StreamGraph 生成过程

StreamGraph 在 Flink 的作业提交前生成, 生成 StreamGraph 的入口在 StreamExecutionEnvironment

  1. @Internal
  2. public StreamGraph getStreamGraph() {
  3. return getStreamGraphGenerator().generate();
  4. }
  5. /**
  6. * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
  7. *
  8. * @param jobName Desired name of the job
  9. * @return The streamgraph representing the transformations
  10. */
  11. @Internal
  12. public StreamGraph getStreamGraph(String jobName) {
  13. return getStreamGraphGenerator().setJobName(jobName).generate();
  14. }
  15. private StreamGraphGenerator getStreamGraphGenerator() {
  16. if (transformations.size() <= 0) {
  17. throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
  18. }
  19. return new StreamGraphGenerator(transformations, config, checkpointCfg)
  20. .setStateBackend(defaultStateBackend)
  21. .setChaining(isChainingEnabled)
  22. .setUserArtifacts(cacheFile)
  23. .setTimeCharacteristic(timeCharacteristic)
  24. .setDefaultBufferTimeout(bufferTimeout);
  25. }

StreamGraph 实际上是 StreamGraphGenerator 中生成的, 从 SinkTransformation 向上直到 SourceTransformation . 在遍历的过程中一边遍历一边构建 StreamGraph

    public StreamGraph generate() {
        streamGraph = new StreamGraph(executionConfig, checkpointConfig);
        streamGraph.setStateBackend(stateBackend);
        streamGraph.setChaining(chaining);
        streamGraph.setScheduleMode(scheduleMode);
        streamGraph.setUserArtifacts(userArtifacts);
        streamGraph.setTimeCharacteristic(timeCharacteristic);
        streamGraph.setJobName(jobName);
        streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);

        alreadyTransformed = new HashMap<>();

        // 注意 StreamGraph的生成是从sink开始的
        for (Transformation<?> transformation: transformations) {
            transform(transformation);
        }

        final StreamGraph builtStreamGraph = streamGraph;

        alreadyTransformed.clear();
        alreadyTransformed = null;
        streamGraph = null;

        return builtStreamGraph;
    }

在遍历 Transformation 的过程中, 会对不同类型的 Transformation 分别进行转换

  • 对于物理 Transformation 则转换为 StreamNode
  • 对于虚拟 Transformation 则作为虚拟 StreamNode
    Collection<Integer> transformedIds;
          if (transform instanceof OneInputTransformation<?, ?>) {
              transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
          } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
              transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
          } else if (transform instanceof SourceTransformation<?>) {
              transformedIds = transformSource((SourceTransformation<?>) transform);
          } else if (transform instanceof SinkTransformation<?>) {
              transformedIds = transformSink((SinkTransformation<?>) transform);
          } else if (transform instanceof UnionTransformation<?>) {
              transformedIds = transformUnion((UnionTransformation<?>) transform);
          } else if (transform instanceof SplitTransformation<?>) {
              transformedIds = transformSplit((SplitTransformation<?>) transform);
          } else if (transform instanceof SelectTransformation<?>) {
              transformedIds = transformSelect((SelectTransformation<?>) transform);
          } else if (transform instanceof FeedbackTransformation<?>) {
              transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
          } else if (transform instanceof CoFeedbackTransformation<?>) {
              transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
          } else if (transform instanceof PartitionTransformation<?>) {
              transformedIds = transformPartition((PartitionTransformation<?>) transform);
          } else if (transform instanceof SideOutputTransformation<?>) {
              transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
          } else {
              throw new IllegalStateException("Unknown transformation: " + transform);
          }
    
    针对于具体一种类型的 Transformation 会调用其相应的 transformationXX() 函数进行转换
    transformationXX() 首先转换上游 Transformation 进行递归转换,确保上游的都已经完成了转换
    然后通过 addOperator() 方法构造出 StreamNode 通过 addEdge 方法与上游的 Transform 进行连接,构造出 StreamEdge

在构建 StreamNode 的过程中,运行时所需的关键信息 也会确定下来,封装到 StreamNode

  • 执行算子的容器类 (StreamTask 及其子类)
  • 实例化算子的工厂 (StreamOperatorFactory)

在添加 StreamEdge 过程中, 如果 ShuffleMode 为null,则使用 ShuffleMode.PIPELINED 模式


