Flink On Yarn 集群的高可用其实就是 JobManager 的高可用。

默认情况下,每个 Flink 集群只有一个 JobManager 实例。这会导致单点故障(SPOF):如果 JobManager 崩溃,则不能提交任何新程序,运行中的程序也会失败。

使用 JobManager 高可用模式,你可以从 JobManager 失败中恢复,从而消除单点故障。

如下是一个使用三个 JobManager 实例的例子:

Flink On Yarn 高可用配置 - 图1

一个 Leader JobManager 以及两个 Standby JobManager,Leader 和 Standby 之间的切换是依赖于 zookeeper,所以部署之前必须安装好 Zookeeper 集群。

Flink On Yarn 部署

Flink On Yarn 部署指南

修改配置文件

修改 flink-conf.yaml

  1. # jobmanager和taskmanager、其他client的RPC通信IP地址,TaskManager用于连接到JobManager/ResourceManager。
  2. # HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
  3. jobmanager.rpc.address: localhost
  4. # jobmanager和taskmanager、其他client的RPC通信端口,TaskManager用于连接到JobManager/ResourceManager。
  5. # HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
  6. jobmanager.rpc.port: 6123
  7. # jobmanager JVM heap 内存大小
  8. jobmanager.memory.process.size: 1024m
  9. # taskmanager JVM heap 内存大小
  10. taskmanager.memory.process.size: 1024m
  11. # 每个taskmanager提供的任务slots数量
  12. taskmanager.numberOfTaskSlots: 4
  13. # 并行计算个数
  14. parallelism.default: 4
  15. # 高可用模式
  16. high-availability: zookeeper
  17. # JobManager元数据保留在文件系统storageDir中 指向此状态的指针存储在ZooKeeper中
  18. high-availability.storageDir: hdfs://node01:9000/flink/ha/
  19. # Zookeeper集群
  20. high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181,node04:2181,node05:2181
  21. # 在zookeeper下的根目录
  22. high-availability.zookeeper.path.root: /flink_yarn
  23. # zookeeper节点下的集群ID 该节点下放置了集群所需的所有协调数据 多个flink集群连接同一套zookeeper集群需要配置各自不同的集群ID
  24. # 重要: 在 YARN、原生 Kubernetes 或其他集群管理器上运行时,不应该手动设置此值。在这些情况下,将自动生成一个集群 ID。如果在未使用集群管理器的机器上运行多个 Flink 高可用集群,则必须为每个集群手动配置单独的集群 ID(cluster-ids)。
  25. # high-availability.cluster-id: /default_ns
  26. # 单个flink job重启次数 必须小于等于yarn-site.xml中Application Master配置的尝试次数
  27. yarn.application-attempts: 10
  28. #==============================================================================
  29. # Fault tolerance and checkpointing
  30. #==============================================================================
  31. # jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend)
  32. state.backend: rocksdb
  33. # 检查点的默认目录。Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录。必须从所有参与的进程/节点(即所有TaskManager和JobManager)访问存储路径。
  34. state.checkpoints.dir: hdfs://node01:9000/flink/checkpoints
  35. # 保存点的默认目录。由状态后端用于将保存点写入文件系统(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)。
  36. state.savepoints.dir: hdfs://node01:9000/flink/savepoints
  37. # 是否应该创建增量检查点。对于增量检查点,只存储前一个检查点的差异,而不存储完整的检查点状态。一些状态后端可能不支持增量检查点并忽略此选项。
  38. state.backend.incremental: false
  39. # jobmanager故障恢复策略,指定作业计算如何从任务失败中恢复 full重新启动所有任务以恢复作业 region重新启动可能受任务失败影响的所有任务
  40. jobmanager.execution.failover-strategy: region
  41. # 全局检查点的保留数量
  42. state.checkpoints.num-retained: 3
  43. # 本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖keyed state backends。当前,MemoryStateBackend不支持本地恢复。
  44. state.backend.local-recovery: true
  45. # 存储基于文件的状态以进行本地恢复的根目录。本地恢复当前仅涵盖keyed state backends
  46. taskmanager.state.local.root-dirs: /opt/flink-tm-state
  47. # Task 重启策略
  48. restart-strategy: fixed-delay
  49. restart-strategy.fixed-delay.attempts: 3
  50. restart-strategy.fixed-delay.delay: 10 s

修改 masters

修改 conf 目录下 masters 文件:

  1. node01:8081
  2. node02:8081
  3. node03:8081
  4. node04:8081
  5. node05:8081

修改 workers

修改 conf 目录下 workers 文件:

  1. node01
  2. node02
  3. node03
  4. node04
  5. node05

主备切换验证

启动 Flink session 集群

  1. cd $FLINK_HOME
  2. ./bin/yarn-session.sh --detached

提交 Job

  1. ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

查看集群配置以及运行的节点

  • 通过 Yarn 管理界面打开 Apache Flink Dashboard

拖动下面的滚动条至最右边,然后点击 ApplicationMaster 进入 Apache Flink Dashboard

Flink On Yarn 高可用配置 - 图2

Flink On Yarn 高可用配置 - 图3

  • 点击 Job Manager,可以看到 JobManager 的高可用配置,并且此时 JobManager 运行在 node02,端口号为 38343

Flink On Yarn 高可用配置 - 图4

  • 在 node02 上查看运行的进程,其中 YarnSessionClusterEntrypoint 就是 JobManager 的进程
  1. root@node02:~# jps
  2. 26896 NodeManager
  3. 25746 JournalNode
  4. 26099 DataNode
  5. 2177269 Jps
  6. 2176890 YarnSessionClusterEntrypoint
  7. 22878 QuorumPeerMain
  • 点击 Task Managers,可以看到 TaskManager 运行在 node01,端口号为 41963

Flink On Yarn 高可用配置 - 图5

  • 在 node01 上查看运行的进程,其中 YarnTaskExecutorRunner 就是 TaskManager 的进程
  1. root@node01:/usr/local/flink# jps
  2. 27043 NameNode
  3. 2301152 Jps
  4. 28965 ResourceManager
  5. 29174 NodeManager
  6. 23529 QuorumPeerMain
  7. 28715 SecondaryNameNode
  8. 27260 DataNode
  9. 26494 JournalNode
  10. 2300685 YarnTaskExecutorRunner

JobManager 高可用

Kill 掉 Active JobManager

在 node02 节点 kill YarnSessionClusterEntrypoint 进程

  1. kill -9 2176890

查看 JobManager 运行的节点

  • 刷新后点击 Job Manager,可以看到此时 JobManager 运行在 node01,端口号为 42831

Flink On Yarn 高可用配置 - 图6

  • 在 node01 上查看运行的进程,此时 YarnSessionClusterEntrypoint 和 YarnTaskExecutorRunner 都在 node01 节点上
  1. root@node01:/usr/local/flink# jps
  2. 27043 NameNode
  3. 28965 ResourceManager
  4. 29174 NodeManager
  5. 23529 QuorumPeerMain
  6. 2301481 Jps
  7. 28715 SecondaryNameNode
  8. 2301400 YarnSessionClusterEntrypoint
  9. 27260 DataNode
  10. 26494 JournalNode
  11. 2300685 YarnTaskExecutorRunner

TaskManager 高可用

Kill 掉 Active TaskManager

在 node01 节点 kill YarnTaskExecutorRunner 进程

  1. kill -9 2300685

查看 JobManager 运行的节点

  • 刷新后点击 Task Managers,可以看到此时 JobManager 运行在 node01,端口号为 37241

Flink On Yarn 高可用配置 - 图7

  • 在 node01 上查看运行的进程,此时 YarnTaskExecutorRunner 进程号已经为新的进程
  1. root@node01:/usr/local/flink# jps
  2. 27649 YarnTaskExecutorRunner