入口:
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#getStreamGraph(java.lang.String)
StreamGraph介绍
- streamGraph的图片


- streamGraph由哪些部分组成

所有的transformation都保存在 transformations 里,transformations是转换算子的集合,但不包含source,构建任务时进一步转换为DataStream
示例任务
```java DataStream
text = env.fromElements(WordCountData.WORDS);
DataStream
此任务source是一个collection,sink则是print输出,中间有2个transformation,分别为 flatMap以及keyByorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment#getStreamGraph(java.lang.String, boolean)```javapublic StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();if (clearTransformations) {this.transformations.clear();}return streamGraph;}
获取 StreamGraphGenerator ,由 StreamGraphGenerator构建StreamGraph
StreamGraphGenerator 组成
private StreamGraphGenerator getStreamGraphGenerator() {if (transformations.size() <= 0) {throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");}final RuntimeExecutionMode executionMode = configuration.get(ExecutionOptions.RUNTIME_MODE);return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration()).setRuntimeExecutionMode(executionMode).setStateBackend(defaultStateBackend).setChaining(isChainingEnabled).setUserArtifacts(cacheFile).setTimeCharacteristic(timeCharacteristic).setDefaultBufferTimeout(bufferTimeout);}
StreamGraph 构建
public StreamGraph generate() {streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);configureStreamGraph(streamGraph);alreadyTransformed = new HashMap<>();for (Transformation<?> transformation: transformations) {transform(transformation);}rfinal StreamGraph builtStreamGraph = streamGraph;alreadyTransformed.clear();alreadyTransformed = null;streamGraph = null;return builtStreamGraph;}
StreamNode生成
for (Transformation<?> transformation: transformations) {transform(transformation);}
将transformation转为StreamNode
示例的 transformation 为
一共是三个transformation,分别为flatmap,keyBy,print对应的transformation,source则是后面会对flatmap转换的时候,递归获取其上游得到source的
private Collection<Integer> transform(Transformation<?> transform) {transform.getOutputType();final TransformationTranslator<?, Transformation<?>> translator =(TransformationTranslator<?, Transformation<?>>) translatorMap.get(transform.getClass());Collection<Integer> transformedIds;if (translator != null) {transformedIds = translate(translator, transform);} else {transformedIds = legacyTransform(transform);}return transformedIds;}
根据transform的class在translatorMap中获取对应的TransformationTranslator
static {@SuppressWarnings("rawtypes")Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> tmp = new HashMap<>();tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());tmp.put(TimestampsAndWatermarksTransformation.class, new TimestampsAndWatermarksTransformationTranslator<>());tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());translatorMap = Collections.unmodifiableMap(tmp);}
- 如果获取到 TransformationTranslator
org.apache.flink.streaming.api.graph.StreamGraphGenerator#translate
- 如果未获取到 TransformationTranslator 则代表是虚拟分区节点?
org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator#translateInternal
protected Collection<Integer> translateInternal(final Transformation<OUT> transformation,final StreamOperatorFactory<OUT> operatorFactory,final TypeInformation<IN> inputType,@Nullable final KeySelector<IN, ?> stateKeySelector,@Nullable final TypeInformation<?> stateKeyType,final Context context) {checkNotNull(transformation);checkNotNull(operatorFactory);checkNotNull(inputType);checkNotNull(context);final StreamGraph streamGraph = context.getStreamGraph();final String slotSharingGroup = context.getSlotSharingGroup();final int transformationId = transformation.getId();final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();//添加StreamNodestreamGraph.addOperator(transformationId,slotSharingGroup,transformation.getCoLocationGroupKey(),operatorFactory,inputType,transformation.getOutputType(),transformation.getName());if (stateKeySelector != null) {TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);}int parallelism =transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT? transformation.getParallelism(): executionConfig.getParallelism();streamGraph.setParallelism(transformationId, parallelism);streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());final List<Transformation<?>> parentTransformations = transformation.getInputs();checkState(parentTransformations.size() == 1,"Expected exactly one input transformation but found "+ parentTransformations.size());for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {//添加StreamEdge 按照类型添加边streamGraph.addEdge(inputId, transformationId, 0);}return Collections.singleton(transformationId);}
如果是keyby union这种节点会生成虚拟分区节点
