一、一致性检查点(checkpoint)

  • Flink故障恢复机制的核心,就是应用状态的一致性检查点
  • 有状态流应用的一致性检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照);

如下图所示:
image.png
说明:

  • input stream 是从 1 开始的递增序列
  • sum_even 是计算偶数的和
    • sum_even=2+4=6
  • sum_odd 是计算奇数的和
    • sum_even=1+3+5=9 说明:offset 等于 5 的序列已经计算完成
  • input offset 表示当 offset 等于 5 的时候,有一分快照,此时 offset 为 5 以前的数据都已经计算完成;
    • offset 并非 input stream 中的递增序列

二、从检查点恢复状态

  • 在执行流应用程序期间,Flink会定期保存状态的一致检查点
  • 如果发生故障,Flink将会使用最近的检查点来回复应用程序的状态,并重新启动处理流程;遇到故障之后:
    • 第一步:重启应用;如下图 “Recovery 1: Restart application”
    • 第二步:从 checkpoint 中读取装填,将状态重置;从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同;如下图:”Recovery2: Reset application state form Checkpoint”
    • 第三步:开始消费并处理检查点到发生故障之间的所有数据;这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次(exactly-once)”的一致性,因为所有的算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置;如下图:”Recovery 3: Continue processing”

image.png
image.png
image.png
image.png

三、Flink检查点算法

  • 一种简单的想法
    • 暂停所有分区任务,保存状态到检查点,再重新恢复分区任务
  • Flink的改进实现
    • 基于Chandy-Lamport 算法的分布式快照
    • 将检查点的保存与数据处理分离开,不暂停所有分区任务

Flink检查点算法

  • 检查点分界线(Checkpoint Barrier)
    • Flink的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,用来把一条流上数据按照不同的检查点分开
    • 分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所属的检查点中;而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中

详解:
image.png

  • 现在是一个有两个输入流 (每个流数据都是从1开始的递增序列) 的应用程序,用并行的两个source任务来读取

image.png

  • JobManager 会向每个 source 任务发送一条带有新检查点 ID 的消息 (如:三角形的 检查点ID),通过这种方式来启动检查点

image.png
说明:source1 的 1,2,3 都在2检查点;source2 的 1,2,3,4 都在2检查点

  • 数据源将它们的状态写入检查点,并发出一个检查点 barrier
  • 状态后端在状态存入检查点之后,会返回通知给 source 任务,source 任务就会向 JobManager 确认检查点完成

image.png

  • 分界线对齐:barrier 向下游传递,sum 任务会等待所有输入分区的barrier到达
  • 对于 barrier 已经到达的分区,继续到达的数据会被缓存 (比如:source1中的 4 后面的序列如果到了会被缓存,检查点状态按照分区缓存到了状态后端,如上图的左上角的3,4)
  • 而 barrier 尚未到达的分区,数据会被正常处理

image.png

  • 当收到所有输入分区的barrier 时,任务就将其状态保存到状态后端的检查点中,然后将 barrier 继续向下游转发

image.png

  • 向下游转发检查点 barrier 后,任务继续正常的数据处理

image.png

  • Sink任务向 JobManager 确认状态保存到 checkpoint 完毕
  • 当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了

检查点代码设置

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. // 开启检查点
  3. env.enableCheckpointing(60000);
  4. // 设置检查点的模式
  5. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  6. // 设置checkpoint保存超时时间: 检查点保存到 store 的时间
  7. env.getCheckpointConfig().setCheckpointTimeout(100000);
  8. // 设置checkpoint保存状态失败后是否停止任务
  9. env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
  10. // 设置checkpoint保存状态的并发数: 因为有可能上次还没保存完成,下次的检查点又来了
  11. env.getCheckpointConfig().setMaxConcurrentCheckpoints(100);
  12. // 设置checkpoint最小间隔时间
  13. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
  14. // 当任务失败后,检查点会被取消;按照下面设置后,即使任务失败,也不会删除检查点
  15. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
  16. // 配置重启策略: 出现故障之后,重启最大尝试3次,每次间隔500ms
  17. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 500));

四、保存点(save points)

  • Flink还提供了可以自定义的镜像保存功能,就是保存带你 (savepoints)
  • 原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点
  • Flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确的触发创建操作
  • 保存点是一个强大的功能。除了故障恢复以外,保存点可以用于
    • 有计划的手动备份
    • 更新应用程序
    • 版本迁移
    • 暂停和重启应用

等等