中文翻译成检查点。实际就是定期备份状态。

设置

从备份中恢复

代码示例

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.enableCheckpointing(5000);
  3. env.setStateBackend(new FsStateBackend("hdfs://flink/checkpoints"));
  4. CheckpointConfig config = env.getCheckpointConfig();
  5. config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  6. config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  7. config.setMinPauseBetweenCheckpoints(500);
  8. config.setCheckpointTimeout(60000);
  9. config.setMaxConcurrentCheckpoints(1);

一次性语义

主要分两种:exactly-once、at-least-once。
exactly-once保证恢复的数据全是属于本次快照。
at-least-once恢复的数据不仅属于本次快照,还可能包含下次快照。
换句话说,如果第n次Checkpoint的state为x。那么exactly-once快照和恢复的值是x,而at-least-once很有可能比x大。

具体步骤

作业整体具体步骤

算子具体步骤

  1. 等待所有上游barrier到达。
  2. 所有到达后触发检查点。
  3. 向下游发送barrier。

同步与异步

根据checkpoint恢复状态,恢复的时候 是依据JobVertexID(hash值)进⾏状态恢复的。
Source支持数据重发,才可以exactly-once。
有且仅有一次 exaclty-once barrier对齐(align)
至少一次

  1. config.setMinPauseBetweenCheckpoints(500) //两次间隔最小时间,检查点制作过慢,会导致频繁checkpoint,防止影响业务

作业停止默认清除

被触发后开始从数据源产生 Checkpoint barrier
通信流程
JM -> TM triggerCheckpoint 。JM通知Source,触发 Barrier。
TM -> JM acknowledgeCheckpoint 。所有subtask 上传 State Snapshot,并在TM本地内存中存一份。FsStateBackend模式会把文件路径发送给JM。例如:hdfs://tail1:9000/flink/checkpoint/43cf6b5040106bacc2e54f8ac2237800/chk-42/939659f6-378a-45ae-8c94-6bdd8656acab
JM -> TM confirmCheckpoint 向所有subtask发送确认
TM -> Acknowledge 所有subtask回复确认

//TODO Checkpoint恢复