Yarn集群启动

四个阶段:Client提交Flink启动集群请求,RM拉起AM,AM拉起TM容器,容器内启动TM。

YarnClient创建Flink应用阶段

0x01 获取Yarn集群信息并创建应用

以下所有请求均是Client向yarn.resourcemanager.address = RM.ASM = 8032 发送。

获取全部节点信息:ApplicationClientProtocol.getClusterNodes
返回:节点ID(节点ID里有主机名和端口)、状态、机架名(rack)、http地址、总资源、已使用资源等信息 。

获取队列信息请求:ApplicationClientProtocol.getQueueInfo

创建应用请求:ApplicationClientProtocol.getNewApplication
返回:单调递增ApplicationId(例如application_1570379089091_0019)和maxCapability(单Container最大可申请内存和VCores)

0x02 上传依赖到HDFS

所有依赖上传应用暂存目录/user/${linux_user_name}/.flink/${yarn_application_id}。应用结束会删掉此目录。
依赖分三类:conf、lib、plugins。

  1. conf目录下三个文件:flink-conf.yaml、logback.xml、log4j.properties。
  2. lib目录下所有文件:flink-dist_2.11-1.9.0.jar、flink-table_2.11-1.9.0.jar、slf4j-log4j12-1.7.15.jar等。
  3. plugins目录下所有文件:README.txt等。

注意:
conf目录下文件和flink-dist2.11-1.9.0.jar文件比较特殊,上传到暂存目录的根目录。其他文件和在本地相对路径一样直接映射到HDFS。
flink-conf.yaml文件名会变为${yarn_application_id}-flink-conf.yaml${random}.tmp (random是随机的JavaLong类型正整数),TM启动时不会用这个文件,因为在Yarn中AM的地址和端口是动态的。
最后调用ClientProtocol.setPermission给暂存目录设置700权限。
Flink on Yarn 缓存文件类型为LocalResourceType._FILE,可见性为_LocalResourceVisibility._APPLICATION。

_

0x03 提交应用到Yarn集群,轮询获取提交结果

调用后RM会开始拉起AM。会发送所有依赖在HDFS上的路径和程序执行入口主类等信息。
ApplicationClientProtocol.submitApplication
Yarn-Session的入口主类是YarnSessionClusterEntrypoint,Per-Job是YarnJobClusterEntrypoint。

插曲:
由于Yarn的yarn.resourcemanager.store.class配置成ZKRMStateStore,所以RM收到Client的提交后,把所有启动AM的元信息(submitApplication提交的信息)通过ZK:2181写到/rmstore/ZKRMStateRoot/RMAppRoot/${yarn_application_id}节点。ZK:2888端口在节点间同步数据。作用是应用挂了后,从ZK读元信息来重启恢复应用。

submitApplication收到返回后,每250毫秒发送ApplicationClientProtocol.getApplicationReport一次,这样不停轮询。

轮询一共四种返回,前三种代表启动前的AM的三种状态ACTIVATED、ASSIGNED、LAUNCHED,第四种返回AM信息。
前三种具体返回:
ACTIVATED

  1. Application is Activated, waiting for resources to be assigned for AM
  2. Details : AM Partition = <DEFAULT_PARTITION> ;
  3. Partition Resource = <memory:6144, vCores:6> ;
  4. Queue's Absolute capacity = 100.0 % ;
  5. Queue's Absolute used capacity = 0.0 % ;
  6. Queue's Absolute max capacity = 100.0 %;

ASSIGNED

  1. Scheduler has assigned a container for AM, waiting for AM container to be launched

LAUNCHED

  1. AM container is launched, waiting for AM container to Register with RM

当AM启动调用ApplicationMasterProtocol.``registerApplicationMaster发送host、rpc_port、tracking_url到RM.scheduler:8030后,轮询出现第四种返回,应用创建成功。

  1. tail3 主机名
  2. http://tail2:8088/proxy/application_1570379089091_0019/ tracking_url web后台的跳转url
  3. http://tail3:36037 AM服务端口

如果RM没收到AM注册,则再次尝试拉起AM attemptid会加一。重复_ACTIVATED、ASSIGNED、LAUNCHED这三个状态,直到成功拉起,或达到最大阈值。
Flink设置的的重启最大次数会被Yarn本身限制住,不能超过Yarn本身的最大值。也就是说下面这俩参数以小的会生效。

  1. yarn.application-attempts: 10 #flink配置
  2. yarn.resourcemanager.am.max-attempts: 100 #yarn配置

getApplicationReport返回AM注册的信息后改为1秒轮询一次。

RM拉起AM阶段

0x00 创建日志聚合目录

