1. 提交流程

流程

Flink on yarn 提交流程 - 图2

1、客户端(入口类 CliFrontend)

⭐1.1 执行启动脚本,进入 CliFrontend 类的 main 方法中,获取 flink conf 目录配置的路径,然后对其进行加载,同时依次添加 3 种客户端类型,并创建 CliFrontend 对象;

⭐1.2 在 main 中执行 parseAndRun 对提交的命令行参数进行解析;

⭐1.3 在解析命令时,根据提交的 run 模式选择对于的run方法,在run方法中选择 FlinkYarnSessionCli 作为客户端;

⭐1.4 在 run 方法中调用 executeProgram 进入用户自定义代码

⭐1.5 在用户自定义代码中执行 execute(),通过 getStreamGraph() 方法生成 streamGraph;

⭐1.6 选择 YarnJobClusterExecutor 作为 pipelineExecutor,并生成 jobGraph;

1.7 创建并启动 yarn 客户端,获取集群配置参数

1.8 部署集群,将应用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户 Jar 文件、JobGraph 对象等)上传至分布式存储 HDFS 中。

⭐1.9 封装 ApplicationMaster 参数和命令,生出 ClusterClientJobClient

⭐1.10 ClusterClientJobClient 向 Yarn ResourceManager 提交任务信息

2、启动 ApplicationMaster

⭐2 Yarn ResourceManager 收到提交的任务信息后,将分配 Container 资源,并通知对应的 NodeManager 启动一个 ApplicationMaster (每提交一个 Flink job 就会启动一个 ApplicationMaster)

3、作业提交

⭐3.1 ApplicationMaster 启动 Dispatcher 和 ResourceManager ;

⭐3.2 Dispatcher 启动 JobMaster (该步和 Session 不同,Jabmaster 是由 Dispatcher 拉起,而不是 Client 传过来的)。

JobMaster 负责作业调度,管理作业和 Task 的生命周期,构建 ExecutionGraph( JobGraph 的并行化版本,调度层最核心的数据结构。

4、作业调度执行

⭐4 JobMaster 向 ResourceManager 申请 Slot 资源,开始调度 ExecutionGraph。

⭐5 ResourceManager 将资源请求加入等待队列,通过心跳向 YarnResourceManager 申请新的 Container 来启动 TaskManager 进程。

⭐6 YarnResourceManager 启动,然后从 HDFS 加载 Jar 文件等所需相关资源,在容器中启动 TaskManager。

⭐7 TaskManager 在内部启动 TaskExecutor。

⭐8 TaskManager 启动后,向 ResourceManager 注册,并把自己的 Slot 资源情况汇报给 ResourceManager。

⭐9 ResourceManager 从等待队列取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManager 将 Slot 分配给哪个 JobMaster。

⭐10 TaskManager 向 JobMaster 回复自己的一个 Slot 属于你这个任务,JobMaser 会将 Slot 缓存到 SlotPool。

⭐11 JobMaster 调度 Task 到 TaskMnager 的 Slot 上执行。

2. 源码分析

包括三个部分:

  1. CliFrontend 客户端
  2. YarnJobClusterEntryPoint:AM 执行的入口类
  3. YarnTaskExecutorRunner:Yarn 模式下的 TaskManager 的入口类

1.1 提交命令

Flink on yarn 提交流程 - 图3

通过 flink on yarn per-job 模式提交,查看 flink 脚本可以看到,程序被提交后,会寻找 CliFrontend 类。

Flink on yarn 提交流程 - 图4

1.2 Main 方法执行

在 CliFrontend.main 方法中,会执行如下操作:

1 获取 flink conf 目录配置的路径; 2 根据 conf 目录加载配置; 3 根据三种方式 GenericCLI、flinkYarnSessionCLI、DefaultCLI按照顺序依次封装成命令行接口。 4 创建 CLiFrontend 客户端; 5 对 flink run 提交的命令行进行解析。(除过 flink 为脚本,run 及 后面的命令全部需要解析判断 ) Flink on yarn 提交流程 - 图5 具体如下源码: Flink on yarn 提交流程 - 图61.2.1 获取 flink conf 目录配置的路径; ⭐ CliFrontend.java 通过 getConfigurationDirectoryFromEnv 方法可以看到,这一步主要是用来获取 flink conf 的目录配置文件路径 Flink on yarn 提交流程 - 图7 1.2.2 根据 conf 目录加载配置; ⭐ GlobalConfiguration.java 在 loadConfiguration 方法中,加载之前获取的 conf 路径配置 Flink on yarn 提交流程 - 图8 1.2.3 选择创建的客户端类型; ⭐ CliFrontend.java 进入 loadCustomCommandLines 方法中,可以看到 这里依次添加了 GenericCLi、FlinkYarnSessionCli 和 DefaultCLi 三种命令行客户端(后面根据 isActive()按顺序选择): Flink on yarn 提交流程 - 图9 1.2.4 创建 CliFrontend 客户端 ⭐ CliFrontend.java Flink on yarn 提交流程 - 图10 1.2.5 调用 parseAndRun 解析参数 第五步:cli.parseAndRun(args) 为主要执行方法,我们进入该方法中进行查看: ⭐ CliFrontend.java 在 parseAndRun 方法中,主要是对之前提交的命令进行解析分析,我们以提交的命令行进行分析 flink run -t yarn-per-job -c xxx xxx.jar

其中 flink 为脚本,不算命令,从 run 开始,所以下图源码中的String action = args[0] 得到的第一个参数就是 run。说明提交方式为 run。

Flink on yarn 提交流程 - 图11

然后根据 run 动作解析输入参数

1.3 通过 run 方法解析输入参数

run 在 CliFrontend.java 类中

在 run 方法中主要执行以下五步操作:

  1. 使用 CliFrontendParser.getRunCommandOptions()获取默认的运行参数
  2. 使用 this.getCommandLine 根据用户指定的配置项进行解析,然后包装成commandLine
  3. 根据提交的命令选择对应的客户端
  4. 获取有效的配置信息
  5. 执行 executeProgram 进入用户自定义类中

Flink on yarn 提交流程 - 图12

我们主要分析一下 this.executeProgram

1.4 执行 this.executeProgram 进入到用户自定义类中

executeProgram() 在 CliFrontend.java 类中

通过调用 this.executeProgram 方法进入到用户定义的程序主类中,首先创建 StreamExecutionEnvironment 环境,然后接收自定义的 source,本文代码使用 kafka 作为 source 源,通过 transformation 对算子进行转换,最后执行 sink 操作,当提交集群时,需要执行 execute()

Flink on yarn 提交流程 - 图13

1.5 生成 StreamGraph

StreamGraph 在 StreamExecutionEnvironment.java 类生成

通过查看 execute() 方法,发现通过输入形参 jobMame 最终返回一个 JobExecutionResult,继续深入查看 getStreamGraph()方法

Flink on yarn 提交流程 - 图14

在 getStreamGraph 中,通过加载全局配置和转换操作来取得流图生成器,最后生成 StreamGraph

Flink on yarn 提交流程 - 图151.6 生成JobGraph

1.6.1 选择 pipelineExecutor

pipelineExecutor 在 StreamExecutionEnvironment.java 类生成

通过 getExecutorFactory(configuration) 选择 YarnJobClusterExecutorFactory 作为工厂类,选择 YarnJobClusterExecutor 作为 pipelineExecutor,

Flink on yarn 提交流程 - 图16

1.6.2 生成 jobGraph

jobGraph 在 AbstractJobClusterExecutor.java 类生成

当 pipelineExecutor 生选择之后,executorFactory 会调用 getExecutor(configuration) 获取之前的配置,然后执行 execute(streamGraph, configuration, userClassloader) 生成 jobClientFuture,在这其中会生成 JobGraph,如下图所示

Flink on yarn 提交流程 - 图17

Flink on yarn 提交流程 - 图18

进入 getJobGraph()方法

Flink on yarn 提交流程 - 图19

1.7 创建并启动 yarn 客户端

YarnClusterDescriptor 在 AbstractJobClusterExecutor.java 类生成

jobGraph 生成后,会进行如下操作:

  1. 会创建并启动 yarn 客户端;
  2. 获取集群配置参数;

Flink on yarn 提交流程 - 图20

1.8 部署集群,上传 jar 包和配置文件到 HDFS

集群部署在 YarnJobCluster.java类中

Flink on yarn 提交流程 - 图21

进入到 deployJobCluster()方法中,会获取 YarnJobClusterEntrypoint,启动 ApplicationMaster 的入口

Flink on yarn 提交流程 - 图22

ApplicationMaster 入口启动后,然后将 jar 包和配置文件上传到 HDFS 中。

1.9 封装 AM 参数和命令,生成 ClusterClientJobClient

⭐ YarnClusterDescriptor.java

在 YarnClusterDescriptor 类的 setupApplicationMasterContainer() 方法中会创建 AM 的容器启动上下文,然后封装 AM 参数和命令,生成 ClusterClientJobClient

1.10 提交任务信息

ClusterClientJobClient 生成后,会 进入到 YarnClientImpl 实现类中提交 jobGraph,提交方法为 submitApplication,具体执行流程图如下:

Flink on yarn 提交流程 - 图23

继续深入 执行 rmClient.submitApplication方法,在ApplicationClientProtocolPBClientImpl.java 类的 submitApplication 方法中会获取到http 报文,然后将获取到的报文发送到服务器,并将返回的结果构成 response

Flink on yarn 提交流程 - 图24

最后将应用请求提交到 Yarn 上的 RMAppManager 去提交任务。

Flink on yarn 提交流程 - 图25

2 启动 ApplicationMaster

在 1.8 部署集群的时候,已经启动了 ApplicationMaster

3.1 ApplicationMaster 创建 Dispatcher、ResourceManager

后面会保证大家更详细的看清楚 JobManager 和 TaskManager的执行过程,我将源码的代码复制出来,添加注释,帮助大家更好理解。

Per-job 模式的 AM container 加载运行入口是 YarnJobClusterEntryPoint 中的 main() 方法,具体如下:

Flink on yarn 提交流程 - 图26

Flink on yarn 提交流程 - 图27

Flink on yarn 提交流程 - 图28

我们进入 runCluster 方法中,

1 、创建 dispatcher、ResourceManager 对象的工厂类。其中包含从本地重新创建 JobGraph 的过程。

2、通过工厂类创建 dispatcher、ResourceManager 对象,Entry 启动 RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore 等

Flink on yarn 提交流程 - 图29

我们调用 create()方法,该方法会执行以下操作:

1、创建接收前端 Rest 请求的节点

2、创建 ResourceManager对象,返回是 new YarnResourceManager

3、创建 dispatcherRunner 对象并启动

4、启动 ResourceManager

Flink on yarn 提交流程 - 图30

3.2 Dispatcher 启动 JobManager

Dispatcher.java Flink on yarn 提交流程 - 图31 ### 3.3 ResourceManager 启动 SlotManager ResourceManager.java Flink on yarn 提交流程 - 图32 3.3.1 创建 Yarn 的 RM 和 NM 客户端 ActiveResourceManager.java Flink on yarn 提交流程 - 图33 YarnResourceManagerDriver.java 在 YarnResourceManagerDriver 里面创建和启动 yarn 的 resourcemanager 客户端,创建和启动 yarn 的 nodemanager 客户端 Flink on yarn 提交流程 - 图34 3.3.2 启动 SlotManager 如下为类之间的方法调用,最后在 checkTaskManagerTimeoutsAndRedundancy 方法中 进行判断: 如果没有 job 在运行,释放 taskmanager,保证随时有足够的 taskmanager Flink on yarn 提交流程 - 图35 ### 4 JobMaster 向 ResourceManager 申请资源 JobMaster 向 ResourceManager 申请 Slot 资源,开始调度 ExecutionGraph。
### 5 ResourceManager 申请资源 ResourceManager.java Flink on yarn 提交流程 - 图36 ### 6 TaskManager 启动 入口类为 YarnTaskExecutorRunner

Flink on yarn 提交流程 - 图37

进入到 runTaskManagerSecurely()方法中

Flink on yarn 提交流程 - 图38

7 TaskManager 启动

TaskExecutor.java

Flink on yarn 提交流程 - 图39

8 向 ResourceManager 注册 slot

TaskExecutor.java

Flink on yarn 提交流程 - 图40

9 ResourceManager 分配 Slot

SlotManagerImpl.java

Flink on yarn 提交流程 - 图41

10 TaskManager 提供 Slot

TaskExecutor.java

Flink on yarn 提交流程 - 图42

11 提交 Task 任务

JobMaster 调度 Task 到 TaskMnager 的 Slot 上执行

3. 源码提交流程总结

1、客户端(入口类 CliFrontend)

⭐1.1 执行启动脚本,进入 CliFrontend 类的 main 方法中,获取 flink conf 目录配置的路径,然后对其进行加载,同时依次添加 3 种客户端类型,并创建 CliFrontend 对象;

⭐1.2 在 main 中执行 parseAndRun 对提交的命令行参数进行解析;

⭐1.3 在解析命令时,根据提交的 run 模式选择对于的run方法,在run方法中选择 FlinkYarnSessionCli 作为客户端;

⭐1.4 在 run 方法中调用 executeProgram 进入用户自定义代码

⭐1.5 在用户自定义代码中执行 execute(),通过 getStreamGraph() 方法生成 streamGraph;

⭐1.6 选择 YarnJobClusterExecutor 作为 pipelineExecutor,并生成 jobGraph;

1.7 创建并启动 yarn 客户端,获取集群配置参数

1.8 部署集群,将应用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户 Jar 文件、JobGraph 对象等)上传至分布式存储 HDFS 中。

