入口:

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#getStreamGraph(java.lang.String)

StreamGraph介绍

  • streamGraph的图片

image.png

  • image.png
  • streamGraph由哪些部分组成
  • image.png

    • addSource 返回dataStreamSource
    • addSinck 返回dataStreamSink

      源码分析

  • 所有的transformation都保存在 transformations 里,transformations是转换算子的集合,但不包含source,构建任务时进一步转换为DataStream

    示例任务

    ```java DataStream text = env.fromElements(WordCountData.WORDS);

DataStream> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field “0” and sum up tuple field “1” .keyBy(value -> value.f0).sum(1); counts.print();

  1. 此任务source是一个collectionsink则是print输出,中间有2transformation,分别为 flatMap以及keyBy
  2. org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#getStreamGraph(java.lang.String, boolean)
  3. ```java
  4. public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
  5. StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
  6. if (clearTransformations) {
  7. this.transformations.clear();
  8. }
  9. return streamGraph;
  10. }

获取 StreamGraphGenerator ,由 StreamGraphGenerator构建StreamGraph

StreamGraphGenerator 组成

  1. private StreamGraphGenerator getStreamGraphGenerator() {
  2. if (transformations.size() <= 0) {
  3. throw new IllegalStateException(
  4. "No operators defined in streaming topology. Cannot execute.");
  5. }
  6. final RuntimeExecutionMode executionMode = configuration.get(ExecutionOptions.RUNTIME_MODE);
  7. return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration())
  8. .setRuntimeExecutionMode(executionMode)
  9. .setStateBackend(defaultStateBackend)
  10. .setChaining(isChainingEnabled)
  11. .setUserArtifacts(cacheFile)
  12. .setTimeCharacteristic(timeCharacteristic)
  13. .setDefaultBufferTimeout(bufferTimeout);
  14. }

StreamGraph 构建

  1. public StreamGraph generate() {
  2. streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
  3. shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
  4. configureStreamGraph(streamGraph);
  5. alreadyTransformed = new HashMap<>();
  6. for (Transformation<?> transformation: transformations) {
  7. transform(transformation);
  8. }r
  9. final StreamGraph builtStreamGraph = streamGraph;
  10. alreadyTransformed.clear();
  11. alreadyTransformed = null;
  12. streamGraph = null;
  13. return builtStreamGraph;
  14. }

StreamNode生成

  1. for (Transformation<?> transformation: transformations) {
  2. transform(transformation);
  3. }

将transformation转为StreamNode
示例的 transformation 为
image.png
一共是三个transformation,分别为flatmap,keyBy,print对应的transformation,source则是后面会对flatmap转换的时候,递归获取其上游得到source的

  1. private Collection<Integer> transform(Transformation<?> transform) {
  2. transform.getOutputType();
  3. final TransformationTranslator<?, Transformation<?>> translator =
  4. (TransformationTranslator<?, Transformation<?>>) translatorMap.get(transform.getClass());
  5. Collection<Integer> transformedIds;
  6. if (translator != null) {
  7. transformedIds = translate(translator, transform);
  8. } else {
  9. transformedIds = legacyTransform(transform);
  10. }
  11. return transformedIds;
  12. }

根据transform的class在translatorMap中获取对应的TransformationTranslator

  1. static {
  2. @SuppressWarnings("rawtypes")
  3. Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> tmp = new HashMap<>();
  4. tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
  5. tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
  6. tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
  7. tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
  8. tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
  9. tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
  10. tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
  11. tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
  12. tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
  13. tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
  14. tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
  15. tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
  16. tmp.put(TimestampsAndWatermarksTransformation.class, new TimestampsAndWatermarksTransformationTranslator<>());
  17. tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
  18. translatorMap = Collections.unmodifiableMap(tmp);
  19. }
  • 如果获取到 TransformationTranslator

org.apache.flink.streaming.api.graph.StreamGraphGenerator#translate




  • 如果未获取到 TransformationTranslator 则代表是虚拟分区节点?

org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator#translateInternal

  1. protected Collection<Integer> translateInternal(
  2. final Transformation<OUT> transformation,
  3. final StreamOperatorFactory<OUT> operatorFactory,
  4. final TypeInformation<IN> inputType,
  5. @Nullable final KeySelector<IN, ?> stateKeySelector,
  6. @Nullable final TypeInformation<?> stateKeyType,
  7. final Context context) {
  8. checkNotNull(transformation);
  9. checkNotNull(operatorFactory);
  10. checkNotNull(inputType);
  11. checkNotNull(context);
  12. final StreamGraph streamGraph = context.getStreamGraph();
  13. final String slotSharingGroup = context.getSlotSharingGroup();
  14. final int transformationId = transformation.getId();
  15. final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
  16. //添加StreamNode
  17. streamGraph.addOperator(
  18. transformationId,
  19. slotSharingGroup,
  20. transformation.getCoLocationGroupKey(),
  21. operatorFactory,
  22. inputType,
  23. transformation.getOutputType(),
  24. transformation.getName());
  25. if (stateKeySelector != null) {
  26. TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
  27. streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
  28. }
  29. int parallelism =
  30. transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
  31. ? transformation.getParallelism()
  32. : executionConfig.getParallelism();
  33. streamGraph.setParallelism(transformationId, parallelism);
  34. streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
  35. final List<Transformation<?>> parentTransformations = transformation.getInputs();
  36. checkState(
  37. parentTransformations.size() == 1,
  38. "Expected exactly one input transformation but found "
  39. + parentTransformations.size());
  40. for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
  41. //添加StreamEdge 按照类型添加边
  42. streamGraph.addEdge(inputId, transformationId, 0);
  43. }
  44. return Collections.singleton(transformationId);
  45. }

如果是keyby union这种节点会生成虚拟分区节点