StreamGraph 生成过程
StreamGraph
在 Flink 的作业提交前生成, 生成 StreamGraph
的入口在 StreamExecutionEnvironment
中
@Internal
public StreamGraph getStreamGraph() {
return getStreamGraphGenerator().generate();
}
/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
*
* @param jobName Desired name of the job
* @return The streamgraph representing the transformations
*/
@Internal
public StreamGraph getStreamGraph(String jobName) {
return getStreamGraphGenerator().setJobName(jobName).generate();
}
private StreamGraphGenerator getStreamGraphGenerator() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
return new StreamGraphGenerator(transformations, config, checkpointCfg)
.setStateBackend(defaultStateBackend)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout);
}
StreamGraph
实际上是 StreamGraphGenerator
中生成的, 从 SinkTransformation
向上直到 SourceTransformation
. 在遍历的过程中一边遍历一边构建 StreamGraph
public StreamGraph generate() {
streamGraph = new StreamGraph(executionConfig, checkpointConfig);
streamGraph.setStateBackend(stateBackend);
streamGraph.setChaining(chaining);
streamGraph.setScheduleMode(scheduleMode);
streamGraph.setUserArtifacts(userArtifacts);
streamGraph.setTimeCharacteristic(timeCharacteristic);
streamGraph.setJobName(jobName);
streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);
alreadyTransformed = new HashMap<>();
// 注意 StreamGraph的生成是从sink开始的
for (Transformation<?> transformation: transformations) {
transform(transformation);
}
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
在遍历 Transformation
的过程中, 会对不同类型的 Transformation
分别进行转换
- 对于物理
Transformation
则转换为StreamNode
- 对于虚拟
Transformation
则作为虚拟StreamNode
针对于具体一种类型的Collection<Integer> transformedIds; if (transform instanceof OneInputTransformation<?, ?>) { transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform); } else if (transform instanceof TwoInputTransformation<?, ?, ?>) { transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform); } else if (transform instanceof SourceTransformation<?>) { transformedIds = transformSource((SourceTransformation<?>) transform); } else if (transform instanceof SinkTransformation<?>) { transformedIds = transformSink((SinkTransformation<?>) transform); } else if (transform instanceof UnionTransformation<?>) { transformedIds = transformUnion((UnionTransformation<?>) transform); } else if (transform instanceof SplitTransformation<?>) { transformedIds = transformSplit((SplitTransformation<?>) transform); } else if (transform instanceof SelectTransformation<?>) { transformedIds = transformSelect((SelectTransformation<?>) transform); } else if (transform instanceof FeedbackTransformation<?>) { transformedIds = transformFeedback((FeedbackTransformation<?>) transform); } else if (transform instanceof CoFeedbackTransformation<?>) { transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform); } else if (transform instanceof PartitionTransformation<?>) { transformedIds = transformPartition((PartitionTransformation<?>) transform); } else if (transform instanceof SideOutputTransformation<?>) { transformedIds = transformSideOutput((SideOutputTransformation<?>) transform); } else { throw new IllegalStateException("Unknown transformation: " + transform); }
Transformation
会调用其相应的transformationXX()
函数进行转换transformationXX()
首先转换上游Transformation
进行递归转换,确保上游的都已经完成了转换
然后通过addOperator()
方法构造出StreamNode
通过addEdge
方法与上游的Transform
进行连接,构造出StreamEdge
在构建 StreamNode
的过程中,运行时所需的关键信息 也会确定下来,封装到 StreamNode
中
- 执行算子的容器类 (
StreamTask
及其子类) - 实例化算子的工厂 (
StreamOperatorFactory
)
在添加
StreamEdge
过程中, 如果ShuffleMode
为null,则使用ShuffleMode.PIPELINED
模式
单输入物理 Transformation 的转换
- 存储这个
OneInputTransformation
的上游Transformation
的 id, 方便构造边,在这里递归, 确保所有的上游Transformation
都已经转化 - 确保共享的
Slot
组 - 添加算子到
StreamGraph
中 - 设置
StateKeySelector
- 设执并行度最大并行度
构造
StreamEdge
边,关联上下游StreamNode
```java /**- Transforms a {@code OneInputTransformation}. *
This recursively transforms the inputs, creates a new {@code StreamNode} in the graph and
wired the inputs to this new node. */ private
Collection transformOneInputTransform(OneInputTransformation transform) { // 存储这个 OneInputTransformation 的上游 Transformation 的 id, 方便构造边,在这里递归, 确保所有的上游 Transformation 都已经转化 Collection
inputIds = transform(transform.getInput()); // 防止重复转换,已经转换过了则直接返回转换的结果 // the recursive call might have already transformed this if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
// 确定共享 Slot 组 String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
// 添加 StreamNode 到 StreamGraph中
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getCoLocationGroupKey(),
transform.getOperatorFactory(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
// 设置 StateKeySelector
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
// 确定并行度
int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
transform.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(transform.getId(), parallelism);
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
// 添加 StreamEdge 建立 StreamNode 之间的关联关系 构造 StreamEdge 边,关联上下游 StreamNode
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
<a name="38cPV"></a>
#### 虚拟 Transformation 的转换
虚拟的 `Transformation` 生成的时候不会转换为 StreamNode,而是添加虚拟节点
```java
/**
* Transforms a {@code PartitionTransformation}.
*
* <p>For this we create a virtual node in the {@code StreamGraph} that holds the partition
* property. @see StreamGraphGenerator
*/
private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
Transformation<T> input = partition.getInput();
List<Integer> resultIds = new ArrayList<>();
// 递归对该 Transformation 的直接上游 Transformation 进行转换 获得直接上游集合
Collection<Integer> transformedIds = transform(input);
for (Integer transformedId: transformedIds) {
int virtualId = Transformation.getNewNodeId();
// 添加一个虚拟分区节点 , 不会生成 StreamNode
streamGraph.addVirtualPartitionNode(
transformedId, virtualId, partition.getPartitioner(), partition.getShuffleMode());
resultIds.add(virtualId);
}
return resultIds;
}
- 对
PartitionTransformation
的转换没有生成具体的StreamNode
和StreamEdge
而是通过streamGraph.addVirtualPartitionNode()
方法添加了一个虚拟节点, 当数据分区的下游
Transformation
添加StreamEdge
时(调用 streaGraph.addEdge()) 会把Partitioner
分区器封装进StreamEdge
中public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) { addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, null, new ArrayList<String>(), null); }
private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames, OutputTag outputTag, ShuffleMode shuffleMode) { // 当上游是 sideOutPut 时,递归调用 并传入 sideOutPut if (virtualSideOutputNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0; if (outputTag == null) { outputTag = virtualSideOutputNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode); // 当上游是 select 时,递归调用 并传入 select } else if (virtualSelectNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualSelectNodes.get(virtualId).f0; if (outputNames.isEmpty()) { // selections that happen downstream override earlier selections outputNames = virtualSelectNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode); // 当上游是 partitioner 时,递归调用 并传入 partitioner } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner = virtualPartitionNodes.get(virtualId).f1; } shuffleMode = virtualPartitionNodes.get(virtualId).f2; addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode); // 不是以上逻辑转换 真正构建 StreamEdge } else { StreamNode upstreamNode = getStreamNode(upStreamVertexID); StreamNode downstreamNode = getStreamNode(downStreamVertexID); // If no partitioner was specified and the parallelism of upstream and downstream // operator matches use forward partitioning, use rebalance otherwise. if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = new ForwardPartitioner<Object>(); } else if (partitioner == null) { partitioner = new RebalancePartitioner<Object>(); } if (partitioner instanceof ForwardPartitioner) { if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) { throw new UnsupportedOperationException("Forward partitioning does not allow " + "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() + ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global."); } } if (shuffleMode == null) { shuffleMode = ShuffleMode.UNDEFINED; } // 创建 StreamEdge 并将该 StreamEdge 添加到上游的输出 下游的输入 StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode); getStreamNode(edge.getSourceId()).addOutEdge(edge); getStreamNode(edge.getTargetId()).addInEdge(edge); } }