Checkpoint管理类:org.apache.flink.runtime.checkpoint.CheckpointCoordinator
从Savepoint恢复
/**
* Restore the state with given savepoint.
*
* @param savepointPointer The pointer to the savepoint.
* @param allowNonRestored True if allowing checkpoint state that cannot be mapped to any job
* vertex in tasks.
* @param tasks Map of job vertices to restore. State for these vertices is restored via {@link
* Execution#setInitialState(JobManagerTaskRestore)}.
* @param userClassLoader The class loader to resolve serialized classes in legacy savepoint
* versions.
*/
public boolean restoreSavepoint(
String savepointPointer,
boolean allowNonRestored,
Map<JobVertexID, ExecutionJobVertex> tasks,
ClassLoader userClassLoader)
throws Exception {
Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");
LOG.info(
"Starting job {} from savepoint {} ({})",
job,
savepointPointer,
(allowNonRestored ? "allowing non restored state" : ""));
final CompletedCheckpointStorageLocation checkpointLocation =
checkpointStorageView.resolveCheckpoint(savepointPointer);
// Load the savepoint as a checkpoint into the system
CompletedCheckpoint savepoint =
Checkpoints.loadAndValidateCheckpoint(
job, tasks, checkpointLocation, userClassLoader, allowNonRestored);
completedCheckpointStore.addCheckpoint(
savepoint, checkpointsCleaner, this::scheduleTriggerRequest);
// Reset the checkpoint ID counter
long nextCheckpointId = savepoint.getCheckpointID() + 1;
checkpointIdCounter.setCount(nextCheckpointId);
LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);
final OptionalLong restoredCheckpointId =
restoreLatestCheckpointedStateInternal(
new HashSet<>(tasks.values()),
OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT,
true,
allowNonRestored,
true);
return restoredCheckpointId.isPresent();
}