Flink 四种Graph转换
- Program->StreamGraph:算子和算子之间的拓扑关系
- StreamGraph -> JobGraph
- Operation Chain,最终执行在同一个Task里运行
- JobGraph -> ExecutionGraph: 并行化版本
- Execution -> 物理执行计划:job进行调度后,在各个TaskMamager部署Task的图
顶点的变化 边的变化 中间结果的变化
Program->StreamGerap

- Source是头结点

StreamGraph的组成
流节点 和 边
- StateBackend
- memory
- rock
迭代计算
private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
userArtifacts:用户使用的文件和包
/** 用户上传的jar包 文件信息 **/private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts;
配置信息:checkPoint信息
- TimeCharacteristic:时间的概念 eventTime或者其他等等
拓扑关系
- 边
- 结点 ```
private TimeCharacteristic timeCharacteristic; // EventTIme ProcessingTime
/ 节点 /
private Map
protected Map<Integer, String> vertexIDtoBrokerID;protected Map<Integer, Long> vertexIDtoLoopTimeout;
//配置信息 private String jobName; private final ExecutionConfig executionConfig; private final CheckpointConfig checkpointConfig; private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none(); private ScheduleMode scheduleMode;
<a name="qH8Vg"></a>#### StramNode相关信息
/ 配置信息 /
//最大的并行度 private int maxParallelism; //每一个buffer传输的超时时间 private long bufferTimeout; private ResourceSpec minResources = ResourceSpec.DEFAULT; //当前节点使用的资源 如几核 几G内存 private ResourceSpec preferredResources = ResourceSpec.DEFAULT; private int managedMemoryWeight = Transformation.DEFAULT_MANAGED_MEMORY_WEIGHT; //名称 private final String operatorName; private @Nullable String slotSharingGroup; private @Nullable String coLocationGroup;
/** 拓扑关系 **///输入边private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();//输出边private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();private InputFormat<?, ?> inputFormat;private OutputFormat<?> outputFormat;private List<OutputSelector<?>> outputSelectors;//状态配置private KeySelector<?, ?>[] statePartitioners = new KeySelector[0];private TypeSerializer<?> stateKeySerializer;//算子配置private transient StreamOperatorFactory<?> operatorFactory;// invoke的class名称private final Class<? extends AbstractInvokable> jobVertexClass;private TypeSerializer<?>[] typeSerializersIn = new TypeSerializer[0];private TypeSerializer<?> typeSerializerOut;private String transformationUID;private String userHash;
<a name="PinRx"></a>## <br /><a name="oyZtI"></a>## StreamGraph -> JobGraph主要是operatorChain的合并操作,对外只有一个头结点。可以显性的指定是否进行合并操作,同时添加了IntermediateDataSet<br />节点的转换<br />边的转换<br />增加intermediateDataSet 中间数据集<a name="y3WDp"></a>### 算子的合并条件<a name="voKLg"></a>### JobGraph组成1. 基本配置
//基本配置 /* The job configuration attached to this job. / private final Configuration jobConfiguration = new Configuration();
/** ID of this job. May be set if specific job id is desired (e.g. session management) */private JobID jobID;/** Name of this job. */private final String jobName;/** The mode in which the job is scheduled */private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;// --- checkpointing ---/** Job specific execution config */private SerializedValue<ExecutionConfig> serializedExecutionConfig;/** The settings for the job checkpoints */private JobCheckpointingSettings snapshotSettings;/** Savepoint restore settings. */private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
2. 拓扑配置
//拓扑配置
/* List of task vertices included in this job graph. /
private final Map
3. 环境配置
//环境配置
/* 用户依赖的一些jar包 /
private final List
/** Set of custom files required to run this job. */private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = new HashMap<>();/** Set of blob keys identifying the JAR files required to run this job. 用户的jar包都存储到blobServer里*/private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();/** List of classpaths required to run this job. */private List<URL> classpaths = Collections.emptyList();
<a name="r1WhX"></a>### JobVertex 配置信息
/ 基本配置 /
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;/** Maximum number of subtasks to split this task into a runtime. */private int maxParallelism = -1;/** 当前节点使用的最少资源 */private ResourceSpec minResources = ResourceSpec.DEFAULT;/** The preferred resource of the vertex. */private ResourceSpec preferredResources = ResourceSpec.DEFAULT;private Configuration configuration;/** The class of the invokable. */private String invokableClassName;
private final JobVertexID id;
/ 拓扑配置 /
private final List
private final ArrayList<IntermediateDataSet> results = new ArrayList<>();private final ArrayList<JobEdge> inputs = new ArrayList<>();/** Indicates of this job vertex is stoppable or not. */private boolean isStoppable = false;/** Optionally, a source of input splits. */private InputSplitSource<?> inputSplitSource;/** The name of the vertex. This will be shown in runtime logs and will be in the runtime environment. */private String name;/** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot. */private SlotSharingGroup slotSharingGroup;/** The group inside which the vertex subtasks share slots. */private CoLocationGroup coLocationGroup;/** Optional, the name of the operator, such as 'Flat Map' or 'Join', to be included in the JSON plan. */private String operatorName;/** Optional, the description of the operator, like 'Hash Join', or 'Sorted Group Reduce',* to be included in the JSON plan. */private String operatorDescription;/** Optional, pretty name of the operator, to be displayed in the JSON plan. */private String operatorPrettyName;/** Optional, the JSON for the optimizer properties of the operator result,* to be included in the JSON plan. */private String resultOptimizerProperties;/** The input dependency constraint to schedule this vertex. */private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;
<a name="TNP8y"></a>## JobGraph -> ExecutionGraph主要进行并行度的拆分<br />每一个ExecutionVertex对应一个Task任务<a name="O9BvR"></a>### ExecutionGraph组成- 基本配置- 环境配置- 容错配置- 拓扑配置- 主要组件
/** blob数据写入 **/
private final BlobWriter blobWriter; / state状态的位置信息的维护 / private final KvStateLocationRegistry kvStateLocationRegistry; / 对task里的partition的监控 / private final JobMasterPartitionTracker partitionTracker; / The slot provider strategy to use for allocating slots for tasks as they are needed. */ / slot的分配与管理 / private final SlotProviderStrategy slotProviderStrategy; / Shuffle master to register partitions for task deployment. / private final ShuffleMaster<?> shuffleMaster; /* Listeners that receive messages when the entire job switches it status
* (such as from RUNNING to FINISHED). */private final List<JobStatusListener> jobStatusListeners;/** Counts all restarts. Used by other Gauges/Meters and does not register to metric group. */private final Counter numberOfRestartsCounter = new SimpleCounter();/** Current status of the job execution. */private volatile JobStatus state = JobStatus.CREATED;
/ checkPoint组件 / /* The coordinator for checkpoints, if snapshot checkpoints are enabled. / private CheckpointCoordinator checkpointCoordinator;
/** TODO, replace it with main thread executor. */private ScheduledExecutorService checkpointCoordinatorTimer;/** Checkpoint stats tracker separate from the coordinator in order to be* available after archiving. */private CheckpointStatsTracker checkpointStatsTracker;
```
