JobGraph生成
JobGraph生成功能的主要类:org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator
形成算子链的条件
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);}private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)&& (edge.getPartitioner() instanceof ForwardPartitioner)&& edge.getExchangeMode() != StreamExchangeMode.BATCH&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()&& streamGraph.isChainingEnabled())) {return false;}// check that we do not have a union operation, because unions currently only work// through the network/byte-channel stack.// we check that by testing that each "type" (which means input position) is used only oncefor (StreamEdge inEdge : downStreamVertex.getInEdges()) {if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {return false;}}return true;}
