疑问

The default Trigger of a WindowAssigner is appropriate for many use cases. For example, all the event-time window assigners have an EventTimeTrigger as default trigger. This trigger simply fires once the watermark passes the end of a window.

EventTimeTrigger是Flink中默认的处理event time窗口时候的trigger。其中onEventTime方法的官方解释为:

The onEventTime() method is called when a registered event-time timer fires.

这里所说的timer就是在onElement方法里面注册的,ctx.registerEventTimeTimer(window.maxTimestamp());,下面就看一下onEventTime方法的具体实现,很简单就一句话:

  1. public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
  2. return time == window.maxTimestamp() ?
  3. TriggerResult.FIRE :
  4. TriggerResult.CONTINUE;
  5. }

刚看到这个方法的时候会有一个疑问,注册timer的时候就是用的window.maxTimestamp(),触发timer的时候应该time == window.maxTimestamp()永远返回true啊,什么情况下会返回false呢,也就是这个方法返回的是TriggerResult.CONTINUE?难道还有其他地方注册了timer?

分析

通过调用栈,可以发现在WindowOperator#processElement方法里面调用了registerCleanupTimer

  1. protected void registerCleanupTimer(W window) {
  2. long cleanupTime = cleanupTime(window);
  3. if (cleanupTime == Long.MAX_VALUE) {
  4. // don't set a GC timer for "end of time"
  5. return;
  6. }
  7. if (windowAssigner.isEventTime()) {
  8. triggerContext.registerEventTimeTimer(cleanupTime);
  9. } else {
  10. triggerContext.registerProcessingTimeTimer(cleanupTime);
  11. }
  12. }

这里面注册了一个清理的timer,这个时间是cleanupTime返回的

  1. private long cleanupTime(W window) {
  2. if (windowAssigner.isEventTime()) {
  3. long cleanupTime = window.maxTimestamp() + allowedLateness;
  4. return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
  5. } else {
  6. return window.maxTimestamp();
  7. }
  8. }

至此就可以回答上面的问题了,如果allowedLateness不是0,那么就会有一个timer的时间不等于window.maxTimestamp()而是window.maxTimestamp() + allowedLateness

总结

触发onEventTime的timer有两种,一种就是在onElement注册的timer,用于触发计算,一定会返回TriggerResult.FIRE;另一种是清理窗口的timer,如果配置了allowedLateness大于零,那么返回就是TriggerResult.CONTINUE