DriverEndpoint 最终生成多个可执行的 TaskDescription 对象,并向各个 ExecutorEndpoint 发送 LaunchTask 指令,本节内容将关注 ExecutorEndpoint 如何处理 LaunchTask 指令,处理完成后如何回馈给 DriverEndpoint,以及整个 job 最终如何多次调度直至结束。

6.10.1 Task 的执行流程

Executor 接受 LaunchTask 指令后,开启一个新线程 TaskRunner 解析 RDD,并调用 RDD 的 compute 方法,归并函数得到最终任务执行结果。

image.png

详解如下:

  1. ExecutorEndpoint 接受到 LaunchTask 指令后,解码出 TaskDescription,调用 ExecutorlaunchTask 方法。
  2. Executor 创建一个 TaskRunner 线程,并启动线程,同时将改线程添加到 Executor 的成员对象中,代码 private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]; runningTasks.put(taskDescription.taskId, taskRunner)
  3. TaskRunner
    1. 首先向 DriverEndpoint 发送任务最新状态为 RUNNING
    2. TaskDescription 解析出 Task,并调用 Task 的 run 方法。
  4. Task
    1. 创建 TaskContext 以及 CallerContext (与 HDFS 交互的上下文对象)。
    2. 执行 TaskrunTask 方法:
      1. 如果 Task 实例为 ShuffleMapTask:解析出 RDD 以及 ShuffleDependency 信息,调用 RDD 的 compute() 方法将结果写 Writer 中(Writer 这里不介绍,可以作为黑盒理解,比如写入一个文件中),返回 MapStatus 对象。
      2. 如果 Task 实例为 ResultTask:解析出 RDD 以及合并函数信息,调用函数将调用后的结果返回。
  5. TaskRunner 将 Task 执行的结果序列化,再次向 DriverEndpoint 发送任务最新状态为 FINISHED

6.10.2 Task 的回馈流程

TaskRunner 执行结束后,都将执行状态发送至 DriverEndpoint,DriverEndpoint 最终反馈指令 CompletionEvent 发送至 DAGSchedulerEventProcessLoop 中。

image.png

详解如下:

(1)DriverEndpoint 接收到 StatusUpdate 消息后,调用 TaskSchedulerstatusUpdate(taskId, state, result) 方法。

(2)TaskScheduler 如果任务结果是完成,那么清除该任务处理中的状态,并调动 TaskResultGetter 相关方法。

关键代码如下:

  1. val taskSet = taskIdToTaskSetManager.get(tid)
  2. taskIdToTaskSetManager.remove(tid)
  3. taskIdToExecutorId.remove(tid).foreach { executorId =>
  4. executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) }
  5. }
  6. taskSet.removeRunningTask(tid)
  7. if (state == TaskState.FINISHED) {
  8. taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
  9. } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
  10. taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
  11. }

(3)TaskResultGetter 启动线程启动线程【task-result-getter】进行相关处理。

首先,通过解析或者远程获取得到 TaskTaskResult 对象。

其次,调用 TaskSethandleSuccessfulTask 方法,TaskSet 的 handleSuccessfulTask 方法直接调用 TaskSetManagerhandleSuccessfulTask 方法。

(4)TaskSetManager

首先,更新内部 TaskInfo 对象状态,并将该 Task 从运行中 Task 的集合删除,代码如下:

  1. val info = taskInfos(tid)
  2. info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
  3. removeRunningTask(tid)


其次,调用 DAGSchedulertaskEnded 方法,关键代码如下:

  1. sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)

最后,DAGSchedulerDAGSchedulerEventProcessLoop 存入 CompletionEvent 指令,CompletionEvent 对象定义如下:

  1. private[scheduler] case class CompletionEvent(
  2. task: Task[_],
  3. reason: TaskEndReason,
  4. result: Any,
  5. accumUpdates: Seq[AccumulatorV2[_, _]],
  6. taskInfo: TaskInfo) extends DAGSchedulerEvent

6.10.3 Task 的迭代流程

DAGSchedulerEventProcessLoop 中针对于 CompletionEvent 指令,调用 DAGScheduler 进行处理,DAGScheduler 更新 Stage 与该 Task 的关系状态,如果 Stage 下 Task 都返回,则做下一层 Stage 的任务拆解与运算工作,直至 Job 被执行完毕。

image.png

详解如下:

(1)DAGSchedulerEventProcessLoop 接收到 CompletionEvent 指令后,调用 DAGSchedulerhandleTaskCompletion 方法。

(2)DAGScheduler 根据 Task 的类型分别处理。

如果 Task 为 ShuffleMapTask

  1. 等待回馈的 Partitions 减去当前 partitionId。
  2. 如果所有 task 都返回,则 markStageAsFinished(shuffleStage),同时向 MapOutputTrackerMaster 注册 MapOutputs 信息,且 markMapStageJobAsFinished
  3. 调用 submitWaitingChildStages(shuffleStage) 进行下层 Stages 的处理,从而迭代处理,最终处理到 ResultTask,job 结束。

submitWaitingChildStages 关键代码如下:

  1. private def submitWaitingChildStages(parent: Stage) {
  2. ...
  3. val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
  4. waitingStages --= childStages
  5. for (stage <- childStages.sortBy(_.firstJobId)) {
  6. submitStage(stage)
  7. }
  8. }

(4)如果 Task 为 ResultTask

该 job 的 partitions 都已返回,则 **markStageAsFinished(resultStage)**,并 **cleanupStateForJobAndIndependentStages(job)**,关键代码如下:

  1. for (stage <- stageIdToStage.get(stageId)) {
  2. if (runningStages.contains(stage)) {
  3. logDebug("Removing running stage %d".format(stageId))
  4. runningStages -= stage
  5. }
  6. for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) {
  7. shuffleIdToMapStage.remove(k)
  8. }
  9. if (waitingStages.contains(stage)) {
  10. logDebug("Removing stage %d from waiting set.".format(stageId))
  11. waitingStages -= stage
  12. }
  13. if (failedStages.contains(stage)) {
  14. logDebug("Removing stage %d from failed set.".format(stageId))
  15. failedStages -= stage
  16. }
  17. }
  18. stageIdToStage -= stageId
  19. jobIdToStageIds -= job.jobId
  20. jobIdToActiveJob -= job.jobId
  21. activeJobs -= job

至此,用户编写的代码最终调用 Spark 分布式计算完毕。

6.10.4 精彩图解

Spark 的交互流程 – 节点启动:

image.png

Spark 的交互流程 – 应用提交:

image.png

Spark 的交互流程 – 任务运行:

image.png

Spark 的交互流程 – 任务运行:

image.png