Flink On Yarn 集群的高可用其实就是 JobManager 的高可用。
默认情况下,每个 Flink 集群只有一个 JobManager 实例。这会导致单点故障(SPOF):如果 JobManager 崩溃,则不能提交任何新程序,运行中的程序也会失败。
使用 JobManager 高可用模式,你可以从 JobManager 失败中恢复,从而消除单点故障。
如下是一个使用三个 JobManager 实例的例子:一个 Leader JobManager 以及两个 Standby JobManager,Leader 和 Standby 之间的切换是依赖于 zookeeper,所以部署之前必须安装好 Zookeeper 集群。
Flink On Yarn 部署
修改配置文件
修改 flink-conf.yaml
# jobmanager和taskmanager、其他client的RPC通信IP地址,TaskManager用于连接到JobManager/ResourceManager。
# HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
jobmanager.rpc.address: localhost
# jobmanager和taskmanager、其他client的RPC通信端口,TaskManager用于连接到JobManager/ResourceManager。
# HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
jobmanager.rpc.port: 6123
# jobmanager JVM heap 内存大小
jobmanager.memory.process.size: 1024m
# taskmanager JVM heap 内存大小
taskmanager.memory.process.size: 1024m
# 每个taskmanager提供的任务slots数量
taskmanager.numberOfTaskSlots: 4
# 并行计算个数
parallelism.default: 4
# 高可用模式
high-availability: zookeeper
# JobManager元数据保留在文件系统storageDir中 指向此状态的指针存储在ZooKeeper中
high-availability.storageDir: hdfs://node01:9000/flink/ha/
# Zookeeper集群
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181,node04:2181,node05:2181
# 在zookeeper下的根目录
high-availability.zookeeper.path.root: /flink_yarn
# zookeeper节点下的集群ID 该节点下放置了集群所需的所有协调数据 多个flink集群连接同一套zookeeper集群需要配置各自不同的集群ID
# 重要: 在 YARN、原生 Kubernetes 或其他集群管理器上运行时,不应该手动设置此值。在这些情况下,将自动生成一个集群 ID。如果在未使用集群管理器的机器上运行多个 Flink 高可用集群,则必须为每个集群手动配置单独的集群 ID(cluster-ids)。
# high-availability.cluster-id: /default_ns
# 单个flink job重启次数 必须小于等于yarn-site.xml中Application Master配置的尝试次数
yarn.application-attempts: 10
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend)
state.backend: rocksdb
# 检查点的默认目录。Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录。必须从所有参与的进程/节点(即所有TaskManager和JobManager)访问存储路径。
state.checkpoints.dir: hdfs://node01:9000/flink/checkpoints
# 保存点的默认目录。由状态后端用于将保存点写入文件系统(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)。
state.savepoints.dir: hdfs://node01:9000/flink/savepoints
# 是否应该创建增量检查点。对于增量检查点,只存储前一个检查点的差异,而不存储完整的检查点状态。一些状态后端可能不支持增量检查点并忽略此选项。
state.backend.incremental: false
# jobmanager故障恢复策略,指定作业计算如何从任务失败中恢复 full重新启动所有任务以恢复作业 region重新启动可能受任务失败影响的所有任务
jobmanager.execution.failover-strategy: region
# 全局检查点的保留数量
state.checkpoints.num-retained: 3
# 本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖keyed state backends。当前,MemoryStateBackend不支持本地恢复。
state.backend.local-recovery: true
# 存储基于文件的状态以进行本地恢复的根目录。本地恢复当前仅涵盖keyed state backends
taskmanager.state.local.root-dirs: /opt/flink-tm-state
# Task 重启策略
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
修改 masters
修改 conf 目录下 masters 文件:
node01:8081
node02:8081
node03:8081
node04:8081
node05:8081
修改 workers
修改 conf 目录下 workers 文件:
node01
node02
node03
node04
node05
主备切换验证
启动 Flink session 集群
cd $FLINK_HOME
./bin/yarn-session.sh --detached
提交 Job
./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
查看集群配置以及运行的节点
- 通过 Yarn 管理界面打开 Apache Flink Dashboard
拖动下面的滚动条至最右边,然后点击 ApplicationMaster 进入 Apache Flink Dashboard
- 点击 Job Manager,可以看到 JobManager 的高可用配置,并且此时 JobManager 运行在 node02,端口号为 38343
- 在 node02 上查看运行的进程,其中 YarnSessionClusterEntrypoint 就是 JobManager 的进程
root@node02:~# jps
26896 NodeManager
25746 JournalNode
26099 DataNode
2177269 Jps
2176890 YarnSessionClusterEntrypoint
22878 QuorumPeerMain
- 点击 Task Managers,可以看到 TaskManager 运行在 node01,端口号为 41963
- 在 node01 上查看运行的进程,其中 YarnTaskExecutorRunner 就是 TaskManager 的进程
root@node01:/usr/local/flink# jps
27043 NameNode
2301152 Jps
28965 ResourceManager
29174 NodeManager
23529 QuorumPeerMain
28715 SecondaryNameNode
27260 DataNode
26494 JournalNode
2300685 YarnTaskExecutorRunner
JobManager 高可用
Kill 掉 Active JobManager
在 node02 节点 kill YarnSessionClusterEntrypoint 进程
kill -9 2176890
查看 JobManager 运行的节点
- 刷新后点击 Job Manager,可以看到此时 JobManager 运行在 node01,端口号为 42831
- 在 node01 上查看运行的进程,此时 YarnSessionClusterEntrypoint 和 YarnTaskExecutorRunner 都在 node01 节点上
root@node01:/usr/local/flink# jps
27043 NameNode
28965 ResourceManager
29174 NodeManager
23529 QuorumPeerMain
2301481 Jps
28715 SecondaryNameNode
2301400 YarnSessionClusterEntrypoint
27260 DataNode
26494 JournalNode
2300685 YarnTaskExecutorRunner
TaskManager 高可用
Kill 掉 Active TaskManager
在 node01 节点 kill YarnTaskExecutorRunner 进程
kill -9 2300685
查看 JobManager 运行的节点
- 刷新后点击 Task Managers,可以看到此时 JobManager 运行在 node01,端口号为 37241
- 在 node01 上查看运行的进程,此时 YarnTaskExecutorRunner 进程号已经为新的进程
root@node01:/usr/local/flink# jps
27649 YarnTaskExecutorRunner