Client部分
执行bin/flink run xxx会调用CliFrontend类的run方法。
用户Jar包的execute会执行真正的提交逻辑
用户代码启动入口
1、StreamExecutionEnvironment#execute
2、StreamExecutionEnvironment#executeAsync
3、StreamExecutionEnvironment#getStreamGraph 构建StreamGraph
决定走什么提交器
1、PipelineExecutorServiceLoader获取PipelineExecutorFactory(默认SPI加载)
2、PipelineExecutorFactory#getExecutor
3、PipelineExecutor#execute
PipelineExecutor会分出不同的模式
选出指定执行器后
1、PipelineExecutorUtils#getJobGraph 构建JobGraph
2、构建JobGraph的时候需要ExecutionConfigAccessor这个类。
Yarn相关
YarnClusterDescriptor#isReadyForDeployment对Yarn集群资源进行检查。
�
BlobServer支线
pipeline.cached-files 中的参数会
通过StreamGraphGenerator设置到StreamGraph#userArtifacts中
通过StreamingJobGraphGenerator设置到JobGraph#userArtifacts
上传文件PerJob 和 Remote Session模式不一样�
�
JobManager部分
ClusterEntrypoint启动主类
�
Dispatcher#submitJob
�
�
�
�
FlinkYarnSessionCli
userCodeClassLoader
�
*Hadoop配置加载
在HadoopUtils#getHadoopConfiguration方法,一共四种方式。最后一种是以 flink.hadoop. 开头的Flink配置会当做Hadoop配置。
Python Flink
flink run命令
根据
flink-python_2.12-1.14.0.jar
加载PythonProgramOptions类
org.apache.flink.client.cli.PythonProgramOptions
PythonProgramOptions启用条件判断
isPythonEntryPoint(line) || containsPythonDependencyOptions(line)
�python.requirements参数中的cached-dir会设置到BlobServer中