由于Yarn开启了日志聚合(yarn.log-aggregation-enable设置true)在HDFS创建目录 ${remoteRootLogDir}/${user}/${suffix}/${appid}(例如/tmp/logs/root/logs/application_1570379089091_0027)
${remoteRootLogDir}=yarn.nmodemanager.remote-app-log-dir默认/tmp/logs
${suffix}=yarn.nmodemanager.remote-app-log-dir-suffix默认logs
设置目录权限770

0x01 RM向NM发起启动容器请求

RM先向NM通过DIGEST-MD5算法认证自己。
认证后,RM向NM发送 ContainerManagementProtocol.startContainers 远程调用
参数ContainerLaunchContextTokenContainerLaunchContext最主要三个属性:依赖资源在HDFS上的路径、环境变量、启动容器运行命令(命令就是运行YarnSessionClusterEntrypoint这个Java入口类)。
如果Container的Token没验证通过会返回

  1. new InvalidToken(
  2. "Invalid container token used for starting container on : "
  3. + context.getNodeId().toString());

真实例子:
Invalid container token used for starting container on : tail1:33016
Token校验失败会重新申请容器再拉起。

0x02 资源本地化

NM收到请求后,从HDFS下载运行所需的所有依赖至Container工作目录(路径为: ${yarn.nodemanager.local-dirs}/usercache/${user}/appcache//);

0x03 容器启动

运行YarnSessionClusterEntrypoint这个Java类。
启动Flink的ResourceManager包含YarnResourceManager(YarnResourceManager extends ResourceManagerSlotManager两个子组件。

*0x04 HA模式额外步骤

创建HDFS目录 /${high-availability.storageDir}/${application_id}/blob
通过ZK进行选主:
但是和Standalone不同的是,只会启动一个AM,所以这个AM指定会获得Leader(Active)。
向ZK写AM元数据:
写启动AM的容器上下文到/rmstore/ZKRMStateRoot/RMAppRoot/${application_id}
写AM容器主机、端口、Token到/rmstore/ZKRMStateRoot/RMAppRoot/${application_id}/${app_attempt_id}。attempt几次就写几个。

0x05 AM注册并开始心跳

AM向RM发送远程调用ApplicationMasterProtocol.registerApplicationMaster传递host、rpc_port、tracking_url到RM.scheduler:8030进行注册。
注册成功后每隔5秒ApplicationMasterProtocol.allocate进行一次心跳,AM申请Worker容器也是同一个TCP连接内使用allocate远程方法进行申请。即心跳和申请容器是同一个方法,所以在申请容器时allocate调用间隔会小于5秒,等申请完毕后恢复正常心跳频率。

Flink的RM请求拉起TM阶段

0x01 AM向RM申请容器

FlinkResourceManager向RM.scheduler:8030请求调用ApplicationMasterProtocol.allocate分配容器,由于此请求也用于5秒一次心跳,此时allocate会极小于5秒调用一次。RM返回分配的NM主机名、端口、ContainerToken。

0x02 生成taskmanager-conf.yaml

生成taskmanager-conf.yaml并上传HDFS暂存目录。文件格式为${UUID.random.UUID()}-taskmanager-conf.yaml,前缀是随机UUID。
例如:/user/root/.flink/application_1570379089091_0019/6de714ea-9cf5-4389-ac91-c8f066d529e9-taskmanager-conf.yaml
taskmanger-conf.yaml和flink-conf.yaml区别是TM里面有动态生成的Flink Master地址(JM和YarnAM在一台机器上所以可以获得地址)。供TM启动时读取。两文件具体区别如下:

  1. f3c83617-2943-4ef2-8555-5eece7f4465c-taskmanager-conf.yaml
  2. web.port: 0
  3. jobmanager.rpc.address: host3
  4. taskmanager.memory.size: 264241152b
  5. web.tmpdir: /tmp/flink-web-9a2d9747-1524-45cd-9e00-2838142dc9b4
  6. jobmanager.rpc.port: 46396
  7. rest.address: host3
  8. application_1570379089091_0011-flink-conf.yaml3560774894264262758.tmp
  9. jobmanager.rpc.address: host1
  10. jobmanager.rpc.port: 6123

0x04 AM向NM拉起容器

AM写完taskmanager-conf.yaml就带Token向分配NM的ContainerManagementProtocol.startContainers
TM容器启动这个类YarnTaskExecutorRunner。

*0x05 AM不断尝试拉起TM

如果TM Container没拉起来。会重复上面所有步骤。重复过多RM会换个NM节点返回给AM继续尝试拉起TM Container。每隔10秒重复一次。

0x06 TM与RM建立心跳

NM启动TM容器阶段

0x01 读HDFS依赖

TM读HDFS上的taskmanager-conf.yaml以及其他依赖

0x02 TM注册以及上报Slot信息

TM向FlinkRM发送远程调用ResourceManagerGateway.registerTaskExecutorResourceManagerGateway.sendSlotReport注册TM以及上报Slot信息。和Standalone模式一样上文已经有详解。