- 1. 提交流程">1. 提交流程
- 2. 源码分析">2. 源码分析
- 1.1 提交命令
- 1.2 Main 方法执行
- 1.3 通过 run 方法解析输入参数
- 1.4 执行 this.executeProgram 进入到用户自定义类中
- 1.5 生成 StreamGraph
- 1.6 生成JobGraph">
1.6 生成JobGraph
- 1.7 创建并启动 yarn 客户端
- 1.8 部署集群,上传 jar 包和配置文件到 HDFS
- 1.9 封装 AM 参数和命令,生成 ClusterClientJobClient
- 1.10 提交任务信息
- 2 启动 ApplicationMaster
- 3.1 ApplicationMaster 创建 Dispatcher、ResourceManager
- 3.2 Dispatcher 启动 JobManager
- 7 TaskManager 启动
- 8 向 ResourceManager 注册 slot
- 9 ResourceManager 分配 Slot
- 10 TaskManager 提供 Slot
- 11 提交 Task 任务
- 3. 源码提交流程总结">3. 源码提交流程总结
1. 提交流程
流程1、客户端(入口类 CliFrontend)
⭐1.1 执行启动脚本,进入 CliFrontend 类的 main 方法中,获取 flink conf 目录配置的路径,然后对其进行加载,同时依次添加 3 种客户端类型,并创建 CliFrontend 对象;
⭐1.2 在 main 中执行 parseAndRun 对提交的命令行参数进行解析;
⭐1.3 在解析命令时,根据提交的 run 模式选择对于的run方法,在run方法中选择 FlinkYarnSessionCli 作为客户端;
⭐1.4 在 run 方法中调用 executeProgram 进入用户自定义代码
⭐1.5 在用户自定义代码中执行 execute(),通过 getStreamGraph() 方法生成 streamGraph;
⭐1.6 选择 YarnJobClusterExecutor 作为 pipelineExecutor,并生成 jobGraph;
⭐1.7 创建并启动 yarn 客户端,获取集群配置参数
⭐1.8 部署集群,将应用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户 Jar 文件、JobGraph 对象等)上传至分布式存储 HDFS 中。
⭐1.9 封装 ApplicationMaster 参数和命令,生出 ClusterClientJobClient
⭐1.10 ClusterClientJobClient 向 Yarn ResourceManager 提交任务信息
2、启动 ApplicationMaster
⭐2 Yarn ResourceManager 收到提交的任务信息后,将分配 Container 资源,并通知对应的 NodeManager 启动一个 ApplicationMaster (每提交一个 Flink job 就会启动一个 ApplicationMaster)
3、作业提交
⭐3.1 ApplicationMaster 启动 Dispatcher 和 ResourceManager ;
⭐3.2 Dispatcher 启动 JobMaster (该步和 Session 不同,Jabmaster 是由 Dispatcher 拉起,而不是 Client 传过来的)。
JobMaster 负责作业调度,管理作业和 Task 的生命周期,构建 ExecutionGraph( JobGraph 的并行化版本,调度层最核心的数据结构。
4、作业调度执行
⭐4 JobMaster 向 ResourceManager 申请 Slot 资源,开始调度 ExecutionGraph。
⭐5 ResourceManager 将资源请求加入等待队列,通过心跳向 YarnResourceManager 申请新的 Container 来启动 TaskManager 进程。
⭐6 YarnResourceManager 启动,然后从 HDFS 加载 Jar 文件等所需相关资源,在容器中启动 TaskManager。
⭐7 TaskManager 在内部启动 TaskExecutor。
⭐8 TaskManager 启动后,向 ResourceManager 注册,并把自己的 Slot 资源情况汇报给 ResourceManager。
⭐9 ResourceManager 从等待队列取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManager 将 Slot 分配给哪个 JobMaster。
⭐10 TaskManager 向 JobMaster 回复自己的一个 Slot 属于你这个任务,JobMaser 会将 Slot 缓存到 SlotPool。
⭐11 JobMaster 调度 Task 到 TaskMnager 的 Slot 上执行。
2. 源码分析
包括三个部分:
- CliFrontend 客户端
- YarnJobClusterEntryPoint:AM 执行的入口类
- YarnTaskExecutorRunner:Yarn 模式下的 TaskManager 的入口类
1.1 提交命令
通过 flink on yarn per-job 模式提交,查看 flink 脚本可以看到,程序被提交后,会寻找 CliFrontend 类。
1.2 Main 方法执行
在 CliFrontend.main 方法中,会执行如下操作:
1 获取 flink conf 目录配置的路径; 2 根据 conf 目录加载配置; 3 根据三种方式 GenericCLI、flinkYarnSessionCLI、DefaultCLI按照顺序依次封装成命令行接口。 4 创建 CLiFrontend 客户端; 5 对 flink run 提交的命令行进行解析。(除过 flink 为脚本,run 及 后面的命令全部需要解析判断 )





其中 flink 为脚本,不算命令,从 run 开始,所以下图源码中的String action = args[0] 得到的第一个参数就是 run。说明提交方式为 run。
然后根据 run 动作解析输入参数
1.3 通过 run 方法解析输入参数
⭐run 在 CliFrontend.java 类中
在 run 方法中主要执行以下五步操作:
- 使用 CliFrontendParser.getRunCommandOptions()获取默认的运行参数
- 使用 this.getCommandLine 根据用户指定的配置项进行解析,然后包装成commandLine
- 根据提交的命令选择对应的客户端
- 获取有效的配置信息
- 执行 executeProgram 进入用户自定义类中
我们主要分析一下 this.executeProgram
1.4 执行 this.executeProgram 进入到用户自定义类中
⭐executeProgram() 在 CliFrontend.java 类中
通过调用 this.executeProgram 方法进入到用户定义的程序主类中,首先创建 StreamExecutionEnvironment 环境,然后接收自定义的 source,本文代码使用 kafka 作为 source 源,通过 transformation 对算子进行转换,最后执行 sink 操作,当提交集群时,需要执行 execute()
1.5 生成 StreamGraph
⭐StreamGraph 在 StreamExecutionEnvironment.java 类生成
通过查看 execute() 方法,发现通过输入形参 jobMame 最终返回一个 JobExecutionResult,继续深入查看 getStreamGraph()方法
在 getStreamGraph 中,通过加载全局配置和转换操作来取得流图生成器,最后生成 StreamGraph
1.6 生成JobGraph
1.6.1 选择 pipelineExecutor
⭐pipelineExecutor 在 StreamExecutionEnvironment.java 类生成
通过 getExecutorFactory(configuration) 选择 YarnJobClusterExecutorFactory 作为工厂类,选择 YarnJobClusterExecutor 作为 pipelineExecutor,
1.6.2 生成 jobGraph
⭐jobGraph 在 AbstractJobClusterExecutor.java 类生成
当 pipelineExecutor 生选择之后,executorFactory 会调用 getExecutor(configuration) 获取之前的配置,然后执行 execute(streamGraph, configuration, userClassloader) 生成 jobClientFuture,在这其中会生成 JobGraph,如下图所示
进入 getJobGraph()方法
1.7 创建并启动 yarn 客户端
⭐YarnClusterDescriptor 在 AbstractJobClusterExecutor.java 类生成
jobGraph 生成后,会进行如下操作:
- 会创建并启动 yarn 客户端;
- 获取集群配置参数;
1.8 部署集群,上传 jar 包和配置文件到 HDFS
⭐ 集群部署在 YarnJobCluster.java类中
进入到 deployJobCluster()方法中,会获取 YarnJobClusterEntrypoint,启动 ApplicationMaster 的入口
ApplicationMaster 入口启动后,然后将 jar 包和配置文件上传到 HDFS 中。
1.9 封装 AM 参数和命令,生成 ClusterClientJobClient
⭐ YarnClusterDescriptor.java
在 YarnClusterDescriptor 类的 setupApplicationMasterContainer() 方法中会创建 AM 的容器启动上下文,然后封装 AM 参数和命令,生成 ClusterClientJobClient
1.10 提交任务信息
ClusterClientJobClient 生成后,会 进入到 YarnClientImpl 实现类中提交 jobGraph,提交方法为 submitApplication,具体执行流程图如下:
继续深入 执行 rmClient.submitApplication方法,在ApplicationClientProtocolPBClientImpl.java 类的 submitApplication 方法中会获取到http 报文,然后将获取到的报文发送到服务器,并将返回的结果构成 response
最后将应用请求提交到 Yarn 上的 RMAppManager 去提交任务。
2 启动 ApplicationMaster
在 1.8 部署集群的时候,已经启动了 ApplicationMaster。
3.1 ApplicationMaster 创建 Dispatcher、ResourceManager
后面会保证大家更详细的看清楚 JobManager 和 TaskManager的执行过程,我将源码的代码复制出来,添加注释,帮助大家更好理解。
Per-job 模式的 AM container 加载运行入口是 YarnJobClusterEntryPoint 中的 main() 方法,具体如下:
我们进入 runCluster 方法中,
1 、创建 dispatcher、ResourceManager 对象的工厂类。其中包含从本地重新创建 JobGraph 的过程。
2、通过工厂类创建 dispatcher、ResourceManager 对象,Entry 启动 RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore 等
我们调用 create()方法,该方法会执行以下操作:
1、创建接收前端 Rest 请求的节点
2、创建 ResourceManager对象,返回是 new YarnResourceManager
3、创建 dispatcherRunner 对象并启动
4、启动 ResourceManager
3.2 Dispatcher 启动 JobManager
⭐ Dispatcher.java




### 5 ResourceManager 申请资源 ⭐ ResourceManager.java

进入到 runTaskManagerSecurely()方法中
7 TaskManager 启动
⭐ TaskExecutor.java
8 向 ResourceManager 注册 slot
⭐ TaskExecutor.java
9 ResourceManager 分配 Slot
⭐ SlotManagerImpl.java
10 TaskManager 提供 Slot
⭐ TaskExecutor.java
11 提交 Task 任务
JobMaster 调度 Task 到 TaskMnager 的 Slot 上执行
3. 源码提交流程总结
1、客户端(入口类 CliFrontend)
⭐1.1 执行启动脚本,进入 CliFrontend 类的 main 方法中,获取 flink conf 目录配置的路径,然后对其进行加载,同时依次添加 3 种客户端类型,并创建 CliFrontend 对象;
⭐1.2 在 main 中执行 parseAndRun 对提交的命令行参数进行解析;
⭐1.3 在解析命令时,根据提交的 run 模式选择对于的run方法,在run方法中选择 FlinkYarnSessionCli 作为客户端;
⭐1.4 在 run 方法中调用 executeProgram 进入用户自定义代码
⭐1.5 在用户自定义代码中执行 execute(),通过 getStreamGraph() 方法生成 streamGraph;
⭐1.6 选择 YarnJobClusterExecutor 作为 pipelineExecutor,并生成 jobGraph;
⭐1.7 创建并启动 yarn 客户端,获取集群配置参数
⭐1.8 部署集群,将应用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户 Jar 文件、JobGraph 对象等)上传至分布式存储 HDFS 中。
⭐1.9 封装 ApplicationMaster 参数和命令,生出 ClusterClientJobClient
⭐1.10 ClusterClientJobClient 向 Yarn ResourceManager 提交任务信息
2、启动 ApplicationMaster
⭐2 Yarn ResourceManager 收到提交的任务信息后,将分配 Container 资源,并通知对应的 NodeManager 启动一个 ApplicationMaster (每提交一个 Flink job 就会启动一个 ApplicationMaster)
3、作业提交
⭐3.1 ApplicationMaster 启动 Dispatcher 和 ResourceManager ;
⭐3.2 Dispatcher 启动 JobMaster (该步和 Session 不同,Jabmaster 是由 Dispatcher 拉起,而不是 Client 传过来的)。
JobMaster 负责作业调度,管理作业和 Task 的生命周期,构建 ExecutionGraph( JobGraph 的并行化版本,调度层最核心的数据结构。
4、作业调度执行
⭐4 JobMaster 向 ResourceManager 申请 Slot 资源,开始调度 ExecutionGraph。
⭐5 ResourceManager 将资源请求加入等待队列,通过心跳向 YarnResourceManager 申请新的 Container 来启动 TaskManager 进程。
⭐6 YarnResourceManager 启动,然后从 HDFS 加载 Jar 文件等所需相关资源,在容器中启动 TaskManager。
⭐7 TaskManager 在内部启动 TaskExecutor。
⭐8 TaskManager 启动后,向 ResourceManager 注册,并把自己的 Slot 资源情况汇报给 ResourceManager。
⭐9 ResourceManager 从等待队列取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManager 将 Slot 分配给哪个 JobMaster。
⭐10 TaskManager 向 JobMaster 回复自己的一个 Slot 属于你这个任务,JobMaser 会将 Slot 缓存到 SlotPool。
⭐11 JobMaster 调度 Task 到 TaskMnager 的 Slot 上执行。