入口:
Clifronted执行用户代码入口类Main方法之后,执行env.execute()
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
// 获取 StreamGraph 并继续执行
return execute(getStreamGraph(jobName));
}
SPI创建ExecutorFactory
往下获取executor进行execute 提交任务
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
checkNotNull(streamGraph, "StreamGraph cannot be null.");
checkNotNull(
configuration.get(DeploymentOptions.TARGET),
"No execution.target specified in your configuration file.");
//SPI获取到Pipeline的执行器工厂
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
//加载进行提交
CompletableFuture<JobClient> jobClientFuture =
executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration, userClassloader);
。。。。。
}
SPI加载的Executor
flink-clients模块SPI
获取到Executor之后,RemoteExecutorFactory会创建RemoteExecutor进入其executor方法
standLone/Yarn-Session进入此方法org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor#execute
public CompletableFuture<JobClient> execute(
@Nonnull final Pipeline pipeline,
@Nonnull final Configuration configuration,
@Nonnull final ClassLoader userCodeClassloader)
throws Exception {
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration)) {
final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
checkState(clusterID != null);
final ClusterClientProvider<ClusterID> clusterClientProvider =
clusterDescriptor.retrieve(clusterID);
ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
return clusterClient
.submitJob(jobGraph)
}
}
AbstractSessionClusterExecutor-StandLone/yarnSession入口
- 创建jobGraph
- 获取集群描述符ClusterDescriptor
- 获取client
submitJob提交任务
standLone获取RestClusterClient
- yarnSession获取
集群描述符ClusterDescriptor
standLone获取的Descriptor为StandLoneClusterDescriptor
Yarn获取的Descriptor为YarnClusterDescriptor
获取Client
clusterDescriptor.retrieve(clusterID).getClusterClient()
由ClusterDescriptor获取对应的Client,
standLone获取的Client为RestClusterClient
Yarn获取的Client为YarnClient
AbstractJobClusterExecutor-PerJob入口
@Override
public CompletableFuture<JobClient> execute(
@Nonnull final Pipeline pipeline,
@Nonnull final Configuration configuration,
@Nonnull final ClassLoader userCodeClassloader)
throws Exception {
// streamGraph 转换为 jobGraph
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
// 获取集群描述器 创建并启动了yarn的客户端,包含了一些yarn flink的配置和环境信息
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration)) {
final ExecutionConfigAccessor configAccessor =
ExecutionConfigAccessor.fromConfiguration(configuration);
//获取集群特有信息,资源配置 jobManager内存 taskManager内存 每个TM的slot数
final ClusterSpecification clusterSpecification =
clusterClientFactory.getClusterSpecification(configuration);
final ClusterClientProvider<ClusterID> clusterClientProvider =
//部署job集群
clusterDescriptor.deployJobCluster(
clusterSpecification, jobGraph, configAccessor.getDetachedMode());
return CompletableFuture.completedFuture(
new ClusterClientJobClientAdapter<>(
clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
}
}