Flink SQL中的时间
- Flink SQL 同时支持事件时间和处理时间
- 通过使用表模式声明的时间属性
时间属性类似于常规时间戳,但附加了特殊的元数据
处理时间窗口
- 点击在处理时计数
- 用户⇒计数
- 本地系统时钟触发关闭窗口 在整点(例如,在 11:00),报告计数并丢弃状态
- 事件时间窗口
- 点击次数计入它们发生的小时数
- 事件可能无序到达
- 前一个窗口可能会在下一个窗口开始后关闭
- 窗口⇒(用户⇒计数)
- watermark触发关闭窗口和丢弃状态
处理时间属性
- 处理时间属性是计算列,不保存数据
- 每当访问属性时都会查询本地机器时间
- 处理时间属性可以像常规 TIMESTAMP_LTZ 一样使用
CREATE TABLE clicks (
user STRING,
url STRING,
cTime AS PROCTIME()
)
事件时间属性
- 事件时间属性带有一个实际的时间戳
- 事件时间属性可以像 TIMESTAMP 或 TIMESTAMP_LTZ 一样使用
- watermark是必需的,并且是相对于时间戳指定的
- 这是一种有界无序watermark策略
cTime (clickTime)是事件时间属性CREATE TABLE clicks (
user STRING,
url STRING,
cTime TIMESTAMP_LTZ(3),
WATERMARK FOR cTime AS cTime – INTERVAL ‘2’ MINUTE
)
时态算子与聚合
GROUP BY windows
查询每个用户每小时点击次数
TUMBLE和TUMBLE_END是内在窗口函数。这些窗口函数使用 cTime,我们的表时间属性。SELECT
user,
TUMBLE_END(cTime, INTERVAL '1' HOUR) AS endT,
COUNT(url) AS cnt
FROM clicks
GROUP BY
TUMBLE(cTime, INTERVAL '1' HOUR),
user
每一行都追加到结果表中
GROUP BY windows的各种类型选择窗口的开始和结束时间
返回相应翻滚、滑动或会话窗口的 TIMESTAMP 的函数滚动窗口
滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常,滚动窗口有一个固定的大小,并且不会出现重叠。例如,如果指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00 - 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口。下图展示了一个30秒的滚动窗口。
- 这是一种有界无序watermark策略
窗口标识函数 | 返回类型 | 描述 |
---|---|---|
TUMBLE_START(time-attr, size-interval) | TIMESTAMP | 返回窗口的起始时间(包含边界)。例如[00:10, 00:15) 窗口,返回00:10 。 |
TUMBLE_END(time-attr, size-interval) | TIMESTAMP | 返回窗口的结束时间(包含边界)。例如[00:00, 00:15]窗口,返回00:15。 |
TUMBLE_ROWTIME(time-attr, size-interval) | TIMESTAMP(rowtime-attr) | 返回窗口的结束时间(不包含边界)。例如[00:00, 00:15]窗口,返回00:14:59.999 。返回值是一个rowtime attribute,即可以基于该字段做时间属性的操作,例如,级联窗口只能用在基于Event Time的Window上 |
TUMBLE_PROCTIME(time-attr, size-interval) | TIMESTAMP(rowtime-attr) | 返回窗口的结束时间(不包含边界)。例如[00:00, 00:15]窗口,返回00:14:59.999。返回值是一个proctime attribute,即可以基于该字段做时间属性的操作,例如,级联窗口只能用在基于Processing Time的Window上 |
Table result = tEnv.sqlQuery("SELECT TUMBLE_START(t, INTERVAL '10' SECOND) AS window_start," +
"TUMBLE_END(t, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY TUMBLE(t, INTERVAL '10' SECOND)");
(2019-11-01 06:53:00.0,2019-11-01 06:53:10.0,603)
(2019-11-01 06:53:20.0,2019-11-01 06:53:30.0,208)
(2019-11-01 06:53:10.0,2019-11-01 06:53:20.0,204)
滑动窗口
滑动窗口(HOP),也被称作Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。
滑动窗口有两个参数:slide和size。slide为每次滑动的步长,size为窗口的大小。
slide < size,则窗口会重叠,每个元素会被分配到多个窗口。
slide = size,则等同于滚动窗口(TUMBLE)。
slide > size,则为跳跃窗口,窗口之间不重叠且有间隙。
通常,大部分元素符合多个窗口情景,窗口是重叠的。因此,滑动窗口在计算移动平均数(moving averages)时很实用。例如,计算过去5分钟数据的平均值,每10秒钟更新一次,可以设置slide为10秒,size为5分钟。下图为您展示间隔为30秒,窗口大小为1分钟的滑动窗口。
窗口标识函数 | 返回类型 | 描述 |
---|---|---|
HOP_START( |
TIMESTAMP | 返回窗口的起始时间(包含边界)。例如[00:10, 00:15) 窗口,返回00:10 。 |
HOP_END( |
TIMESTAMP | 返回窗口的结束时间(包含边界)。例如[00:00, 00:15) 窗口,返回00:15。 |
HOP_ROWTIME( |
TIMESTAMP(rowtime-attr) | 返回窗口的结束时间(不包含边界)。例如[00:00, 00:15) 窗口,返回00:14:59.999。返回值是一个rowtime attribute,即可以基于该字段做时间类型的操作,只能用在基于event time的window上。 |
HOP_PROCTIME( |
TIMESTAMP(rowtime-attr) | 返回窗口的结束时间(不包含边界)。例如[00:00, 00:15) 窗口,返回00:14:59.999 。返回值是一个proctime attribute |
SELECT HOP_START(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_start," +
"HOP_END(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY HOP(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND)
(2019-11-01 06:53:15.0,2019-11-01 06:53:25.0,208)
(2019-11-01 06:53:10.0,2019-11-01 06:53:20.0,204)
(2019-11-01 06:53:05.0,2019-11-01 06:53:15.0,507)
(2019-11-01 06:53:20.0,2019-11-01 06:53:30.0,208)
(2019-11-01 06:53:00.0,2019-11-01 06:53:10.0,603)
(2019-11-01 06:52:55.0,2019-11-01 06:53:05.0,300)
会话窗口
会话窗口(SESSION)通过Session活动来对元素进行分组。会话窗口与滚动窗口和滑动窗口相比,没有窗口重叠,没有固定窗口大小。相反,当它在一个固定的时间周期内不再收到元素,即会话断开时,这个窗口就会关闭。
会话窗口通过一个间隔时间(Gap)来配置,这个间隔定义了非活跃周期的长度。例如,一个表示鼠标点击活动的数据流可能具有长时间的空闲时间,并在两段空闲之间散布着高浓度的点击。 如果数据在指定的间隔(Gap)之后到达,则会开始一个新的窗口。
窗口标识函数 | 返回类型 | 描述 |
---|---|---|
SESSION_START( |
Timestamp | 返回窗口的起始时间(包含边界)。如[00:10, 00:15) 的窗口,返回 00:10 ,即为此会话窗口内第一条记录的时间。 |
SESSION_END( |
Timestamp | 返回窗口的结束时间(包含边界)。如[00:00, 00:15) 的窗口,返回 00:15,即为此会话窗口内最后一条记录的时间+ |
SESSION_ROWTIME( |
Timestamp(rowtime-attr) | 返回窗口的结束时间(不包含边界)。如 [00:00, 00:15) 的窗口,返回00:14:59.999 。返回值是一个rowtime attribute,也就是可以基于该字段进行时间类型的操作。该参数只能用于基于event time的window 。 |
SESSION_PROCTIME( |
Timestamp(rowtime-attr) | 返回窗口的结束时间(不包含边界)。如 [00:00, 00:15) 的窗口,返回 00:14:59.999 。返回值是一个 proctime attribute,也就是可以基于该字段进行时间类型的操作。该参数只能用于基于processing time的window 。 |
"SELECT SESSION_START(t, INTERVAL '5' SECOND) AS window_start," +
"SESSION_END(t, INTERVAL '5' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY SESSION(t, INTERVAL '5' SECOND)"
(2019-11-01 06:53:21.0,2019-11-01 06:53:26.0,208)
(2019-11-01 06:53:00.0,2019-11-01 06:53:05.0,300)
(2019-11-01 06:53:09.0,2019-11-01 06:53:17.0,507)
更多窗口数据,请看https://segmentfault.com/a/1190000023296719
OVER窗口
OVER窗口(OVER Window)是传统数据库的标准开窗,不同于Group By Window,OVER窗口中每1个元素都对应1个窗口。窗口内的元素是当前元素往前多少个或往前多长时间的元素集合,因此流数据元素分布在多个窗口中。
在应用OVER窗口的流式数据中,每1个元素都对应1个OVER窗口。每1个元素都触发1次数据计算,每个触发计算的元素所确定的行,都是该元素所在窗口的最后1行。在实时计算的底层实现中,OVER窗口的数据进行全局统一管理(数据只存储1份),逻辑上为每1个元素维护1个OVER窗口,为每1个元素进行窗口计算,完成计算后会清除过期的数据。
Flink SQL中对OVER窗口的定义遵循标准SQL的定义语法,传统OVER窗口没有对其进行更细粒度的窗口类型命名划分。按照计算行的定义方式,OVER Window可以分为以下两类:
- ROWS OVER Window:每一行元素都被视为新的计算行,即每一行都是一个新的窗口。
- RANGE OVER Window:具有相同时间值的所有元素行视为同一计算行,即具有相同时间值的所有行都是同一个窗口。
Rows OVER Window语义
窗口数据
ROWS OVER Window的每个元素都确定一个窗口。ROWS OVER Window分为Unbounded(无界流)和Bounded(有界流)两种情况。
Unbounded ROWS OVER Window数据示例如下图所示。
虽然上图所示窗口user1的w7、w8及user2的窗口w3、w4都是同一时刻到达,但它们仍然在不同的窗口,这一点与RANGE OVER Window不同。
Bounded ROWS OVER Window数据以3个元素(往前2个元素)的窗口为例,如下图所示。
对于每次点击,其 URL 在过去 2 小时内被点击的频率如何?
SELECT
user,
url,
COUNT(*) OVER w
FROM clicks
WINDOW w AS (
PARTITION BY url
ORDER BY cTime
RANGE BETWEEN INTERVAL ’2’ HOUR PRECEDING AND CURRENT ROW)
时间属性的有效性
TODO
补充 Validity of Time Attributes https://flink-training-slides.netlify.app/decks/sql/time/19及以后的ppt页dity of Time Attributes