JobGraph生成

JobGraph生成功能的主要类:org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator

形成算子链的条件

  1. public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
  2. StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
  3. return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
  4. }
  5. private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
  6. StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
  7. StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
  8. if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
  9. && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
  10. && (edge.getPartitioner() instanceof ForwardPartitioner)
  11. && edge.getExchangeMode() != StreamExchangeMode.BATCH
  12. && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
  13. && streamGraph.isChainingEnabled())) {
  14. return false;
  15. }
  16. // check that we do not have a union operation, because unions currently only work
  17. // through the network/byte-channel stack.
  18. // we check that by testing that each "type" (which means input position) is used only once
  19. for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
  20. if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
  21. return false;
  22. }
  23. }
  24. return true;
  25. }
  • target算子只有一个进入的边
  • source算子和target算子在相同的SlotSharingGroup(槽共享组, 默认值是”default”)中
  • 两节点的ChainingStrategy是允许相连的
  • 两个算子间的物理分区逻辑是 ForwardPartitioner
  • 两个算子间的 shuffle 方式不是批处理模式
  • 上下游算子实例的并行度相同
  • 没有禁用算子链(参数pipeline.operator-chaining为true表示使用算子链,默认值是true)

    生成算子ID

    生成的类:org.apache.flink.streaming.api.graph.StreamGraphHasherV2