Flink SQL中的时间

  • Flink SQL 同时支持事件时间和处理时间
    • 通过使用表模式声明的时间属性
  • 时间属性类似于常规时间戳,但附加了特殊的元数据

    • 事件时间属性可以是 TIMESTAMP 或 TIMESTAMP_LTZ
    • 处理时间属性始终是 TIMESTAMP_LTZ
    • Flink 1.13 增加了 TIMESTAMP_LTZ 类型;以前所有时间属性都是 TIMESTAMP

      时态算子采用时间来使状态过期

      例如,在长达一小时的窗口中计算每个用户的点击次数
  • 处理时间窗口

    • 点击在处理时计数
    • 用户⇒计数
    • 本地系统时钟触发关闭窗口 在整点(例如,在 11:00),报告计数并丢弃状态
  • 事件时间窗口
    • 点击次数计入它们发生的小时数
    • 事件可能无序到达
    • 前一个窗口可能会在下一个窗口开始后关闭
    • 窗口⇒(用户⇒计数)
    • watermark触发关闭窗口和丢弃状态

处理时间属性

  • 处理时间属性是计算列,不保存数据
    • 每当访问属性时都会查询本地机器时间
  • 处理时间属性可以像常规 TIMESTAMP_LTZ 一样使用
    1. CREATE TABLE clicks (
    2. user STRING,
    3. url STRING,
    4. cTime AS PROCTIME()
    5. )

事件时间属性

  • 事件时间属性带有一个实际的时间戳
  • 事件时间属性可以像 TIMESTAMP 或 TIMESTAMP_LTZ 一样使用
  • watermark是必需的,并且是相对于时间戳指定的
    • 这是一种有界无序watermark策略
      1. CREATE TABLE clicks (
      2. user STRING,
      3. url STRING,
      4. cTime TIMESTAMP_LTZ(3),
      5. WATERMARK FOR cTime AS cTime INTERVAL 2 MINUTE
      6. )
      cTime (clickTime)是事件时间属性

      时态算子与聚合

      GROUP BY windows

      查询每个用户每小时点击次数
      1. SELECT
      2. user,
      3. TUMBLE_END(cTime, INTERVAL '1' HOUR) AS endT,
      4. COUNT(url) AS cnt
      5. FROM clicks
      6. GROUP BY
      7. TUMBLE(cTime, INTERVAL '1' HOUR),
      8. user
      TUMBLE和TUMBLE_END是内在窗口函数。这些窗口函数使用 cTime,我们的表时间属性。
      每一行都追加到结果表中
      image.png
      GROUP BY windows的各种类型
      image.png

      选择窗口的开始和结束时间

      返回相应翻滚、滑动或会话窗口的 TIMESTAMP 的函数

      滚动窗口

      滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常,滚动窗口有一个固定的大小,并且不会出现重叠。例如,如果指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00 - 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口。下图展示了一个30秒的滚动窗口。
      Time, Watermarks, and Windows with Flink SQL - 图3
窗口标识函数 返回类型 描述
TUMBLE_START(time-attr, size-interval) TIMESTAMP 返回窗口的起始时间(包含边界)。例如[00:10, 00:15) 窗口,返回00:10 。
TUMBLE_END(time-attr, size-interval) TIMESTAMP 返回窗口的结束时间(包含边界)。例如[00:00, 00:15]窗口,返回00:15。
TUMBLE_ROWTIME(time-attr, size-interval) TIMESTAMP(rowtime-attr) 返回窗口的结束时间(不包含边界)。例如[00:00, 00:15]窗口,返回00:14:59.999 。返回值是一个rowtime attribute,即可以基于该字段做时间属性的操作,例如,级联窗口只能用在基于Event Time的Window上
TUMBLE_PROCTIME(time-attr, size-interval) TIMESTAMP(rowtime-attr) 返回窗口的结束时间(不包含边界)。例如[00:00, 00:15]窗口,返回00:14:59.999。返回值是一个proctime attribute,即可以基于该字段做时间属性的操作,例如,级联窗口只能用在基于Processing Time的Window上
  1. Table result = tEnv.sqlQuery("SELECT TUMBLE_START(t, INTERVAL '10' SECOND) AS window_start," +
  2. "TUMBLE_END(t, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
  3. + logT + " GROUP BY TUMBLE(t, INTERVAL '10' SECOND)");

