Flink 四种Graph转换

  1. Program->StreamGraph:算子和算子之间的拓扑关系
  2. StreamGraph -> JobGraph
    1. Operation Chain,最终执行在同一个Task里运行
  3. JobGraph -> ExecutionGraph: 并行化版本
  4. Execution -> 物理执行计划:job进行调度后,在各个TaskMamager部署Task的图

顶点的变化 边的变化 中间结果的变化

Program->StreamGerap

image.png

  • Source是头结点

image.png

StreamGraph的组成

流节点 和 边

  • StateBackend
    • memory
    • rock
  • 迭代计算

    1. private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
  • userArtifacts:用户使用的文件和包

    1. /** 用户上传的jar包 文件信息 **/
    2. private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts;
  • 配置信息:checkPoint信息

  • TimeCharacteristic:时间的概念 eventTime或者其他等等
  • 拓扑关系

    • 结点 ```

    private TimeCharacteristic timeCharacteristic; // EventTIme ProcessingTime

/ 节点 / private Map streamNodes; private Set sources; private Set sinks; private Map>> virtualSelectNodes; private Map> virtualSideOutputNodes; private Map, ShuffleMode>> virtualPartitionNodes;

  1. protected Map<Integer, String> vertexIDtoBrokerID;
  2. protected Map<Integer, Long> vertexIDtoLoopTimeout;

//配置信息 private String jobName; private final ExecutionConfig executionConfig; private final CheckpointConfig checkpointConfig; private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none(); private ScheduleMode scheduleMode;

  1. <a name="qH8Vg"></a>
  2. #### StramNode相关信息

/ 配置信息 /

//最大的并行度 private int maxParallelism; //每一个buffer传输的超时时间 private long bufferTimeout; private ResourceSpec minResources = ResourceSpec.DEFAULT; //当前节点使用的资源 如几核 几G内存 private ResourceSpec preferredResources = ResourceSpec.DEFAULT; private int managedMemoryWeight = Transformation.DEFAULT_MANAGED_MEMORY_WEIGHT; //名称 private final String operatorName; private @Nullable String slotSharingGroup; private @Nullable String coLocationGroup;

  1. /** 拓扑关系 **/
  2. //输入边
  3. private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
  4. //输出边
  5. private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
  6. private InputFormat<?, ?> inputFormat;
  7. private OutputFormat<?> outputFormat;
  8. private List<OutputSelector<?>> outputSelectors;
  9. //状态配置
  10. private KeySelector<?, ?>[] statePartitioners = new KeySelector[0];
  11. private TypeSerializer<?> stateKeySerializer;
  12. //算子配置
  13. private transient StreamOperatorFactory<?> operatorFactory;
  14. // invoke的class名称
  15. private final Class<? extends AbstractInvokable> jobVertexClass;
  16. private TypeSerializer<?>[] typeSerializersIn = new TypeSerializer[0];
  17. private TypeSerializer<?> typeSerializerOut;
  18. private String transformationUID;
  19. private String userHash;
  1. <a name="PinRx"></a>
  2. ## <br />
  3. <a name="oyZtI"></a>
  4. ## StreamGraph -> JobGraph
  5. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/661881/1612099521578-a7a674b1-59fb-4269-8eb3-298b9c93f772.png#align=left&display=inline&height=521&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1042&originWidth=3008&size=1190472&status=done&style=none&width=1504)
  6. 主要是operatorChain的合并操作,对外只有一个头结点。可以显性的指定是否进行合并操作,同时添加了IntermediateDataSet<br />节点的转换<br />边的转换<br />增加intermediateDataSet 中间数据集
  7. <a name="y3WDp"></a>
  8. ### 算子的合并条件
  9. <a name="voKLg"></a>
  10. ### JobGraph组成
  11. 1. 基本配置

//基本配置 /* The job configuration attached to this job. / private final Configuration jobConfiguration = new Configuration();

  1. /** ID of this job. May be set if specific job id is desired (e.g. session management) */
  2. private JobID jobID;
  3. /** Name of this job. */
  4. private final String jobName;
  5. /** The mode in which the job is scheduled */
  6. private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
  7. // --- checkpointing ---
  8. /** Job specific execution config */
  9. private SerializedValue<ExecutionConfig> serializedExecutionConfig;
  10. /** The settings for the job checkpoints */
  11. private JobCheckpointingSettings snapshotSettings;
  12. /** Savepoint restore settings. */
  13. private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
  1. 2. 拓扑配置

//拓扑配置 /* List of task vertices included in this job graph. / private final Map taskVertices = new LinkedHashMap();

  1. 3. 环境配置

//环境配置 /* 用户依赖的一些jar包 / private final List userJars = new ArrayList();

  1. /** Set of custom files required to run this job. */
  2. private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = new HashMap<>();
  3. /** Set of blob keys identifying the JAR files required to run this job. 用户的jar包都存储到blobServer里*/
  4. private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();
  5. /** List of classpaths required to run this job. */
  6. private List<URL> classpaths = Collections.emptyList();
  1. <a name="r1WhX"></a>
  2. ### JobVertex 配置信息

/ 基本配置 /

  1. private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
  2. /** Maximum number of subtasks to split this task into a runtime. */
  3. private int maxParallelism = -1;
  4. /** 当前节点使用的最少资源 */
  5. private ResourceSpec minResources = ResourceSpec.DEFAULT;
  6. /** The preferred resource of the vertex. */
  7. private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
  8. private Configuration configuration;
  9. /** The class of the invokable. */
  10. private String invokableClassName;

private final JobVertexID id;

/ 拓扑配置 / private final List operatorIDs;

  1. private final ArrayList<IntermediateDataSet> results = new ArrayList<>();
  2. private final ArrayList<JobEdge> inputs = new ArrayList<>();
  3. /** Indicates of this job vertex is stoppable or not. */
  4. private boolean isStoppable = false;
  5. /** Optionally, a source of input splits. */
  6. private InputSplitSource<?> inputSplitSource;
  7. /** The name of the vertex. This will be shown in runtime logs and will be in the runtime environment. */
  8. private String name;
  9. /** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot. */
  10. private SlotSharingGroup slotSharingGroup;
  11. /** The group inside which the vertex subtasks share slots. */
  12. private CoLocationGroup coLocationGroup;
  13. /** Optional, the name of the operator, such as 'Flat Map' or 'Join', to be included in the JSON plan. */
  14. private String operatorName;
  15. /** Optional, the description of the operator, like 'Hash Join', or 'Sorted Group Reduce',
  16. * to be included in the JSON plan. */
  17. private String operatorDescription;
  18. /** Optional, pretty name of the operator, to be displayed in the JSON plan. */
  19. private String operatorPrettyName;
  20. /** Optional, the JSON for the optimizer properties of the operator result,
  21. * to be included in the JSON plan. */
  22. private String resultOptimizerProperties;
  23. /** The input dependency constraint to schedule this vertex. */
  24. private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;
  1. <a name="TNP8y"></a>
  2. ## JobGraph -> ExecutionGraph
  3. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/661881/1612101043127-67bb7132-15d2-49f0-9706-adefba8265e2.png#align=left&display=inline&height=705&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1410&originWidth=2350&size=2050801&status=done&style=none&width=1175)
  4. 主要进行并行度的拆分<br />每一个ExecutionVertex对应一个Task任务
  5. <a name="O9BvR"></a>
  6. ### ExecutionGraph组成
  7. - 基本配置
  8. - 环境配置
  9. - 容错配置
  10. - 拓扑配置
  11. - 主要组件
  1. /** blob数据写入 **/

private final BlobWriter blobWriter; / state状态的位置信息的维护 / private final KvStateLocationRegistry kvStateLocationRegistry; / 对task里的partition的监控 / private final JobMasterPartitionTracker partitionTracker; / The slot provider strategy to use for allocating slots for tasks as they are needed. */ / slot的分配与管理 / private final SlotProviderStrategy slotProviderStrategy; / Shuffle master to register partitions for task deployment. / private final ShuffleMaster<?> shuffleMaster; /* Listeners that receive messages when the entire job switches it status

  1. * (such as from RUNNING to FINISHED). */
  2. private final List<JobStatusListener> jobStatusListeners;
  3. /** Counts all restarts. Used by other Gauges/Meters and does not register to metric group. */
  4. private final Counter numberOfRestartsCounter = new SimpleCounter();
  5. /** Current status of the job execution. */
  6. private volatile JobStatus state = JobStatus.CREATED;

/ checkPoint组件 / /* The coordinator for checkpoints, if snapshot checkpoints are enabled. / private CheckpointCoordinator checkpointCoordinator;

  1. /** TODO, replace it with main thread executor. */
  2. private ScheduledExecutorService checkpointCoordinatorTimer;
  3. /** Checkpoint stats tracker separate from the coordinator in order to be
  4. * available after archiving. */
  5. private CheckpointStatsTracker checkpointStatsTracker;

```