在编写 Flink 的程序的时候,核心的要点是构造出数据处理的拓扑结构,即任务执行逻辑的 DAG。我们先来看一下 Flink 任务的拓扑在逻辑上是怎么保存的。

StreamExecutionEnvironment

StreamExecutionEnvironment 是 Flink 在流模式下任务执行的上下文,也是我们编写 Flink 程序的入口。根据具体的执行环境不同,StreamExecutionEnvironment 有不同的具体实现类,如 LocalStreamEnvironment, RemoteStreamEnvironment 等。StreamExecutionEnvironment 也提供了用来配置默认并行度、Checkpointing 等机制的方法,这些配置主要都保存在 ExecutionConfigCheckpointConfig 中。我们现在先只关注拓扑结构的产生。
通常一个 Flink 任务是按照下面的流程来编写处理逻辑的:

  1. senv.addSource(XXX)
  2. .map(XXX)
  3. .filter(XXX)
  4. .addSink(XXX)

添加数据源后获得 DataStream, 之后通过不同的算子不停地在 DataStream 上实现转换过滤等逻辑,最终将结果输出到 DataSink 中。
在 StreamExecutionEnvironment 内部使用一个 List<StreamTransformation<?>> transformations 来保留生成 DataStream 的所有转换。

StreamTransformation

StreamTransformation 代表了生成 DataStream 的操作,每一个 DataStream 的底层都有对应的一个 StreamTransformation。在 DataStream 上面通过 map 等算子不断进行转换,就得到了由 StreamTransformation 构成的图。当需要执行的时候,底层的这个图就会被转换成 StreamGraph
StreamTransformation 在运行时并不一定对应着一个物理转换操作,有一些操作只是逻辑层面上的,比如 split/select/partitioning 等。
每一个 StreamTransformation 都有一个关联的 Id,这个 Id 是全局递增的。除此以外,还有 uid, slotSharingGroup, parallelism 等信息。
StreamTransformation 有很多具体的子类,如SourceTransformationOneInputStreamTransformationTwoInputTransformationSideOutputTransformationSinkTransformation 等等,这些分别对应了DataStream 上的不同转换操作。
由于 StreamTransformation 中通常保留了其前向的 StreamTransformation,即其输入,因此可以据此还原出 DAG 的拓扑结构。

| ``` // OneInputTransformation public OneInputTransformation( StreamTransformation input, String name, OneInputStreamOperator operator, TypeInformation outputType, int parallelism) { super(name, outputType, parallelism); this.input = input; this.operator = operator; } // TwoInputTransformation public TwoInputTransformation( StreamTransformation input1, StreamTransformation input2, String name, TwoInputStreamOperator operator, TypeInformation outputType, int parallelism) { super(name, outputType, parallelism); this.input1 = input1; this.input2 = input2; this.operator = operator; }

  1. |
  2. | --- |
  3. <a name="y1SJ5"></a>
  4. ## DataStream
  5. 一个 `DataStream` 就表征了由同一种类型元素构成的数据流。通过对 `DataStream` 应用 map/filter 等操作,可以将一个 `DataStream` 转换为另一个 `DataStream`,这个转换的过程就是根据不同的操作生成不同的 `StreamTransformation`,并将其加入 `StreamExecutionEnvironment` `transformations` 列表中。<br />例如:
  6. |

//构造 StreamTransformation OneInputTransformation resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operator, outTypeInfo, environment.getParallelism()); @SuppressWarnings({ “unchecked”, “rawtypes” }) SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator(environment, resultTransform); //加入到 StreamExecutionEnvironment 的列表中 getExecutionEnvironment().addOperator(resultTransform);

  1. |
  2. | --- |
  3. `DataStream` 的子类包括 `SingleOutputStreamOperator` `DataStreamSource` `KeyedStream` `IterativeStream`, `SplitStream`(已弃用)。这里要吐槽一下 `SingleOutputStreamOperator` 的这个类的命名,太容易和 `StreamOperator` 混淆了。`StreamOperator` 的介绍见下一小节。<br />除了 `DataStream` 及其子类以外,其它的表征数据流的类还有 `ConnectedStreams` (两个流连接在一起)、 `WindowedStream``AllWindowedStream` 。这些数据流之间的转换可以参考 Flink 的[官方文档](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/)。
  4. <a name="streamoperator"></a>
  5. ## StreamOperator
  6. 在操作 `DataStream` 的时候,比如 `DataStream#map` 等,会要求我们提供一个自定义的处理函数。那么这些信息时如何保存在 `StreamTransformation` 中的呢?这里就要引入一个新的接口 `StreamOperator`。<br />`StreamOperator` 定义了对一个具体的算子的生命周期的管理,包括:
  7. |