(2019-11-01 06:53:00.0,2019-11-01 06:53:10.0,603)
(2019-11-01 06:53:20.0,2019-11-01 06:53:30.0,208)
(2019-11-01 06:53:10.0,2019-11-01 06:53:20.0,204)

滑动窗口

滑动窗口(HOP),也被称作Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。

滑动窗口有两个参数:slide和size。slide为每次滑动的步长,size为窗口的大小。

slide < size,则窗口会重叠,每个元素会被分配到多个窗口。
slide = size,则等同于滚动窗口(TUMBLE)。
slide > size,则为跳跃窗口,窗口之间不重叠且有间隙。
通常,大部分元素符合多个窗口情景,窗口是重叠的。因此,滑动窗口在计算移动平均数(moving averages)时很实用。例如,计算过去5分钟数据的平均值,每10秒钟更新一次,可以设置slide为10秒,size为5分钟。下图为您展示间隔为30秒,窗口大小为1分钟的滑动窗口。
1632100568-779824cf2aee01c5.png

窗口标识函数 返回类型 描述
HOP_START(, , TIMESTAMP 返回窗口的起始时间(包含边界)。例如[00:10, 00:15) 窗口,返回00:10 。
HOP_END(, , TIMESTAMP 返回窗口的结束时间(包含边界)。例如[00:00, 00:15) 窗口,返回00:15。
HOP_ROWTIME(, , TIMESTAMP(rowtime-attr) 返回窗口的结束时间(不包含边界)。例如[00:00, 00:15) 窗口,返回00:14:59.999。返回值是一个rowtime attribute,即可以基于该字段做时间类型的操作,只能用在基于event time的window上。
HOP_PROCTIME(, , TIMESTAMP(rowtime-attr) 返回窗口的结束时间(不包含边界)。例如[00:00, 00:15) 窗口,返回00:14:59.999 。返回值是一个proctime attribute
  1. SELECT HOP_START(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_start," +
  2. "HOP_END(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
  3. + logT + " GROUP BY HOP(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND)

(2019-11-01 06:53:15.0,2019-11-01 06:53:25.0,208)
(2019-11-01 06:53:10.0,2019-11-01 06:53:20.0,204)
(2019-11-01 06:53:05.0,2019-11-01 06:53:15.0,507)
(2019-11-01 06:53:20.0,2019-11-01 06:53:30.0,208)
(2019-11-01 06:53:00.0,2019-11-01 06:53:10.0,603)
(2019-11-01 06:52:55.0,2019-11-01 06:53:05.0,300)

会话窗口

会话窗口(SESSION)通过Session活动来对元素进行分组。会话窗口与滚动窗口和滑动窗口相比,没有窗口重叠,没有固定窗口大小。相反,当它在一个固定的时间周期内不再收到元素,即会话断开时,这个窗口就会关闭。

会话窗口通过一个间隔时间(Gap)来配置,这个间隔定义了非活跃周期的长度。例如,一个表示鼠标点击活动的数据流可能具有长时间的空闲时间,并在两段空闲之间散布着高浓度的点击。 如果数据在指定的间隔(Gap)之后到达,则会开始一个新的窗口。

Time, Watermarks, and Windows with Flink SQL - 图5

窗口标识函数 返回类型 描述
SESSION_START(, Timestamp 返回窗口的起始时间(包含边界)。如[00:10, 00:15) 的窗口,返回 00:10 ,即为此会话窗口内第一条记录的时间。
SESSION_END(, Timestamp 返回窗口的结束时间(包含边界)。如[00:00, 00:15) 的窗口,返回 00:15,即为此会话窗口内最后一条记录的时间+
SESSION_ROWTIME(, Timestamp(rowtime-attr) 返回窗口的结束时间(不包含边界)。如 [00:00, 00:15) 的窗口,返回00:14:59.999 。返回值是一个rowtime attribute,也就是可以基于该字段进行时间类型的操作。该参数只能用于基于event time的window 。
SESSION_PROCTIME(, Timestamp(rowtime-attr) 返回窗口的结束时间(不包含边界)。如 [00:00, 00:15) 的窗口,返回 00:14:59.999 。返回值是一个 proctime attribute,也就是可以基于该字段进行时间类型的操作。该参数只能用于基于processing time的window 。
  1. "SELECT SESSION_START(t, INTERVAL '5' SECOND) AS window_start," +
  2. "SESSION_END(t, INTERVAL '5' SECOND) AS window_end, SUM(v) FROM "
  3. + logT + " GROUP BY SESSION(t, INTERVAL '5' SECOND)"

(2019-11-01 06:53:21.0,2019-11-01 06:53:26.0,208)
(2019-11-01 06:53:00.0,2019-11-01 06:53:05.0,300)
(2019-11-01 06:53:09.0,2019-11-01 06:53:17.0,507)

更多窗口数据,请看https://segmentfault.com/a/1190000023296719

OVER窗口

OVER窗口(OVER Window)是传统数据库的标准开窗,不同于Group By Window,OVER窗口中每1个元素都对应1个窗口。窗口内的元素是当前元素往前多少个或往前多长时间的元素集合,因此流数据元素分布在多个窗口中。
在应用OVER窗口的流式数据中,每1个元素都对应1个OVER窗口。每1个元素都触发1次数据计算,每个触发计算的元素所确定的行,都是该元素所在窗口的最后1行。在实时计算的底层实现中,OVER窗口的数据进行全局统一管理(数据只存储1份),逻辑上为每1个元素维护1个OVER窗口,为每1个元素进行窗口计算,完成计算后会清除过期的数据。
Flink SQL中对OVER窗口的定义遵循标准SQL的定义语法,传统OVER窗口没有对其进行更细粒度的窗口类型命名划分。按照计算行的定义方式,OVER Window可以分为以下两类:

  • ROWS OVER Window:每一行元素都被视为新的计算行,即每一行都是一个新的窗口。
  • RANGE OVER Window:具有相同时间值的所有元素行视为同一计算行,即具有相同时间值的所有行都是同一个窗口。

Rows OVER Window语义
窗口数据
ROWS OVER Window的每个元素都确定一个窗口。ROWS OVER Window分为Unbounded(无界流)和Bounded(有界流)两种情况。
Unbounded ROWS OVER Window数据示例如下图所示。
Time, Watermarks, and Windows with Flink SQL - 图6
虽然上图所示窗口user1的w7、w8及user2的窗口w3、w4都是同一时刻到达,但它们仍然在不同的窗口,这一点与RANGE OVER Window不同。
Bounded ROWS OVER Window数据以3个元素(往前2个元素)的窗口为例,如下图所示。
Time, Watermarks, and Windows with Flink SQL - 图7
对于每次点击,其 URL 在过去 2 小时内被点击的频率如何?

  1. SELECT
  2. user,
  3. url,
  4. COUNT(*) OVER w
  5. FROM clicks
  6. WINDOW w AS (
  7. PARTITION BY url
  8. ORDER BY cTime
  9. RANGE BETWEEN INTERVAL 2 HOUR PRECEDING AND CURRENT ROW)

image.png

时间属性的有效性

TODO
补充 Validity of Time Attributes https://flink-training-slides.netlify.app/decks/sql/time/19及以后的ppt页dity of Time Attributes