真正持久化方法
// TODO [1] 真正的持久化 @VisibleForTesting void snapshotState( CheckpointedStreamOperator streamOperator, Optional<InternalTimeServiceManager<?>> timeServiceManager, String operatorName, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory, OperatorSnapshotFutures snapshotInProgress, StateSnapshotContextSynchronousImpl snapshotContext, boolean isUsingCustomRawKeyedState) throws CheckpointException { try { LOG.info("(1) 真正的持久化 State [Operator/Keyed] : 在这里持久化 {StreamOperatorStateHandler.snapshotState() }"); if (timeServiceManager.isPresent()) { checkState( keyedStateBackend != null, "keyedStateBackend should be available with timeServiceManager"); final InternalTimeServiceManager<?> manager = timeServiceManager.get(); if (manager.isUsingLegacyRawKeyedStateSnapshots()) { checkState( !isUsingCustomRawKeyedState, "Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write."); manager.snapshotToRawKeyedState( snapshotContext.getRawKeyedOperatorStateOutput(), operatorName); } } // TODO 对状态进行快照,包括KeyedState和OperatorState streamOperator.snapshotState(snapshotContext); snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); snapshotInProgress.setOperatorStateRawFuture( snapshotContext.getOperatorStateStreamFuture()); // TODO 持久化 Operator State if (null != operatorStateBackend) { LOG.info("真正的持久化 Operator State : 在这里持久化"); snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot( checkpointId, timestamp, factory, checkpointOptions)); } // TODO 持久化 Keyed State if (null != keyedStateBackend) { LOG.info("真正的持久化 Keyed State : 在这里持久化"); snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot( checkpointId, timestamp, factory, checkpointOptions)); } } catch (Exception snapshotException) { try { snapshotInProgress.cancel(); } catch (Exception e) { snapshotException.addSuppressed(e); } String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " + operatorName + "."; try { snapshotContext.closeExceptionally(); } catch (IOException e) { snapshotException.addSuppressed(e); } throw new CheckpointException( snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException); } }