Flink定期保存数据,failover后从上次成功的保存点处恢复,并提供Exactly-Once的投递保障机制

  1. // 恢复保存点
  2. public boolean restoreSavepoint(
  3. String savepointPointer,
  4. boolean allowNonRestored,
  5. Map<JobVertexID, ExecutionJobVertex> tasks,
  6. ClassLoader userClassLoader) throws Exception {
  7. Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");
  8. LOG.info("Starting job {} from savepoint {} ({})",
  9. job, savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));
  10. // 从指定目录获取hdfs的地址
  11. final CompletedCheckpointStorageLocation checkpointLocation = checkpointStorage.resolveCheckpoint(savepointPointer);
  12. // 1、加载 metadata 信息
  13. // 2、生成operator 到 task 的映射
  14. // 3、检查 并行度
  15. // 4、转换这次的 savepoint为checkpoint,以便失败后恢复
  16. CompletedCheckpoint savepoint = Checkpoints.loadAndValidateCheckpoint(
  17. job, tasks, checkpointLocation, userClassLoader, allowNonRestored);
  18. // 将要恢复的checkpoint信息写到zk,并异步删除旧的checkpoint
  19. completedCheckpointStore.addCheckpoint(savepoint);
  20. // 重置checkpoint 计数器
  21. long nextCheckpointId = savepoint.getCheckpointID() + 1;
  22. checkpointIdCounter.setCount(nextCheckpointId);
  23. LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);
  24. // 从最近一次 Checkpoint 处恢复 State
  25. // 获取OperatorState,分配state
  26. return restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()), true, true, allowNonRestored);
  27. }

总结

  • 首先客户端提供 Checkpoint 或 Savepoint 的目录
  • JM 从给定的目录中找到 _metadata 文件(Checkpoint 的元数据文件)
  • JM 解析元数据文件,做一些校验,将信息写入到 zk 中,然后准备从这一次 Checkpoint 中恢复任务
  • JM 拿到所有算子对应的 State,给各个 subtask 分配 StateHandle(状态文件句柄)
  • TM 启动时,也就是 StreamTask 的初始化阶段会创建 KeyedStateBackend 和 OperatorStateBackend
  • 创建过程中就会根据 JM 分配给自己的 StateHandle 从 dfs 上恢复 State