⭐1.9 封装 ApplicationMaster 参数和命令,生出 ClusterClientJobClient

⭐1.10 ClusterClientJobClient 向 Yarn ResourceManager 提交任务信息

2、启动 ApplicationMaster

⭐2 Yarn ResourceManager 收到提交的任务信息后,将分配 Container 资源,并通知对应的 NodeManager 启动一个 ApplicationMaster (每提交一个 Flink job 就会启动一个 ApplicationMaster)

3、作业提交

⭐3.1 ApplicationMaster 启动 Dispatcher 和 ResourceManager ;

⭐3.2 Dispatcher 启动 JobMaster (该步和 Session 不同,Jabmaster 是由 Dispatcher 拉起,而不是 Client 传过来的)。

JobMaster 负责作业调度,管理作业和 Task 的生命周期,构建 ExecutionGraph( JobGraph 的并行化版本,调度层最核心的数据结构。

4、作业调度执行

⭐4 JobMaster 向 ResourceManager 申请 Slot 资源,开始调度 ExecutionGraph。

⭐5 ResourceManager 将资源请求加入等待队列,通过心跳向 YarnResourceManager 申请新的 Container 来启动 TaskManager 进程。

⭐6 YarnResourceManager 启动,然后从 HDFS 加载 Jar 文件等所需相关资源,在容器中启动 TaskManager。

⭐7 TaskManager 在内部启动 TaskExecutor。

⭐8 TaskManager 启动后,向 ResourceManager 注册,并把自己的 Slot 资源情况汇报给 ResourceManager。

⭐9 ResourceManager 从等待队列取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManager 将 Slot 分配给哪个 JobMaster。

⭐10 TaskManager 向 JobMaster 回复自己的一个 Slot 属于你这个任务,JobMaser 会将 Slot 缓存到 SlotPool。

⭐11 JobMaster 调度 Task 到 TaskMnager 的 Slot 上执行。