时态表

时态表表示改变的历史记录表上的(参数化)视图的概念,该表返回特定时间点的表的内容。

Flink 可以跟踪应用于追加表的更改,在查询中的特定时间点,允许访问表的内容。

动机

假设我们有下表 RatesHistory

  1. SELECT * FROM RatesHistory;
  2. rowtime currency rate
  3. ======= ======== ======
  4. 09:00 US Dollar 102
  5. 09:00 Euro 114
  6. 09:00 Yen 1
  7. 10:45 Euro 116
  8. 11:15 Euro 119
  9. 11:49 Pounds 108

RatesHistory 代表一个不断增长的仅附加货币汇率表,相对于 Yen(其汇率为1)。例如,从 09:0010:45 期间的欧元日元汇率为 114。从 10:4511:15,汇率是 116

鉴于我们希望在 10:58 时输出所有当前汇率,我们需要以下 SQL 查询来计算结果表:

  1. SELECT *
  2. FROM RatesHistory AS r
  3. WHERE r.rowtime = (
  4. SELECT MAX(rowtime)
  5. FROM RatesHistory AS r2
  6. WHERE r2.currency = r.currency
  7. AND r2.rowtime <= TIME '10:58');

相关子查询确定相应货币的最大时间小于或等于所需时间。外部查询列出具有最大时间戳的汇率。

下表显示了这种计算的结果。在我们的例子中,考虑了对 10:45Euro 的更新,但是在时间 10:58,表格的版本中不会考虑在 11:15Euro的更新以及新的 Pounds 条目。

  1. rowtime currency rate
  2. ======= ======== ======
  3. 09:00 US Dollar 102
  4. 09:00 Yen 1
  5. 10:45 Euro 116

Temporal Tables 的概念旨在简化此类查询,加快执行速度并减少 Flink 的状态使用。 Temporal Table 是仅附加表的参数化视图,它将仅附加(append-only)表的行解释为表的更改日志,并在特定时间点提供该表的版本。将仅附加表解释为更改日志(changgelog)需要指定主键属性和时间戳属性。主键确定覆盖哪些行,时间戳确定行有效的时间。

在上面的例子中,currency 将是 RatesHistory 表的主键,rowtime 将是 timestamp 属性。

在 Flink 中,时态表由 时态表函数(Temporal Table Function) 表示。

时态表函数

为了访问时态表中的数据,必须传递一个时间属性,该属性确定将返回的表的版本。Flink 使用表函数的 SQL 语法来提供表达它的方法。

定义后,Temporal Table Function 采用单个时间参数 timeAttribute 并返回一组 rows。返回内容包含与给定时间属性相关的所有现有主键的最新行版本。

假设我们基于 RatesHistory 表定义了一个时态表函数 Rates(timeAttribute),我们可以通过以下方式查询这样的函数:

  1. SELECT * FROM Rates('10:15');
  2. rowtime currency rate
  3. ======= ======== ======
  4. 09:00 US Dollar 102
  5. 09:00 Euro 114
  6. 09:00 Yen 1
  7. SELECT * FROM Rates('11:00');
  8. rowtime currency rate
  9. ======= ======== ======
  10. 09:00 US Dollar 102
  11. 10:45 Euro 116
  12. 09:00 Yen 1

对于 Rates(timeAttribute) 的每个查询都将返回给定 timeAttributeRates 的状态。

注意:目前,Flink 不支持使用常量时间属性参数直接查询时态表函数。目前,时态表函数只能用于连接。上面的例子展示了函数 Rates(timeAttribute)的返回值。

有关如何连接时态表的更多信息,另请参阅有关连续查询的连接的页面。

定义时态表函数

以下代码段说明了如何从仅追加表创建时态表函数。

  1. import org.apache.flink.table.functions.TemporalTableFunction;
  2. (...)
  3. // Get the stream and table environments.
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
  6. // Provide a static data set of the rates history table.
  7. List<Tuple2<String, Long>> ratesHistoryData = new ArrayList<>();
  8. ratesHistoryData.add(Tuple2.of("US Dollar", 102L));
  9. ratesHistoryData.add(Tuple2.of("Euro", 114L));
  10. ratesHistoryData.add(Tuple2.of("Yen", 1L));
  11. ratesHistoryData.add(Tuple2.of("Euro", 116L));
  12. ratesHistoryData.add(Tuple2.of("Euro", 119L));
  13. // Create and register an example table using above data set.
  14. // In the real setup, you should replace this with your own table.
  15. DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
  16. Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate, r_proctime.proctime");
  17. tEnv.registerTable("RatesHistory", ratesHistory);
  18. // Create and register a temporal table function.
  19. // Define "r_proctime" as the time attribute and "r_currency" as the primary key.
  20. TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); // <==== (1)
  21. tEnv.registerFunction("Rates", rates); // <==== (2)
  1. // Get the stream and table environments. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tEnv = TableEnvironment.getTableEnvironment(env)
  3. // Provide a static data set of the rates history table. val ratesHistoryData = new mutable.MutableList[(String, Long)]
  4. ratesHistoryData.+=(("US Dollar", 102L))
  5. ratesHistoryData.+=(("Euro", 114L))
  6. ratesHistoryData.+=(("Yen", 1L))
  7. ratesHistoryData.+=(("Euro", 116L))
  8. ratesHistoryData.+=(("Euro", 119L))
  9. // Create and register an example table using above data set.
  10. // In the real setup, you should replace this with your own table. val ratesHistory = env
  11. .fromCollection(ratesHistoryData)
  12. .toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime)
  13. tEnv.registerTable("RatesHistory", ratesHistory)
  14. // Create and register TemporalTableFunction.
  15. // Define "r_proctime" as the time attribute and "r_currency" as the primary key. val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency) // <==== (1) tEnv.registerFunction("Rates", rates) // <==== (2)

(1)行创建了一个 rates 时态表函数,它允许我们使用表API中的函数 rates

(2)行在我们的表环境中以名称 Rates 注册此函数,这允许我们在SQL中使用 Rates 函数。