入口:

Clifronted执行用户代码入口类Main方法之后,执行env.execute()

  1. public JobExecutionResult execute(String jobName) throws Exception {
  2. Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
  3. // 获取 StreamGraph 并继续执行
  4. return execute(getStreamGraph(jobName));
  5. }

StreamGraph的生成

SPI创建ExecutorFactory

往下获取executor进行execute 提交任务

  1. public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
  2. checkNotNull(streamGraph, "StreamGraph cannot be null.");
  3. checkNotNull(
  4. configuration.get(DeploymentOptions.TARGET),
  5. "No execution.target specified in your configuration file.");
  6. //SPI获取到Pipeline的执行器工厂
  7. final PipelineExecutorFactory executorFactory =
  8. executorServiceLoader.getExecutorFactory(configuration);
  9. //加载进行提交
  10. CompletableFuture<JobClient> jobClientFuture =
  11. executorFactory
  12. .getExecutor(configuration)
  13. .execute(streamGraph, configuration, userClassloader);
  14. 。。。。。
  15. }

SPI加载的Executor
flink-clients模块SPI
image.png
获取到Executor之后,RemoteExecutorFactory会创建RemoteExecutor进入其executor方法
image.png

standLone/Yarn-Session进入此方法org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor#execute

  1. public CompletableFuture<JobClient> execute(
  2. @Nonnull final Pipeline pipeline,
  3. @Nonnull final Configuration configuration,
  4. @Nonnull final ClassLoader userCodeClassloader)
  5. throws Exception {
  6. final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
  7. try (final ClusterDescriptor<ClusterID> clusterDescriptor =
  8. clusterClientFactory.createClusterDescriptor(configuration)) {
  9. final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
  10. checkState(clusterID != null);
  11. final ClusterClientProvider<ClusterID> clusterClientProvider =
  12. clusterDescriptor.retrieve(clusterID);
  13. ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
  14. return clusterClient
  15. .submitJob(jobGraph)
  16. }
  17. }

AbstractSessionClusterExecutor-StandLone/yarnSession入口

  • 创建jobGraph
  • 获取集群描述符ClusterDescriptor
  • 获取client
  • submitJob提交任务

  • standLone获取RestClusterClient

  • yarnSession获取

集群描述符ClusterDescriptor

standLone获取的Descriptor为StandLoneClusterDescriptor
Yarn获取的Descriptor为YarnClusterDescriptor

获取Client

  1. clusterDescriptor.retrieve(clusterID).getClusterClient()

由ClusterDescriptor获取对应的Client,
standLone获取的Client为RestClusterClient
Yarn获取的Client为YarnClient

AbstractJobClusterExecutor-PerJob入口

  1. @Override
  2. public CompletableFuture<JobClient> execute(
  3. @Nonnull final Pipeline pipeline,
  4. @Nonnull final Configuration configuration,
  5. @Nonnull final ClassLoader userCodeClassloader)
  6. throws Exception {
  7. // streamGraph 转换为 jobGraph
  8. final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
  9. // 获取集群描述器 创建并启动了yarn的客户端,包含了一些yarn flink的配置和环境信息
  10. try (final ClusterDescriptor<ClusterID> clusterDescriptor =
  11. clusterClientFactory.createClusterDescriptor(configuration)) {
  12. final ExecutionConfigAccessor configAccessor =
  13. ExecutionConfigAccessor.fromConfiguration(configuration);
  14. //获取集群特有信息,资源配置 jobManager内存 taskManager内存 每个TM的slot数
  15. final ClusterSpecification clusterSpecification =
  16. clusterClientFactory.getClusterSpecification(configuration);
  17. final ClusterClientProvider<ClusterID> clusterClientProvider =
  18. //部署job集群
  19. clusterDescriptor.deployJobCluster(
  20. clusterSpecification, jobGraph, configAccessor.getDetachedMode());
  21. return CompletableFuture.completedFuture(
  22. new ClusterClientJobClientAdapter<>(
  23. clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
  24. }
  25. }