HDFS上的存储

Flink依赖的HDFS存储路径为/user/${linux_user_name}/.flink/${yarn_application_id}
主要包括

  • flink/lib
  • flink/plugins
  • log4j.properties
  • logback.xml
  • flink-dist_2.11-1.9.0.jar 原本放在flink/lib中,在HDFS的这个目录被提了出来
  • ${UUID.random.UUID()}-taskmanager-conf.yaml
  • ${yarn_application_id}-flink-conf.yaml${random}.tmp (random是随机的JavaLong类型正整数)
  1. 运行yarn-session.sh后,资源是否够用(内存和vcore)。
  2. jar包和配置上传HDFS
  3. 向Yarn的RM申请一个容器启动AM(AM和FlinkJM在一个容器中)
  4. 生成新的配置文件上传HDFS(目的TM可以根据新的文件获知JM的位置)

其他HDFS相关目录

  1. high-availability.storageDir: hdfs://tail1:9000/flink/ha/
  2. /flink/ha/completedCheckpoint15edb8fd10df
  3. /flink/ha/submittedJobGraph018da5f8af06
  4. /flink/ha/application_1570379089091_0001
  5. /flink/ha/default #Standalone模式
  6. # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
  7. env.setStateBackend(new FsStateBackend(s"hdfs://${HOST}:9000/flink/checkpoint"))
  8. /flink/checkpoint == checkpointDataUri 代码和配置文件都可设置
  9. /flink/checkpoint/61f94fa056e171f7f6180be75d22cf79/chk-80
  10. /flink/checkpoint/61f94fa056e171f7f6180be75d22cf79/shared
  11. /flink/checkpoint/61f94fa056e171f7f6180be75d22cf79/taskowned

getApplicationReport详细返回

ApplicationClientApplicationClientProtocol.submitApplication 到 RM.ASM:8032后每个0.25秒轮询一次ApplicationClientProtocol.getApplicationReport。

轮询期间RM的初始返回

  1. Application is Activated, waiting for resources to be assigned for AM.
  2. Details :
  3. AM Partition = <DEFAULT_PARTITION> ;
  4. Partition Resource = <memory:6144, vCores:6> ;
  5. Queue's Absolute capacity = 100.0 % ;
  6. Queue's Absolute used capacity = 0.0 % ;
  7. Queue's Absolute max capacity = 100.0 % ;

RM执行ContainerManagementProtocol.startContainers org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint后轮询返回

  1. Scheduler has assigned a container for AM, waiting for AM container to be launchedX
  1. container is launched, waiting for AM container to Register with RMX

ApplicationMasterProtocol.registerApplicationMaster发送host,rpc_port,tracking_url到RM.scheduler:8030后轮询返回

  1. root..default".Flink session cluster
  2. tail3
  3. http://tail2:8088/proxy/application_1570379089091_0019/
  4. http://tail3:36037

停止轮询。AM拉起完成

Yarn-Session抓包明细

4,5 23527 ZK2888同步
18 23515 RM向ZK写 App信息 2181 送 /rmstore/ZKRMStateRoot/RMAppRoot/application_1570379089091_0019
一 25 286-316 ApplicationClientProtocol.getClusterNodes、getQueueInfo、getNewApplication 8032 yarn.resourcemanager.address
二 26 337-23506 Client写所有依赖NN:9000 最后setPermission 目录
三 40 23507-41244 ApplicationClientProtocol.submitApplication 到 8032
getApplicationReport
root..default”.Flink session cluster.N/A0……….@.J7http://tail2:8088/proxy/application_1570379089091_0019/R..[……… …… 10 17:28:24 +0800 2019] Application is Activated, waiting for resources to be assigned for AM. Details : AM Partition = ; Partition Resource = ; Queue’s Absolute capacity = 100.0 % ; Queue’s Absolute used capacity = 0.0 % ; Queue’s Absolute max capacity = 100.0 %; X…..-`.h.r(……….”…..…..0.8.E….M….P.X.z.N/A..
……..-………..Apache Flink…………. ………. UNLIMITED……….
23585-23655
root..default”.Flink session cluster.N/A0……….@.J7http://tail2:8088/proxy/application_1570379089091_0019/R|[……… …… 10 17:28:24 +0800 2019] Scheduler has assigned a container for AM, waiting for AM container to be launchedX…..-`.h.r(……….”…..…..0.8.E….M….P.X.z.N/A..
……..-………..Apache Flink…………. ………. UNLIMITED………..
23567-38870

