• 核心概念">核心概念
    • ExecutionJobVertex">ExecutionJobVertex
    • 5. 配置 state checkpointing (忽略)">5. 配置 state checkpointing (忽略)
  • 从 ExecutionGraph 到实际运行的任务">从 ExecutionGraph 到实际运行的任务
    • ExecutionGraph.scheduleForExecution">ExecutionGraph.scheduleForExecution
  • 小结">小结

    我们前面已经分析过 StreamGraph, JobGraph 的生成过程,这两个执行图都是在 client 端生成的。接下来我们将把目光头投向 Flink Job 运行时调度层核心的执行图 - ExecutionGraph
    StreamGraph 以及 JobGraph 不同的是,ExecutionGraph 是在 JobManager 中生成的。 Client 向 JobManager 提交 JobGraph 后, JobManager 就会根据 JobGraph 来创建对应的 ExecutionGraph,并以此来调度任务。
    本文不会介绍 JobMagage 的启动及任务调度过程,后面将会在单独的文章中进行分析。

    核心概念

    ExecutionJobVertex

    ExecutionGraph 中,节点对应的类是 ExecutionJobVertex,与之对应的就是 JobGraph 中的 JobVertex。每一个 ExexutionJobVertex 都是由一个 JobVertex 生成的。

    | ``` private final JobVertex jobVertex; private final List operatorIDs; private final List userDefinedOperatorIds; //ExecutionVertex 对应一个并行的子任务 private final ExecutionVertex[] taskVertices; private final IntermediateResult[] producedDataSets; private final List inputs; private final int parallelism; private final SlotSharingGroup slotSharingGroup; private final CoLocationGroup coLocationGroup; private final InputSplit[] inputSplits; private int maxParallelism;

    1. |
    2. | --- |
    3. <a name="executionvertex"></a>
    4. ### [](https://blog.jrwang.me/2019/flink-source-code-executiongraph/#executionvertex)ExecutionVertex
    5. `ExexutionJobVertex` 的成员变量中包含一个 `ExecutionVertex` 数组。我们知道,Flink Job 是可以指定任务的并行度的,在实际运行时,会有多个并行的任务同时在执行,对应到这里就是 `ExecutionVertex``ExecutionVertex` 是并行任务的一个子任务,算子的并行度是多少,那么就会有多少个 `ExecutionVertex`
    6. |

    private final ExecutionJobVertex jobVertex; private final Map resultPartitions; private final ExecutionEdge[][] inputEdges; private final int subTaskIndex; private final EvictingBoundedList priorExecutions; private volatile CoLocationConstraint locationConstraint; /* The current or latest execution attempt of this vertex’s task. / private volatile Execution currentExecution; // this field must never be null

    1. |
    2. | --- |
    3. <a name="execution"></a>
    4. ### [](https://blog.jrwang.me/2019/flink-source-code-executiongraph/#execution)Execution
    5. `Execution` 是对 `ExecutionVertex` 的一次执行,通过 `ExecutionAttemptId` 来唯一标识。
    6. <a name="intermediateresult"></a>
    7. ### [](https://blog.jrwang.me/2019/flink-source-code-executiongraph/#intermediateresult)IntermediateResult
    8. `JobGraph` 中用 `IntermediateDataSet` 表示 `JobVertex` 的对外输出,一个 `JobGraph` 可能有 n(n >=0) 个输出。在 `ExecutionGraph` 中,与此对应的就是 `IntermediateResult`
    9. |

    //对应的IntermediateDataSet的ID private final IntermediateDataSetID id; //生产者 private final ExecutionJobVertex producer; //对应ExecutionJobVertex的并行度 private final int numParallelProducers; private final IntermediateResultPartition[] partitions = new IntermediateResultPartition[numParallelProducers]; private final ResultPartitionType resultType;

    1. |
    2. | --- |
    3. 由于 `ExecutionJobVertex` numParallelProducers 个并行的子任务,自然对应的每一个 `IntermediateResult` 就有 numParallelProducers 个生产者,每个生产者的在相应的 `IntermediateResult` 上的输出对应一个 `IntermediateResultPartition``IntermediateResultPartition` 表示的是 `ExecutionVertex` 的一个输出分区,即:
    4. | ExecutionJobVertex --> IntermediateResult
    5. ExecutionVertex --> IntermediateResultPartition |
    6. | --- |
    7. 一个 `ExecutionJobVertex` 可能包含多个(n `IntermediateResult` 那实际上每一个并行的子任务 `ExecutionVertex` 可能会会包含(n `IntermediateResultPartition`。<br />`IntermediateResultPartition` 的生产者是 `ExecutionVertex`,消费者是一个或若干个 `ExecutionEdge`
    8. <a name="executionedge"></a>
    9. ### [](https://blog.jrwang.me/2019/flink-source-code-executiongraph/#executionedge)ExecutionEdge
    10. `ExecutionEdge` 表示 `ExecutionVertex` 的输入,通过 `ExecutionEdge` `ExecutionVertex` `IntermediateResultPartition` 连接起来,进而在不同的 `ExecutionVertex` 之间建立联系。
    11. |

    private final IntermediateResultPartition source; private final ExecutionVertex target; private final int inputNum;

    1. |
    2. | --- |
    3. <a name="3841c331"></a>
    4. ## [](https://blog.jrwang.me/2019/flink-source-code-executiongraph/#%E6%9E%84%E5%BB%BA-executiongraph-%E7%9A%84%E6%B5%81%E7%A8%8B)构建 ExecutionGraph 的流程
    5. 创建 `ExecutionGraph` 的入口在 `ExecutionGraphBuilder#buildGraph()` 中。
    6. <a name="86f539e5"></a>
    7. ### [](https://blog.jrwang.me/2019/flink-source-code-executiongraph/#1-%E5%88%9B%E5%BB%BA-executiongraph-%E5%AF%B9%E8%B1%A1%E5%B9%B6%E8%AE%BE%E7%BD%AE%E5%9F%BA%E6%9C%AC%E5%B1%9E%E6%80%A7)1. 创建 ExecutionGraph 对象并设置基本属性
    8. 设置 JobInformation, SlotProvider 等信息,下面罗列了一些比较重要的属性:
    9. |

    / Job specific information like the job id, job name, job configuration, etc. */ private final JobInformation jobInformation; / The slot provider to use for allocating slots for tasks as they are needed. / private final SlotProvider slotProvider; /** The classloader for the user code. Needed for calls into user code classes. / private final ClassLoader userClassLoader; / All job vertices that are part of this graph. */ private final ConcurrentHashMap tasks; / All vertices, in the order in which they were created. / private final List verticesInCreationOrder; / All intermediate results that are part of this graph. / private final ConcurrentHashMap intermediateResults; /** Current status of the job execution. / private volatile JobStatus state = JobStatus.CREATED; /** Listeners that receive messages when the entire job switches it status

    1. * (such as from RUNNING to FINISHED). */
    2. private final List<JobStatusListener> jobStatusListeners;
    3. /** Listeners that receive messages whenever a single task execution changes its status. */
    4. private final List<ExecutionStatusListener> executionListeners;
    1. |
    2. | --- |
    3. <a name="95214e11"></a>
    4. ### [](https://blog.jrwang.me/2019/flink-source-code-executiongraph/#2-jobvertex-%E5%88%9D%E5%A7%8B%E5%8C%96)2. JobVertex 初始化
    5. JobVertex Master 上进行初始化,主要关注`OutputFormatVertex` `InputFormatVertex`,其他类型的 vertex 在这里没有什么特殊操作。File output format 在这一步准备好输出目录, Input splits 在这一步创建对应的 splits
    6. |

    for (JobVertex vertex : jobGraph.getVertices()) { …. try { vertex.initializeOnMaster(classLoader); } catch (Throwable t) { throw new JobExecutionException(jobId, “Cannot initialize task ‘“ + vertex.getName() + “‘: “ + t.getMessage(), t); } }

    1. |
    2. | --- |
    3. <a name="788ec8b0"></a>
    4. ### [](https://blog.jrwang.me/2019/flink-source-code-executiongraph/#4-%E7%94%9F%E6%88%90-executiongraph-%E5%86%85%E9%83%A8%E7%9A%84%E8%8A%82%E7%82%B9%E5%92%8C%E8%BF%9E%E6%8E%A5)4. 生成 ExecutionGraph 内部的节点和连接
    5. 对所有的 Jobvertext 进行拓扑排序,并生成 `ExecutionGraph` 内部的节点和连接
    6. |

    //topologically sort the job vertices and attach the graph to the existing one List sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); if (log.isDebugEnabled()) { log.debug(“Adding {} vertices from job graph {} ({}).”, sortedTopology.size(), jobName, jobId); } executionGraph.attachJobGraph(sortedTopology);

    1. |
    2. | --- |
    3. <a name="69f0b5ac"></a>
    4. #### [](https://blog.jrwang.me/2019/flink-source-code-executiongraph/#4-1-%E5%AF%B9-jobvertex-%E8%BF%9B%E8%A1%8C%E6%8B%93%E6%89%91%E6%8E%92%E5%BA%8F)4.1 对 JobVertex 进行拓扑排序
    5. 所谓拓扑排序,即保证如果存在 A -> B 的有向边,那么在排序后的列表中 A 节点一定在 B 节点之前。具体的算法这里不再详细分析。
    6. <a name="00ece853"></a>
    7. #### [](https://blog.jrwang.me/2019/flink-source-code-executiongraph/#4-2-%E5%88%9B%E5%BB%BA-executionjobvertex)4.2 创建 ExecutionJobVertex
    8. 按照拓扑排序的结果依次为每个 `JobVertex` 创建对应的 `ExecutionJobVertex`
    9. |

    for (JobVertex jobVertex : topologiallySorted) { if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) { this.isStoppable = false; } // create the execution job vertex and attach it to the graph //创建 ExecutionJobVertex ExecutionJobVertex ejv = new ExecutionJobVertex( this, jobVertex, 1, rpcTimeout, globalModVersion, createTimestamp); //连接上游节点 ejv.connectToPredecessors(this.intermediateResults); ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv); if (previousTask != null) { throw new JobException(String.format(“Encountered two job vertices with ID %s : previous=[%s] / new=[%s]”, jobVertex.getID(), ejv, previousTask)); } for (IntermediateResult res : ejv.getProducedDataSets()) { IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res); if (previousDataSet != null) { throw new JobException(String.format(“Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]”, res.getId(), res, previousDataSet)); } } this.verticesInCreationOrder.add(ejv); this.numVerticesTotal += ejv.getParallelism(); newExecJobVertices.add(ejv); } ``` | | —- |

    在创建 ExecutionJobVertex 的时候会创建对应的 ExecutionVertexIntermediateResultExecutionEdgeIntermediateResultPartition 等对象,这里涉及到的对象相对较多,概括起来大致是这样的:

    • 每一个 JobVertex 对应一个 ExecutionJobVertex,
    • 每一个 ExecutionJobVertex 有 parallelism 个 ExecutionVertex
    • 每一个 JobVertex 可能有 n(n>=0) 个 IntermediateDataSet,在 ExecutionJobVertex 中,一个 IntermediateDataSet 对应一个 IntermediateResult, 每一个 IntermediateResult 都有 parallelism 个生产者, 对应 parallelism 个IntermediateResultPartition
    • 每一个 ExecutionJobVertex 都会和前向的 IntermediateResult 连接,实际上是 ExecutionVertexIntermediateResult 建立连接,生成 ExecutionEdge

      5. 配置 state checkpointing (忽略)

      从 ExecutionGraph 到实际运行的任务

      ExecutionGraph 是在创建 JobMaster 时就构建完成的,之后就可以被调度执行了。下面简单概括下调度执行的流程,具体分析见后续的文章。

      ExecutionGraph.scheduleForExecution

      按照拓扑顺序为所有的 ExecutionJobVertex 分配资源,其中每一个 ExecutionVertex 都需要分配一个 slot,ExecutionVertex 的一次执行对应一个 Execution,在分配资源的时候会依照 SlotSharingGroupCoLocationConstraint 确定,分配的时候会考虑 slot 重用的情况。
      在所有的节点资源都获取成功后,会逐一调用 Execution.deploy() 来部署 Execution, 使用 TaskDeploymentDescriptor 来描述 Execution,并提交到分配给该 Execution 的 slot 对应的 TaskManager, 最终被分配给对应的 TaskExecutor 执行。

      小结

      本文简单概括了 ExecutionGraph 涉及到的概念和其生成过程。
      到目前为止,我们了解了 StreamGraph, JobGraphExecutionGraph 的生成过程,以及他们内部的节点和连接的对应关系。总的来说, streamGraph 是最原始的,更贴近用户逻辑的 DAG 执行图;JobGraph 是对 StreamGraph 的进一步优化,将能够合并的算子合并为一个节点以降低运行时数据传输的开销;ExecutionGraph 则是作业运行是用来调度的执行图,可以看作是并行化版本的 JobGraph,将 DAG 拆分到基本的调度单元。
      -EOF-