Flink 四种Graph转换
- Program->StreamGraph:算子和算子之间的拓扑关系
- StreamGraph -> JobGraph
- Operation Chain,最终执行在同一个Task里运行
- JobGraph -> ExecutionGraph: 并行化版本
- Execution -> 物理执行计划:job进行调度后,在各个TaskMamager部署Task的图
顶点的变化 边的变化 中间结果的变化
Program->StreamGerap
- Source是头结点
StreamGraph的组成
流节点 和 边
- StateBackend
- memory
- rock
迭代计算
private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
userArtifacts:用户使用的文件和包
/** 用户上传的jar包 文件信息 **/
private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts;
配置信息:checkPoint信息
- TimeCharacteristic:时间的概念 eventTime或者其他等等
拓扑关系
- 边
- 结点 ```
private TimeCharacteristic timeCharacteristic; // EventTIme ProcessingTime
/ 节点 /
private Map
protected Map<Integer, String> vertexIDtoBrokerID;
protected Map<Integer, Long> vertexIDtoLoopTimeout;
//配置信息 private String jobName; private final ExecutionConfig executionConfig; private final CheckpointConfig checkpointConfig; private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none(); private ScheduleMode scheduleMode;
<a name="qH8Vg"></a>
#### StramNode相关信息
/ 配置信息 /
//最大的并行度 private int maxParallelism; //每一个buffer传输的超时时间 private long bufferTimeout; private ResourceSpec minResources = ResourceSpec.DEFAULT; //当前节点使用的资源 如几核 几G内存 private ResourceSpec preferredResources = ResourceSpec.DEFAULT; private int managedMemoryWeight = Transformation.DEFAULT_MANAGED_MEMORY_WEIGHT; //名称 private final String operatorName; private @Nullable String slotSharingGroup; private @Nullable String coLocationGroup;
/** 拓扑关系 **/
//输入边
private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
//输出边
private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
private InputFormat<?, ?> inputFormat;
private OutputFormat<?> outputFormat;
private List<OutputSelector<?>> outputSelectors;
//状态配置
private KeySelector<?, ?>[] statePartitioners = new KeySelector[0];
private TypeSerializer<?> stateKeySerializer;
//算子配置
private transient StreamOperatorFactory<?> operatorFactory;
// invoke的class名称
private final Class<? extends AbstractInvokable> jobVertexClass;
private TypeSerializer<?>[] typeSerializersIn = new TypeSerializer[0];
private TypeSerializer<?> typeSerializerOut;
private String transformationUID;
private String userHash;
<a name="PinRx"></a>
## <br />
<a name="oyZtI"></a>
## StreamGraph -> JobGraph

主要是operatorChain的合并操作,对外只有一个头结点。可以显性的指定是否进行合并操作,同时添加了IntermediateDataSet<br />节点的转换<br />边的转换<br />增加intermediateDataSet 中间数据集
<a name="y3WDp"></a>
### 算子的合并条件
<a name="voKLg"></a>
### JobGraph组成
1. 基本配置
//基本配置 /* The job configuration attached to this job. / private final Configuration jobConfiguration = new Configuration();
/** ID of this job. May be set if specific job id is desired (e.g. session management) */
private JobID jobID;
/** Name of this job. */
private final String jobName;
/** The mode in which the job is scheduled */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
// --- checkpointing ---
/** Job specific execution config */
private SerializedValue<ExecutionConfig> serializedExecutionConfig;
/** The settings for the job checkpoints */
private JobCheckpointingSettings snapshotSettings;
/** Savepoint restore settings. */
private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
2. 拓扑配置
//拓扑配置
/* List of task vertices included in this job graph. /
private final Map
3. 环境配置
//环境配置
/* 用户依赖的一些jar包 /
private final List
/** Set of custom files required to run this job. */
private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = new HashMap<>();
/** Set of blob keys identifying the JAR files required to run this job. 用户的jar包都存储到blobServer里*/
private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();
/** List of classpaths required to run this job. */
private List<URL> classpaths = Collections.emptyList();
<a name="r1WhX"></a>
### JobVertex 配置信息
/ 基本配置 /
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
/** Maximum number of subtasks to split this task into a runtime. */
private int maxParallelism = -1;
/** 当前节点使用的最少资源 */
private ResourceSpec minResources = ResourceSpec.DEFAULT;
/** The preferred resource of the vertex. */
private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
private Configuration configuration;
/** The class of the invokable. */
private String invokableClassName;
private final JobVertexID id;
/ 拓扑配置 /
private final List
private final ArrayList<IntermediateDataSet> results = new ArrayList<>();
private final ArrayList<JobEdge> inputs = new ArrayList<>();
/** Indicates of this job vertex is stoppable or not. */
private boolean isStoppable = false;
/** Optionally, a source of input splits. */
private InputSplitSource<?> inputSplitSource;
/** The name of the vertex. This will be shown in runtime logs and will be in the runtime environment. */
private String name;
/** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot. */
private SlotSharingGroup slotSharingGroup;
/** The group inside which the vertex subtasks share slots. */
private CoLocationGroup coLocationGroup;
/** Optional, the name of the operator, such as 'Flat Map' or 'Join', to be included in the JSON plan. */
private String operatorName;
/** Optional, the description of the operator, like 'Hash Join', or 'Sorted Group Reduce',
* to be included in the JSON plan. */
private String operatorDescription;
/** Optional, pretty name of the operator, to be displayed in the JSON plan. */
private String operatorPrettyName;
/** Optional, the JSON for the optimizer properties of the operator result,
* to be included in the JSON plan. */
private String resultOptimizerProperties;
/** The input dependency constraint to schedule this vertex. */
private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;
<a name="TNP8y"></a>
## JobGraph -> ExecutionGraph

主要进行并行度的拆分<br />每一个ExecutionVertex对应一个Task任务
<a name="O9BvR"></a>
### ExecutionGraph组成
- 基本配置
- 环境配置
- 容错配置
- 拓扑配置
- 主要组件
/** blob数据写入 **/
private final BlobWriter blobWriter; / state状态的位置信息的维护 / private final KvStateLocationRegistry kvStateLocationRegistry; / 对task里的partition的监控 / private final JobMasterPartitionTracker partitionTracker; / The slot provider strategy to use for allocating slots for tasks as they are needed. */ / slot的分配与管理 / private final SlotProviderStrategy slotProviderStrategy; / Shuffle master to register partitions for task deployment. / private final ShuffleMaster<?> shuffleMaster; /* Listeners that receive messages when the entire job switches it status
* (such as from RUNNING to FINISHED). */
private final List<JobStatusListener> jobStatusListeners;
/** Counts all restarts. Used by other Gauges/Meters and does not register to metric group. */
private final Counter numberOfRestartsCounter = new SimpleCounter();
/** Current status of the job execution. */
private volatile JobStatus state = JobStatus.CREATED;
/ checkPoint组件 / /* The coordinator for checkpoints, if snapshot checkpoints are enabled. / private CheckpointCoordinator checkpointCoordinator;
/** TODO, replace it with main thread executor. */
private ScheduledExecutorService checkpointCoordinatorTimer;
/** Checkpoint stats tracker separate from the coordinator in order to be
* available after archiving. */
private CheckpointStatsTracker checkpointStatsTracker;
```