Checkpoint管理类:org.apache.flink.runtime.checkpoint.CheckpointCoordinator
从Savepoint恢复
/*** Restore the state with given savepoint.** @param savepointPointer The pointer to the savepoint.* @param allowNonRestored True if allowing checkpoint state that cannot be mapped to any job* vertex in tasks.* @param tasks Map of job vertices to restore. State for these vertices is restored via {@link* Execution#setInitialState(JobManagerTaskRestore)}.* @param userClassLoader The class loader to resolve serialized classes in legacy savepoint* versions.*/public boolean restoreSavepoint(String savepointPointer,boolean allowNonRestored,Map<JobVertexID, ExecutionJobVertex> tasks,ClassLoader userClassLoader)throws Exception {Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");LOG.info("Starting job {} from savepoint {} ({})",job,savepointPointer,(allowNonRestored ? "allowing non restored state" : ""));final CompletedCheckpointStorageLocation checkpointLocation =checkpointStorageView.resolveCheckpoint(savepointPointer);// Load the savepoint as a checkpoint into the systemCompletedCheckpoint savepoint =Checkpoints.loadAndValidateCheckpoint(job, tasks, checkpointLocation, userClassLoader, allowNonRestored);completedCheckpointStore.addCheckpoint(savepoint, checkpointsCleaner, this::scheduleTriggerRequest);// Reset the checkpoint ID counterlong nextCheckpointId = savepoint.getCheckpointID() + 1;checkpointIdCounter.setCount(nextCheckpointId);LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);final OptionalLong restoredCheckpointId =restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()),OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT,true,allowNonRestored,true);return restoredCheckpointId.isPresent();}
