继承
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>,
SetupableStreamOperator<OUT>,
CheckpointedStreamOperator,
Serializable { ... }
- 实现
StreamOperator
- 实现
CheckpointedStreamOperator
- 实现
Serializable
核心方法
- 处理单流水印
- 处理双流水印
- 提供持久化对象 并实施 StreamOperatorStateHandler
属性
private transient StreamOperatorStateHandler stateHandler; // (持久化对象)
算子持久化
// TODO 算子持久化准备
@Override
public final OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory)
throws Exception {
return stateHandler.snapshotState(
this,
Optional.ofNullable(timeServiceManager),
getOperatorName(),
checkpointId,
timestamp,
checkpointOptions,
factory,
isUsingCustomRawKeyedState());
}
处理单流水印
// TODO 处理水印
public void processWatermark(Watermark mark) throws Exception {
if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark);
}
// TODO 发出水印
output.emitWatermark(mark);
}
调用
advanceWatermark
方法@Override public void advanceWatermark(Watermark watermark) throws Exception { for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) { service.advanceWatermark(watermark.getTimestamp()); } }
设置
onEventTime
是走这块的逻辑吗 ?public void advanceWatermark(long time) throws Exception { currentWatermark = time; InternalTimer<K, N> timer; while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { eventTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer); } }
处理双流水印
// TODO 双流 Watermark 1
public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}
// TODO 双流 Watermark 2
public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}