DAGScheduler会以TaskSet的方式以一个DAG构造的Stage中所有的任务提交给底层调度器TaskScheduler,TaskScheduler是一个接口(做接口的好处就是跟具体的任务调度解耦合 加到我们的TaskSetManager中,这里new了一个TaskSet。 构建TaskSetManager的参数包括this(TaskSchedulerImpl本身),taskSet,还有最大失败重试次数(是构建TaskSchedulerImpl的时候传进来的,如果没设置的话默认最大为 sortedTaskSets根据调度顺序,返回的是TaskSet集体的计算集合。 然后遍历所有排序后的TaskSet集合,如果有可用的新的executor,我们会把executorAdded交给taskSet。
为了让CPU在固定的核心上执行,我们可以使用taskset指令,让程序绑定逻辑核心。 taskset -c 0,10 . /bind_core a:999409723 b:994174648 设置绑定到0号CPU逻辑核心 taskset -c 0 . /bind_core a:563819215 b:564766868 设置绑定到0,1号CPU逻辑核心 taskset -c 0,1 . /bind_core a:1113333145 b:1072107088 设置绑定到0,1,2号CPU逻辑核心 taskset -c 0,1,2 .
构建TaskSet 基于以上信息,方法为每个分区创建一个Task对象(ShuffleMapTask或ResultTask),并封装成TaskSet: val tasks: Seq[Task[_]] = 提交TaskSet至TaskScheduler 最后,方法调用TaskScheduler的submitTasks方法,将TaskSet提交给底层调度器: if (tasks.nonEmpty) { ,下一步的关键环节是将这些Task封装成TaskSet并提交给下层调度器。 TaskSet的构建与提交 封装完成的Task会被组织成TaskSet对象,它代表了一组能够在同一Stage中并行执行的任务。 在submitTasks方法中,TaskScheduler会执行以下操作: 创建TaskSetManager来管理这个TaskSet的生命周期 将TaskSet加入到调度队列中,根据配置的调度算法(FIFO
(taskSet: TaskSet) //< TaskScheduler(实际上是TaskSchedulerImpl)为DAGScheduler提交的每个taskSet创建一个对应的TaskSetManager : TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length (taskSet, maxTaskFailures) //< 建立taskset与TaskSetManager的对应关系 activeTaskSets(taskSet.id) = manager () } } var launchedTask = false //< 依次取出排序过的taskSet列表中的taskSet; //< 对于每个taskSet 若taskSet带有locality属性,则通过getAllowedLocalityLevel函数获得该taskSet能容忍的最低界别locality。
TaskSet类 如果Locust类代表蝗虫群,则可以说TaskSet类代表蝗虫的大脑。每个Locust类必须设置一个task_set属性,该属性指向TaskSet。 TaskSets可以嵌套,这意味着一个 TaskSet 的 tasks 属性可以包含其他的 TaskSet。如果计划执行嵌套的 TaskSet ,则将实例化它并从当前执行的 TaskSet 进行调用。 然后,当前运行的 TaskSet 中的执行将被移交给嵌套的 TaskSet ,嵌套的 TaskSet 将继续运行,直到遇到由 TaskSet.interrupt() 方法抛出的 InterruptTaskSet 如果计划执行嵌套的 TaskSet,则将实例化它并从当前执行的 TaskSet 调用它。 然后,当前运行的 TaskSet 中的执行将被移交给嵌套的 TaskSet ,这个嵌套的 TaskSet 将继续运行,直到遇到由 TaskSet.interrupt()抛出 InterruptTaskSet
– Phil Karlton 1 安装taskset $ yum install util-linux 如果系统没有taskset命令,使用yum安装util-linux即可,这是一个工具集,其中包含了 taskset命令。 2 查看应用的cpu亲和力(affinity) $ taskset -p 14795 pid 14795's current affinity mask: 3 $ taskset -cp 14795 pid 除了通过taskset命令绑定应用到指定的cpu上,也可以通过taskset命令启动应用,并指定应用运行的cpu,例如: $ taskset 0x1 sleep 10000 & [2] 14925 $ taskset -p 14925 pid 14925's current affinity mask: 1 $ taskset -cp 14925 pid 14925's current affinity
However, it’s also possible to define the tasks of a User or TaskSet by setting the tasks attribute ( 官方案例 普通写法 from locust import TaskSet,HttpLocust,between def login(x): x.client.post('/login',{ 每次启动locust时运行setup方法,退出时运行teardown方法,locust执行TaskSet时运行TaskSet的setup方法,退出时运行teardown方法,每个虚拟用户执行操作时运行on_start 方法,退出时执行on_stop方法,运行上面的脚本,执行顺序如下: 执行顺序:Locust setup → TaskSet setup → TaskSet on_start → TaskSet tasks → TaskSet on_stop → TaskSet teardown → Locust teardown 运行结果: [2020-06-22 21:30:10,993] WIN10-804191526
// TaskSchedulerImpl.scala override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks // 同步代码块 this.synchronized { // 创建TaskSet管理器,对同一个TaskSet中的任务进行调度,跟踪每个task的状态, // 如果失败则重试 manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId // 获取 = taskSet && ! ("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks
这就是本文要探讨的内容,将通过以下四小节来进行剖析: 打散可用的 executors 对所有处于等待状态的 taskSet 进行排序 根据是否有新增的 executor 来决定是否更新各个 taskSet 对所有处于等待状态的 taskSet 进行排序 排序的目的是为了让优先级更高的 taskSet 所包含的 task 更优先的被调度执行,所执行的操作是: val sortedTaskSets: ArrayBuffer 的可用本地性集合 关于更新 taskSet 的可用本地性集合,这里值进行简单说明,更多内容请移步 Spark的位置优先: TaskSetManager 的有效 Locality Levels 若 taskSet 中有 task 的 partition 是存储在 executor 内存中的且对应 executor alive,那么该 taskSet 的最佳本地性为 PROCESS_LOCAL,可用本地性集合包括 ( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) } 含义是根据
: TaskSet, maxTaskFailures: Int): TaskSetManager = { new TaskSetManager(this, taskSet, maxTaskFailures = true,说明任务调度器已经分配当前TaskSet中的任务,发送给Executor去执行 hasLaunchedTask = false,说明15秒后,当前TaskSet中的任务还没有发送给Executor : TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length = taskSet && ! , taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded()
,它将仅用于该TaskSet。 TaskSet类 如果Locust类代表蝗虫群,则可以说TaskSet类代表蝗虫的大脑。每个Locust类必须设置一个task_set属性,该属性指向TaskSet。 启动负载测试时,派生的Locust类的每个实例将开始执行其TaskSet。接下来的情况是每个TaskSet将选择一个任务并调用它。 TaskSet): @task def my_task(self): pass 引用Locust实例,或父TaskSet实例 TaskSet实例的属性 当模拟用户开始执行该TaskSet类时,将调用on_start方法;而当TaskSet停止时,将调用on_stop <locust.core.TaskSet.on_stop()方法。
: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length (taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate ) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet ! = taskSet && ! schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!
overridedefsubmitTasks(taskSet: TaskSet) { valtasks =taskSet.tasks logInfo(“Addingtask set “ + taskSet.id maxTaskFailures) activeTaskSets(taskSet.id)= manager schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties :%s, name: %s, runningTasks: %s”.format( taskSet.parent.name,taskSet.name,taskSet.runningTasks)) } 计算 先从taskset列表中拿出一个tasetset, 子迭代是从PROCESS_LOCAL開始迭代locality的级别。 tasks(i)+= task valtid =task.taskId taskIdToTaskSetId(tid)= taskSet.taskSet.id taskSetTaskIds(taskSet.taskSet.id
在DAGScheduler划分为Stage并以TaskSet的形式提交给TaskScheduler后,再由TaskScheduler通过TaskSetMagager对taskSet的task进行调度与执行 先看整个实现: override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding = taskSet && ! TaskSet,则抛出异常,确保一个stage不能有两个taskSet同时运行。 , taskSet.name, taskSet.runningTasks)) if (newExecAvail) { // 如果该executor是新分配来的 taskSet.executorAdded
TaskScheduler的核心任务是提交TaskSet到集群运算并汇报结果。 为TaskSet创建和维护一个TaskSetManager, 并追踪任务的本地性及错误信息。 下面来分析TaskScheduler接收到DAGScheduler的Stage任务 后, 是如何管理Stage (TaskSet) 的生命周期的。 TaskScheduler源代码解析 下面通过源代码解析来看一下 TaskScheduler 是如何调度和管理 TaskSet 的任务。 submitTasks 源代码如下所示: override def submitTasks(taskSet: TaskSet): Unit = { val tasks = taskSet.tasks (taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate
作为初始化;从WebsiteTasks中随机挑选(如果定义了任务间的权重关系,那么就是按照权重关系随机挑选)一个任务执行;根据Locust类中min_wait和max_wait定义的间隔时间范围(如果TaskSet 类中也定义了min_wait或者max_wait,以TaskSet中的优先),在时间范围中随机取一个值,休眠等待;重复2~3步骤,直至测试任务终止。 from locust import HttpUser, between, TaskSet, task class MyTest(TaskSet): @task #@task 装饰的方法才会在 from locust import HttpUser, between, TaskSet, task class MyTest(TaskSet): @task def testdef( import json import random from locust import HttpUser, between, TaskSet, task class MyTest(TaskSet):
[root@www ~]# ps aux | grep gearman-manager | awk {'print $2;'} | sort -k1,1 | head -3 | xargs -n 1 taskset [root@www ~]# ps aux | grep gearman-manager | awk {'print $2;'} | sort -k1,1 | tail -3 | xargs -n 1 taskset 在Linux上修改进程的「CPU亲和力」 在Linux上,可以通过 taskset 命令进行修改。以 CentOS 为 例,taskset 在 util-linux-2.13-pre7 包中。 对运行中的进程,可用如下命令将 CPU #1, #2, #3 分配给 PID 为 12345 的进程: [root@www ~]# taskset -cp 1,2,3 12345 对于已经在运行中 如果父进程设置了affinity,之后其创建的子进程会继承父进程的affinity属性(其实用 taskset 启动进程就是一次fork+exec)。
: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet ! = taskSet && ! , taskSet.name, taskSet.runningTasks)) //如果有新的executor加入 //则需要从新计算TaskSetManager的就近原则 if (newExecAvail) { taskSet.executorAdded() } } // 得到调度序列中的每个TaskSet,
下面是 locustfile.py 的一个例子,它定义了一个简单的用户行为,它由一个获取特定网页的“任务”组成: from locust import HttpLocust, TaskSet, task class UserBehavior(TaskSet): @task def get_something(self): self.client.get("/something ") class WebsiteUser(HttpLocust): task_set = UserBehavior 我们再来添加第二个任务: class UserBehavior(TaskSet) 如果你想为不同的任务定义权重,那么你可以按照下面的方法来加权: class UserBehavior(TaskSet): @task(2) def get_something(self): 类可以有选择地声明一个 on_start 函数,当模拟用户开始执行该 TaskSet 类时会调用该函数。
带宽测试server: taskset -c 10,11 ib_write_bw -d mlx5_bond_0 -x 3 -F --report_gbits -p 18500 -D 2 -q 16 -- run_infinitely taskset -c 12,13 ib_write_bw -d mlx5_bond_1 -x 3 -F --report_gbits -p 18501 -D 2 -q 16 --run_infinitely taskset -c 14,15 ib_write_bw -d mlx5_bond_2 -x 3 -F --report_gbits -p 18502 -D 2 -q 16 --run_infinitely taskset -c 16,17 ib_write_bw -d mlx5_bond_3 -x 3 -F --report_gbits -p 18503 -D 2 -q 16 --run_infinitely taskset -c 100,101 ib_write_bw -d mlx5_bond_4 -x 3 -F --report_gbits -p 18504