疑问

onElement方法的具体实现如下

  1. public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
  2. if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
  3. // if the watermark is already past the window fire immediately
  4. return TriggerResult.FIRE;
  5. } else {
  6. ctx.registerEventTimeTimer(window.maxTimestamp());
  7. return TriggerResult.CONTINUE;
  8. }
  9. }

下面是官方文档对于这个方法的解释

The onElement() method is called for each element that is added to a window.

也就是说每条数据加入这个窗口中都会调用一次这个方法,什么情况下这个方法会返回TriggerResult.FIRE

分析

关键问题在于WindowOperator#isWindowLate方法

  1. protected boolean isWindowLate(W window) {
  2. return (windowAssigner.isEventTime()
  3. && (cleanupTime(window) <= internalTimerService.currentWatermark()));
  4. }

如果allowedLateness没有设置默认为0,那么第二个判断条件相当于window.maxTimestamp()<=internalTimerService.currentWatermark()如果这个条件为true,那么上层WindowOperator#processElement方法中

  1. // drop if the window is already late
  2. if (isWindowLate(window)) {
  3. continue;
  4. }

会走到这个判断里面,那么就不会调用到EventTimeTrigger#onElement,反过来说,能调用到EventTimeTrigger#onElement方法的情况,window.maxTimestamp() <= ctx.getCurrentWatermark()就不会成立。

但是当设置allowedLateness大于0的情况,数据迟到的条件变成了window.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark(),那么假设滚动窗口的size是30秒,设置allowedLateness为10秒这时候来了一条数据的时间戳为1573441910000,那么此时window.maxTimestamp()=1573441919999,allowedLateness=10000,internalTimerService.currentWatermark()=1573441924000,不满足上面迟到的条件,进入EventTimeTrigger#onElement,这时就满足了window.maxTimestamp() <= ctx.getCurrentWatermark(),即返回值就是TriggerResult.FIRE

小结

上面的过程比较绕,简单的说,如果allowedLateness=0那么进入EventTimeTrigger#onElement后不可能返回TriggerResult.FIRE,因为满足这个判断条件的数据在前面isWindowLate(window)判断中已经过滤掉了。如果allowedLateness>0那么满足迟到的数据进入EventTimeTrigger#onElement后就会返回TriggerResult.FIRE。有兴趣的读者可以运行demo进行测试。

总结

结合上篇文章《onEventTime方法分析》可以得到下面的结论

  • 当没有设置allowedLateness的时候,即allowedLateness=0

    1. EventTimeTrigger#onElement用来注册窗口触发的定时器
    2. 定时器触发之后回调EventTimeTrigger#onEventTime触发窗口的计算
    3. 定时器是在InternalTimerServiceImpl#advanceWatermark方法中触发的,关键在timer.getTimestamp() <= time,这个time就是传进来的当前的watermark,也就是watermark的时间大于等于定时器的注册时间的时候就会调用triggerTarget.onEventTime ```java public void advanceWatermark(long time) throws Exception { currentWatermark = time;

    InternalTimer timer;

    while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { eventTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer); } } ```

  • 设置allowedLateness>0

    1. 没有迟到的数据调用逻辑如上
    2. 窗口已经触发计算之后,在允许迟到时间范围内到来的数据,会在EventTimeTrigger#onElement中返回TriggerResult.FIRE触发计算,每一条都会触发一次所在窗口的计算
    3. 迟到的数据不会在EventTimeTrigger#onEventTime触发计算,此时对于迟到的数据返回TriggerResult.CONTINUE