DriverEndpoint 最终生成多个可执行的 TaskDescription 对象,并向各个 ExecutorEndpoint 发送 LaunchTask 指令,本节内容将关注 ExecutorEndpoint 如何处理 LaunchTask 指令,处理完成后如何回馈给 DriverEndpoint,以及整个 job 最终如何多次调度直至结束。
6.10.1 Task 的执行流程
Executor 接受 LaunchTask 指令后,开启一个新线程 TaskRunner 解析 RDD,并调用 RDD 的 compute 方法,归并函数得到最终任务执行结果。
详解如下:
- ExecutorEndpoint 接受到 LaunchTask 指令后,解码出 TaskDescription,调用 Executor 的 launchTask 方法。
- Executor 创建一个 TaskRunner 线程,并启动线程,同时将改线程添加到 Executor 的成员对象中,代码
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]; runningTasks.put(taskDescription.taskId, taskRunner)
。 - TaskRunner:
- 首先向 DriverEndpoint 发送任务最新状态为 RUNNING。
- 从 TaskDescription 解析出 Task,并调用 Task 的 run 方法。
- Task:
- 创建 TaskContext 以及 CallerContext (与 HDFS 交互的上下文对象)。
- 执行 Task 的 runTask 方法:
- 如果 Task 实例为 ShuffleMapTask:解析出 RDD 以及 ShuffleDependency 信息,调用 RDD 的
compute()
方法将结果写 Writer 中(Writer 这里不介绍,可以作为黑盒理解,比如写入一个文件中),返回 MapStatus 对象。 - 如果 Task 实例为 ResultTask:解析出 RDD 以及合并函数信息,调用函数将调用后的结果返回。
- 如果 Task 实例为 ShuffleMapTask:解析出 RDD 以及 ShuffleDependency 信息,调用 RDD 的
- TaskRunner 将 Task 执行的结果序列化,再次向 DriverEndpoint 发送任务最新状态为 FINISHED。
6.10.2 Task 的回馈流程
TaskRunner 执行结束后,都将执行状态发送至 DriverEndpoint,DriverEndpoint 最终反馈指令 CompletionEvent 发送至 DAGSchedulerEventProcessLoop 中。
详解如下:
(1)DriverEndpoint 接收到 StatusUpdate 消息后,调用 TaskScheduler 的 statusUpdate(taskId, state, result)
方法。
(2)TaskScheduler 如果任务结果是完成,那么清除该任务处理中的状态,并调动 TaskResultGetter 相关方法。
关键代码如下:
val taskSet = taskIdToTaskSetManager.get(tid)
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { executorId =>
executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) }
}
taskSet.removeRunningTask(tid)
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
(3)TaskResultGetter 启动线程启动线程【task-result-getter】进行相关处理。
首先,通过解析或者远程获取得到 Task 的 TaskResult 对象。
其次,调用 TaskSet 的 handleSuccessfulTask 方法,TaskSet 的 handleSuccessfulTask 方法直接调用 TaskSetManager 的 handleSuccessfulTask 方法。
(4)TaskSetManager
首先,更新内部 TaskInfo 对象状态,并将该 Task 从运行中 Task 的集合删除,代码如下:
val info = taskInfos(tid)
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
removeRunningTask(tid)
其次,调用 DAGScheduler 的 taskEnded 方法,关键代码如下:
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
最后,DAGScheduler 向 DAGSchedulerEventProcessLoop 存入 CompletionEvent 指令,CompletionEvent 对象定义如下:
private[scheduler] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Seq[AccumulatorV2[_, _]],
taskInfo: TaskInfo) extends DAGSchedulerEvent
6.10.3 Task 的迭代流程
DAGSchedulerEventProcessLoop 中针对于 CompletionEvent 指令,调用 DAGScheduler 进行处理,DAGScheduler 更新 Stage 与该 Task 的关系状态,如果 Stage 下 Task 都返回,则做下一层 Stage 的任务拆解与运算工作,直至 Job 被执行完毕。
详解如下:
(1)DAGSchedulerEventProcessLoop 接收到 CompletionEvent 指令后,调用 DAGScheduler 的 handleTaskCompletion 方法。
(2)DAGScheduler 根据 Task 的类型分别处理。
如果 Task 为 ShuffleMapTask:
- 等待回馈的 Partitions 减去当前 partitionId。
- 如果所有 task 都返回,则
markStageAsFinished(shuffleStage)
,同时向 MapOutputTrackerMaster 注册 MapOutputs 信息,且 markMapStageJobAsFinished。 - 调用
submitWaitingChildStages(shuffleStage)
进行下层 Stages 的处理,从而迭代处理,最终处理到 ResultTask,job 结束。
submitWaitingChildStages 关键代码如下:
private def submitWaitingChildStages(parent: Stage) {
...
val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
waitingStages --= childStages
for (stage <- childStages.sortBy(_.firstJobId)) {
submitStage(stage)
}
}
(4)如果 Task 为 ResultTask。
该 job 的 partitions 都已返回,则 **markStageAsFinished(resultStage)**
,并 **cleanupStateForJobAndIndependentStages(job)**
,关键代码如下:
for (stage <- stageIdToStage.get(stageId)) {
if (runningStages.contains(stage)) {
logDebug("Removing running stage %d".format(stageId))
runningStages -= stage
}
for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) {
shuffleIdToMapStage.remove(k)
}
if (waitingStages.contains(stage)) {
logDebug("Removing stage %d from waiting set.".format(stageId))
waitingStages -= stage
}
if (failedStages.contains(stage)) {
logDebug("Removing stage %d from failed set.".format(stageId))
failedStages -= stage
}
}
stageIdToStage -= stageId
jobIdToStageIds -= job.jobId
jobIdToActiveJob -= job.jobId
activeJobs -= job
至此,用户编写的代码最终调用 Spark 分布式计算完毕。
6.10.4 精彩图解
Spark 的交互流程 – 节点启动:
Spark 的交互流程 – 应用提交:
Spark 的交互流程 – 任务运行:
Spark 的交互流程 – 任务运行: