一、一致性检查点(checkpoint)
- Flink故障恢复机制的核心,就是应用状态的一致性检查点
- 有状态流应用的一致性检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照);
如下图所示:
说明:
- input stream 是从 1 开始的递增序列
- sum_even 是计算偶数的和
- sum_even=2+4=6
- sum_odd 是计算奇数的和
- sum_even=1+3+5=9 说明:offset 等于 5 的序列已经计算完成
- input offset 表示当 offset 等于 5 的时候,有一分快照,此时 offset 为 5 以前的数据都已经计算完成;
- offset 并非 input stream 中的递增序列
二、从检查点恢复状态
- 在执行流应用程序期间,Flink会定期保存状态的一致检查点
- 如果发生故障,Flink将会使用最近的检查点来回复应用程序的状态,并重新启动处理流程;遇到故障之后:
- 第一步:重启应用;如下图 “Recovery 1: Restart application”
- 第二步:从 checkpoint 中读取装填,将状态重置;从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同;如下图:”Recovery2: Reset application state form Checkpoint”
- 第三步:开始消费并处理检查点到发生故障之间的所有数据;这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次(exactly-once)”的一致性,因为所有的算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置;如下图:”Recovery 3: Continue processing”
三、Flink检查点算法
- 一种简单的想法
- 暂停所有分区任务,保存状态到检查点,再重新恢复分区任务
- Flink的改进实现
- 基于Chandy-Lamport 算法的分布式快照
- 将检查点的保存与数据处理分离开,不暂停所有分区任务
Flink检查点算法
- 检查点分界线(Checkpoint Barrier)
- Flink的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,用来把一条流上数据按照不同的检查点分开
- 分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所属的检查点中;而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中
详解:
- 现在是一个有两个输入流 (每个流数据都是从1开始的递增序列) 的应用程序,用并行的两个source任务来读取
- JobManager 会向每个 source 任务发送一条带有新检查点 ID 的消息 (如:三角形的 检查点ID),通过这种方式来启动检查点
说明:source1 的 1,2,3 都在2检查点;source2 的 1,2,3,4 都在2检查点
- 数据源将它们的状态写入检查点,并发出一个检查点 barrier
- 状态后端在状态存入检查点之后,会返回通知给 source 任务,source 任务就会向 JobManager 确认检查点完成
- 分界线对齐:barrier 向下游传递,sum 任务会等待所有输入分区的barrier到达
- 对于 barrier 已经到达的分区,继续到达的数据会被缓存 (比如:source1中的 4 后面的序列如果到了会被缓存,检查点状态按照分区缓存到了状态后端,如上图的左上角的3,4)
- 而 barrier 尚未到达的分区,数据会被正常处理
- 当收到所有输入分区的barrier 时,任务就将其状态保存到状态后端的检查点中,然后将 barrier 继续向下游转发
- 向下游转发检查点 barrier 后,任务继续正常的数据处理
- Sink任务向 JobManager 确认状态保存到 checkpoint 完毕
- 当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了
检查点代码设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启检查点
env.enableCheckpointing(60000);
// 设置检查点的模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint保存超时时间: 检查点保存到 store 的时间
env.getCheckpointConfig().setCheckpointTimeout(100000);
// 设置checkpoint保存状态失败后是否停止任务
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
// 设置checkpoint保存状态的并发数: 因为有可能上次还没保存完成,下次的检查点又来了
env.getCheckpointConfig().setMaxConcurrentCheckpoints(100);
// 设置checkpoint最小间隔时间
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
// 当任务失败后,检查点会被取消;按照下面设置后,即使任务失败,也不会删除检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// 配置重启策略: 出现故障之后,重启最大尝试3次,每次间隔500ms
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 500));
四、保存点(save points)
- Flink还提供了可以自定义的镜像保存功能,就是保存带你 (savepoints)
- 原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点
- Flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确的触发创建操作
- 保存点是一个强大的功能。除了故障恢复以外,保存点可以用于
- 有计划的手动备份
- 更新应用程序
- 版本迁移
- 暂停和重启应用
等等