窗口

根据窗口的不同,Flink的窗口分为三种:

  • 滚动窗口:窗口数据有固定的大小,窗口中的数据不会叠加;
  • 滑动窗口:窗口数据有固定大小,并且有生成间隔;
  • 窗口数据没有固定的大小,根据用户传入的参数进行划分,窗口数据无叠加;

滚动窗口

滚动窗口的特点是:有固定大小、窗口中的数据不会重叠。
Flink窗口、时间和水印 - 图1

滚动窗口的语法:

  1. SELECT
  2. [gk],
  3. [TUMBLE_START(timeCol, size)],
  4. [TUMBLE_END(timeCol, size)],
  5. agg1(col1),
  6. ...
  7. aggn(colN)
  8. FROM Tab1
  9. GROUP BY [gk], TUMBLE(timeCol, size)

举例说明,我们需要计算每个用户每天的订单数量:

  1. SELECT user, TUMBLE_START(timeLine, INTERVAL '1' DAY) as winStart, SUM(amount)
  2. FROM Orders GROUP BY TUMBLE(timeLine, INTERVAL '1' DAY), user;

其中,TUMBLE_START 和 TUMBLE_END 代表窗口的开始时间和窗口的结束时间,TUMBLE (timeLine, INTERVAL ‘1’ DAY) 中的 timeLine 代表时间字段所在的列,INTERVAL ‘1’ DAY 表示时间间隔为一天。

滑动窗口

滑动窗口有固定的大小,与滚动窗口不同的是滑动窗口可以通过 slide 参数控制滑动窗口的创建频率。需要注意的是,多个滑动窗口可能会发生数据重叠,具体语义如下:
Flink窗口、时间和水印 - 图2
滑动窗口的语法与滚动窗口相比,只多了一个 slide 参数:

  1. SELECT
  2. [gk],
  3. [HOP_START(timeCol, slide, size)] ,
  4. [HOP_END(timeCol, slide, size)],
  5. agg1(col1),
  6. ...
  7. aggN(colN)
  8. FROM Tab1
  9. GROUP BY [gk], HOP(timeCol, slide, size)

例如,我们要每间隔一小时计算一次过去 24 小时内每个商品的销量:

  1. SELECT
  2. product,
  3. SUM(amount)
  4. FROM Orders
  5. GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product

上述案例中的 INTERVAL ‘1’ HOUR 代表滑动窗口生成的时间间隔。

会话窗口

会话窗口定义了一个非活动时间,假如在指定的时间间隔内没有出现事件或消息,则会话窗口关闭。
Flink窗口、时间和水印 - 图3
会话窗口的语法如下:

  1. SELECT
  2. [gk],
  3. SESSION_START(timeCol, gap) AS winStart,
  4. SESSION_END(timeCol, gap) AS winEnd,
  5. agg1(col1),
  6. ...
  7. aggn(colN)
  8. FROM Tab1
  9. GROUP BY [gk], SESSION(timeCol, gap)

举例,我们需要计算每个用户过去 1 小时内的订单量:

  1. SELECT
  2. user,
  3. SESSION_START(rowtime, INTERVAL '1' HOUR) AS sStart,
  4. SESSION_ROWTIME(rowtime, INTERVAL '1' HOUR) AS sEnd,
  5. SUM(amount)
  6. FROM Orders
  7. GROUP BY SESSION(rowtime, INTERVAL '1' HOUR), user

时间

时间时间(Event time)事件时间(Event Time)

指的是数据产生的时间,这个时间一般由数据生产方自身携带,比如 Kafka 消息,每个生成的消息中自带一个时间戳代表每条数据的产生时间。Event Time 从消息的产生就诞生了,不会改变,也是我们使用最频繁的时间。

处理时间(Processing Time)

处理时间(Processing Time)指的是数据被 Flink 框架处理时机器的系统时间,Processing Time 是 Flink 的时间系统中最简单的概念,但是这个时间存在一定的不确定性,比如消息到达处理节点延迟等影响。

摄入时间(Ingestion Time)

摄入时间(Ingestion Time)是事件进入 Flink 系统的时间,在 Flink 的 Source 中,每个事件会把当前时间作为时间戳,后续做窗口处理都会基于这个时间。理论上 Ingestion Time 处于 Event Time 和 Processing Time之间。

与事件时间相比,摄入时间无法处理延时和无序的情况,但是不需要明确执行如何生成 watermark。在系统内部,摄入时间采用更类似于事件时间的处理方式进行处理,但是有自动生成的时间戳和自动的 watermark。

可以防止 Flink 内部处理数据是发生乱序的情况,但无法解决数据到达 Flink 之前发生的乱序问题。如果需要处理此类问题,建议使用 EventTime。

水印(WaterMark)

水印本质上是一个时间戳的概念。
在Flink的时间概念:

  • Event Time : 每条数据都带着时间戳;
  • Processing Time:数据不携带任何时间戳的信息;
  • Ingestion Time:和 Event time 类似,不同的是,Ingestion time 的时间戳是采用当前的系统时间,如此可以防止数据在Flink中乱序的情况,但是不能防止数据到Flink之前发生的乱序问题。

综上,在处理数据乱序的情况下,会采用 Event Time 和 WaterMark 进行配合使用。

首先需要明白几个基本的问题

水印的本质是什么

水印的出现是为了解决实时计算中的数据乱序问题,它的本质是 DataStream 中一个带有时间戳的元素。如果 Flink 系统中出现了一个 WaterMark T,那么就意味着 EventTime < T 的数据都已经到达,窗口的结束时间和 T 相同的那个窗口被触发进行计算了。

也就是说:水印是 Flink 判断迟到数据的标准,同时也是窗口触发的标记

在程序并行度大于 1 的情况下,会有多个流产生水印和窗口,这时候 Flink 会选取时间戳最小的水印

水印是如何生成的

Flink 提供了 assignTimestampsAndWatermarks() 方法来实现水印的提取和指定,该方法接受的入参有 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 两种。

整体的类图如下:
Flink窗口、时间和水印 - 图4

水印种类