root..default”.Flink session cluster.N/A0……….@.J7http://tail2:8088/proxy/application_1570379089091_0019/RFAM container is launched, waiting for AM container to Register with RMX…..-`.h.r………..”…..*……0.8.EVU.AMVU.AP.X.z.N/A..
……..-………..Apache Flink…………. ………. UNLIMITED……….

38901 Application is Activated, waiting for resources to be assigned for AM. Details : AM Partition = ; Partition Resource = ; Queue’s Absolute capacity = 100.0 % ; Queue’s Absolute used capacity = 0.0 % ; Queue’s Absolute max capacity = 100.0 % ; X…..-

38957Scheduler has assigned a container for AM, waiting for AM container to be launchedX
39004-39838container is launched, waiting for AM container to Register with RMX
39850
root..default”.Flink session cluster.tail30…@.J7http://tail2:8088/proxy/application_1570379089091_0019/R.X.....-`.h.r,………..”…..……0…8cEVU.AMVU.AP.X.z.http://tail3:36037..

四 41 42 23660-23702 AM mkdir /tmp/logs/root/logs/application_1570379089091_0019 ${remoteRootLogDir}/${user}/${suffix}/${appid}
remoteRootLogDir yarn.nodemanager.remote-app-log-dir默认/tmp/logs。suffix == yarn.nodemanager.remote-app-log-dir-suffix 默认 logs。

RM的ApplicationMasterLauncher
五 43 23806 RM读HDFS
六 71 38943-39411 RM 先交换Token 然后 ContainerManagementProtocol.startContainers org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint
AM发起
72 73 74 38985-39016-39213 AM读HDFS
77 39802-39843 39848-41183 AM向RM ApplicationMasterProtocol registerApplicationMaster allocate

flink on yarn 的prelaunch 是什么?

提交任务抓包明细

7 AM allocate 20811-20846 21049-21051
AM to RMscheduler:8030 申请Worker容器

9 121 AM getApplicationReport http://tail3:36037
Client(运行Yarn-Session的Client不是一个)获取AM主机和地址。

20 Post http://tail3:36037 提交Job 5281-20512

24 create HDFS taskmanager-conf 20852-20913 全getFileInfo完是 21121 //todo
AM
mkdir
flink/checkpoint/cd68c50927a2c0121e74891b2f10938d/shared
flink/checkpoint/cd68c50927a2c0121e74891b2f10938d/taskowned
addBlock
/user/root/.flink/application_1570379089091_0019/6de714ea-9cf5-4389-ac91-c8f066d529e9-taskmanager-conf.yaml

35 20879-20909 AM写taskmanager-conf DN tail2
37 21004-21007 startContainers token非法

39 21062-21091 AM写taskmanager-conf DN tail1
40 startContainers 21151-21156 return 75 mapreduce_shuffle
AM 发给NM带Token

42 tm container 读 tm-conf 21255-62
43 读DN 21256 TCP握手 21266-77
TM读tm-conf

63 NamenodeProtocol.getTransactionId和rollEditLog 60秒一次
Closes the current edit log and opens a new one
The call fails if the file system is in SafeMode.
DN 发给NN

68 TM向FlinkRM 注册通信

RemoteHandshakeMessage

HandshakeSuccessMessage
registerTaskExecutor 向RM
RemoteHandshakeMessage
HandshakeSuccessMessage
RegistrationResponse$Success

sendSlotReport

requestSlot

registerTaskManager 向JM
RegistrationResponse$Success
offerSlots

73 15->14发送WordCountJar 22534-44040

86 44253 mkdirs /flink/checkpoint/cd68c50927a2c0121e74891b2f10938d/chk-1 44431 delete chk-1

Yarn-Session抓包

YarnJobClusterEntrypoint
Client向RM获取所有NM、队列,申请Application实例。
9:75 - 91 getClusterNodes、getQueueInfo、getNewApplication、getClusterNodes

9:18403-19023 getApplicationReport

Client上传依赖到HDFS
10-21:113-18375

