Table API

Table API是用于流和批处理的统一关系API。Table API 查询可以在批处理或流输入上运行,无需修改。表 API 是 SQL 语言的超集,是专门为使用 Apache Flink 而设计的。Table API 是 Scala 和 Java 的语言集成 API。不像SQL那样将查询指定为字符串值,Table API 查询是在 Java 或 Scala 中以嵌入语言的样式定义的,并具有 IDE 支持,如自动完成和语法验证。

Table API 与 Flink 的 SQL 集成共享许多概念和部分API。查看常见概念和 API 学习如何注册表或创建一个 Table 对象。流概念页面讨论了流特定的概念,例如动态表和时间属性。

下面的示例假设一个名为 Orders 的注册表具有 (a, b, c, rowtime) 属性。rowtime 字段要么是流中的逻辑time属性,要么是批处理中的常规时间戳字段。

概述和例子

Table API 可用于 Scala 和 Java。Scala Table API 利用 Scala 表达式,Java Table API 基于被解析并转换成等价表达式的字符串。

下面的例子展示了 Scala 和 Java Table API 之间的区别。Table 程序在批处理环境中执行。它扫描 Orders 表,按字段 a 分组,并计算每个组的结果行数。Table 程序的结果被转换成 Row 类型的 DataSet 并打印出来。

Java Table API 是通过导入 org.apache.flink.table.api.java.* 来启用的。下面的示例展示了如何构造 Java Table API 程序,以及如何将表达式指定为字符串。

  1. // environment configuration
  2. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  3. BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
  4. // register Orders table in table environment
  5. // ...
  6. // specify table program
  7. Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)
  8. Table counts = orders
  9. .groupBy("a")
  10. .select("a, b.count as cnt");
  11. // conversion to DataSet
  12. DataSet<Row> result = tEnv.toDataSet(counts, Row.class);
  13. result.print();

Scala Table API 是通过导入 org.apache.flink.api.scala._org.apache.flink.table.api.scala._ 来启用的。

