-
使用process time
DDL连接外部系统
CREATE TABLE EventTable(user STRING,url STRING,ts AS PROCTIME()) WITH ()
fromDatastream
不能用原有字段,只能加额外字段
Table table = tEnv.fromDataStream(stream,$("user"),$("url"),$("ts").proctime());//ts是stream里原来不存在的字段
使用event time
DDL连接外部系统
时间字段必须是TIMESTAMP 或者 TIMESTAMP_LTZ类型。如果是bigint毫秒值需要额外定义一个字段 ```sql CREATE TABLE EventTable( user STRING, url STRING, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL ‘5’ SECOND ) WITH ();
// 利用毫秒值字段 生成额外字段 CREATE TABLE events ( user STRING, url STRING, ts BIGINT, ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL ‘5’ SECOND ) WITH ();
<a name="Jzp6s"></a>#### fromDatastream- 需提前声明【时间戳的提取、watermark的生成】```sqlDataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);Table eventTable = tableEnv.fromDataStream(eventStream, // 要声明好 watermark$("user"),$("url"),$("timestamp").rowtime().as("ts") //将timestamp字段指定为eventtime,并命名为 ts);
分组窗口函数(group windows)-1.12及之前
- 滚动窗口
- TUMBLE(ts, INTERVAL ‘1’ HOUR)
- 滑动窗口
- HOP(rowtime, INTERVAL ‘1’ HOUR, INTERVAL ‘24’ HOUR)
会话窗口
替代了分组窗口函数(group windows),后者只支持窗口聚合
- 更符合SQL标准,更强大的窗口计算,如TOPN、window-join(1.13暂不支持)
将每一行数据 按照时间字段分配到指定的窗口中,一行数据可能分配到多个窗口
计算方式一:group by聚合 多对一
四种内置TVF函数
正是这一类的计算方式,取代了 分组窗口函数(group window)
四种内置的开窗方式
- 滚动窗口
TUMBLE(TABLE EventTable,DESCRIPTOR(ts),INTERVAL '1' HOUR)
- 滑动窗口
HOP(TABLE EventTable,DESCRIPTOR(ts),INTERVAL '5' MINUTES,INTERVAL '1' HOURS));//步长、窗口大小
- 累积窗口
- 统计窗口太长,但是需要隔一段时间输出一次结果。比如:计算一天的pv 不能一天结束后才输出结果 而是10分钟输出一次累计结果
CUMULATE(TABLE EventTable,DESCRIPTOR(ts),INTERVAL '1' HOURS,INTERVAL '1' DAYS))//累积步长(step)、最大窗口长度
会话窗口
窗口起始点 window_start
- 窗口结束点 window_end
- 窗口时间 window_time:(window_end - 1)表示本窗口的可以包含的最大时间戳 ```sql // 旧:使用分组窗口函数(group window) select TUMBLE_START(ts,INTERVAL ‘1’ HOUR) as window_start, user, COUNT(url) as cnt from EventTable group by TUMBLE(ts,INTERVAL ‘1’ HOUR), user;
// 新:使用窗口表值函数 select window_start, window_end, user, COUNT(url) as cnt from table(TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL ‘1’ HOUR)) group by window_start, window_end, user;
<a name="c4UTX"></a>
#### 计算方式二:over开窗 一对一
- 排序方式:目前只支持时间的升序,所以order by 后加时间字段
- 开窗范围:上界、下界(目前只支持到本行)
- 开窗类型:时间范围RANGE、行数范围ROWS
```sql
// 时间范围:当前行之前一小时数据,(包括当前行)
XX_function over(partition by XX
order by XX
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
// 行数范围:当前行之前的5行数据,(包括当前行,一共6行!!!)
XX_function over(partition by XX
order by XX
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)
// 案例一:以ts作为时间属性字段,对表中的每行数据都选取它之前1小时的所有数据进行聚合,统计每个用户的pv
SELECT
user,
ts,
COUNT(url) OVER (
PARTITION BY user
ORDER BY ts
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS cnt
FROM EventTable;
// 案例二:在select外部单独定义一个window 并命名为w,可以在sql里多次利用,精简代码 增加可读性
select
user,
ts,
COUNT(url) OVER w AS cnt,
MAX(CHAR_LENGTH(url)) OVER w AS max_url
FROM EventTable
WINDOW w AS (
PARTITION BY user
ORDER BY ts
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);
TOPN
1 、本来order的字段只能是时间字段 且升序,这里单独优化可使用其他字段或desc
2、table-api不支持row_number函数,但是sql支持
3、topN-sql必须 嵌套查询+where筛选
// 案例三:整体TOPN : 整体pv最多的user
SELECT
user,
url,
ts,
row_num
FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY user
ORDER BY CHAR_LENGTH(url) desc) AS row_num
FROM EventTable)
WHERE row_num <= 2;
// 案例四:分组TOPN : 每小时内pv最多的user
第一步:
sql_string = """
SELECT
window_start,
window_end,
user,
COUNT(url) as cnt
FROM TABLE(TUMBLE(TABLE EventTable,
DESCRIPTOR(ts),
INTERVAL '1' HOUR ))
GROUP BY
window_start,
window_end,
user"""
第二步:
SELECT
*
FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY window_start, window_end
ORDER BY cnt desc) AS row_num
FROM (" + sql_string + "))
WHERE row_num <= 2;
开窗中的状态(state)
因为动态表-持续查询的原因,可能导致状态持续增大,因此需要设置状态的生存时间ttl。配置ttl 可能导致结果不准确。 这就是牺牲了正确性 换取了资源
TableEnvironment tableEnv = ...
// 获取表环境的配置
TableConfig tableConfig = tableEnv.getConfig();
// 配置状态保持时间
tableConfig.setIdleStateRetention(Duration.ofMinutes(60));
// sql中配置
Flink SQL> SET 'table.exec.state.ttl' = '1000';