单输入物理 Transformation 的转换

  1. 存储这个 OneInputTransformation 的上游 Transformation 的 id, 方便构造边,在这里递归, 确保所有的上游 Transformation 都已经转化
  2. 确保共享的 Slot
  3. 添加算子到 StreamGraph
  4. 设置 StateKeySelector
  5. 设执并行度最大并行度
  6. 构造 StreamEdge 边,关联上下游 StreamNode ```java /**

    • Transforms a {@code OneInputTransformation}. *
    • This recursively transforms the inputs, creates a new {@code StreamNode} in the graph and

    • wired the inputs to this new node. */ private Collection transformOneInputTransform(OneInputTransformation transform) {

      // 存储这个 OneInputTransformation 的上游 Transformation 的 id, 方便构造边,在这里递归, 确保所有的上游 Transformation 都已经转化 Collection inputIds = transform(transform.getInput());

      // 防止重复转换,已经转换过了则直接返回转换的结果 // the recursive call might have already transformed this if (alreadyTransformed.containsKey(transform)) {

       return alreadyTransformed.get(transform);
      

      }

      // 确定共享 Slot 组 String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

    // 添加 StreamNode 到 StreamGraph中
    streamGraph.addOperator(transform.getId(),
            slotSharingGroup,
            transform.getCoLocationGroupKey(),
            transform.getOperatorFactory(),
            transform.getInputType(),
            transform.getOutputType(),
            transform.getName());


    // 设置 StateKeySelector
    if (transform.getStateKeySelector() != null) {
        TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
        streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
    }


    // 确定并行度
    int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
        transform.getParallelism() : executionConfig.getParallelism();
    streamGraph.setParallelism(transform.getId(), parallelism);
    streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

    // 添加 StreamEdge 建立 StreamNode 之间的关联关系  构造 StreamEdge 边,关联上下游 StreamNode
    for (Integer inputId: inputIds) {
        streamGraph.addEdge(inputId, transform.getId(), 0);
    }

    return Collections.singleton(transform.getId());
}
<a name="38cPV"></a>
#### 虚拟 Transformation 的转换
虚拟的 `Transformation` 生成的时候不会转换为 StreamNode,而是添加虚拟节点
```java

    /**
     * Transforms a {@code PartitionTransformation}.
     *
     * <p>For this we create a virtual node in the {@code StreamGraph} that holds the partition
     * property. @see StreamGraphGenerator
     */
    private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
        Transformation<T> input = partition.getInput();
        List<Integer> resultIds = new ArrayList<>();

        // 递归对该 Transformation 的直接上游 Transformation 进行转换  获得直接上游集合
        Collection<Integer> transformedIds = transform(input);
        for (Integer transformedId: transformedIds) {
            int virtualId = Transformation.getNewNodeId();

            // 添加一个虚拟分区节点 , 不会生成 StreamNode
            streamGraph.addVirtualPartitionNode(
                    transformedId, virtualId, partition.getPartitioner(), partition.getShuffleMode());
            resultIds.add(virtualId);
        }

        return resultIds;
    }
  • PartitionTransformation 的转换没有生成具体的 StreamNodeStreamEdge 而是通过 streamGraph.addVirtualPartitionNode() 方法添加了一个虚拟节点,
  • 当数据分区的下游 Transformation 添加 StreamEdge 时(调用 streaGraph.addEdge()) 会把 Partitioner 分区器封装进 StreamEdge

      public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
          addEdgeInternal(upStreamVertexID,
                  downStreamVertexID,
                  typeNumber,
                  null,
                  new ArrayList<String>(),
                  null);
      }
    
      private void addEdgeInternal(Integer upStreamVertexID,
              Integer downStreamVertexID,
              int typeNumber,
              StreamPartitioner<?> partitioner,
              List<String> outputNames,
              OutputTag outputTag,
              ShuffleMode shuffleMode) {
    
          // 当上游是 sideOutPut 时,递归调用 并传入 sideOutPut
          if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
              int virtualId = upStreamVertexID;
              upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
              if (outputTag == null) {
                  outputTag = virtualSideOutputNodes.get(virtualId).f1;
              }
              addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
    
          // 当上游是 select 时,递归调用 并传入 select
          } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
              int virtualId = upStreamVertexID;
              upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
              if (outputNames.isEmpty()) {
                  // selections that happen downstream override earlier selections
                  outputNames = virtualSelectNodes.get(virtualId).f1;
              }
              addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
    
          // 当上游是 partitioner 时,递归调用 并传入 partitioner    
          } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
              int virtualId = upStreamVertexID;
              upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
              if (partitioner == null) {
                  partitioner = virtualPartitionNodes.get(virtualId).f1;
              }
              shuffleMode = virtualPartitionNodes.get(virtualId).f2;
              addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
    
          // 不是以上逻辑转换 真正构建 StreamEdge    
          } else {
              StreamNode upstreamNode = getStreamNode(upStreamVertexID);
              StreamNode downstreamNode = getStreamNode(downStreamVertexID);
    
              // If no partitioner was specified and the parallelism of upstream and downstream
              // operator matches use forward partitioning, use rebalance otherwise.
              if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                  partitioner = new ForwardPartitioner<Object>();
              } else if (partitioner == null) {
                  partitioner = new RebalancePartitioner<Object>();
              }
    
              if (partitioner instanceof ForwardPartitioner) {
                  if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                      throw new UnsupportedOperationException("Forward partitioning does not allow " +
                              "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                              ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                              " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                  }
              }
    
              if (shuffleMode == null) {
                  shuffleMode = ShuffleMode.UNDEFINED;
              }
    
              // 创建 StreamEdge 并将该 StreamEdge 添加到上游的输出 下游的输入
              StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
    
              getStreamNode(edge.getSourceId()).addOutEdge(edge);
              getStreamNode(edge.getTargetId()).addInEdge(edge);
          }
      }