下面的示例展示了如何构造 Scala Table API 程序。表属性使用Scala 符号引用,Scala 符号以撇号字符(')开头。

  1. import org.apache.flink.api.scala._
  2. import org.apache.flink.table.api.scala._
  3. // environment configuration val env = ExecutionEnvironment.getExecutionEnvironment
  4. val tEnv = TableEnvironment.getTableEnvironment(env)
  5. // register Orders table in table environment
  6. // ...
  7. // specify table program val orders = tEnv.scan("Orders") // schema (a, b, c, rowtime)
  8. val result = orders
  9. .groupBy('a)
  10. .select('a, 'b.count as 'cnt)
  11. .toDataSet[Row] // conversion to DataSet
  12. .print()

下一个示例显示一个更复杂的 Table API 程序。程序再次扫描 Orders 表。它过滤 null 值,规范化类型为 String 的字段 a,并为每个小时和产品 a 计算平均账单金额 b

  1. // environment configuration
  2. // ...
  3. // specify table program
  4. Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)
  5. Table result = orders
  6. .filter("a.isNotNull && b.isNotNull && c.isNotNull")
  7. .select("a.lowerCase() as a, b, rowtime")
  8. .window(Tumble.over("1.hour").on("rowtime").as("hourlyWindow"))
  9. .groupBy("hourlyWindow, a")
  10. .select("a, hourlyWindow.end as hour, b.avg as avgBillingAmount");
  1. // environment configuration
  2. // ...
  3. // specify table program val orders: Table = tEnv.scan("Orders") // schema (a, b, c, rowtime)
  4. val result: Table = orders
  5. .filter('a.isNotNull && 'b.isNotNull && 'c.isNotNull)
  6. .select('a.lowerCase() as 'a, 'b, 'rowtime)
  7. .window(Tumble over 1.hour on 'rowtime as 'hourlyWindow)
  8. .groupBy('hourlyWindow, 'a)
  9. .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount)

由于 Table API 是批处理和流数据的统一 API,因此两个示例程序都可以在批处理和流输入上执行,而无需修改表程序本身。在这两种情况下,程序产生相同的结果,因为流记录不会延迟(有关详细信息,请参见流概念)。

算子

Table API 支持以下操作。请注意,并非所有操作都可用于批处理和流式处理;它们被相应地标记。

扫描,投影和过滤 Scan, Projection, and Filter

Operators Description
Scan

Batch Streaming | 类似于SQL查询中的FROM子句。执行已注册表的扫描。

  1. Table orders = tableEnv.scan("Orders");

| | Select Batch Streaming | 类似于SQL SELECT语句。执行选择操作。

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders.select("a, c as d");

可以使用星号(*)作为通配符,选择表中的所有列。

  1. Table result = orders.select("*");

| | As Batch Streaming | 重命名字段。

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders.as("x, y, z, t");

| | Where / Filter Batch Streaming | 类似于SQL WHERE子句。过滤掉不传递过滤器谓词(filter predicate)的行。

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders.where("b === 'red'");

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders.filter("a % 2 === 0");

|

Operators Description
Scan

Batch Streaming | 类似于SQL查询中的FROM子句。执行已注册表的扫描。

  1. val orders: Table = tableEnv.scan("Orders")

| | Select Batch Streaming | 类似于SQL SELECT语句。执行选择操作。

  1. val orders: Table = tableEnv.scan("Orders")
  2. val result = orders.select('a, 'c as 'd)

可以使用星号(*)作为通配符,选择表中的所有列。

  1. val orders: Table = tableEnv.scan("Orders")
  2. val result = orders.select('*)

| | As Batch Streaming | 重命名字段。

  1. val orders: Table = tableEnv.scan("Orders").as('x, 'y, 'z, 't)

| | Where / Filter Batch Streaming | 类似于SQL WHERE子句。过滤掉不传递过滤器谓词的行。

  1. val orders: Table = tableEnv.scan("Orders")
  2. val result = orders.filter('a % 2 === 0)

  1. val orders: Table = tableEnv.scan("Orders")
  2. val result = orders.where('b === "red")

|

聚合 Aggregations

Operators Description
GroupBy Aggregation

Batch Streaming Result Updating | 类似于SQL GROUP BY子句。使用下面正在运行的聚合操作符对分组键上的行进行分组,以按组方式聚合行。

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders.groupBy("a").select("a, b.sum as d");

注意: 对于流查询,计算查询结果所需的状态可能无限增长,这取决于聚合的类型和不同分组键的数量。请提供具有有效保留间隔的查询配置,以防止状态大小过大。有关详细信息,请参见查询配置。 | | GroupBy 窗口聚合(GroupBy Window Aggregation) 批处理流 |在分组窗口和一个或多个分组键上对表进行分组和聚合。

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders
  3. .window(Tumble.over("5.minutes").on("rowtime").as("w")) // define window
  4. .groupBy("a, w") // group by key and window
  5. .select("a, w.start, w.end, w.rowtime, b.sum as d"); // access window properties and aggregate

| | 在窗口聚合(Over Window Aggregation) 流 | 类似于 SQL OVER 子句。基于前一行和后一行的窗口(范围),为每一行计算窗口聚合。有关详细信息,请参见over windows 部分

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders
  3. // define window
  4. .window(Over
  5. .partitionBy("a")
  6. .orderBy("rowtime")
  7. .preceding("UNBOUNDED_RANGE")
  8. .following("CURRENT_RANGE")
  9. .as("w"))
  10. .select("a, b.avg over w, b.max over w, b.min over w"); // sliding aggregate

注意: 所有聚合必须在同一个窗口上定义,即,同样的分区、排序和范围。目前,只支持前面(无界和有界)到当前行范围的窗口。还不支持包含以下内容的范围。必须在单个time 属性上指定 ORDER BY。 | | 不同的聚合(Distinct Aggregation) 批处理流 结果更新| 类似于 SQL 的 DISTINCT 聚合子句,比如 COUNT(DISTINCT a), DISTINCT 聚合声明聚合函数(内置的或用户定义的)只应用于不同的输入值。可分别应用于 GroupBy 聚合(GroupBy Aggregation)GroupBy 窗口聚合(GroupBy Window Aggregation)Over 窗口聚合(Over Window Aggregation)

  1. Table orders = tableEnv.scan("Orders");
  2. // Distinct aggregation on group by
  3. Table groupByDistinctResult = orders
  4. .groupBy("a")
  5. .select("a, b.sum.distinct as d");
  6. // Distinct aggregation on time window group by
  7. Table groupByWindowDistinctResult = orders
  8. .window(Tumble.over("5.minutes").on("rowtime").as("w")).groupBy("a, w")
  9. .select("a, b.sum.distinct as d");
  10. // Distinct aggregation on over window
  11. Table result = orders
  12. .window(Over
  13. .partitionBy("a")
  14. .orderBy("rowtime")
  15. .preceding("UNBOUNDED_RANGE")
  16. .as("w"))
  17. .select("a, b.avg.distinct over w, b.max over w, b.min over w");

用户定义的聚合函数也可以与 DISTINCT 修饰符一起使用。要仅计算不同值的聚合结果,只需向聚合函数添加不同的修饰符。

  1. Table orders = tEnv.scan("Orders");
  2. // Use distinct aggregation for user-defined aggregate functions
  3. tEnv.registerFunction("myUdagg", new MyUdagg());
  4. orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctResult");

注意 对于流查询,计算查询结果所需的状态可能会无限增长,这取决于不同字段的数量。请提供具有有效保留间隔的查询配置,以防止状态大小过大。有关详细信息,请参见查询配置。 | | 不同的(Distinct) 批次流(Batch Streaming) 结果更新(Result Updating) | 类似于 SQL DISTINCT 子句。返回具有不同值组合的记录。

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders.distinct();

注意: 对于流查询,计算查询结果所需的状态可能会根据不同字段的数量无限增长。请提供具有有效保留间隔的查询配置,以防止状态大小过大。有关详细信息,请参见查询配置。|

Operators Description
GroupBy Aggregation

Batch Streaming Result Updating | 类似于SQL GROUP BY子句。使用下面正在运行的聚合操作符对分组键上的行进行分组,以按组方式聚合行。

  1. val orders: Table = tableEnv.scan("Orders")
  2. val result = orders.groupBy('a).select('a, 'b.sum as 'd)

注意: 对于流查询,计算查询结果所需的状态可能会无限增长,这取决于聚合类型和不同分组键的数量。请提供具有有效保留间隔的查询配置,以防止状态大小过大。有关详细信息,请参见查询配置。 | | GroupBy Window Aggregation Batch Streaming | 在组窗口和可能的一个或多个分组 keys 上对表进行分组和聚合。

  1. val orders: Table = tableEnv.scan("Orders")
  2. val result: Table = orders
  3. .window(Tumble over 5.minutes on 'rowtime as 'w) // define window
  4. .groupBy('a, 'w) // group by key and window
  5. .select('a, w.start, 'w.end, 'w.rowtime, 'b.sum as 'd) // access window properties and aggregate

| | Over Window Aggregation Streaming | 类似于SQL OVER子句。基于前一行和后一行的窗口(范围)计算每行的窗口聚合。 有关详细信息,请参阅over windows部分

  1. val orders: Table = tableEnv.scan("Orders")
  2. val result: Table = orders
  3. // define window
  4. .window(Over
  5. partitionBy 'a
  6. orderBy 'rowtime
  7. preceding UNBOUNDED_RANGE
  8. following CURRENT_RANGE
  9. as 'w)
  10. .select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w) // sliding aggregate

注意: 所有聚合必须在同一个窗口上定义,即,同样的分区、排序和范围。目前,只支持前面(无界和有界)到当前行范围的窗口。还不支持包含以下内容的范围。ORDER BY 必须在单个time 属性上指定。| | Distinct Aggregation Batch Streaming Result Updating | 与SQL DISTINCT AGGREGATION子句类似,比如 COUNT(DISTINCT a)。Distinct aggregation 声明聚合函数(内置的或用户定义的)只应用于不同的输入值。Distinct 可以应用于 GroupBy Aggregation, GroupBy Window AggregationOver Window Aggregation

  1. val orders: Table = tableEnv.scan("Orders");
  2. // Distinct aggregation on group by val groupByDistinctResult = orders
  3. .groupBy('a)
  4. .select('a, 'b.sum.distinct as 'd)
  5. // Distinct aggregation on time window group by val groupByWindowDistinctResult = orders
  6. .window(Tumble over 5.minutes on 'rowtime as 'w).groupBy('a, 'w)
  7. .select('a, 'b.sum.distinct as 'd)
  8. // Distinct aggregation on over window val result = orders
  9. .window(Over
  10. partitionBy 'a
  11. orderBy 'rowtime
  12. preceding UNBOUNDED_RANGE
  13. as 'w)
  14. .select('a, 'b.avg.distinct over 'w, 'b.max over 'w, 'b.min over 'w)

用户定义的聚合函数也可以与 DISTINCT 修饰符一起使用。要仅计算不同值的聚合结果,只需向聚合函数添加不同的修饰符。

  1. val orders: Table = tEnv.scan("Orders");
  2. // Use distinct aggregation for user-defined aggregate functions val myUdagg = new MyUdagg();
  3. orders.groupBy('users).select('users, myUdagg.distinct('points) as 'myDistinctResult);

注意: 对于流查询,计算查询结果所需的状态可能会根据不同字段的数量无限增长。请提供具有有效保留间隔的查询配置,以防止状态大小过大。有关详细信息,请参见查询配置。 | | Distinct Batch | 类似于SQL DISTINCT子句。返回具有不同值组合的记录。

  1. val orders: Table = tableEnv.scan("Orders")
  2. val result = orders.distinct()

注意: 对于流查询,计算查询结果所需的状态可能会根据不同字段的数量无限增长。请提供具有有效保留间隔的查询配置,以防止状态大小过大。有关详细信息,请参见查询配置。|

连接 Joins

Operators Description
Inner Join

Batch Streaming | 类似于 SQL JOIN 子句。连接两个表。两个表必须具有不同的字段名,并且必须通过 join 操作符或使用 where 或 filter 操作符定义至少一个相等连接谓词。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "d, e, f");
  3. Table result = left.join(right).where("a = d").select("a, b, e");

注意: 对于流查询,计算查询结果所需的状态可能会根据不同的输入行数无限增长。请提供具有有效保留间隔的查询配置,以防止状态大小过大。有关详细信息,请参见查询配置。 | | Outer Join Batch Streaming Result Updating | 类似于 SQL LEFT/RIGHT/FULL OUTER JOIN 子句。连接两个表。两个表必须具有不同的字段名,并且必须定义至少一个相等连接谓词。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "d, e, f");
  3. Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");
  4. Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");
  5. Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");

注意: 对于流查询,计算查询结果所需的状态可能会根据不同的输入行数无限增长。请提供具有有效保留间隔的查询配置,以防止状态大小过大。有关详细信息,请参见查询配置。 | | Time-windowed Join Batch Streaming | 注意: 时间窗口连接是可以以流方式处理的常规连接的子集。有时间窗口的连接至少需要一个等值连接谓词和一个连接条件,该条件在连接两边限定时间。这种条件可以由两个适当的范围谓词(&lt;, &lt;=, &gt;=, &gt;)或一个相等谓词来定义,该谓词比较相同类型的时间属性(即,处理时间或事件时间)。例如,以下谓词是有效的窗口连接条件:

  • ltime === rtime
  • ltime &gt;= rtime && ltime &lt; rtime + 10.minutes
  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
  2. Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");
  3. Table result = left.join(right)
  4. .where("a = d && ltime &gt;= rtime - 5.minutes && ltime &lt; rtime + 10.minutes")
  5. .select("a, b, e, ltime");

| | Inner Join with Table Function Batch Streaming | 将表与表函数的结果连接起来。左侧(外部)表的每一行都与表函数的相应调用生成的所有行相连接。如果左(外部)表的表函数调用返回空结果,则删除该表的一行。

  1. // register User-Defined Table Function
  2. TableFunction&lt;String&gt; split = new MySplitUDTF();
  3. tableEnv.registerFunction("split", split);
  4. // join
  5. Table orders = tableEnv.scan("Orders");
  6. Table result = orders
  7. .join(new Table(tableEnv, "split(c)").as("s", "t", "v"))
  8. .select("a, b, s, t, v");

| | Left Outer Join with Table Function Batch Streaming | 将表与表函数的结果连接起来。左侧(外部)表的每一行都与表函数的相应调用生成的所有行相连接。如果表函数调用返回空结果,则保留相应的外行,并用空值填充结果。注意: 当前,表函数左外连接的谓词只能为空或字面上的 true

  1. // register User-Defined Table Function
  2. TableFunction&lt;String&gt; split = new MySplitUDTF();
  3. tableEnv.registerFunction("split", split);
  4. // join
  5. Table orders = tableEnv.scan("Orders");
  6. Table result = orders
  7. .leftOuterJoin(new Table(tableEnv, "split(c)").as("s", "t", "v"))
  8. .select("a, b, s, t, v");

| | Join with Temporal Table Streaming | 时态表是跟踪随时间变化的表。时态表函数提供对特定时间点时态表状态的访问。使用时态表函数联接表的语法与 Inner join with table Function 中的语法相同。目前只支持与时态表的内部连接。

  1. Table ratesHistory = tableEnv.scan("RatesHistory");
  2. // register temporal table function with a time attribute and primary key
  3. TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
  4. "r_proctime",
  5. "r_currency");
  6. tableEnv.registerFunction("rates", rates);
  7. // join with "Orders" based on the time attribute and key
  8. Table orders = tableEnv.scan("Orders");
  9. Table result = orders
  10. .join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")

如需更多信息,请查看更详细的时态表概念描述 |

Operators Description
Inner Join

Batch Streaming | 类似于 SQL JOIN 子句。连接两个表。两个表必须具有不同的字段名,并且必须通过 join 操作符或使用 where 或 filter 操作符定义至少一个相等连接谓词。

  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
  2. val right = ds2.toTable(tableEnv, 'd, 'e, 'f)
  3. val result = left.join(right).where('a === 'd).select('a, 'b, 'e)

注意: 对于流查询,计算查询结果所需的状态可能会根据不同的输入行数无限增长。请提供具有有效保留间隔的查询配置,以防止状态过大。有关详细信息,请参见查询配置。 | | Outer Join Batch Streaming Result Updating | 类似于 SQL LEFT/RIGHT/FULL OUTER JOIN 子句。连接两个表。两个表必须具有不同的字段名,并且必须定义至少一个相等连接谓词。

  1. val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
  2. val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
  3. val leftOuterResult = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
  4. val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
  5. val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)

注意: 对于流查询,计算查询结果所需的状态可能会根据不同的输入行数无限增长。请提供具有有效保留间隔的查询配置,以防止状态过大。有关详细信息,请参见查询配置。 | | Time-windowed Join Batch Streaming | 注意: 时间窗口连接是可以以流方式处理的常规连接的子集。有时间窗口的连接至少需要一个等值连接谓词和一个连接条件,该条件在连接两边限定时间。这种条件可以由两个适当的范围谓词 (&lt;, &lt;=, &gt;=, &gt;) 定义,也可以由一个相等谓词来比较相同类型的时间属性(即,处理时间或事件时间)。例如,以下谓词是有效的窗口连接条件:

  • 'ltime === 'rtime
  • 'ltime &gt;= 'rtime && 'ltime &lt; 'rtime + 10.minutes
  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime)
  2. val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime)
  3. val result = left.join(right)
  4. .where('a === 'd && 'ltime &gt;= 'rtime - 5.minutes && 'ltime &lt; 'rtime + 10.minutes)
  5. .select('a, 'b, 'e, 'ltime)

| | Inner Join with Table Function Batch Streaming | 将表与表函数的结果连接在一起。左边(外部)表的每一行都与表函数的相应调用生成的所有行连接在一起。如果其表函数调用返回空结果,则删除左侧(外部)表的一行。

  1. // instantiate User-Defined Table Function val split: TableFunction[_] = new MySplitUDTF()
  2. // join val result: Table = table
  3. .join(split('c) as ('s, 't, 'v))
  4. .select('a, 'b, 's, 't, 'v)

| | Left Outer Join with Table Function Batch Streaming | 将表与表函数的结果连接在一起。左边(外部)表的每一行都与表函数的相应调用生成的所有行连接在一起。如果表函数调用返回空结果,则保留相应的外部行,并使用空值填充结果。注意: 当前,表函数左外连接的谓词只能为空或字面上的 true

  1. // instantiate User-Defined Table Function val split: TableFunction[_] = new MySplitUDTF()
  2. // join val result: Table = table
  3. .leftOuterJoin(split('c) as ('s, 't, 'v))
  4. .select('a, 'b, 's, 't, 'v)

| | Join with Temporal Table Streaming | 时态表是跟踪它们随时间变化的表。时态表函数 提供对特定时间点时态表状态的访问。使用时态表函数联接表的语法与 Inner join with table Function 中的语法相同。目前只支持与时态表的内部连接。

  1. val ratesHistory = tableEnv.scan("RatesHistory")
  2. // register temporal table function with a time attribute and primary key val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency)
  3. // join with "Orders" based on the time attribute and key val orders = tableEnv.scan("Orders")
  4. val result = orders
  5. .join(rates('o_rowtime), 'r_currency === 'o_currency)

有关更多信息,请查看更详细的时态表概念描述。 |

集合操作 Set Operations

Operators Description
Union

Batch | 与 SQL UNION 子句类似。联合两个表删除了重复记录。两个表必须具有相同的字段类型。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "a, b, c");
  3. Table result = left.union(right);

| | UnionAll Batch Streaming | 类似于SQL UNION ALL子句。连接(Unions)两个表。两个表必须具有相同的字段类型。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "a, b, c");
  3. Table result = left.unionAll(right);

| | Intersect Batch | 类似于 SQL INTERSECT 子句。Intersect 返回存在于两个表中的记录。如果一条记录不止一次出现在一个或两个表中,那么它只返回一次,即,生成的表没有重复的记录。两个表必须具有相同的字段类型。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "d, e, f");
  3. Table result = left.intersect(right);

| | IntersectAll Batch |类似于 SQL INTERSECT ALL 子句。IntersectAll 返回存在于两个表中的记录。如果一条记录在两个表中出现不止一次,那么它返回的次数与在两个表中出现的次数相同,即,结果表可能有重复的记录。两个表必须具有相同的字段类型。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "d, e, f");
  3. Table result = left.intersectAll(right);

| | Minus Batch | 类似于 SQL EXCEPT 子句。Minus 返回左表中不存在于右表中的记录。左表中的重复记录只返回一次,即,删除重复项。两个表必须具有相同的字段类型。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "a, b, c");
  3. Table result = left.minus(right);

| | MinusAll Batch | 类似于 SQL EXCEPT ALL 子句。MinusAll 返回不存在于正确表中的记录。如果一条记录在左表中出现 n 次,在右表中出现 m 次,则返回 (n - m) 次,即,删除右表中出现的所有副本。两个表必须具有相同的字段类型。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "a, b, c");
  3. Table result = left.minusAll(right);

| | In Batch Streaming | 类似于 SQL IN 子句。如果表达式存在于给定的表子查询中,则返回 true。子查询表必须由一列组成。该列必须具有与表达式相同的数据类型。

  1. Table left = ds1.toTable(tableEnv, "a, b, c");
  2. Table right = ds2.toTable(tableEnv, "a");
  3. // using implicit registration
  4. Table result = left.select("a, b, c").where("a.in(" + right + ")");
  5. // using explicit registration
  6. tableEnv.registerTable("RightTable", right);
  7. Table result = left.select("a, b, c").where("a.in(RightTable)");

注意: 对于流查询,操作在 join 和 group 操作中重写。计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。有关详细信息,请参见查询配置。 |

Operators Description
Union

Batch | 类似于 SQL UNION 子句。将删除重复记录的两个表合并,两个表必须具有相同的字段类型。

  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
  2. val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
  3. val result = left.union(right)

| | UnionAll Batch Streaming | 类似于 SQL UNION ALL 子句。联合两个表,两个表必须具有相同的字段类型。

  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
  2. val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
  3. val result = left.unionAll(right)

| | Intersect Batch | 类似于SQL INTERSECT子句。Intersect 返回存在于两个表中的记录。如果一条记录在一个或两个表中出现超过一次,则只返回一次,即,生成的表没有重复的记录。两个表必须具有相同的字段类型。

  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
  2. val right = ds2.toTable(tableEnv, 'e, 'f, 'g)
  3. val result = left.intersect(right)

| | IntersectAll Batch | 类似于 SQL INTERSECT ALL 子句。IntersectAll 返回存在于两个表中的记录。如果一条记录在两个表中出现不止一次,那么它返回的次数与在两个表中出现的次数相同,即,结果表可能有重复的记录。两个表必须具有相同的字段类型。

  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
  2. val right = ds2.toTable(tableEnv, 'e, 'f, 'g)
  3. val result = left.intersectAll(right)

| | Minus Batch | 类似于 SQL EXCEPT 子句。Minus 返回左表中不存在于右表中的记录。左表中的重复记录只返回一次,即,删除重复项。两个表必须具有相同的字段类型。

  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
  2. val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
  3. val result = left.minus(right)

| | MinusAll Batch | 类似于 SQL EXCEPT ALL 子句。MinusAll 返回不存在于正确表中的记录。如果一条记录在左表中出现 n 次,在右表中出现 m 次,则返回(n - m)次,即,删除右表中出现的所有副本。两个表必须具有相同的字段类型。

  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
  2. val right = ds2.toTable(tableEnv, 'a, 'b, 'c)
  3. val result = left.minusAll(right)

| | In Batch Streaming | 类似于 SQL IN 子句。如果表达式存在于给定表的子查询中,则返回 true。子查询表必须由一列组成。该列必须具有与表达式相同的数据类型。

  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
  2. val right = ds2.toTable(tableEnv, 'a)
  3. val result = left.select('a, 'b, 'c).where('a.in(right))

注意: 对于流查询,操作在 join 和 group 操作中重写。计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。有关详细信息,请参见查询配置。 |

OrderBy, Offset & Fetch

Operators Description
Order By

Batch | 类似于 SQL ORDER BY 子句。返回在所有并行分区中全局排序的记录。

  1. Table in = tableEnv.fromDataSet(ds, "a, b, c");
  2. Table result = in.orderBy("a.asc");

| | Offset & Fetch Batch | 类似于 SQL 偏移量和 FETCH 子句。偏移量(Offset)和取值(Fetch)限制从排序结果返回的记录数。偏移量和取值在技术上是操作符命令的一部分,因此必须在它之前。

  1. Table in = tableEnv.fromDataSet(ds, "a, b, c");
  2. // returns the first 5 records from the sorted result
  3. Table result1 = in.orderBy("a.asc").fetch(5);
  4. // skips the first 3 records and returns all following records from the sorted result
  5. Table result2 = in.orderBy("a.asc").offset(3);
  6. // skips the first 10 records and returns the next 5 records from the sorted result
  7. Table result3 = in.orderBy("a.asc").offset(10).fetch(5);

|

Operators Description
Order By

Batch | 类似于 SQL ORDER BY 子句。返回在所有并行分区中全局排序的记录。

  1. val in = ds.toTable(tableEnv, 'a, 'b, 'c)
  2. val result = in.orderBy('a.asc)

| | Offset & Fetch Batch | 类似于 SQL 偏移量和 FETCH 子句。偏移量(Offset)和取值(Fetch)限制从排序结果返回的记录数。偏移量和取值在技术上是操作符命令的一部分,因此必须在它之前。

  1. val in = ds.toTable(tableEnv, 'a, 'b, 'c)
  2. // returns the first 5 records from the sorted result val result1: Table = in.orderBy('a.asc).fetch(5)
  3. // skips the first 3 records and returns all following records from the sorted result val result2: Table = in.orderBy('a.asc).offset(3)
  4. // skips the first 10 records and returns the next 5 records from the sorted result val result3: Table = in.orderBy('a.asc).offset(10).fetch(5)

|

Insert

Operators Description
Insert Into

Batch Streaming | 类似于 SQL 查询中的 INSERT INTO 子句。执行对已注册输出表的插入。输出表必须在 TableEnvironment 中注册(参见Register a TableSink)。此外,已注册表的模式必须与查询的模式匹配。

  1. Table orders = tableEnv.scan("Orders");
  2. orders.insertInto("OutOrders");

|

Operators Description
Insert Into

Batch Streaming | 类似于 SQL 查询中的 INSERT INTO 子句。执行对已注册输出表的插入。输出表必须在TableEnvironment中注册(参见Register a TableSink)。此外,已注册表的模式必须与查询的模式匹配。

  1. val orders: Table = tableEnv.scan("Orders")
  2. orders.insertInto("OutOrders")

|

Group Windows

组窗口(Group Windows)根据时间或行计数间隔将组行聚合为有限组,并对每个组计算一次聚合函数。对于批处理表,窗口是按时间间隔对记录进行分组的方便快捷方式。

窗口是使用 window(w: Window) 子句定义的,需要一个别名,别名使用 as 子句指定。为了按窗口对表进行分组,必须在 groupBy(...) 子句中引用窗口别名,就像常规的分组属性一样。下面的示例显示如何在表上定义窗口聚合。

  1. Table table = input
  2. .window([Window w].as("w")) // define window with alias w
  3. .groupBy("w") // group the table by window w
  4. .select("b.sum"); // aggregate
  1. val table = input
  2. .window([w: Window] as 'w) // define window with alias w
  3. .groupBy('w) // group the table by window w
  4. .select('b.sum) // aggregate

在流(streaming)环境中,窗口聚合只有在对除窗口外的一个或多个属性进行分组时才能并行计算,即,groupBy(...) 子句引用窗口别名和至少一个附加属性。仅引用窗口别名的 groupBy(...) 子句(如上面示例所示)只能由单个非并行任务计算。下面的示例显示如何定义具有附加分组属性的窗口聚合。

  1. Table table = input
  2. .window([Window w].as("w")) // define window with alias w
  3. .groupBy("w, a") // group the table by attribute a and window w
  4. .select("a, b.sum"); // aggregate
  1. val table = input
  2. .window([w: Window] as 'w) // define window with alias w
  3. .groupBy('w, 'a) // group the table by attribute a and window w
  4. .select('a, 'b.sum) // aggregate

窗口属性,如时间窗口的开始、结束或行时间戳,可以作为窗口别名的属性分别添加到select语句中,如 w.startw.endw.rowtime。窗口开始和 rowtime 时间戳是包含上下窗口边界的时间戳。相反,窗口结束时间戳是唯一的上窗口边界。例如,从下午2点开始的30分钟滚动窗口将有 14:00:00.000 作为开始时间戳,14:29:59.999 作为行时间戳,14:30:00.000 作为结束时间戳。

  1. Table table = input
  2. .window([Window w].as("w")) // define window with alias w
  3. .groupBy("w, a") // group the table by attribute a and window w
  4. .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
  1. val table = input
  2. .window([w: Window] as 'w) // define window with alias w
  3. .groupBy('w, 'a) // group the table by attribute a and window w
  4. .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps

窗口(Window) 参数定义如何将行映射到窗口。窗口(Window) 不是用户可以实现的界面。相反,Table API 提供了一组预定义的具有特定语义的 Window 类,这些类被转换为底层的 DataStreamDataSet 操作。支持的窗口定义如下所示。

(Tumble)(滚动窗口(Tumbling Windows))

滚动窗口将行分配给长度固定的非重叠连续窗口。例如,一个5分钟滚动窗口以5分钟为间隔将行分组。翻滚窗口可以在事件时间、处理时间或行计数上定义。

Tumbling windows are defined by using the class as follows: 翻滚窗口的定义使用 滚动(Tumble) 类如下:

方法 描述
over 定义窗口的长度,可以是时间间隔,也可以是行计数间隔。
on 组(时间间隔)或排序(行数)的时间属性。对于批处理查询,这可能是任何 Long 或 Timestamp 属性。对于流查询,这必须是一个声明的事件时间或处理时间时间属性
as 为窗口分配别名。别名用于引用下面的 groupBy() 子句中的窗口,并可选地在 select() 子句中选择窗口属性,如窗口开始、结束或行时间戳。
  1. // Tumbling Event-time Window
  2. .window(Tumble.over("10.minutes").on("rowtime").as("w"));
  3. // Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
  4. .window(Tumble.over("10.minutes").on("proctime").as("w"));
  5. // Tumbling Row-count Window (assuming a processing-time attribute "proctime")
  6. .window(Tumble.over("10.rows").on("proctime").as("w"));
  1. // Tumbling Event-time Window .window(Tumble over 10.minutes on 'rowtime as 'w)
  2. // Tumbling Processing-time Window (assuming a processing-time attribute "proctime") .window(Tumble over 10.minutes on 'proctime as 'w)
  3. // Tumbling Row-count Window (assuming a processing-time attribute "proctime") .window(Tumble over 10.rows on 'proctime as 'w)

滑动 Slide(滑动窗口 Sliding Windows)

滑动窗口具有固定的大小,并按指定的滑动间隔滑动。如果滑动间隔小于窗口大小,则滑动窗口重叠。因此,可以将行分配给多个窗口。例如,一个15分钟大小的滑动窗口和5分钟的滑动间隔将每一行分配给3个15分钟大小的不同窗口,这些窗口在5分钟的间隔内计算。滑动窗口可以在事件时间、处理时间或行计数上定义。

滑动窗口使用 Slide 类定义如下:

方法 描述
over 定义窗口的长度,可以是时间间隔,也可以是行计数间隔。
every 将滑动间隔定义为时间间隔或行计数间隔。滑动间隔必须与大小间隔类型相同。
on 组(时间间隔)或排序(行数)的时间属性。对于批处理查询,这可能是任何 Long 或 Timestamp 属性。对于流查询,这必须是一个声明的事件时间或处理时间时间属性
as Assigns an alias to the window. The alias is used to reference the window in the following groupBy() clause and optionally to select window properties such as window start, end, or rowtime timestamps in the select() clause.为窗口分配别名。别名用于引用下面的 groupBy() 子句中的窗口,并可选地在 select() 子句中选择窗口属性,如窗口开始、结束或行时间戳。
  1. // Sliding Event-time Window
  2. .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));
  3. // Sliding Processing-time window (assuming a processing-time attribute "proctime")
  4. .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));
  5. // Sliding Row-count window (assuming a processing-time attribute "proctime")
  6. .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
  1. // Sliding Event-time Window .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
  2. // Sliding Processing-time window (assuming a processing-time attribute "proctime") .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)
  3. // Sliding Row-count window (assuming a processing-time attribute "proctime") .window(Slide over 10.rows every 5.rows on 'proctime as 'w)

会话 Session(会话窗口 Session Windows)

会话窗口没有固定的大小,但它们的边界由不活动的时间间隔定义,即,如果在定义的间隔期间没有出现事件,则会话窗口将关闭。例如,如果在 30 分钟不活动之后观察到一行(否则该行将被添加到现有窗口),则会启动一个间隔为30分钟的会话窗口,如果在30分钟内没有添加行,则会关闭该会话窗口。会话窗口可以在事件时间或处理时间上工作。

会话窗口是使用 Session 类定义的,如下:

方法 描述
withGap 将两个窗口之间的间隔定义为时间间隔。
on 组(时间间隔)或排序(行数)的时间属性。对于批处理查询,这可能是任何 Long 或 Timestamp 属性。对于流查询,这必须是一个声明的事件时间或处理时间时间属性
as 为窗口分配别名。别名用于引用下面的 groupBy() 子句中的窗口,并可选地在 select() 子句中选择窗口属性,如窗口开始、结束或行时间戳。
  1. // Session Event-time Window
  2. .window(Session.withGap("10.minutes").on("rowtime").as("w"));
  3. // Session Processing-time Window (assuming a processing-time attribute "proctime")
  4. .window(Session.withGap("10.minutes").on("proctime").as("w"));
  1. // Session Event-time Window .window(Session withGap 10.minutes on 'rowtime as 'w)
  2. // Session Processing-time Window (assuming a processing-time attribute "proctime") .window(Session withGap 10.minutes on 'proctime as 'w)

Over 窗口

Over 窗口聚合可以从标准SQL (OVER 子句)中得知,并在查询的 SELECT 子句中定义。与组(group)窗口不同,组窗口是在 GROUP BY 子句中指定的,在窗口上不会折叠行。相反,在窗口聚合上计算每个输入行在其邻近行范围内的聚合。

Over windows are defined using the window(w: OverWindow*) clause and referenced via an alias in the select() method. The following example shows how to define an over window aggregation on a table. Over 窗口使用 window(w: OverWindow*) 子句定义,并通过 select() 方法中的别名引用。下面的示例显示如何在表上定义 over 窗口聚合。

  1. Table table = input
  2. .window([OverWindow w].as("w")) // define over window with alias w
  3. .select("a, b.sum over w, c.min over w"); // aggregate over the over window w
  1. val table = input
  2. .window([w: OverWindow] as 'w) // define over window with alias w
  3. .select('a, 'b.sum over 'w, 'c.min over 'w) // aggregate over the over window w

OverWindow 定义了计算聚合的行范围。OverWindow 不是用户可以实现的接口。相反,Table API 提供了 Over 类来配置 Over 窗口的属性。Over 窗口可以在事件时间或处理时间上定义,也可以在指定为时间间隔或行数的范围上定义。受支持的 over 窗口定义公开为 Over (和其他类)上的方法,如下所示:

方法 必须的 描述
partitionBy 可选的 在一个或多个属性上定义输入的分区。每个分区都是单独排序的,聚合函数分别应用于每个分区。注意: 在流环境中,只有当窗口包含一个 partition by 子句时,才能并行计算 over 窗口聚合。如果没有 partitionBy(...),流将由单个非并行任务处理。
orderBy 必须的 定义每个分区中的行顺序,从而定义将聚合函数应用于行的顺序。注意: 对于流查询,这必须是一个声明的事件时间或处理时间时间属性。目前,只支持一个 sort 属性。
preceding 必须的 定义窗口中包含并位于当前行之前的行间隔。间隔可以指定为时间间隔,也可以指定为行计数间隔。Bounded over windows是用间隔的大小指定的,例如,一个时间间隔的 10.minutes 或一个行计数间隔的 10.rowsUnbounded over windows使用常量指定,即,时间间隔为 UNBOUNDED_RANGE ,行计数间隔为 UNBOUNDED_ROW。Unbounded over windows 从分区的第一行开始。
following 可选的 定义窗口中包含的行之间的窗口间隔,并跟随当前行。间隔必须在与前一个间隔(时间或行数)相同的单元中指定。目前,不支持在具有当前行之后的行的窗口上运行。相反,您可以指定两个常量中的一个:
  • CURRENT_ROW 将窗口的上界设置为当前行。
  • CURRENT_RANGE 设置窗口的上界来对当前行的键排序,即,所有具有与当前行相同排序键的行都包含在窗口中。

如果省略 following 子句,则时间间隔窗口的上界定义为 CURRENT_RANGE,行计数间隔窗口的上界定义为 CURRENT_ROW。 | | as | 必须的 | 为 over 窗口分配别名。别名用于引用下面的 select() 子句中的 over 窗口。 |

注: 当前,相同的 select() 调用中的所有聚合函数必须计算相同的 over 窗口。

Unbounded Over Windows

  1. // Unbounded Event-time over window (assuming an event-time attribute "rowtime")
  2. .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w"));
  3. // Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
  4. .window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_range").as("w"));
  5. // Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
  6. .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"));
  7. // Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
  8. .window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_row").as("w"));
  1. // Unbounded Event-time over window (assuming an event-time attribute "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
  2. // Unbounded Processing-time over window (assuming a processing-time attribute "proctime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
  3. // Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
  4. // Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)

Bounded Over Windows

  1. // Bounded Event-time over window (assuming an event-time attribute "rowtime")
  2. .window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
  3. // Bounded Processing-time over window (assuming a processing-time attribute "proctime")
  4. .window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
  5. // Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
  6. .window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
  7. // Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
  8. .window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))
  1. // Bounded Event-time over window (assuming an event-time attribute "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
  2. // Bounded Processing-time over window (assuming a processing-time attribute "proctime") .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)
  3. // Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
  4. // Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime") .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)

数据类型 Data Types

表 API 构建在 Flink 的 DataSet 和 DataStream API 之上。在内部,它还使用 Flink 的 TypeInformation 来定义数据类型。完全支持的类型列在 org.apache.flink.table.api.Types 中。下表总结了表 API 类型、SQL 类型和生成的 Java 类之间的关系。

Table API SQL Java type
Types.STRING VARCHAR java.lang.String
Types.BOOLEAN BOOLEAN java.lang.Boolean
Types.BYTE TINYINT java.lang.Byte
Types.SHORT SMALLINT java.lang.Short
Types.INT INTEGER, INT java.lang.Integer
Types.LONG BIGINT java.lang.Long
Types.FLOAT REAL, FLOAT java.lang.Float
Types.DOUBLE DOUBLE java.lang.Double
Types.DECIMAL DECIMAL java.math.BigDecimal
Types.SQL_DATE DATE java.sql.Date
Types.SQL_TIME TIME java.sql.Time
Types.SQL_TIMESTAMP TIMESTAMP(3) java.sql.Timestamp
Types.INTERVAL_MONTHS INTERVAL YEAR TO MONTH java.lang.Integer
Types.INTERVAL_MILLIS INTERVAL DAY TO SECOND(3) java.lang.Long
Types.PRIMITIVE_ARRAY ARRAY e.g. int[]
Types.OBJECT_ARRAY ARRAY e.g. java.lang.Byte[]
Types.MAP MAP java.util.HashMap
Types.MULTISET MULTISET e.g. java.util.HashMap&lt;String, Integer&gt; for a multiset of String
Types.ROW ROW org.apache.flink.types.Row

泛型类型和(嵌套的)复合类型(例如 POJOs, tuples, rows, Scala case classes)也可以是一行的字段。

可以使用value access functions访问具有任意嵌套的复合类型字段。

Generic types are treated as a black box and can be passed on or processed by user-defined functions. 泛型类型被视为一个黑盒子,可以通过用户定义函数传递或处理。

表达式语法 Expression Syntax

前几节中的一些运算符需要一个或多个表达式。表达式可以使用嵌入的 Scala DSL 或字符串来指定。请参考上面的例子来学习如何指定表达式。

这是表达式的 EBNF 语法:

  1. expressionList = expression , { "," , expression } ;
  2. expression = timeIndicator | overConstant | alias ;
  3. alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ;
  4. logic = comparison , [ ( "&&" | "||" ) , comparison ] ;
  5. comparison = term , [ ( "=" | "==" | "===" | "!=" | "!==" | ">" | ">=" | "<" | "<=" ) , term ] ;
  6. term = product , [ ( "+" | "-" ) , product ] ;
  7. product = unary , [ ( "*" | "/" | "%") , unary ] ;
  8. unary = [ "!" | "-" | "+" ] , composite ;
  9. composite = over | suffixed | nullLiteral | prefixed | atom ;
  10. suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | suffixFunctionCall ;
  11. prefixed = prefixAs | prefixCast | prefixIf | prefixDistinct | prefixFunctionCall ;
  12. interval = timeInterval | rowInterval ;
  13. timeInterval = composite , "." , ("year" | "years" | "quarter" | "quarters" | "month" | "months" | "week" | "weeks" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;
  14. rowInterval = composite , "." , "rows" ;
  15. suffixCast = composite , ".cast(" , dataType , ")" ;
  16. prefixCast = "cast(" , expression , dataType , ")" ;
  17. dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "MAP" , "(" , dataType , "," , dataType , ")" ) | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;
  18. suffixAs = composite , ".as(" , fieldReference , ")" ;
  19. prefixAs = "as(" , expression, fieldReference , ")" ;
  20. suffixIf = composite , ".?(" , expression , "," , expression , ")" ;
  21. prefixIf = "?(" , expression , "," , expression , "," , expression , ")" ;
  22. suffixDistinct = composite , "distinct.()" ;
  23. prefixDistinct = functionIdentifier , ".distinct" , [ "(" , [ expression , { "," , expression } ] , ")" ] ;
  24. suffixFunctionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;
  25. prefixFunctionCall = functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;
  26. atom = ( "(" , expression , ")" ) | literal | fieldReference ;
  27. fieldReference = "*" | identifier ;
  28. nullLiteral = "Null(" , dataType , ")" ;
  29. timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "QUARTER" | "WEEK" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;
  30. timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;
  31. over = composite , "over" , fieldReference ;
  32. overConstant = "current_row" | "current_range" | "unbounded_row" | "unbounded_row" ;
  33. timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ;

这里,literal 是一个有效的 Java 文字。字符串文本可以使用单引号或双引号指定。复制转义引用(例如 'It''s me.' or "I ""like"" dogs.")。

fieldReference 指定数据中的一列(如果使用 * 则指定所有列),functionIdentifier 指定受支持的标量函数(scalar function)。列名和函数名遵循 Java 标识符语法。

指定为字符串的表达式也可以使用前缀表示法而不是后缀表示法来调用操作符和函数。

如果需要处理精确的数值或大小数,那么表 API 还支持 Java 的 BigDecimal 类型。在 Scala 表 API 中,小数可以用 BigDecimal("123456") 来定义,而在 Java 中,可以在后面加上“p”来精确定义,例如 123456p

为了处理时态值,表 API 支持 Java SQL 的日期、时间和时间戳类型。在 Scala 表 API 中,可以使用 java.sql.Date.valueOf("2016-06-27")java.sql.Time.valueOf("10:10:42")java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")来定义字面量。Java 和 Scala 表 API 还支持调用 "2016-06-27".toDate()"10:10:42".toTime()"2016-06-27 10:10:42.123".toTimestamp() 来将字符串转换为时态类型。注意: 由于 Java 的时态 SQL 类型依赖于时区,请确保 Flink 客户机和所有任务管理器使用相同的时区。

时间间隔可以表示为月数(Types.INTERVAL_MONTHS)或毫秒数(Types.INTERVAL_MILLIS)。可以添加或减去相同类型的区间(例如 1.hour + 10.minutes)。可以将毫秒间隔添加到时间点(例如`”2016-08-10”.toDate + 5.days)。