• JobVertex">JobVertex
  • 小结">小结

    前面的文章我们介绍了 StreamGraph 的生成,这个实际上只对应 Flink 作业在逻辑上的执行计划图。Flink 会进一步对 StreamGraph 进行转换,得到另一个执行计划图,即 JobGraph。

    JobVertex

    在 StreamGraph 中,每一个算子(Operator) 对应了图中的一个节点(StreamNode)。StreamGraph 会被进一步优化,将多个符合条件的节点串联(Chain) 在一起形成一个节点,从而减少数据在不同节点之间流动所产生的序列化、反序列化、网络传输的开销。多个算子被 chain 在一起的形成的节点在 JobGraph 中对应的就是 JobVertex
    每个 JobVertex 中包含一个或多个 Operators。 JobVertex 的主要成员变量包括

    | ``` / The ID of the vertex. */ private final JobVertexID id; / The alternative IDs of the vertex. / private final ArrayList idAlternatives = new ArrayList<>(); /** The IDs of all operators contained in this vertex. / private final ArrayList operatorIDs = new ArrayList<>(); / The alternative IDs of all operators contained in this vertex. */ private final ArrayList operatorIdsAlternatives = new ArrayList<>(); / List of produced data sets, one per writer / private final ArrayList results = new ArrayList(); /** List of edges with incoming data. One per Reader. / private final ArrayList inputs = new ArrayList(); /* Number of subtasks to split this task into at runtime./ private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;

    1. |
    2. | --- |
    3. 其输入是 `JobEdge` 列表, 输出是 `IntermediateDataSet` 列表。
    4. <a name="jobedge"></a>
    5. ## [](https://blog.jrwang.me/2019/flink-source-code-jobgraph/#jobedge)JobEdge
    6. `StramGraph` 中,`StreamNode` 之间是通过 `StreamEdge` 建立连接的。在 `JobEdge` 中,对应的是 `JobEdge` 。<br />和 `StreamEdge` 中同时保留了源节点和目标节点 sourceId targetId)不同,在 `JobEdge` 中只有源节点的信息。由于 `JobVertex` 中保存了所有输入的 `JobEdge` 的信息,因而同样可以在两个节点之间建立连接。更确切地说,`JobEdge` 是和节点的输出结果相关联的,我们看下 `JobEdge` 主要的成员变量:
    7. |

    / The vertex connected to this edge. */ private final JobVertex target; / The distribution pattern that should be used for this job edge. / // DistributionPattern 决定了在上游节点(生产者)的子任务和下游节点(消费者)之间的连接模式 private final DistributionPattern distributionPattern; /** The data set at the source of the edge, may be null if the edge is not yet connected/ private IntermediateDataSet source; / The id of the source intermediate data set */ private IntermediateDataSetID sourceId; / Optional name for the data shipping strategy (forward, partition hash, rebalance, …),

    1. * to be displayed in the JSON plan */
    2. private String shipStrategyName;
    1. |
    2. | --- |
    3. <a name="intermediatedataset"></a>
    4. ## [](https://blog.jrwang.me/2019/flink-source-code-jobgraph/#intermediatedataset)IntermediateDataSet
    5. `JobVertex` 产生的数据被抽象为 `IntermediateDataSet`, 字面意思为中间数据集,这个很容易理解。前面提到,`JobEdge` 是和节点的输出结果相关联的,其实就是指可以把 `JobEdge` 看作是 `IntermediateDataSet` 的消费者,那么 `JobVertex` 自然就是生产者了。
    6. |

    private final IntermediateDataSetID id; // the identifier private final JobVertex producer; // the operation that produced this data set private final List consumers = new ArrayList(); // The type of partition to use at runtime private final ResultPartitionType resultType;

    1. |
    2. | --- |
    3. 其中 `ResultPartitionType` 表示中间结果的类型,说起来有点抽象,我们看下属性就明白了:
    4. |

    / Can the partition be consumed while being produced? */ private final boolean isPipelined; / Does the partition produce back pressure when not consumed? / private final boolean hasBackPressure; /** Does this partition use a limited number of (network) buffers? / private final boolean isBounded;

    1. |
    2. | --- |
    3. 这个要结合 Flink 任务运行时的内存管理机制来看,在后面的文章再进行分析。目前在 Stream 模式下使用的类型是 `PIPELINED_BOUNDED(true, true, true)`,上面的三个属性都是 true
    4. <a name="streamconfig"></a>
    5. ## [](https://blog.jrwang.me/2019/flink-source-code-jobgraph/#streamconfig)StreamConfig
    6. 对于每一个 `StreamOperator`, 也就是 `StreamGraph` 中的每一个 `StreamGraph`, 在生成 `JobGraph` 的过程中 `StreamingJobGraphGenerator` 都会创建一个对应的 `StreamConfig`。<br />`StreamConfig` 中保存了这个算子(operator 在运行是需要的所有配置信息,这些信息都是通过 key/value 的形式存储在 `Configuration` 中的。例如:
    7. |

    //保存StreamOperator信息 public void setStreamOperator(StreamOperator<?> operator) { if (operator != null) { config.setClass(USER_FUNCTION, operator.getClass()); try { InstantiationUtil.writeObjectToConfig(operator, this.config, SERIALIZEDUDF); } catch (IOException e) { throw new StreamTaskException(“Cannot serialize operator object “

    1. + operator.getClass() + ".", e);
    2. }
    3. }
    4. }

    public void setChainedOutputs(List chainedOutputs) { try { InstantiationUtil.writeObjectToConfig(chainedOutputs, this.config, CHAINED_OUTPUTS); } catch (IOException e) { throw new StreamTaskException(“Cannot serialize chained outputs.”, e); } } public void setNonChainedOutputs(List outputvertexIDs) { try { InstantiationUtil.writeObjectToConfig(outputvertexIDs, this.config, NONCHAINED_OUTPUTS); } catch (IOException e) { throw new StreamTaskException(“Cannot serialize non chained outputs.”, e); } } public void setInPhysicalEdges(List inEdges) { try { InstantiationUtil.writeObjectToConfig(inEdges, this.config, IN_STREAM_EDGES); } catch (IOException e) { throw new StreamTaskException(“Cannot serialize inward edges.”, e); } } //……

    1. |
    2. | --- |
    3. <a name="c9985dc2"></a>
    4. ## [](https://blog.jrwang.me/2019/flink-source-code-jobgraph/#%E4%BB%8E-streamgraph-%E5%88%B0-jobgraph)从 StreamGraph 到 JobGraph
    5. `StreamGraph` `JobGraph` 的转换入口在 `StreamingJobGraphGenerator` 中。<br />首先来看下 `StreamingJobGraphGenerator` 的成员变量和入口函数:
    6. |

    //id -> JobVertex 的对应关系 private final Map jobVertices; //已经构建的JobVertex的id集合 private final Collection builtVertices; //物理边集合(不包含chain内部的边), 按创建顺序排序 private List physicalEdgesInOrder; //保存 operataor chain 的信息,部署时用来构建 OperatorChain,startNodeId -> (currentNodeId -> StreamConfig) private Map> chainedConfigs; //所有节点的配置信息,id -> StreamConfig private Map vertexConfigs; //保存每个节点的名字,id -> chainedName private Map chainedNames; //用于计算hash值的算法 private final StreamGraphHasher defaultStreamGraphHasher; private final List legacyStreamGraphHashers; //….. private JobGraph createJobGraph() { // 调度模式,立即启动 jobGraph.setScheduleMode(ScheduleMode.EAGER); // 广度优先遍历 StreamGraph 并且为每个SteamNode生成hash,hash值将被用于 JobVertexId 中 // 保证如果提交的拓扑没有改变,则每次生成的hash都是一样的 Map hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); List> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size()); for (StreamGraphHasher hasher : legacyStreamGraphHashers) { legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph)); } Map>> chainedOperatorHashes = new HashMap<>(); // 主要的转换逻辑,生成 JobVetex, JobEdge 等 setChaining(hashes, legacyHashes, chainedOperatorHashes); // 将每个JobVertex的输入边集合也序列化到该JobVertex的StreamConfig中 // (出边集合已经在setChaining的时候写入了) setPhysicalEdges(); // 根据group name,为每个 JobVertex 指定所属的 SlotSharingGroup // 以及针对 Iteration的头尾设置 CoLocationGroup setSlotSharingAndCoLocation(); // 配置 checkpoint configureCheckpointing(); // 添加用户提供的自定义的文件信息 JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph); // 将 StreamGraph 的 ExecutionConfig 序列化到 JobGraph 的配置中 try { jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); } catch (IOException e) { throw new IllegalConfigurationException(“Could not serialize the ExecutionConfig.” + “This indicates that non-serializable types (like custom serializers) were registered”); } return jobGraph; }

    1. |
    2. | --- |
    3. `StreamingJobGraphGenerator#createJobGraph` 函数的逻辑也很清晰,首先为所有节点生成一个唯一的hash id,如果节点在多次提交中没有改变(包括并发度、上下游等),那么这个id就不会改变,这主要用于故障恢复。这里我们不能用 `StreamNode.id` 来代替,因为这是一个从 1 开始的静态计数变量,同样的 Job 可能会得到不一样的 id。然后就是最关键的 chaining 处理,和生成JobVetexJobEdge等。之后就是写入各种配置相关的信息。<br />我们先来看一下,Flink 是如何确定两个 Operator 是否能够被 chain 到同一个节点的:
    4. |

    //StreamEdge 两端的节点是否能够被 chain 到同一个 JobVertex 中 public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { //获取到上游和下游节点 StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); //获取到上游和下游节点具体的算子 StreamOperator StreamOperator<?> headOperator = upStreamVertex.getOperator(); StreamOperator<?> outOperator = downStreamVertex.getOperator(); return downStreamVertex.getInEdges().size() == 1 //下游节点只有一个输入 && outOperator != null && headOperator != null && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //在同一个slot共享组中 //上下游算子的 chainning 策略,要允许chainning //默认的是 ALWAYS //在添加算子时,也可以强制使用 disableChain 设置为 NEVER && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) //上下游节点之间的数据传输方式必须是FORWARD,而不能是REBALANCE等其它模式 && (edge.getPartitioner() instanceof ForwardPartitioner) //上下游节点的并行度要一致 && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled(); }

    1. |
    2. | --- |
    3. 只要一条边两端的节点满足上面的条件,那么这两个节点就可以被串联在同一个 `JobVertex` 中。接着来就来看最为关键的函数 setChaining 的逻辑:
    4. |

    /**

    1. * Sets up task chains from the source {@link StreamNode} instances.
    2. *
    3. * <p>This will recursively create all {@link JobVertex} instances.
    4. */
    5. private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
    6. for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
    7. createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
    8. }
    9. }
    10. //构建 operator chain(可能包含一个或多个StreamNode),返回值是当前的这个 operator chain 实际的输出边(不包括内部的边)
    11. //如果 currentNodeId != startNodeId, 说明当前节点在 operator chain 的内部
    12. private List<StreamEdge> createChain(
    13. Integer startNodeId,
    14. Integer currentNodeId,
    15. Map<Integer, byte[]> hashes,
    16. List<Map<Integer, byte[]>> legacyHashes,
    17. int chainIndex,
    18. Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
    19. if (!builtVertices.contains(startNodeId)) {
    20. //当前 operator chain 最终的输出边,不包括内部的边
    21. List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
    22. List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
    23. List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
    24. //将当前节点的出边分为两组,即 chainable 和 nonChainable
    25. for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
    26. if (isChainable(outEdge, streamGraph)) { //判断当前 StreamEdge 的上下游是否可以串联在一起
    27. chainableOutputs.add(outEdge);
    28. } else {
    29. nonChainableOutputs.add(outEdge);
    30. }
    31. }
    32. //对于chainable的输出边,递归调用,找到最终的输出边并加入到输出列表中
    33. for (StreamEdge chainable : chainableOutputs) {
    34. transitiveOutEdges.addAll(
    35. createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
    36. }
    37. //对于 nonChainable 的边
    38. for (StreamEdge nonChainable : nonChainableOutputs) {
    39. //这个边本身就应该加入到当前节点的输出列表中
    40. transitiveOutEdges.add(nonChainable);
    41. //递归调用,以下游节点为起点创建新的operator chain
    42. createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
    43. }
    44. //用于保存一个operator chain所有 operator 的 hash 信息
    45. List<Tuple2<byte[], byte[]>> operatorHashes =
    46. chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
    47. byte[] primaryHashBytes = hashes.get(currentNodeId);
    48. for (Map<Integer, byte[]> legacyHash : legacyHashes) {
    49. operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
    50. }
    51. //当前节点的名称,资源要求等信息
    52. chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
    53. chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
    54. chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
    55. //如果当前节点是起始节点, 则直接创建 JobVertex 并返回 StreamConfig, 否则先创建一个空的 StreamConfig
    56. //createJobVertex 函数就是根据 StreamNode 创建对应的 JobVertex, 并返回了空的 StreamConfig
    57. StreamConfig config = currentNodeId.equals(startNodeId)
    58. ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
    59. : new StreamConfig(new Configuration());
    60. // 设置 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig 中.
    61. // 其中包括 序列化器, StreamOperator, Checkpoint 等相关配置
    62. setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
    63. if (currentNodeId.equals(startNodeId)) {
    64. // 如果是chain的起始节点。(不是chain中的节点,也会被标记成 chain start)
    65. config.setChainStart();
    66. config.setChainIndex(0);
    67. config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
    68. //把实际的输出边写入配置, 部署时会用到
    69. config.setOutEdgesInOrder(transitiveOutEdges);
    70. //operator chain 的头部 operator 的输出边,包括内部的边
    71. config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
    72. // 将当前节点(headOfChain)与所有出边相连
    73. for (StreamEdge edge : transitiveOutEdges) {
    74. // 通过StreamEdge构建出JobEdge,创建IntermediateDataSet,用来将JobVertex和JobEdge相连
    75. connect(startNodeId, edge);
    76. }
    77. // 将operator chain中所有子节点的 StreamConfig 写入到 headOfChain 节点的 CHAINED_TASK_CONFIG 配置中
    78. config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
    79. } else {
    80. //如果是 operator chain 内部的节点
    81. Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);
    82. if (chainedConfs == null) {
    83. chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
    84. }
    85. config.setChainIndex(chainIndex);
    86. StreamNode node = streamGraph.getStreamNode(currentNodeId);
    87. config.setOperatorName(node.getOperatorName());
    88. // 将当前节点的 StreamConfig 添加所在的 operator chain 的 config 集合中
    89. chainedConfigs.get(startNodeId).put(currentNodeId, config);
    90. }
    91. //设置当前 operator 的 OperatorID
    92. config.setOperatorID(new OperatorID(primaryHashBytes));
    93. if (chainableOutputs.isEmpty()) {
    94. config.setChainEnd();
    95. }
    96. return transitiveOutEdges;
    97. } else {
    98. return new ArrayList<>();
    99. }
    100. }
    1. |
    2. | --- |
    3. 上述过程实际上就是通过 DFS 遍历所有的 `StreamNode`, 并按照 chainable 的条件不停地将可以串联的呃 operator 放在同一个的 operator chain 中。每一个 `StreamNode` 的配置信息都会被序列化到对应的 `StreamConfig` 中。只有 operator chain 的头部节点会生成对应的 `JobVertex` ,一个 operator chain 的所有内部节点都会以序列化的形式写入头部节点的 `CHAINED_TASK_CONFIG` 配置项中。<br />每一个 operator chain 都会为所有的实际输出边创建对应的 `JobEdge`,并和 `JobVertex` 连接:
    4. |

    private void connect(Integer headOfChain, StreamEdge edge) { physicalEdgesInOrder.add(edge); Integer downStreamvertexID = edge.getTargetId(); //上下游节点 JobVertex headVertex = jobVertices.get(headOfChain); JobVertex downStreamVertex = jobVertices.get(downStreamvertexID); StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration()); //下游节点增加一个输入 downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1); StreamPartitioner<?> partitioner = edge.getPartitioner(); JobEdge jobEdge; //创建 JobEdge 和 IntermediateDataSet //根据StreamPartitioner类型决定在上游节点(生产者)的子任务和下游节点(消费者)之间的连接模式 if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED); } else { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED_BOUNDED); } // set strategy name so that web interface can show it. jobEdge.setShipStrategyName(partitioner.toString()); if (LOG.isDebugEnabled()) { LOG.debug(“CONNECTED: {} - {} -> {}”, partitioner.getClass().getSimpleName(), headOfChain, downStreamvertexID); } } ``` | | —- |

    小结

    本文分析了从 StreamGraphJobGraph 之间的转换过程。 JobGraph 的关键在于将多个 StreamNode 优化为一个 JobVertex, 对应的 StreamEdge 则转化为 JobEdge, 并且 JobVertexJobEdge 之间通过 IntermediateDataSet 形成一个生产者和消费者的连接关系。
    -EOF-