HDFS上的存储
Flink依赖的HDFS存储路径为/user/${linux_user_name}/.flink/${yarn_application_id}
主要包括
- flink/lib
- flink/plugins
- log4j.properties
- logback.xml
- flink-dist_2.11-1.9.0.jar 原本放在flink/lib中,在HDFS的这个目录被提了出来
- ${UUID.random.UUID()}-taskmanager-conf.yaml
- ${yarn_application_id}-flink-conf.yaml${random}.tmp (random是随机的JavaLong类型正整数)
- 运行yarn-session.sh后,资源是否够用(内存和vcore)。
- jar包和配置上传HDFS
- 向Yarn的RM申请一个容器启动AM(AM和FlinkJM在一个容器中)
- 生成新的配置文件上传HDFS(目的TM可以根据新的文件获知JM的位置)
其他HDFS相关目录
high-availability.storageDir: hdfs://tail1:9000/flink/ha/
/flink/ha/completedCheckpoint15edb8fd10df
/flink/ha/submittedJobGraph018da5f8af06
/flink/ha/application_1570379089091_0001
/flink/ha/default #Standalone模式
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
env.setStateBackend(new FsStateBackend(s"hdfs://${HOST}:9000/flink/checkpoint"))
/flink/checkpoint == checkpointDataUri 代码和配置文件都可设置
/flink/checkpoint/61f94fa056e171f7f6180be75d22cf79/chk-80
/flink/checkpoint/61f94fa056e171f7f6180be75d22cf79/shared
/flink/checkpoint/61f94fa056e171f7f6180be75d22cf79/taskowned
getApplicationReport详细返回
ApplicationClientApplicationClientProtocol.submitApplication 到 RM.ASM:8032后每个0.25秒轮询一次ApplicationClientProtocol.getApplicationReport。
轮询期间RM的初始返回
Application is Activated, waiting for resources to be assigned for AM.
Details :
AM Partition = <DEFAULT_PARTITION> ;
Partition Resource = <memory:6144, vCores:6> ;
Queue's Absolute capacity = 100.0 % ;
Queue's Absolute used capacity = 0.0 % ;
Queue's Absolute max capacity = 100.0 % ;
RM执行ContainerManagementProtocol.startContainers org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint后轮询返回
Scheduler has assigned a container for AM, waiting for AM container to be launchedX
container is launched, waiting for AM container to Register with RMX
ApplicationMasterProtocol.registerApplicationMaster发送host,rpc_port,tracking_url到RM.scheduler:8030后轮询返回
root..default".Flink session cluster
tail3
http://tail2:8088/proxy/application_1570379089091_0019/
http://tail3:36037
停止轮询。AM拉起完成
Yarn-Session抓包明细
4,5 23527 ZK2888同步
18 23515 RM向ZK写 App信息 2181 送 /rmstore/ZKRMStateRoot/RMAppRoot/application_1570379089091_0019
一 25 286-316 ApplicationClientProtocol.getClusterNodes、getQueueInfo、getNewApplication 8032 yarn.resourcemanager.address
二 26 337-23506 Client写所有依赖NN:9000 最后setPermission 目录
三 40 23507-41244 ApplicationClientProtocol.submitApplication 到 8032
getApplicationReport
root..default”.Flink session cluster.N/A0……….@.J7http://tail2:8088/proxy/application_1570379089091_0019/R..[……… …… 10 17:28:24 +0800 2019] Application is Activated, waiting for resources to be assigned for AM. Details : AM Partition =
……..-………..Apache Flink………….
23585-23655
root..default”.Flink session cluster.N/A0……….@.J7http://tail2:8088/proxy/application_1570379089091_0019/R|[……… …… 10 17:28:24 +0800 2019] Scheduler has assigned a container for AM, waiting for AM container to be launchedX…..-`.h.r(……….”…..…..0.8.E….M….P.X.z.N/A..
……..-………..Apache Flink………….
23567-38870
root..default”.Flink session cluster.N/A0……….@.J7http://tail2:8088/proxy/application_1570379089091_0019/RFAM container is launched, waiting for AM container to Register with RMX…..-`.h.r………..”…..*……0.8.EVU.AMVU.AP.X.z.N/A..
……..-………..Apache Flink………….
38901 Application is Activated, waiting for resources to be assigned for AM. Details : AM Partition =
38957Scheduler has assigned a container for AM, waiting for AM container to be launchedX
39004-39838container is launched, waiting for AM container to Register with RMX
39850
root..default”.Flink session cluster.tail30…@.J7http://tail2:8088/proxy/application_1570379089091_0019/R.X.....-`.h.r,………..”…..……0…8cEVU.AMVU.AP.X.z.http://tail3:36037..
四 41 42 23660-23702 AM mkdir /tmp/logs/root/logs/application_1570379089091_0019 ${remoteRootLogDir}/${user}/${suffix}/${appid}
remoteRootLogDir yarn.nodemanager.remote-app-log-dir默认/tmp/logs。suffix == yarn.nodemanager.remote-app-log-dir-suffix 默认 logs。
RM的ApplicationMasterLauncher
五 43 23806 RM读HDFS
六 71 38943-39411 RM 先交换Token 然后 ContainerManagementProtocol.startContainers org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint
AM发起
72 73 74 38985-39016-39213 AM读HDFS
77 39802-39843 39848-41183 AM向RM ApplicationMasterProtocol registerApplicationMaster allocate
flink on yarn 的prelaunch 是什么?
提交任务抓包明细
7 AM allocate 20811-20846 21049-21051
AM to RMscheduler:8030 申请Worker容器
9 121 AM getApplicationReport http://tail3:36037
Client(运行Yarn-Session的Client不是一个)获取AM主机和地址。
20 Post http://tail3:36037 提交Job 5281-20512
24 create HDFS taskmanager-conf 20852-20913 全getFileInfo完是 21121 //todo
AM
mkdir
flink/checkpoint/cd68c50927a2c0121e74891b2f10938d/shared
flink/checkpoint/cd68c50927a2c0121e74891b2f10938d/taskowned
addBlock
/user/root/.flink/application_1570379089091_0019/6de714ea-9cf5-4389-ac91-c8f066d529e9-taskmanager-conf.yaml
35 20879-20909 AM写taskmanager-conf DN tail2
37 21004-21007 startContainers token非法
39 21062-21091 AM写taskmanager-conf DN tail1
40 startContainers 21151-21156 return 75 mapreduce_shuffle
AM 发给NM带Token
42 tm container 读 tm-conf 21255-62
43 读DN 21256 TCP握手 21266-77
TM读tm-conf
63 NamenodeProtocol.getTransactionId和rollEditLog 60秒一次
Closes the current edit log and opens a new one
The call fails if the file system is in SafeMode.
DN 发给NN
68 TM向FlinkRM 注册通信
RemoteHandshakeMessage
HandshakeSuccessMessage
registerTaskExecutor 向RM
RemoteHandshakeMessage
HandshakeSuccessMessage
RegistrationResponse$Success
sendSlotReport
requestSlot
registerTaskManager 向JM
RegistrationResponse$Success
offerSlots
73 15->14发送WordCountJar 22534-44040
86 44253 mkdirs /flink/checkpoint/cd68c50927a2c0121e74891b2f10938d/chk-1 44431 delete chk-1
Yarn-Session抓包
YarnJobClusterEntrypoint
Client向RM获取所有NM、队列,申请Application实例。
9:75 - 91 getClusterNodes、getQueueInfo、getNewApplication、getClusterNodes
9:18403-19023 getApplicationReport
Client上传依赖到HDFS
10-21:113-18375
Client提交Application给RM
9:18381-18388 submitApplication
RM拉起AM
22:18443-18461
AM创建Log目录
23:18471-18475 getFileInfo/tmp/logs
24:18476-18489 getFileInfo /tmp/logs/root/logs/application_1570379089091_0024
24:18495-18502 mkdirs setPermission
AM读HDFS依赖
25:18503-18625
216: 34949
AM mkdirs /flink/ha/application_1570379089091_0013/blob
AM通过ZK选主
AM向RM注册
28:18991-19009
AM向RM申请Worker容器。
28:19397-
申请第二次Worker
28:33945-
AM写tm-conf
34:19298-19313 mkdirs checkpoint目录
34-35:19412-19483 写tm-conf并getFileInfo其他依赖。(目的有可能是看看是否都存在)
*HDFS的DataNode自己的机制。
29 NamenodeProtocol.getTransactionId、rollEditLog
AM拿着Token向NM申请Container
36:19487-19506
NM getFileInfo log目录(目的判断是否存在?)
37:19500-19510
38:19514-19520
TM读HDFS依赖
39:19521-28893
40:
Per-Job-no-ha
20:38965 Client拿到AM主机和端口
37:38973 AM向RM allocate
Client POST AM JobGraph。FlinkRM会直接拉起TM不会等到Dispatcher接到JobGraph
38:39053-39105-50692
41:50763-50778 AM mkdirs checkpoint
41:50933-50995 AM getFileInfo所有依赖
55:51011-51025 TM读task-conf
Per-Job会出现不停拉TM现象
per-job-no-ha2
4 20359 rm 写 zk 每次attempt也会写
7 submitApp 20350 client to rm 21456获得AM地址
8-25 client write hdfs
26-27 RM? mkdirs /tmp/logs/root/logs/application_1570379089091_0028
28 RM?读依赖
29 RM startContainer AM
32-33 AM getFileInfo /tmp/logs/root/logs/application_1570379089091_0028
34 AM read HDFS
35 AM register and allocate
38 AM mkdir checkpoint、
20秒轮询拉起一次
38,39 AM write TM-conf、getFileInfo 所有依赖
40 AM startContainer NM Worker 返回 mapreduce_shuffle
41,42 Worker读TM-conf
50,51 AM write TM-conf、getFileInfo 所有依赖
52 AM startContainer NM Worker 返回 mapreduce_shuffle
53, Worker读TM-conf
AM write TM-conf、getFileInfo 所有依赖 ->AM startContainer NM Worker -> Worker读TM-conf
54,55 -> 56 -> 57,58
60,61 -> 62 -> 63,64
23585-23647-> 23648-23666 -> 23667-23691
68,69 -> 70 -> 71,72
74
78
86 tm to 大jobmananger
88 tm to hdfs checkpoint 09df75ca8bb8c125981faf3ef8900cd4
90 am to hdfs checkpoint 有delete 老编号checkpoint 目录
92- tm write checkpoint to hdfs datanode
抓包明细
tcp.stream eq 7-17 FlinkYarnClient把Jar包和配置上传HDFS,
RM向一个NM请求拉起Container(跑AM的)
18 8270 向RM拉起AM容器,Flink启动类YarnSessionClusterEntrypoint startContainers
19-20 8275 AM创建在HDFS上创建logs目录,21读依赖信息
21 8302-8384 AM从HDFS读所需资源
AM向NM请求拉起容器
33 mk ha dir,
36选举Flink Leader ,/flink/${yarn_application_id}/leaderlatch和leader
38 15141 ApplicationMasterProtocol registerApplicationMaster
5秒一次allocate
拉起worker也是allocate
23812
ContainerToken”.192.168.124.14:40349
失败
Exception from container-launch.
Container id: container_e08_1570379089091_0013_01_000009
Exit code: 1
成功
34649 NM Token”.192.168.124.15:37474
40 15446 读zk flink leader 信息,谁读的还需要确认。
44-46,写AM写这三个 flink/ha/application_1570379089091_0013/blob /flink/ha/submittedJobGraph01f99fe0f0bd /user/root/.flink/application_1570379089091_0013/82d26394-1c91-4582-b8b7-2f52d7d2bb20-taskmanager-conf.yaml
tm-conf 23847写
61 62 获取hdfs log 信息
写配置,向NM请求容器,读配置 10秒重复一次 startContainers ContainerManagementProtocolPB
TM Container 启动这个类YarnTaskExecutorRunner
58,59,64;72,73,74,75;82,83,84,85;91,92,93,94;103,104,105,106;129,130,131,132;145,154,155,156;194,195,196,197;206,207,209(缺少拉起)
226 232 238-243 245 246 252-254 256(完成?) 260-263 271-274 检查点
Per-Job 抓包备忘
jobmanager.execution.failover-strategy: region
high-availability.cluster-id: application_1572569671434_0003
jobmanager.rpc.address: tail1
FLINK_PLUGINS_DIR: /opt/flink/plugins
jobmanager.rpc.port: 6123
internal.cluster.execution-mode: DETACHED
parallelism.default: 2
yarn.application-attempts: 10
taskmanager.numberOfTaskSlots: 2
web.submit.enable: true
taskmanager.heap.size: 1024m
jobmanager.heap.size: 2048m
25
submitApplication的时候把jobgrah带上 和用户jar包
27
Invalid container token used for starting container on : ${host}:${port}
30 AM读HDFSapplication_1572569671434_00035234493208555882243.tmp里是JobGraph
日志
Cancel Job
- Job Running -> Canceling
- ExecutionGraph Vertex RUNNING -> Canceling。所有Vertex Canceling后 -> Canceled
- Job Canceling -> Canceled
- 关闭 checkpoint coordinator
- Dispatcher 接收到 CANCELED后 关闭JM
- suspend SlotPool
- 关闭 SlotPool
- 与RM断开连接。
Per-Job Cancel Job
49
GET /proxy/application_1572569671434_0004/jobs/eb6527ecaab2a35480cb93a9a40d66c1/yarn-cancel HTTP/1.1
115 TaskManagerGateway.freeSlot Stopping JobMaster for job AM to TM 二连发
118 freeSlot Stopping JobMaster for job AM to TM 二连发
124
notifySlotAvailable
125TaskExecutorGateway.disconnectJobManager(JobId)
131notifySlotAvailable
0x04 to JM
138
taskmanager_0 has no more allocated slots for job
139
closeJobManagerConnectionIfNoAllocatedResourcess
286-417 ApplicationMasterProtocol.finishApplicationMaster
417
finishApplicationMaster
291-398
RM收到finishApplicationMaster后会请求ZK /rmstore/ZKRMStateRoot/RMAppRoot/${application_id}
/rmstore/ZKRMStateRoot/RMAppRoot/application_1572569671434_0005
291-398
RM收到finishApplicationMaster后会请求ZK /rmstore/ZKRMStateRoot/RMAppRoot/${application_id}
/rmstore/ZKRMStateRoot/RMAppRoot/application_1572569671434_0005
421
delete 删除暂存目录
AM先关TM容器,RM关AM容器。
427 459 541 569
Dispatcher接收JobGraph
- Dispatcher拉起一个JM
//TODO Failover 策略需要探索
- JM选择Failover策略
- Job Created -> Running
- ExecutionGraph Vertex CREATED -> SCHEDULED
- JM向RM注册自己
- 向RM请求Slot。即JM里的SlotPool向RM里的SlotManager请求Slot。TM在收到第一个Slot请求时就会向Job所属JM注册自己。TM向JM提供(Offer)Slot,之后Slot状态Allocated_ -> _Active。
- ExecutionGraph Vertex SCHEDULED -> DEPLOYING。每向Slot部署一个Vertex就将当Vertex状态切换为DEPLOYING
- 从Sink开始反向沿着DAG图 逐一将Vertex DEPLOYING -> RUNNING
//TODO 部署方法除了Round Robin其他有待探寻
//TODO 提交一个有chain的Job再看下日志
2019-09-26 22:11:23,722 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job recovers via failover strategy: New Pipelined Region Failover
2019-09-26 22:11:23,739 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy - Start building failover regions.
2019-09-26 22:11:23,750 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy - Created 1 failover regions.