Yarn集群启动
四个阶段:Client提交Flink启动集群请求,RM拉起AM,AM拉起TM容器,容器内启动TM。
YarnClient创建Flink应用阶段
0x01 获取Yarn集群信息并创建应用
以下所有请求均是Client向yarn.resourcemanager.address = RM.ASM = 8032 发送。
获取全部节点信息:ApplicationClientProtocol.getClusterNodes
返回:节点ID(节点ID里有主机名和端口)、状态、机架名(rack)、http地址、总资源、已使用资源等信息 。
获取队列信息请求:ApplicationClientProtocol.getQueueInfo
创建应用请求:ApplicationClientProtocol.getNewApplication
返回:单调递增ApplicationId(例如application_1570379089091_0019)和maxCapability(单Container最大可申请内存和VCores)
0x02 上传依赖到HDFS
所有依赖上传应用暂存目录/user/${linux_user_name}/.flink/${yarn_application_id}。应用结束会删掉此目录。
依赖分三类:conf、lib、plugins。
- conf目录下三个文件:flink-conf.yaml、logback.xml、log4j.properties。
- lib目录下所有文件:flink-dist_2.11-1.9.0.jar、flink-table_2.11-1.9.0.jar、slf4j-log4j12-1.7.15.jar等。
- plugins目录下所有文件:README.txt等。
注意:
conf目录下文件和flink-dist2.11-1.9.0.jar文件比较特殊,上传到暂存目录的根目录。其他文件和在本地相对路径一样直接映射到HDFS。
flink-conf.yaml文件名会变为${yarn_application_id}-flink-conf.yaml${random}.tmp (random是随机的JavaLong类型正整数),TM启动时不会用这个文件,因为在Yarn中AM的地址和端口是动态的。
最后调用ClientProtocol.setPermission
给暂存目录设置700权限。
Flink on Yarn 缓存文件类型为LocalResourceType._FILE,可见性为_LocalResourceVisibility._APPLICATION。
_
0x03 提交应用到Yarn集群,轮询获取提交结果
调用后RM会开始拉起AM。会发送所有依赖在HDFS上的路径和程序执行入口主类等信息。ApplicationClientProtocol.submitApplication
Yarn-Session的入口主类是YarnSessionClusterEntrypoint,Per-Job是YarnJobClusterEntrypoint。
插曲:
由于Yarn的yarn.resourcemanager.store.class配置成ZKRMStateStore,所以RM收到Client的提交后,把所有启动AM的元信息(submitApplication提交的信息)通过ZK:2181写到/rmstore/ZKRMStateRoot/RMAppRoot/${yarn_application_id}节点。ZK:2888端口在节点间同步数据。作用是应用挂了后,从ZK读元信息来重启恢复应用。
submitApplication收到返回后,每250毫秒发送ApplicationClientProtocol.getApplicationReport
一次,这样不停轮询。
轮询一共四种返回,前三种代表启动前的AM的三种状态ACTIVATED、ASSIGNED、LAUNCHED,第四种返回AM信息。
前三种具体返回:
ACTIVATED
Application is Activated, waiting for resources to be assigned for AM
Details : AM Partition = <DEFAULT_PARTITION> ;
Partition Resource = <memory:6144, vCores:6> ;
Queue's Absolute capacity = 100.0 % ;
Queue's Absolute used capacity = 0.0 % ;
Queue's Absolute max capacity = 100.0 %;
ASSIGNED
Scheduler has assigned a container for AM, waiting for AM container to be launched
LAUNCHED
AM container is launched, waiting for AM container to Register with RM
当AM启动调用ApplicationMasterProtocol.``registerApplicationMaster
发送host、rpc_port、tracking_url到RM.scheduler:8030后,轮询出现第四种返回,应用创建成功。
tail3 主机名
http://tail2:8088/proxy/application_1570379089091_0019/ tracking_url web后台的跳转url
http://tail3:36037 AM服务端口
如果RM没收到AM注册,则再次尝试拉起AM attemptid会加一。重复_ACTIVATED、ASSIGNED、LAUNCHED这三个状态,直到成功拉起,或达到最大阈值。
Flink设置的的重启最大次数会被Yarn本身限制住,不能超过Yarn本身的最大值。也就是说下面这俩参数以小的会生效。
yarn.application-attempts: 10 #flink配置
yarn.resourcemanager.am.max-attempts: 100 #yarn配置
getApplicationReport返回AM注册的信息后改为1秒轮询一次。
RM拉起AM阶段
0x00 创建日志聚合目录
由于Yarn开启了日志聚合(yarn.log-aggregation-enable设置true)在HDFS创建目录 ${remoteRootLogDir}/${user}/${suffix}/${appid}(例如/tmp/logs/root/logs/application_1570379089091_0027)
${remoteRootLogDir}=yarn.nmodemanager.remote-app-log-dir默认/tmp/logs
${suffix}=yarn.nmodemanager.remote-app-log-dir-suffix默认logs
设置目录权限770
0x01 RM向NM发起启动容器请求
RM先向NM通过DIGEST-MD5算法认证自己。
认证后,RM向NM发送 ContainerManagementProtocol.startContainers
远程调用
参数ContainerLaunchContext和Token。ContainerLaunchContext最主要三个属性:依赖资源在HDFS上的路径、环境变量、启动容器运行命令(命令就是运行YarnSessionClusterEntrypoint这个Java入口类)。
如果Container的Token没验证通过会返回
new InvalidToken(
"Invalid container token used for starting container on : "
+ context.getNodeId().toString());
真实例子:
Invalid container token used for starting container on : tail1:33016
Token校验失败会重新申请容器再拉起。
0x02 资源本地化
NM收到请求后,从HDFS下载运行所需的所有依赖至Container工作目录(路径为: ${yarn.nodemanager.local-dirs}/usercache/${user}/appcache//);
0x03 容器启动
运行YarnSessionClusterEntrypoint这个Java类。
启动Flink的ResourceManager包含YarnResourceManager(YarnResourceManager extends ResourceManager
*0x04 HA模式额外步骤
创建HDFS目录 /${high-availability.storageDir}/${application_id}/blob
通过ZK进行选主:
但是和Standalone不同的是,只会启动一个AM,所以这个AM指定会获得Leader(Active)。
向ZK写AM元数据:
写启动AM的容器上下文到/rmstore/ZKRMStateRoot/RMAppRoot/${application_id}
写AM容器主机、端口、Token到/rmstore/ZKRMStateRoot/RMAppRoot/${application_id}/${app_attempt_id}。attempt几次就写几个。
0x05 AM注册并开始心跳
AM向RM发送远程调用ApplicationMasterProtocol.registerApplicationMaster
传递host、rpc_port、tracking_url到RM.scheduler:8030进行注册。
注册成功后每隔5秒ApplicationMasterProtocol.allocate
进行一次心跳,AM申请Worker容器也是同一个TCP连接内使用allocate远程方法进行申请。即心跳和申请容器是同一个方法,所以在申请容器时allocate调用间隔会小于5秒,等申请完毕后恢复正常心跳频率。
Flink的RM请求拉起TM阶段
0x01 AM向RM申请容器
FlinkResourceManager向RM.scheduler:8030请求调用ApplicationMasterProtocol.allocate
分配容器,由于此请求也用于5秒一次心跳,此时allocate会极小于5秒调用一次。RM返回分配的NM主机名、端口、ContainerToken。
0x02 生成taskmanager-conf.yaml
生成taskmanager-conf.yaml并上传HDFS暂存目录。文件格式为${UUID.random.UUID()}-taskmanager-conf.yaml,前缀是随机UUID。
例如:/user/root/.flink/application_1570379089091_0019/6de714ea-9cf5-4389-ac91-c8f066d529e9-taskmanager-conf.yaml
taskmanger-conf.yaml和flink-conf.yaml区别是TM里面有动态生成的Flink Master地址(JM和YarnAM在一台机器上所以可以获得地址)。供TM启动时读取。两文件具体区别如下:
f3c83617-2943-4ef2-8555-5eece7f4465c-taskmanager-conf.yaml
web.port: 0
jobmanager.rpc.address: host3
taskmanager.memory.size: 264241152b
web.tmpdir: /tmp/flink-web-9a2d9747-1524-45cd-9e00-2838142dc9b4
jobmanager.rpc.port: 46396
rest.address: host3
application_1570379089091_0011-flink-conf.yaml3560774894264262758.tmp
jobmanager.rpc.address: host1
jobmanager.rpc.port: 6123
0x04 AM向NM拉起容器
AM写完taskmanager-conf.yaml就带Token向分配NM的ContainerManagementProtocol.startContainers
TM容器启动这个类YarnTaskExecutorRunner。
*0x05 AM不断尝试拉起TM
如果TM Container没拉起来。会重复上面所有步骤。重复过多RM会换个NM节点返回给AM继续尝试拉起TM Container。每隔10秒重复一次。
0x06 TM与RM建立心跳
NM启动TM容器阶段
0x01 读HDFS依赖
TM读HDFS上的taskmanager-conf.yaml以及其他依赖
0x02 TM注册以及上报Slot信息
TM向FlinkRM发送远程调用ResourceManagerGateway.registerTaskExecutor
、ResourceManagerGateway.sendSlotReport
注册TM以及上报Slot信息。和Standalone模式一样上文已经有详解。