• AS 计算列:原有字段+各种运算 生成一个额外的列

    使用process time

    DDL连接外部系统

    1. CREATE TABLE EventTable(
    2. user STRING,
    3. url STRING,
    4. ts AS PROCTIME()
    5. ) WITH ()

    fromDatastream

  • 不能用原有字段,只能加额外字段

    1. Table table = tEnv.fromDataStream(
    2. stream,
    3. $("user"),
    4. $("url"),
    5. $("ts").proctime());//ts是stream里原来不存在的字段

    使用event time

    DDL连接外部系统

  • 时间字段必须是TIMESTAMP 或者 TIMESTAMP_LTZ类型。如果是bigint毫秒值需要额外定义一个字段 ```sql CREATE TABLE EventTable( user STRING, url STRING, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL ‘5’ SECOND ) WITH ();

// 利用毫秒值字段 生成额外字段 CREATE TABLE events ( user STRING, url STRING, ts BIGINT, ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL ‘5’ SECOND ) WITH ();

  1. <a name="Jzp6s"></a>
  2. #### fromDatastream
  3. - 需提前声明【时间戳的提取、watermark的生成】
  4. ```sql
  5. DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
  6. Table eventTable = tableEnv.fromDataStream(
  7. eventStream, // 要声明好 watermark
  8. $("user"),
  9. $("url"),
  10. $("timestamp").rowtime().as("ts") //将timestamp字段指定为eventtime,并命名为 ts
  11. );

分组窗口函数(group windows)-1.12及之前

  • 滚动窗口
    • TUMBLE(ts, INTERVAL ‘1’ HOUR)
  • 滑动窗口
    • HOP(rowtime, INTERVAL ‘1’ HOUR, INTERVAL ‘24’ HOUR)
  • 会话窗口

    • SESSION(time_attr, interval) 时间字段,第二个参数是窗口间隔
      // 将窗口本身作为一个group by字段,TUMBLE_END获取窗口结束时间
      select
      user,
      TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, // TUMBLE_END是函数
      COUNT(url) as cnt
      from eventTable
      group BY
      user,
      TUMBLE(ts,INTERVAL '1' HOUR);
      

      窗口表值函数(table-valued function TVF)-1.13

  • 替代了分组窗口函数(group windows),后者只支持窗口聚合

  • 更符合SQL标准,更强大的窗口计算,如TOPN、window-join(1.13暂不支持)
  • 将每一行数据 按照时间字段分配到指定的窗口中,一行数据可能分配到多个窗口

    计算方式一:group by聚合 多对一

    四种内置TVF函数
  • 正是这一类的计算方式,取代了 分组窗口函数(group window)

四种内置的开窗方式

  • 滚动窗口

TUMBLE(TABLE EventTable,DESCRIPTOR(ts),INTERVAL '1' HOUR)

  • 滑动窗口

HOP(TABLE EventTable,DESCRIPTOR(ts),INTERVAL '5' MINUTES,INTERVAL '1' HOURS));//步长、窗口大小

  • 累积窗口
    • 统计窗口太长,但是需要隔一段时间输出一次结果。比如:计算一天的pv 不能一天结束后才输出结果 而是10分钟输出一次累计结果

CUMULATE(TABLE EventTable,DESCRIPTOR(ts),INTERVAL '1' HOURS,INTERVAL '1' DAYS))//累积步长(step)、最大窗口长度

  • 会话窗口

    • 目前尚未完全支持
      三个额外新增的函数
  • 窗口起始点 window_start

  • 窗口结束点 window_end
  • 窗口时间 window_time:(window_end - 1)表示本窗口的可以包含的最大时间戳 ```sql // 旧:使用分组窗口函数(group window) select TUMBLE_START(ts,INTERVAL ‘1’ HOUR) as window_start, user, COUNT(url) as cnt from EventTable group by TUMBLE(ts,INTERVAL ‘1’ HOUR), user;

// 新:使用窗口表值函数 select window_start, window_end, user, COUNT(url) as cnt from table(TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL ‘1’ HOUR)) group by window_start, window_end, user;

<a name="c4UTX"></a>
#### 计算方式二:over开窗 一对一

- 排序方式:目前只支持时间的升序,所以order by 后加时间字段
- 开窗范围:上界、下界(目前只支持到本行)
- 开窗类型:时间范围RANGE、行数范围ROWS
```sql
// 时间范围:当前行之前一小时数据,(包括当前行)
XX_function over(partition by XX 
                 order by XX 
                 RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
// 行数范围:当前行之前的5行数据,(包括当前行,一共6行!!!)
XX_function over(partition by XX 
                 order by XX 
                 ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)
// 案例一:以ts作为时间属性字段,对表中的每行数据都选取它之前1小时的所有数据进行聚合,统计每个用户的pv
SELECT
user,
ts,
COUNT(url) OVER (
    PARTITION BY user
    ORDER BY ts
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS cnt
FROM EventTable;

// 案例二:在select外部单独定义一个window 并命名为w,可以在sql里多次利用,精简代码 增加可读性
select
user,
ts,
COUNT(url) OVER w AS cnt,
MAX(CHAR_LENGTH(url)) OVER w AS max_url
FROM EventTable
WINDOW w AS (
    PARTITION BY user
    ORDER BY ts
    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);

TOPN

1 、本来order的字段只能是时间字段 且升序,这里单独优化可使用其他字段或desc
2、table-api不支持row_number函数,但是sql支持
3、topN-sql必须 嵌套查询+where筛选

// 案例三:整体TOPN : 整体pv最多的user
SELECT
user,
url,
ts,
row_num
FROM (
    SELECT
    *,
    ROW_NUMBER() OVER (
        PARTITION BY user
        ORDER BY CHAR_LENGTH(url) desc) AS row_num
    FROM EventTable)
WHERE row_num <= 2;

// 案例四:分组TOPN : 每小时内pv最多的user
第一步:
sql_string = """
SELECT
window_start,
window_end,
user,
COUNT(url) as cnt
FROM TABLE(TUMBLE(TABLE EventTable,
                  DESCRIPTOR(ts),
                  INTERVAL '1' HOUR ))
GROUP BY
window_start,
window_end,
user"""
第二步:
SELECT
*
FROM (
    SELECT
    *,
    ROW_NUMBER() OVER (
        PARTITION BY window_start, window_end
        ORDER BY cnt desc) AS row_num
    FROM (" + sql_string + "))
WHERE row_num <= 2;

开窗中的状态(state)

因为动态表-持续查询的原因,可能导致状态持续增大,因此需要设置状态的生存时间ttl。配置ttl 可能导致结果不准确。 这就是牺牲了正确性 换取了资源

TableEnvironment tableEnv = ...
// 获取表环境的配置
TableConfig tableConfig = tableEnv.getConfig();
// 配置状态保持时间
tableConfig.setIdleStateRetention(Duration.ofMinutes(60));

// sql中配置
Flink SQL> SET 'table.exec.state.ttl' = '1000';