继承

  1. public abstract class AbstractStreamOperator<OUT>
  2. implements StreamOperator<OUT>,
  3. SetupableStreamOperator<OUT>,
  4. CheckpointedStreamOperator,
  5. Serializable { ... }
  • 实现 StreamOperator
  • 实现 CheckpointedStreamOperator
  • 实现 Serializable

核心方法


属性

private transient StreamOperatorStateHandler stateHandler;  // (持久化对象)

算子持久化

    //  TODO 算子持久化准备
    @Override
    public final OperatorSnapshotFutures snapshotState(
            long checkpointId,
            long timestamp,
            CheckpointOptions checkpointOptions,
            CheckpointStreamFactory factory)
            throws Exception {
        return stateHandler.snapshotState(
                this,
                Optional.ofNullable(timeServiceManager),
                getOperatorName(),
                checkpointId,
                timestamp,
                checkpointOptions,
                factory,
                isUsingCustomRawKeyedState());
    }

处理单流水印

   // TODO 处理水印
    public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark);
        }
        // TODO 发出水印
        output.emitWatermark(mark);
    }
  • 调用 advanceWatermark 方法

    @Override
      public void advanceWatermark(Watermark watermark) throws Exception {
          for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
              service.advanceWatermark(watermark.getTimestamp());
          }
      }
    
  • 设置 onEventTime 是走这块的逻辑吗 ?

      public void advanceWatermark(long time) throws Exception {
          currentWatermark = time;
    
          InternalTimer<K, N> timer;
    
          while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
              eventTimeTimersQueue.poll();
              keyContext.setCurrentKey(timer.getKey());
              triggerTarget.onEventTime(timer);
          }
      }
    

处理双流水印

  // TODO 双流 Watermark 1
    public void processWatermark1(Watermark mark) throws Exception {
        input1Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }

    // TODO 双流 Watermark 2
    public void processWatermark2(Watermark mark) throws Exception {
        input2Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }