Flink定期保存数据,failover后从上次成功的保存点处恢复,并提供Exactly-Once的投递保障机制
// 恢复保存点public boolean restoreSavepoint(String savepointPointer,boolean allowNonRestored,Map<JobVertexID, ExecutionJobVertex> tasks,ClassLoader userClassLoader) throws Exception {Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");LOG.info("Starting job {} from savepoint {} ({})",job, savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));// 从指定目录获取hdfs的地址final CompletedCheckpointStorageLocation checkpointLocation = checkpointStorage.resolveCheckpoint(savepointPointer);// 1、加载 metadata 信息// 2、生成operator 到 task 的映射// 3、检查 并行度// 4、转换这次的 savepoint为checkpoint,以便失败后恢复CompletedCheckpoint savepoint = Checkpoints.loadAndValidateCheckpoint(job, tasks, checkpointLocation, userClassLoader, allowNonRestored);// 将要恢复的checkpoint信息写到zk,并异步删除旧的checkpointcompletedCheckpointStore.addCheckpoint(savepoint);// 重置checkpoint 计数器long nextCheckpointId = savepoint.getCheckpointID() + 1;checkpointIdCounter.setCount(nextCheckpointId);LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);// 从最近一次 Checkpoint 处恢复 State// 获取OperatorState,分配statereturn restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()), true, true, allowNonRestored);}
总结
- 首先客户端提供 Checkpoint 或 Savepoint 的目录
- JM 从给定的目录中找到 _metadata 文件(Checkpoint 的元数据文件)
- JM 解析元数据文件,做一些校验,将信息写入到 zk 中,然后准备从这一次 Checkpoint 中恢复任务
- JM 拿到所有算子对应的 State,给各个 subtask 分配 StateHandle(状态文件句柄)
- TM 启动时,也就是 StreamTask 的初始化阶段会创建 KeyedStateBackend 和 OperatorStateBackend
- 创建过程中就会根据 JM 分配给自己的 StateHandle 从 dfs 上恢复 State