//生命周期 void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output> output); void open() throws Exception; void close() throws Exception; @Override void dispose() throws Exception; //状态管理 OperatorSnapshotFutures snapshotState( long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation) throws Exception; void initializeState() throws Exception; //其它方法暂时省略

  1. |
  2. | --- |
  3. `StreamOperator` 的两个子接口 `OneInputStreamOperator` `TwoInputStreamOperator` 则提供了操作数据流中具体元素的方法,而 `AbstractUdfStreamOperator` 这个抽象子类则提供了自定义处理函数对应的算子的基本实现:
  4. |

//OneInputStreamOperator void processElement(StreamRecord element) throws Exception; void processWatermark(Watermark mark) throws Exception; void processLatencyMarker(LatencyMarker latencyMarker) throws Exception; //TwoInputStreamOperator void processElement1(StreamRecord element) throws Exception; void processElement2(StreamRecord element) throws Exception; //AbstractUdfStreamOperator 接受一个用户自定义的处理函数 public AbstractUdfStreamOperator(F userFunction) { this.userFunction = requireNonNull(userFunction); checkUdfCheckpointingPreconditions(); }

  1. |
  2. | --- |
  3. 至于具体到诸如 map/fliter 等操作对应的 StreamOperator,基本都是在 `AbstractUdfStreamOperator` 的基础上实现的。以 `StreamMap` 为例:
  4. |

public class StreamMap extends AbstractUdfStreamOperator> implements OneInputStreamOperator { private static final long serialVersionUID = 1L; public StreamMap(MapFunction mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); } }

  1. |
  2. | --- |
  3. 由此,通过 **DataStream –> StreamTransformation –> StreamOperator** 这样的依赖关系,就可以完成 DataStream 的转换,并且保留数据流和应用在流上的算子之间的关系。
  4. <a name="streamgraph"></a>
  5. ## StreamGraph
  6. `StreamGraphGenerator` 会基于 `StreamExecutionEnvironment` `transformations` 列表来生成 `StreamGraph`。<br />在遍历 `List<StreamTransformation>` 生成 `StreamGraph` 的时候,会递归调用`StreamGraphGenerator#transform`方法。对于每一个 `StreamTransformation`, 确保当前其上游已经完成转换。`StreamTransformations` 被转换为 `StreamGraph` 中的节点 `StreamNode`,并为上下游节点添加边 `StreamEdge`
  7. |

Collection transformedIds; if (transform instanceof OneInputTransformation<?, ?>) { transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform); } else if (transform instanceof TwoInputTransformation<?, ?, ?>) { transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform); } else if (transform instanceof SourceTransformation<?>) { transformedIds = transformSource((SourceTransformation<?>) transform); } else if (transform instanceof SinkTransformation<?>) { transformedIds = transformSink((SinkTransformation<?>) transform); } else if (transform instanceof UnionTransformation<?>) { transformedIds = transformUnion((UnionTransformation<?>) transform); } else if (transform instanceof SplitTransformation<?>) { transformedIds = transformSplit((SplitTransformation<?>) transform); } else if (transform instanceof SelectTransformation<?>) { transformedIds = transformSelect((SelectTransformation<?>) transform); } else if (transform instanceof FeedbackTransformation<?>) { transformedIds = transformFeedback((FeedbackTransformation<?>) transform); } else if (transform instanceof CoFeedbackTransformation<?>) { transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform); } else if (transform instanceof PartitionTransformation<?>) { transformedIds = transformPartition((PartitionTransformation<?>) transform); } else if (transform instanceof SideOutputTransformation<?>) { transformedIds = transformSideOutput((SideOutputTransformation<?>) transform); } else { throw new IllegalStateException(“Unknown transformation: “ + transform); }

  1. |
  2. | --- |
  3. 对于不同类型的 `StreamTransformation`,分别调用对应的转换方法,以 最典型的 `transformOneInputTransform` 为例:
  4. |

