StreamGraph 生成过程
StreamGraph 在 Flink 的作业提交前生成, 生成 StreamGraph 的入口在 StreamExecutionEnvironment 中
@Internalpublic StreamGraph getStreamGraph() {return getStreamGraphGenerator().generate();}/*** Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.** @param jobName Desired name of the job* @return The streamgraph representing the transformations*/@Internalpublic StreamGraph getStreamGraph(String jobName) {return getStreamGraphGenerator().setJobName(jobName).generate();}private StreamGraphGenerator getStreamGraphGenerator() {if (transformations.size() <= 0) {throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");}return new StreamGraphGenerator(transformations, config, checkpointCfg).setStateBackend(defaultStateBackend).setChaining(isChainingEnabled).setUserArtifacts(cacheFile).setTimeCharacteristic(timeCharacteristic).setDefaultBufferTimeout(bufferTimeout);}
StreamGraph 实际上是 StreamGraphGenerator 中生成的, 从 SinkTransformation 向上直到 SourceTransformation . 在遍历的过程中一边遍历一边构建 StreamGraph
public StreamGraph generate() {
streamGraph = new StreamGraph(executionConfig, checkpointConfig);
streamGraph.setStateBackend(stateBackend);
streamGraph.setChaining(chaining);
streamGraph.setScheduleMode(scheduleMode);
streamGraph.setUserArtifacts(userArtifacts);
streamGraph.setTimeCharacteristic(timeCharacteristic);
streamGraph.setJobName(jobName);
streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);
alreadyTransformed = new HashMap<>();
// 注意 StreamGraph的生成是从sink开始的
for (Transformation<?> transformation: transformations) {
transform(transformation);
}
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
在遍历 Transformation 的过程中, 会对不同类型的 Transformation 分别进行转换
- 对于物理
Transformation则转换为StreamNode - 对于虚拟
Transformation则作为虚拟StreamNode
针对于具体一种类型的Collection<Integer> transformedIds; if (transform instanceof OneInputTransformation<?, ?>) { transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform); } else if (transform instanceof TwoInputTransformation<?, ?, ?>) { transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform); } else if (transform instanceof SourceTransformation<?>) { transformedIds = transformSource((SourceTransformation<?>) transform); } else if (transform instanceof SinkTransformation<?>) { transformedIds = transformSink((SinkTransformation<?>) transform); } else if (transform instanceof UnionTransformation<?>) { transformedIds = transformUnion((UnionTransformation<?>) transform); } else if (transform instanceof SplitTransformation<?>) { transformedIds = transformSplit((SplitTransformation<?>) transform); } else if (transform instanceof SelectTransformation<?>) { transformedIds = transformSelect((SelectTransformation<?>) transform); } else if (transform instanceof FeedbackTransformation<?>) { transformedIds = transformFeedback((FeedbackTransformation<?>) transform); } else if (transform instanceof CoFeedbackTransformation<?>) { transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform); } else if (transform instanceof PartitionTransformation<?>) { transformedIds = transformPartition((PartitionTransformation<?>) transform); } else if (transform instanceof SideOutputTransformation<?>) { transformedIds = transformSideOutput((SideOutputTransformation<?>) transform); } else { throw new IllegalStateException("Unknown transformation: " + transform); }Transformation会调用其相应的transformationXX()函数进行转换transformationXX()首先转换上游Transformation进行递归转换,确保上游的都已经完成了转换
然后通过addOperator()方法构造出StreamNode通过addEdge方法与上游的Transform进行连接,构造出StreamEdge
在构建 StreamNode 的过程中,运行时所需的关键信息 也会确定下来,封装到 StreamNode中
- 执行算子的容器类 (
StreamTask及其子类) - 实例化算子的工厂 (
StreamOperatorFactory)
在添加
StreamEdge过程中, 如果ShuffleMode为null,则使用ShuffleMode.PIPELINED模式
单输入物理 Transformation 的转换
- 存储这个
OneInputTransformation的上游Transformation的 id, 方便构造边,在这里递归, 确保所有的上游Transformation都已经转化 - 确保共享的
Slot组 - 添加算子到
StreamGraph中 - 设置
StateKeySelector - 设执并行度最大并行度
构造
StreamEdge边,关联上下游StreamNode```java /**- Transforms a {@code OneInputTransformation}. *
This recursively transforms the inputs, creates a new {@code StreamNode} in the graph and
wired the inputs to this new node. */ private
Collection transformOneInputTransform(OneInputTransformation transform) { // 存储这个 OneInputTransformation 的上游 Transformation 的 id, 方便构造边,在这里递归, 确保所有的上游 Transformation 都已经转化 Collection
inputIds = transform(transform.getInput()); // 防止重复转换,已经转换过了则直接返回转换的结果 // the recursive call might have already transformed this if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);}
// 确定共享 Slot 组 String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
// 添加 StreamNode 到 StreamGraph中
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getCoLocationGroupKey(),
transform.getOperatorFactory(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
// 设置 StateKeySelector
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
// 确定并行度
int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
transform.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(transform.getId(), parallelism);
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
// 添加 StreamEdge 建立 StreamNode 之间的关联关系 构造 StreamEdge 边,关联上下游 StreamNode
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
<a name="38cPV"></a>
#### 虚拟 Transformation 的转换
虚拟的 `Transformation` 生成的时候不会转换为 StreamNode,而是添加虚拟节点
```java
/**
* Transforms a {@code PartitionTransformation}.
*
* <p>For this we create a virtual node in the {@code StreamGraph} that holds the partition
* property. @see StreamGraphGenerator
*/
private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
Transformation<T> input = partition.getInput();
List<Integer> resultIds = new ArrayList<>();
// 递归对该 Transformation 的直接上游 Transformation 进行转换 获得直接上游集合
Collection<Integer> transformedIds = transform(input);
for (Integer transformedId: transformedIds) {
int virtualId = Transformation.getNewNodeId();
// 添加一个虚拟分区节点 , 不会生成 StreamNode
streamGraph.addVirtualPartitionNode(
transformedId, virtualId, partition.getPartitioner(), partition.getShuffleMode());
resultIds.add(virtualId);
}
return resultIds;
}
- 对
PartitionTransformation的转换没有生成具体的StreamNode和StreamEdge而是通过streamGraph.addVirtualPartitionNode()方法添加了一个虚拟节点, 当数据分区的下游
Transformation添加StreamEdge时(调用 streaGraph.addEdge()) 会把Partitioner分区器封装进StreamEdge中public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) { addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, null, new ArrayList<String>(), null); }private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames, OutputTag outputTag, ShuffleMode shuffleMode) { // 当上游是 sideOutPut 时,递归调用 并传入 sideOutPut if (virtualSideOutputNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0; if (outputTag == null) { outputTag = virtualSideOutputNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode); // 当上游是 select 时,递归调用 并传入 select } else if (virtualSelectNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualSelectNodes.get(virtualId).f0; if (outputNames.isEmpty()) { // selections that happen downstream override earlier selections outputNames = virtualSelectNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode); // 当上游是 partitioner 时,递归调用 并传入 partitioner } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner = virtualPartitionNodes.get(virtualId).f1; } shuffleMode = virtualPartitionNodes.get(virtualId).f2; addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode); // 不是以上逻辑转换 真正构建 StreamEdge } else { StreamNode upstreamNode = getStreamNode(upStreamVertexID); StreamNode downstreamNode = getStreamNode(downStreamVertexID); // If no partitioner was specified and the parallelism of upstream and downstream // operator matches use forward partitioning, use rebalance otherwise. if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = new ForwardPartitioner<Object>(); } else if (partitioner == null) { partitioner = new RebalancePartitioner<Object>(); } if (partitioner instanceof ForwardPartitioner) { if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) { throw new UnsupportedOperationException("Forward partitioning does not allow " + "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() + ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global."); } } if (shuffleMode == null) { shuffleMode = ShuffleMode.UNDEFINED; } // 创建 StreamEdge 并将该 StreamEdge 添加到上游的输出 下游的输入 StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode); getStreamNode(edge.getSourceId()).addOutEdge(edge); getStreamNode(edge.getTargetId()).addInEdge(edge); } }
