设置
代码示例
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.setStateBackend(new FsStateBackend("hdfs://flink/checkpoints"));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMinPauseBetweenCheckpoints(500);
config.setCheckpointTimeout(60000);
config.setMaxConcurrentCheckpoints(1);
一次性语义
主要分两种:exactly-once、at-least-once。
exactly-once保证恢复的数据全是属于本次快照。
at-least-once恢复的数据不仅属于本次快照,还可能包含下次快照。
换句话说,如果第n次Checkpoint的state为x。那么exactly-once快照和恢复的值是x,而at-least-once很有可能比x大。
具体步骤
作业整体具体步骤
算子具体步骤
- 等待所有上游barrier到达。
- 所有到达后触发检查点。
- 向下游发送barrier。
同步与异步
根据checkpoint恢复状态,恢复的时候
是依据JobVertexID(hash值)进⾏状态恢复的。
Source支持数据重发,才可以exactly-once。
有且仅有一次 exaclty-once barrier对齐(align)
至少一次
config.setMinPauseBetweenCheckpoints(500) //两次间隔最小时间,检查点制作过慢,会导致频繁checkpoint,防止影响业务
作业停止默认清除
被触发后开始从数据源产生 Checkpoint barrier
通信流程
JM -> TM triggerCheckpoint 。JM通知Source,触发 Barrier。
TM -> JM acknowledgeCheckpoint 。所有subtask 上传 State Snapshot,并在TM本地内存中存一份。FsStateBackend模式会把文件路径发送给JM。例如:hdfs://tail1:9000/flink/checkpoint/43cf6b5040106bacc2e54f8ac2237800/chk-42/939659f6-378a-45ae-8c94-6bdd8656acab
JM -> TM confirmCheckpoint 向所有subtask发送确认
TM -> Acknowledge 所有subtask回复确认