private Collection transformOneInputTransform(OneInputTransformation transform) { //首先确保上游节点完成转换 Collection inputIds = transform(transform.getInput()); // the recursive call might have already transformed this // 由于是递归调用的,可能已经完成了转换 if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } //确定资源共享组,用户如果没有指定,默认是default String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds); //向 StreamGraph 中添加 Operator, 这一步会生成对应的 StreamNode streamGraph.addOperator(transform.getId(), slotSharingGroup, transform.getCoLocationGroupKey(), transform.getOperator(), transform.getInputType(), transform.getOutputType(), transform.getName()); if (transform.getStateKeySelector() != null) { TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig()); streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer); } streamGraph.setParallelism(transform.getId(), transform.getParallelism()); streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism()); //依次连接到上游节点,创建 StreamEdge for (Integer inputId: inputIds) { streamGraph.addEdge(inputId, transform.getId(), 0); } return Collections.singleton(transform.getId()); }

  1. |
  2. | --- |
  3. 接着看一看 `StreamGraph` 中对应的添加节点和边的方法:
  4. |

protected StreamNode addNode(Integer vertexID, String slotSharingGroup, @Nullable String coLocationGroup, Class<? extends AbstractInvokable> vertexClass, StreamOperator<?> operatorObject, String operatorName) { if (streamNodes.containsKey(vertexID)) { throw new RuntimeException(“Duplicate vertexID “ + vertexID); } StreamNode vertex = new StreamNode(environment, vertexID, slotSharingGroup, coLocationGroup, operatorObject, operatorName, new ArrayList>(), vertexClass); //创建 StreamNode,这里保存了 StreamOperator 和 vertexClass 信息 streamNodes.put(vertexID, vertex); return vertex; }

  1. |
  2. | --- |
  3. `StreamNode` 中,保存了对应的 `StreamOperator` (从 `StreamTransformation` 得到),并且还引入了变量 jobVertexClass 来表示该节点在 TaskManager 中运行时的实际任务类型。
  4. |

private final Class<? extends AbstractInvokable> jobVertexClass;

  1. |
  2. | --- |
  3. `AbstractInvokable` 是所有可以在 TaskManager 中运行的任务的抽象基础类,包括流式任务和批任务。`StreamTask` 是所有流式任务的基础类,其具体的子类包括 `SourceStreamTask`, `OneInputStreamTask`, `TwoInputStreamTask` 等。<br />对于一些不包含物理转换操作的 `StreamTransformation`,如 Partitioning, split/select, union,并不会生成 StreamNode,而是生成一个带有特定属性的虚拟节点。当添加一条有虚拟节点指向下游节点的边时,会找到虚拟节点上游的物理节点,在两个物理节点之间添加边,并把虚拟转换操作的属性附着上去。<br />以 `PartitionTansformation` 为例, `PartitionTansformation` `KeyedStream` 对应的转换:
  4. |

//StreamGraphGenerator#transformPartition private Collection transformPartition(PartitionTransformation partition) { StreamTransformation input = partition.getInput(); List resultIds = new ArrayList<>(); //递归地转换上游节点 Collection transformedIds = transform(input); for (Integer transformedId: transformedIds) { int virtualId = StreamTransformation.getNewNodeId(); //添加虚拟的 Partition 节点 streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner()); resultIds.add(virtualId); } return resultIds; } // StreamGraph#addVirtualPartitionNode public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner<?> partitioner) { if (virtualPartitionNodes.containsKey(virtualId)) { throw new IllegalStateException(“Already has virtual partition node with id “ + virtualId); } //添加一个虚拟节点,后续添加边的时候会连接到实际的物理节点 virtualPartitionNodes.put(virtualId, new Tuple2>(originalId, partitioner)); }

  1. |
  2. | --- |
  3. 前面提到,在每一个物理节点的转换上,会调用 `StreamGraph#addEdge` 在输入节点和当前节点之间建立边的连接:

private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List outputNames, OutputTag outputTag) { //先判断是不是虚拟节点上的边,如果是,则找到虚拟节点上游对应的物理节点 //在两个物理节点之间添加边,并把对应的 StreamPartitioner,或者 OutputTag 等补充信息添加到StreamEdge中 if (virtualSideOutputNodes.containsKey(upStreamVertexID)) { …… } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner = virtualPartitionNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag); } else { //两个物理节点 StreamNode upstreamNode = getStreamNode(upStreamVertexID); StreamNode downstreamNode = getStreamNode(downStreamVertexID); // If no partitioner was specified and the parallelism of upstream and downstream // operator matches use forward partitioning, use rebalance otherwise. if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = new ForwardPartitioner(); } else if (partitioner == null) { partitioner = new RebalancePartitioner(); } if (partitioner instanceof ForwardPartitioner) { if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) { throw new UnsupportedOperationException(“Forward partitioning does not allow “ + “change of parallelism. Upstream operation: “ + upstreamNode + “ parallelism: “ + upstreamNode.getParallelism() + “, downstream operation: “ + downstreamNode + “ parallelism: “ + downstreamNode.getParallelism() + “ You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.”); } } //创建 StreamEdge,保留了 StreamPartitioner 等属性 StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag); //分别将StreamEdge添加到上游节点和下游节点 getStreamNode(edge.getSourceId()).addOutEdge(edge); getStreamNode(edge.getTargetId()).addInEdge(edge); } } ``` 这样通过 StreamNode 和 SteamEdge,就得到了 DAG 中的所有节点和边,以及它们之间的连接关系,拓扑结构也就建立了。

小结

本文简单分析了从 DataStream API 到 StramGraph 的过程。 StreamGraph 是 Flink 任务最接近用户逻辑的 DAG 表示,后面到具体执行的时候还会进行一系列转换,我们在后续的文章中再逐一加以分析。
-EOF-