疑问
The default Trigger of a WindowAssigner is appropriate for many use cases. For example, all the event-time window assigners have an EventTimeTrigger as default trigger. This trigger simply fires once the watermark passes the end of a window.
EventTimeTrigger是Flink中默认的处理event time窗口时候的trigger。其中onEventTime方法的官方解释为:
The onEventTime() method is called when a registered event-time timer fires.
这里所说的timer就是在onElement方法里面注册的,ctx.registerEventTimeTimer(window.maxTimestamp());,下面就看一下onEventTime方法的具体实现,很简单就一句话:
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {return time == window.maxTimestamp() ?TriggerResult.FIRE :TriggerResult.CONTINUE;}
刚看到这个方法的时候会有一个疑问,注册timer的时候就是用的window.maxTimestamp(),触发timer的时候应该time == window.maxTimestamp()永远返回true啊,什么情况下会返回false呢,也就是这个方法返回的是TriggerResult.CONTINUE?难道还有其他地方注册了timer?
分析
通过调用栈,可以发现在WindowOperator#processElement方法里面调用了registerCleanupTimer
protected void registerCleanupTimer(W window) {long cleanupTime = cleanupTime(window);if (cleanupTime == Long.MAX_VALUE) {// don't set a GC timer for "end of time"return;}if (windowAssigner.isEventTime()) {triggerContext.registerEventTimeTimer(cleanupTime);} else {triggerContext.registerProcessingTimeTimer(cleanupTime);}}
这里面注册了一个清理的timer,这个时间是cleanupTime返回的
private long cleanupTime(W window) {if (windowAssigner.isEventTime()) {long cleanupTime = window.maxTimestamp() + allowedLateness;return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;} else {return window.maxTimestamp();}}
至此就可以回答上面的问题了,如果allowedLateness不是0,那么就会有一个timer的时间不等于window.maxTimestamp()而是window.maxTimestamp() + allowedLateness。
总结
触发onEventTime的timer有两种,一种就是在onElement注册的timer,用于触发计算,一定会返回TriggerResult.FIRE;另一种是清理窗口的timer,如果配置了allowedLateness大于零,那么返回就是TriggerResult.CONTINUE。
