入口:
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#getStreamGraph(java.lang.String)
StreamGraph介绍
- streamGraph的图片
- streamGraph由哪些部分组成
所有的transformation都保存在 transformations 里,transformations是转换算子的集合,但不包含source,构建任务时进一步转换为DataStream
示例任务
```java DataStream
text = env.fromElements(WordCountData.WORDS);
DataStream
此任务source是一个collection,sink则是print输出,中间有2个transformation,分别为 flatMap以及keyBy
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#getStreamGraph(java.lang.String, boolean)
```java
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
if (clearTransformations) {
this.transformations.clear();
}
return streamGraph;
}
获取 StreamGraphGenerator ,由 StreamGraphGenerator构建StreamGraph
StreamGraphGenerator 组成
private StreamGraphGenerator getStreamGraphGenerator() {
if (transformations.size() <= 0) {
throw new IllegalStateException(
"No operators defined in streaming topology. Cannot execute.");
}
final RuntimeExecutionMode executionMode = configuration.get(ExecutionOptions.RUNTIME_MODE);
return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration())
.setRuntimeExecutionMode(executionMode)
.setStateBackend(defaultStateBackend)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout);
}
StreamGraph 构建
public StreamGraph generate() {
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
configureStreamGraph(streamGraph);
alreadyTransformed = new HashMap<>();
for (Transformation<?> transformation: transformations) {
transform(transformation);
}r
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
StreamNode生成
for (Transformation<?> transformation: transformations) {
transform(transformation);
}
将transformation转为StreamNode
示例的 transformation 为
一共是三个transformation,分别为flatmap,keyBy,print对应的transformation,source则是后面会对flatmap转换的时候,递归获取其上游得到source的
private Collection<Integer> transform(Transformation<?> transform) {
transform.getOutputType();
final TransformationTranslator<?, Transformation<?>> translator =
(TransformationTranslator<?, Transformation<?>>) translatorMap.get(transform.getClass());
Collection<Integer> transformedIds;
if (translator != null) {
transformedIds = translate(translator, transform);
} else {
transformedIds = legacyTransform(transform);
}
return transformedIds;
}
根据transform的class在translatorMap中获取对应的TransformationTranslator
static {
@SuppressWarnings("rawtypes")
Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> tmp = new HashMap<>();
tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
tmp.put(TimestampsAndWatermarksTransformation.class, new TimestampsAndWatermarksTransformationTranslator<>());
tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
translatorMap = Collections.unmodifiableMap(tmp);
}
- 如果获取到 TransformationTranslator
org.apache.flink.streaming.api.graph.StreamGraphGenerator#translate
- 如果未获取到 TransformationTranslator 则代表是虚拟分区节点?
org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator#translateInternal
protected Collection<Integer> translateInternal(
final Transformation<OUT> transformation,
final StreamOperatorFactory<OUT> operatorFactory,
final TypeInformation<IN> inputType,
@Nullable final KeySelector<IN, ?> stateKeySelector,
@Nullable final TypeInformation<?> stateKeyType,
final Context context) {
checkNotNull(transformation);
checkNotNull(operatorFactory);
checkNotNull(inputType);
checkNotNull(context);
final StreamGraph streamGraph = context.getStreamGraph();
final String slotSharingGroup = context.getSlotSharingGroup();
final int transformationId = transformation.getId();
final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
//添加StreamNode
streamGraph.addOperator(
transformationId,
slotSharingGroup,
transformation.getCoLocationGroupKey(),
operatorFactory,
inputType,
transformation.getOutputType(),
transformation.getName());
if (stateKeySelector != null) {
TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
}
int parallelism =
transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
? transformation.getParallelism()
: executionConfig.getParallelism();
streamGraph.setParallelism(transformationId, parallelism);
streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
final List<Transformation<?>> parentTransformations = transformation.getInputs();
checkState(
parentTransformations.size() == 1,
"Expected exactly one input transformation but found "
+ parentTransformations.size());
for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
//添加StreamEdge 按照类型添加边
streamGraph.addEdge(inputId, transformationId, 0);
}
return Collections.singleton(transformationId);
}
如果是keyby union这种节点会生成虚拟分区节点