Checkpoint管理类:org.apache.flink.runtime.checkpoint.CheckpointCoordinator

从Savepoint恢复

  1. /**
  2. * Restore the state with given savepoint.
  3. *
  4. * @param savepointPointer The pointer to the savepoint.
  5. * @param allowNonRestored True if allowing checkpoint state that cannot be mapped to any job
  6. * vertex in tasks.
  7. * @param tasks Map of job vertices to restore. State for these vertices is restored via {@link
  8. * Execution#setInitialState(JobManagerTaskRestore)}.
  9. * @param userClassLoader The class loader to resolve serialized classes in legacy savepoint
  10. * versions.
  11. */
  12. public boolean restoreSavepoint(
  13. String savepointPointer,
  14. boolean allowNonRestored,
  15. Map<JobVertexID, ExecutionJobVertex> tasks,
  16. ClassLoader userClassLoader)
  17. throws Exception {
  18. Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");
  19. LOG.info(
  20. "Starting job {} from savepoint {} ({})",
  21. job,
  22. savepointPointer,
  23. (allowNonRestored ? "allowing non restored state" : ""));
  24. final CompletedCheckpointStorageLocation checkpointLocation =
  25. checkpointStorageView.resolveCheckpoint(savepointPointer);
  26. // Load the savepoint as a checkpoint into the system
  27. CompletedCheckpoint savepoint =
  28. Checkpoints.loadAndValidateCheckpoint(
  29. job, tasks, checkpointLocation, userClassLoader, allowNonRestored);
  30. completedCheckpointStore.addCheckpoint(
  31. savepoint, checkpointsCleaner, this::scheduleTriggerRequest);
  32. // Reset the checkpoint ID counter
  33. long nextCheckpointId = savepoint.getCheckpointID() + 1;
  34. checkpointIdCounter.setCount(nextCheckpointId);
  35. LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);
  36. final OptionalLong restoredCheckpointId =
  37. restoreLatestCheckpointedStateInternal(
  38. new HashSet<>(tasks.values()),
  39. OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT,
  40. true,
  41. allowNonRestored,
  42. true);
  43. return restoredCheckpointId.isPresent();
  44. }