Client提交Application给RM
9:18381-18388 submitApplication

RM拉起AM
22:18443-18461

AM创建Log目录
23:18471-18475 getFileInfo/tmp/logs
24:18476-18489 getFileInfo /tmp/logs/root/logs/application_1570379089091_0024
24:18495-18502 mkdirs setPermission

AM读HDFS依赖
25:18503-18625

216: 34949
AM mkdirs /flink/ha/application_1570379089091_0013/blob
AM通过ZK选主

AM向RM注册
28:18991-19009

AM向RM申请Worker容器。
28:19397-
申请第二次Worker
28:33945-

AM写tm-conf
34:19298-19313 mkdirs checkpoint目录
34-35:19412-19483 写tm-conf并getFileInfo其他依赖。(目的有可能是看看是否都存在)

*HDFS的DataNode自己的机制。
29 NamenodeProtocol.getTransactionId、rollEditLog

AM拿着Token向NM申请Container
36:19487-19506

NM getFileInfo log目录(目的判断是否存在?)
37:19500-19510
38:19514-19520

TM读HDFS依赖
39:19521-28893
40:

Per-Job-no-ha

20:38965 Client拿到AM主机和端口

37:38973 AM向RM allocate

Client POST AM JobGraph。FlinkRM会直接拉起TM不会等到Dispatcher接到JobGraph
38:39053-39105-50692

41:50763-50778 AM mkdirs checkpoint
41:50933-50995 AM getFileInfo所有依赖

55:51011-51025 TM读task-conf

Per-Job会出现不停拉TM现象

per-job-no-ha2

4 20359 rm 写 zk 每次attempt也会写
7 submitApp 20350 client to rm 21456获得AM地址
8-25 client write hdfs
26-27 RM? mkdirs /tmp/logs/root/logs/application_1570379089091_0028
28 RM?读依赖
29 RM startContainer AM
32-33 AM getFileInfo /tmp/logs/root/logs/application_1570379089091_0028
34 AM read HDFS
35 AM register and allocate

38 AM mkdir checkpoint、
20秒轮询拉起一次
38,39 AM write TM-conf、getFileInfo 所有依赖
40 AM startContainer NM Worker 返回 mapreduce_shuffle
41,42 Worker读TM-conf

50,51 AM write TM-conf、getFileInfo 所有依赖
52 AM startContainer NM Worker 返回 mapreduce_shuffle
53, Worker读TM-conf

AM write TM-conf、getFileInfo 所有依赖 ->AM startContainer NM Worker -> Worker读TM-conf
54,55 -> 56 -> 57,58
60,61 -> 62 -> 63,64
23585-23647-> 23648-23666 -> 23667-23691

68,69 -> 70 -> 71,72
74
78

86 tm to 大jobmananger

88 tm to hdfs checkpoint 09df75ca8bb8c125981faf3ef8900cd4
90 am to hdfs checkpoint 有delete 老编号checkpoint 目录
92- tm write checkpoint to hdfs datanode

73 167.92
102 229.64

抓包明细

tcp.stream eq 7-17 FlinkYarnClient把Jar包和配置上传HDFS,
RM向一个NM请求拉起Container(跑AM的)
18 8270 向RM拉起AM容器,Flink启动类YarnSessionClusterEntrypoint startContainers
19-20 8275 AM创建在HDFS上创建logs目录,21读依赖信息
21 8302-8384 AM从HDFS读所需资源
AM向NM请求拉起容器

33 mk ha dir,
36选举Flink Leader ,/flink/${yarn_application_id}/leaderlatch和leader
38 15141 ApplicationMasterProtocol registerApplicationMaster
5秒一次allocate
拉起worker也是allocate

23812
ContainerToken”.192.168.124.14:40349

失败

Exception from container-launch.
Container id: container_e08_1570379089091_0013_01_000009
Exit code: 1

成功
34649 NM Token”.192.168.124.15:37474

40 15446 读zk flink leader 信息,谁读的还需要确认。
44-46,写AM写这三个 flink/ha/application_1570379089091_0013/blob /flink/ha/submittedJobGraph01f99fe0f0bd /user/root/.flink/application_1570379089091_0013/82d26394-1c91-4582-b8b7-2f52d7d2bb20-taskmanager-conf.yaml
tm-conf 23847写
61 62 获取hdfs log 信息

