本文基于1.14

Client部分

执行bin/flink run xxx会调用CliFrontend类的run方法。

用户Jar包的execute会执行真正的提交逻辑

用户代码启动入口

1、StreamExecutionEnvironment#execute
2、StreamExecutionEnvironment#executeAsync
3、StreamExecutionEnvironment#getStreamGraph 构建StreamGraph

决定走什么提交器

1、PipelineExecutorServiceLoader获取PipelineExecutorFactory(默认SPI加载)
2、PipelineExecutorFactory#getExecutor
3、PipelineExecutor#execute
PipelineExecutor会分出不同的模式

选出指定执行器后

1、PipelineExecutorUtils#getJobGraph 构建JobGraph
2、构建JobGraph的时候需要ExecutionConfigAccessor这个类。

Yarn相关

YarnClusterDescriptor#isReadyForDeployment对Yarn集群资源进行检查。

BlobServer支线

pipeline.cached-files 中的参数会
通过StreamGraphGenerator设置到StreamGraph#userArtifacts中
通过StreamingJobGraphGenerator设置到JobGraph#userArtifacts

上传文件PerJob 和 Remote Session模式不一样�

JobManager部分

ClusterEntrypoint启动主类

Dispatcher#submitJob

FlinkYarnSessionCli

userCodeClassLoader

*Hadoop配置加载

在HadoopUtils#getHadoopConfiguration方法,一共四种方式。最后一种是以 flink.hadoop. 开头的Flink配置会当做Hadoop配置。

Python Flink

flink run命令

根据
flink-python_2.12-1.14.0.jar

加载PythonProgramOptions类

  1. org.apache.flink.client.cli.PythonProgramOptions

PythonProgramOptions启用条件判断

  1. isPythonEntryPoint(line) || containsPythonDependencyOptions(line)

�python.requirements参数中的cached-dir会设置到BlobServer中

FlinkSQL