介绍

Window 是用来划分无限数据流一个个数据集合的一种机制。它会在第一个元素进入到 窗口范围内时创建一个窗口,当元窗口里的元素积累到了一定要求就会触发Trigger结束收集,然后执行 **WindowOperator** ,最后按照 Evict 清除窗口。
这里涉及到了四个部分:

  1. Window
  2. Trigger
  3. WindowOperator(又叫做Window Function)
  4. Evict

我们先从Window的整个生命周期来看,然后再逐个击破上述四个部分。

生命周期

  1. 当第一个元素进入指定时间窗口范围内时,就创建一个窗口。

点击查看【processon】
比如基于事件时间设置了 size60S,一个数据在22:**50:38**Flink 接收,那么会将它放在22:**50:00** ~ 22:**51:00**的时间窗口内。时间窗口的起始时间不是由第一个进入的元素决定的,而是客观的Flink时间决定的,换句话说,就算你数据不进来,每个窗口的划分都已经决定好了,只不过因为你有数据进来了,所以这个窗口有效(没有数据的窗口就略过了)。

  1. 当一个元素进入时,会触发一个叫做 Trigger 的东西。比如当在 Processing Time 下,时间窗口为 1分钟 ,假设第一个元素进来时是 22:50:00 ,最后 Trigger 会在系统时间到达 22:50:59.99 触发(注意不同的时间语义下,默认的Trigger是不一样的。当然,开发人员也能对其进行自定义)Trigger

点击查看【processon】 :::info 比如CounterTrigger在每个元素进来时进行计数,ProcessintTimeTrigger在每个元素进来时注册一些事件。当Trigger满足条件后,就会触发Evict。 :::

  1. Trigger 触发时,就会执行对应的 WindowOperator :::info 另外,如果有增量聚合函数,那么每次执行Trigger都会触发;如果是全窗口函数,是在执行Evict时执行 :::

  2. 最后使用 Evict 清除窗口。 :::info 个别 Evict 会清理一部分元素,剩余的用作下一个窗口 :::

Window(窗口类型)

窗口的作用对象可以是 KeyedStream 也可以是普通的 DatkaStream ,前者对数据按 key 分流,不同键的流互不干扰,可以并行执行;后者相当于所有的数据都在一条流里,只能串行处理处理,相当于单线程执行,如下图所示:
点击查看【processon】
随后需要创建窗口分配器 WindowAssigner ,窗口分配器用来指挥元素如何分配到窗口里面。Flink内置了 tumbling windowssliding windowssession windowsglobal window 窗口分配器,当然开发人员也能通过实现 WindowAssigner 接口来自定义窗口分配器。Flink内置的窗口分配器中除了 global window 以外都是基于时间(可以是事件时间 Event Time ,也可以是处理时间 Process Time )来分配元素的。

TumblingWindow

Tumbling Window ,滚动窗口。它的窗口大小相同,且各个窗口不重叠。Y轴左侧的 user x 表示不同的键,其示意图如下所示:
tumbling-windows.jpg
From 《Apache Flink》
Tumbling Window 在不同的时间语义下,使用的 TriggerEvict 均不同,使用的效果也不一样。笔者整理了 TirggerEvict 放在下面:

时间语义 Trigger Evict
Processing Time EventTimeTrigger TimeEvictor
Event Time ProcessingTimeTrigger TimeEvictor

不同的时间语义,数据的时间来源也不同,用来触发 WindowOperator 聚合计算的 Trigger 也不同,最终跑代码的效果也不一样,下面实战一波 ProcessingTimeEventTime

  1. ProcessingTime实战戳这里👉WindowWithProcessingTime.java
  2. EventTime实战戳这里👉WindowWithEventTime.java

总结一下:

  • ProcessingTime 语义下的 TumblingWindow

当一个数据进入Flink系统被处理时,它会被打上一个时间标签,该时间标签记录了进入系统时的时间,即 Processing Time 。随后将这个数据放入对应窗口范围内的窗口里(如果没有这个窗口,就创建一个。这个行为取决于 WindowAssigner ),后续的数据依次类推。当系统时间越过了窗口范围的终点,对应的 ProcessingTimeTrigger 会被触发,停止收集该窗口的数据,并对该窗口执行对应的 WindowOperator

  • EventTime 语义下的 TumblingWindow

当一个带有日志时间戳的数据进入Flink系统时,它首先会被 WatermarkStrategy#createTimestampAssigner 分离出要判断的时间戳字段并分配到对应的窗口里,然后会回调 WatermarkStrategy#createWatermarkGenerator 尝试生成水位线,检查此时水位线是否越过某个窗口的结束位置(此处的行为由 EventTimeTrigger 决定),如果是就对水位线之前的所有未执行的窗口执行 WindowOperator ;反之不执行,继续处理下个数据

这里注意一下,窗口分配器在设置了之后每个时间戳要分配到哪些窗口里都已经是确定的,比如某个数据在某个时间点到来了,那么包含该时间点的所有窗口都会被创建。窗口的创建是由数据进入时触发的!

Sliding Windows