写配置,向NM请求容器,读配置 10秒重复一次 startContainers ContainerManagementProtocolPB
TM Container 启动这个类YarnTaskExecutorRunner
58,59,64;72,73,74,75;82,83,84,85;91,92,93,94;103,104,105,106;129,130,131,132;145,154,155,156;194,195,196,197;206,207,209(缺少拉起)
226 232 238-243 245 246 252-254 256(完成?) 260-263 271-274 检查点

56512

Per-Job 抓包备忘

  1. jobmanager.execution.failover-strategy: region
  2. high-availability.cluster-id: application_1572569671434_0003
  3. jobmanager.rpc.address: tail1
  4. FLINK_PLUGINS_DIR: /opt/flink/plugins
  5. jobmanager.rpc.port: 6123
  6. internal.cluster.execution-mode: DETACHED
  7. parallelism.default: 2
  8. yarn.application-attempts: 10
  9. taskmanager.numberOfTaskSlots: 2
  10. web.submit.enable: true
  11. taskmanager.heap.size: 1024m
  12. jobmanager.heap.size: 2048m

25
submitApplication的时候把jobgrah带上 和用户jar包
27
Invalid container token used for starting container on : ${host}:${port}

30 AM读HDFSapplication_1572569671434_00035234493208555882243.tmp里是JobGraph

日志

Cancel Job

  1. Job Running -> Canceling
  2. ExecutionGraph Vertex RUNNING -> Canceling。所有Vertex Canceling后 -> Canceled
  3. Job Canceling -> Canceled
  4. 关闭 checkpoint coordinator
  5. Dispatcher 接收到 CANCELED后 关闭JM
  6. suspend SlotPool
  7. 关闭 SlotPool
  8. 与RM断开连接。

Per-Job Cancel Job

49
GET /proxy/application_1572569671434_0004/jobs/eb6527ecaab2a35480cb93a9a40d66c1/yarn-cancel HTTP/1.1

115 TaskManagerGateway.freeSlot Stopping JobMaster for job AM to TM 二连发
118 freeSlot Stopping JobMaster for job AM to TM 二连发

124
notifySlotAvailable
125
TaskExecutorGateway.disconnectJobManager(JobId)
131notifySlotAvailable
0x04 to JM
138
taskmanager_0 has no more allocated slots for job
139
closeJobManagerConnectionIfNoAllocatedResourcess

286-417 ApplicationMasterProtocol.finishApplicationMaster
417
finishApplicationMaster

291-398
RM收到finishApplicationMaster后会请求ZK /rmstore/ZKRMStateRoot/RMAppRoot/${application_id}
/rmstore/ZKRMStateRoot/RMAppRoot/application_1572569671434_0005

291-398
RM收到finishApplicationMaster后会请求ZK /rmstore/ZKRMStateRoot/RMAppRoot/${application_id}
/rmstore/ZKRMStateRoot/RMAppRoot/application_1572569671434_0005

421

delete 删除暂存目录

AM先关TM容器,RM关AM容器。
427 459 541 569

Dispatcher接收JobGraph

  1. Dispatcher拉起一个JM

//TODO Failover 策略需要探索

  1. JM选择Failover策略
  2. Job Created -> Running
  3. ExecutionGraph Vertex CREATED -> SCHEDULED
  4. JM向RM注册自己
  5. 向RM请求Slot。即JM里的SlotPool向RM里的SlotManager请求Slot。TM在收到第一个Slot请求时就会向Job所属JM注册自己。TM向JM提供(Offer)Slot,之后Slot状态Allocated_ -> _Active。
  6. ExecutionGraph Vertex SCHEDULED -> DEPLOYING。每向Slot部署一个Vertex就将当Vertex状态切换为DEPLOYING
  7. 从Sink开始反向沿着DAG图 逐一将Vertex DEPLOYING -> RUNNING

//TODO 部署方法除了Round Robin其他有待探寻
//TODO 提交一个有chain的Job再看下日志
2019-09-26 22:11:23,722 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job recovers via failover strategy: New Pipelined Region Failover
2019-09-26 22:11:23,739 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy - Start building failover regions.
2019-09-26 22:11:23,750 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy - Created 1 failover regions.