真正持久化方法
// TODO [1] 真正的持久化
@VisibleForTesting
void snapshotState(
CheckpointedStreamOperator streamOperator,
Optional<InternalTimeServiceManager<?>> timeServiceManager,
String operatorName,
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory,
OperatorSnapshotFutures snapshotInProgress,
StateSnapshotContextSynchronousImpl snapshotContext,
boolean isUsingCustomRawKeyedState)
throws CheckpointException {
try {
LOG.info("(1) 真正的持久化 State [Operator/Keyed] : 在这里持久化 {StreamOperatorStateHandler.snapshotState() }");
if (timeServiceManager.isPresent()) {
checkState(
keyedStateBackend != null,
"keyedStateBackend should be available with timeServiceManager");
final InternalTimeServiceManager<?> manager = timeServiceManager.get();
if (manager.isUsingLegacyRawKeyedStateSnapshots()) {
checkState(
!isUsingCustomRawKeyedState,
"Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write.");
manager.snapshotToRawKeyedState(
snapshotContext.getRawKeyedOperatorStateOutput(), operatorName);
}
}
// TODO 对状态进行快照,包括KeyedState和OperatorState
streamOperator.snapshotState(snapshotContext);
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(
snapshotContext.getOperatorStateStreamFuture());
// TODO 持久化 Operator State
if (null != operatorStateBackend) {
LOG.info("真正的持久化 Operator State : 在这里持久化");
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(
checkpointId, timestamp, factory, checkpointOptions));
}
// TODO 持久化 Keyed State
if (null != keyedStateBackend) {
LOG.info("真正的持久化 Keyed State : 在这里持久化");
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(
checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
try {
snapshotInProgress.cancel();
} catch (Exception e) {
snapshotException.addSuppressed(e);
}
String snapshotFailMessage =
"Could not complete snapshot "
+ checkpointId
+ " for operator "
+ operatorName
+ ".";
try {
snapshotContext.closeExceptionally();
} catch (IOException e) {
snapshotException.addSuppressed(e);
}
throw new CheckpointException(
snapshotFailMessage,
CheckpointFailureReason.CHECKPOINT_DECLINED,
snapshotException);
}
}