Sliding Windows ,滑动窗口。 window size 就是我们前面理解的窗口大小, window slide 可以理解为上一个窗口和下一个窗口的创建间隔时间(至于为什么叫滑动呢,如果全程只有一个窗口, Sliding Window 的运作机制很像移动 window slide 个大小):
sliding-windows.png
From 《Apache Flink》
这个窗口可以理解为 在近X分钟(秒、小时,总之是个时间单位就行)XXX业务的数据分析。我们先从参数分析:

  1. size ,窗口的总大小,执行 WindowOperator 操作时的数据数量。比如设置了5秒,那最终执行 WindowOperator 的数据就是5秒内的所有数据。
  2. slide ,可滑动大小,创建窗口的间隔大小。比如 size5秒slide1秒,那么每隔1秒生成一个大小为5秒的窗口;如果 size5秒slide2秒,那么每隔2秒生成一个大小为5秒的窗口。

小提示:如果 size5秒slide5秒,那么就相当于窗口大小设置称5秒的滚动窗口
举个栗子:比如在 ProcessingTime 下,size 为5分钟, slide 为1分钟,当前时间为12:00,若有一个数据在12:00进入,那么就会生成一个窗口A;随后,新的一个数据在12:01进入会生成一个窗口B,并且该数据会被保存到窗口A和窗口B里;当又有一个数据在12:02进入,会生成一个窗口C,并且该数据会被保存到窗口A、窗口B、窗口C里;…省略后续的数据;当新的数据一直持续到12:05时,时间已到(假设水位线能够触发 Trigger ),就对窗口A执行 WindowOperator ,后续的数据就不会再保存到A里了。此时此刻窗口B(12:01~12:06)已经过去4分钟、窗口C(12:02~12:07)过去三分钟、窗口D(12:03~12:08)过去2分钟、窗口E(12:04~12:09)过去1分钟。示意图如下所示(以窗口增长来表示数据放入窗口): sliding_window_example_gif.gif (104.46KB)窗口与窗口重叠的部分,就是它们都拥有的数据。比如窗口A窗口B,窗口A和窗口B都会收到来12:01~12:05 的数据。
不知道此时大家对滑动窗口是不是理解更深了呢?大家可以自己动手试试呗~

Count Windows

Count Windows 表示计数窗口,通过DataStream#countWindowAll()方法实现,该方法有两个重载方法:

  • countWindowAll(size, slide)
  • countWindowAll(size)

size表示这次计数的窗口大小,slide表示每有多少个元素进来就滑动一下(产生一个新窗口)。跟了一下执行机制,它的TriggerCountTrigger():每有一个元素进来就进行计数,当计数器达到了slide,就激活EvictCountEvict:激活Evict动作时,判断当前整个窗口的元素是否大于size,如果大于就把头部的几个元素移除(还有一些机制);否则就直接结束。对于CountEvict来说,如果每次Evict还有剩余元素就会作用于下一个窗口里。咱们看一下动图:

窗口总结

现在回过头来发现前面主要是因为EventTime和ProcessingTime对应的 Trigger 不同,执行 WindowOperator 的时机也不同,导致懵逼。
总结起来看,整个 Window 机制其实还是离不开窗口创建、触发Trigger、执行WindowOperaotr、执行Evict这样四个阶段。窗口的种类很多,我们只需要把握窗口分配的特性就行了,因为数据落在哪个窗口里从一开始就确定了,即使有时候没有触发 Trigger 没有输出,但是窗口是实实在在到点了就创建的!

Trigger

Window Operator

Evict

一些小配置

allowLatency():当水位线超过窗口的结束时间后,窗口会被关闭,如果有迟到的元素进来,就会被丢弃。如果设置了allowLatency(...)后,窗口结束时间得算上allowLatency(...)的时间。当一个迟到数据进来时会立即触发所有的aggregate(...)WindowFunction(...)

在测试allowLatency(…)时,最好在纸上画好窗口时间,方便确认。

sideOutputLateData:当窗口都关闭后,迟到的数据会被丢入侧输出流,导入一个数据库种等待处理。

关于窗口的一些小知识

  1. EventTime 下,水位线产生了之后,后续任何小于水位线的值都无法进入对应的窗口,但是能进入普通的DataStream。证明戳这里👉WaterMarkIsBarrier.java
  2. EventTime 下,如果数据输入结束后,不管水位线有没有、越没越窗口终点,都对所有窗口应用WindowOperator。证明戳这里👉FinallyOutputAllWindow.java

官方文档上说 window(窗口) 是处理无界流的关键,它能将无界流分割并放到各个桶中分别处理。

窗口有分键和无键,前者使用 keyBy()+window() 后者使用 windowAll() ,两者的区别:

  1. 分键:分键了之后可以并行处理各个键的数据,因为它们互不干扰
  2. 不分键:所有的数据只会被放到一个单独的任务进行处理,即并行度为1

窗口的生命周期:
当第一个元素到达时,窗口就会创建;当时间到期(截至时间+延迟时间)了,就会自动移除窗口。比如在12:00时设置了结束时间为12:05,然后延迟1分钟。假设在12:00第一个元素进入时,窗口被创建,在12:06时被移除

另外窗口提供了触发器和对应的算子函数,触发器是用来决定何时使用算子函数处理窗口内的数据

分键和不分键

Window Assigners
用来决定元素分配到哪些窗口里的接口。Flink内置了一些Window Assigners

Event Time 和 Processing Time timestamp和watermarks的生成

参考资料