• State 真正持久化 核心类

真正持久化方法

  1. // TODO [1] 真正的持久化
  2. @VisibleForTesting
  3. void snapshotState(
  4. CheckpointedStreamOperator streamOperator,
  5. Optional<InternalTimeServiceManager<?>> timeServiceManager,
  6. String operatorName,
  7. long checkpointId,
  8. long timestamp,
  9. CheckpointOptions checkpointOptions,
  10. CheckpointStreamFactory factory,
  11. OperatorSnapshotFutures snapshotInProgress,
  12. StateSnapshotContextSynchronousImpl snapshotContext,
  13. boolean isUsingCustomRawKeyedState)
  14. throws CheckpointException {
  15. try {
  16. LOG.info("(1) 真正的持久化 State [Operator/Keyed] : 在这里持久化 {StreamOperatorStateHandler.snapshotState() }");
  17. if (timeServiceManager.isPresent()) {
  18. checkState(
  19. keyedStateBackend != null,
  20. "keyedStateBackend should be available with timeServiceManager");
  21. final InternalTimeServiceManager<?> manager = timeServiceManager.get();
  22. if (manager.isUsingLegacyRawKeyedStateSnapshots()) {
  23. checkState(
  24. !isUsingCustomRawKeyedState,
  25. "Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write.");
  26. manager.snapshotToRawKeyedState(
  27. snapshotContext.getRawKeyedOperatorStateOutput(), operatorName);
  28. }
  29. }
  30. // TODO 对状态进行快照,包括KeyedState和OperatorState
  31. streamOperator.snapshotState(snapshotContext);
  32. snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
  33. snapshotInProgress.setOperatorStateRawFuture(
  34. snapshotContext.getOperatorStateStreamFuture());
  35. // TODO 持久化 Operator State
  36. if (null != operatorStateBackend) {
  37. LOG.info("真正的持久化 Operator State : 在这里持久化");
  38. snapshotInProgress.setOperatorStateManagedFuture(
  39. operatorStateBackend.snapshot(
  40. checkpointId, timestamp, factory, checkpointOptions));
  41. }
  42. // TODO 持久化 Keyed State
  43. if (null != keyedStateBackend) {
  44. LOG.info("真正的持久化 Keyed State : 在这里持久化");
  45. snapshotInProgress.setKeyedStateManagedFuture(
  46. keyedStateBackend.snapshot(
  47. checkpointId, timestamp, factory, checkpointOptions));
  48. }
  49. } catch (Exception snapshotException) {
  50. try {
  51. snapshotInProgress.cancel();
  52. } catch (Exception e) {
  53. snapshotException.addSuppressed(e);
  54. }
  55. String snapshotFailMessage =
  56. "Could not complete snapshot "
  57. + checkpointId
  58. + " for operator "
  59. + operatorName
  60. + ".";
  61. try {
  62. snapshotContext.closeExceptionally();
  63. } catch (IOException e) {
  64. snapshotException.addSuppressed(e);
  65. }
  66. throw new CheckpointException(
  67. snapshotFailMessage,
  68. CheckpointFailureReason.CHECKPOINT_DECLINED,
  69. snapshotException);
  70. }
  71. }