Table API

译者:flink.sojb.cn

Table API是用于流和批处理的统一关系API。 Table API查询可以在批量或流式输入上运行而无需修改。 Table API是SQL语言的超级集合,专门用于与Apache Flink一起使用。 Table API是Scala和Java语言集成API。 Table API查询不是像SQL一样将字符串值指定为SQL,而是在Java或Scala中以嵌入语言的样式定义,并支持自动完成和语法验证等IDE支持。

Table API与Flink的SQL集成共享其API的许多概念和部分。查看Common Concepts&API以了解如何注册表或创建Table对象。该流概念页讨论流如动态表和时间属性,具体的概念。

以下示例假定Orders使用属性调用的已注册表(a, b, c, rowtime)。该rowtime字段是流式传输中的逻辑时间属性或批处理中的常规时间戳字段。

概述和示例

Table API可用于Scala和Java。Scala Table API利用Scala表达式,Java Table API基于字符串,这些字符串被解析并转换为等效表达式。

以下示例显示了Scala和Java Table API之间的差异。表程序在批处理环境中执行。它Orders按字段Scan表,a并按组计算结果行。表程序的结果转换为DataSet类型Row并打印。

通过导入启用Java Table API org.apache.flink.table.api.java.*。以下示例显示如何构造Java Table API程序以及如何将表达式指定为字符串。

  1. // environment configuration
  2. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  3. BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
  4. // register Orders table in table environment
  5. // ...
  6. // specify table program
  7. Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)
  8. Table counts = orders
  9. .groupBy("a")
  10. .select("a, b.count as cnt");
  11. // conversion to DataSet
  12. DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
  13. result.print();

The Scala Table API is enabled by importing org.apache.flink.api.scala._ and org.apache.flink.table.api.scala._.

The following example shows how a Scala Table API program is constructed. Table attributes are referenced using Scala Symbols, which start with an apostrophe character (').

  1. import org.apache.flink.api.scala._
  2. import org.apache.flink.table.api.scala._
  3. // environment configuration val env = ExecutionEnvironment.getExecutionEnvironment
  4. val tEnv = TableEnvironment.getTableEnvironment(env)
  5. // register Orders table in table environment
  6. // ...
  7. // specify table program val orders = tEnv.scan("Orders") // schema (a, b, c, rowtime)
  8. val result = orders
  9. .groupBy('a)
  10. .select('a, 'b.count as 'cnt)
  11. .toDataSet[Row] // conversion to DataSet
  12. .print()

下一个示例显示了一个更复杂的 Table API程序。程序再次扫描Orders表格。它过滤空值,规范化aString类型的字段,并计算每小时和产品a的平均计费金额b

  1. // environment configuration
  2. // ...
  3. // specify table program
  4. Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)
  5. Table result = orders
  6. .filter("a.isNotNull && b.isNotNull && c.isNotNull")
  7. .select("a.lowerCase(), b, rowtime")
  8. .window(Tumble.over("1.hour").on("rowtime").as("hourlyWindow"))
  9. .groupBy("hourlyWindow, a")
  10. .select("a, hourlyWindow.end as hour, b.avg as avgBillingAmount");
  1. // environment configuration
  2. // ...
  3. // specify table program val orders: Table = tEnv.scan("Orders") // schema (a, b, c, rowtime)
  4. val result: Table = orders
  5. .filter('a.isNotNull && 'b.isNotNull && 'c.isNotNull)
  6. .select('a.lowerCase(), 'b, 'rowtime)
  7. .window(Tumble over 1.hour on 'rowtime as 'hourlyWindow)
  8. .groupBy('hourlyWindow, 'a)
  9. .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount)

由于 Table API是批处理和流数据的统一API,因此两个示例程序都可以在批处理和流输入上执行,而无需对表程序本身进行任何修改。在这两种情况下,程序产生相同的结果,因为流记录不会延迟(有关详细信息,请参阅流式概念)。

算子操作

Table API支持以下 算子操作。请注意,并非所有 算子操作都可用于批处理和流式传输; 他们被相应地标记。

Scan,Projection和过滤


算子:Scan 批量 流

描述:与SQL查询中的FROM子句类似。执行已注册表的扫描。

  1. Table orders = tableEnv.scan("Orders");

算子:Select 批量 流

描述:与SQL SELECT语句类似。执行选择 算子操作。

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders.select("a, c as d");

您可以使用star(*)作为通配符,选择表中的所有列。

  1. Table result = orders.select("*");

算子:As 批量 流

描述:重命名字段。

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders.as("x, y, z, t");

算子:Where / Filter Batch Streaming

描述:与SQL WHERE子句类似。过滤掉未通过过滤谓词的行。

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders.where("b === 'red'");

要么

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders.filter("a % 2 === 0");

聚合


算子:GroupBy聚合 批处理 流 结果更新

描述:与SQL GROUP BY子句类似。使用以下运行的聚合 算子对分组键上的行进行分组,以按组聚合行。

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders.groupBy("a").select("a, b.sum as d");

注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于聚合类型和不同分组键的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts


算子:GroupBy窗口聚合 批量 流

描述:组和聚合组窗口上的表以及可能的一个或多个分组键。

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders
  3. .window(Tumble.over("5.minutes").on("rowtime").as("w")) // define window
  4. .groupBy("a, w") // group by key and window
  5. .select("a, w.start, w.end, w.rowtime, b.sum as d"); // access window properties and aggregate

算子:Over Windows聚合

描述:与SQL OVER子句类似。基于前一行和后一行的窗口(范围)计算每行的窗口聚合。有关更多详细信息,请参阅over windows部分

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders
  3. // define window
  4. .window(Over
  5. .partitionBy("a")
  6. .orderBy("rowtime")
  7. .preceding("UNBOUNDED_RANGE")
  8. .following("CURRENT_RANGE")
  9. .as("w"))
  10. .select("a, b.avg over w, b.max over w, b.min over w"); // sliding aggregate

注意:必须在同一窗口中定义所有聚合,即相同的分区,排序和范围。目前,仅支持具有PRREDING(UNBOUNDED和有界)到CURRENT ROW范围的窗口。尚不支持使用FOLLOWING的范围。必须在单个时间属性上指定ORDER BY 。

算子:Distinct 批量 流 结果更新

描述:与SQL DISTINCT子句类似。返回具有不同值组合的记录。

  1. Table orders = tableEnv.scan("Orders");
  2. Table result = orders.distinct();

注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同字段的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts

Join


算子:Inner Join 批量 流

描述:与SQL JOIN子句类似。关联两张桌子。两个表必须具有不同的字段名称,并且必须通过连接 算子或使用where或filter 算子定义至少一个相等连接谓词。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "d, e, f");
  3. Table result = left.join(right).where("a = d").select("a, b, e");

注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts


算子:Outer Join 批处理 流 结果更新

描述:与SQL LEFT / RIGHT / FULL OUTER JOIN子句类似。关联两张桌子。两个表必须具有不同的字段名称,并且必须至少定义一个等于连接谓词。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "d, e, f");
  3. Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");
  4. Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");
  5. Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");

注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts


算子:Time-windowed Join 批量 流

描述:注意:时间窗口连接是可以以流方式处理的常规连接的子集。时间窗口连接需要至少一个等连接谓词和一个限制双方时间的连接条件。这样的条件可以由两个适当的范围谓词(&lt;, &lt;=, &gt;=, &gt;)或单个等式谓词来定义,该单个等式谓词比较两个输入表的相同类型的时间属性(即,处理时间或事件时间)。例如,以下谓词是有效的窗口连接条件:

  • ltime === rtime
  • ltime &gt;= rtime && ltime &lt; rtime + 10.minutes
  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
  2. Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");
  3. Table result = left.join(right)
  4. .where("a = d && ltime &gt;= rtime - 5.minutes && ltime &lt; rtime + 10.minutes")
  5. .select("a, b, e, ltime");

算子:TableFunction Inner Join 批量 流

描述:使用表函数的结果连接表。左(外)表的每一行与表函数的相应调用产生的所有行连接。如果其表函数调用返回空结果,则删除左(外)表的一行。

  1. // register function
  2. TableFunction&lt;String&gt; split = new MySplitUDTF();
  3. tEnv.registerFunction("split", split);
  4. // join
  5. Table orders = tableEnv.scan("Orders");
  6. Table result = orders
  7. .join(new Table(tEnv, "split(c)").as("s", "t", "v"))
  8. .select("a, b, s, t, v");

算子:TableFunction Left Outer Join Batch Streaming

描述:使用表函数的结果连接表。左(外)表的每一行与表函数的相应调用产生的所有行连接。如果表函数调用返回空结果,则保存相应的外部行,并使用空值填充结果。注意:目前,左外连接的表函数的谓词只能是空或文字true

  1. // register function
  2. TableFunction&lt;String&gt; split = new MySplitUDTF();
  3. tEnv.registerFunction("split", split);
  4. // join
  5. Table orders = tableEnv.scan("Orders");
  6. Table result = orders
  7. .leftOuterJoin(new Table(tEnv, "split(c)").as("s", "t", "v"))
  8. .select("a, b, s, t, v");

设置 算子操作


算子:Union 批次

描述:与SQL UNION子句类似。联合两个表删除了重复记录。两个表必须具有相同的字段类型。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "a, b, c");
  3. Table result = left.union(right);

算子:UnionAll Batch Streaming

描述:与SQL UNION ALL子句类似。工会两张桌子。两个表必须具有相同的字段类型。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "a, b, c");
  3. Table result = left.unionAll(right);

算子:Intersect 批次

描述:类似于SQL INTERSECT子句。Intersect返回两个表中存在的记录。如果一个或两个表不止一次出现记录,则只返回一次,即结果表没有重复记录。两个表必须具有相同的字段类型。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "d, e, f");
  3. Table result = left.intersect(right);

算子:IntersectAll Batch

描述:类似于SQL INTERSECT ALL子句。IntersectAll返回两个表中存在的记录。如果两个表中的记录多次出现,则返回的值与两个表中的记录一样多,即生成的表可能具有重复记录。两个表必须具有相同的字段类型。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "d, e, f");
  3. Table result = left.intersectAll(right);

算子:Minus

描述:与SQL EXCEPT子句类似。减号返回左表中右表中不存在的记录。左表中的重复记录只返回一次,即删除重复项。两个表必须具有相同的字段类型。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "a, b, c");
  3. Table result = left.minus(right);

算子:MinusAll Batch

描述:类似于SQL EXCEPT ALL子句。MinusAll返回右表中不存在的记录。在左表中出现n次并在右表中出现m次的记录返回(n-m)次,即,删除右表中存在的重复次数。两个表必须具有相同的字段类型。

  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");
  2. Table right = tableEnv.fromDataSet(ds2, "a, b, c");
  3. Table result = left.minusAll(right);

算子:In 批量 流中

描述:与SQL IN子句类似。如果表达式存在于给定的表子查询中,则返回true。子查询表必须包含一列。此列必须与表达式具有相同的数据类型。

  1. Table left = ds1.toTable(tableEnv, "a, b, c");
  2. Table right = ds2.toTable(tableEnv, "a");
  3. // using implicit registration
  4. Table result = left.select("a, b, c").where("a.in(" + right + ")");
  5. // using explicit registration
  6. tableEnv.registerTable("RightTable", right);
  7. Table result = left.select("a, b, c").where("a.in(RightTable)");

注意:对于流式查询, 算子操作将在连接和组 算子操作中重写。计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts

OrderBy,Offset&Fetch


算子:Order By 批次

描述:与SQL ORDER BY子句类似。返回跨所有并行分区全局排序的记录。

  1. Table in = tableEnv.fromDataSet(ds, "a, b, c");
  2. Table result = in.orderBy("a.asc");

算子:Offset & Fetch 批次

描述:类似于SQL OFFSET和FETCH子句。偏移和提取限制从排序结果返回的记录数。Offset和Fetch在技术上是Order By 算子的一部分,因此必须以它为前缀。

  1. Table in = tableEnv.fromDataSet(ds, "a, b, c");
  2. // returns the first 5 records from the sorted result
  3. Table result1 = in.orderBy("a.asc").fetch(5);
  4. // skips the first 3 records and returns all following records from the sorted result
  5. Table result2 = in.orderBy("a.asc").offset(3);
  6. // skips the first 10 records and returns the next 5 records from the sorted result
  7. Table result3 = in.orderBy("a.asc").offset(10).fetch(5);

Insert


算子:Insert 批量 流处理

描述:类似于SQL查询中的INSERT INTO子句。执行插入已注册的输出表。输出表必须在TableEnvironment中注册(请参阅注册TableSink)。此外,已注册表的模式必须与查询的模式匹配。

  1. Table orders = tableEnv.scan("Orders");
  2. orders.insertInto("OutOrders");

GroupWindows

组窗口根据时间或行计数间隔将行组聚合为有限组,并按组评估聚合函数。对于批处理表,窗口是按时间间隔对记录进行分组的便捷快捷方式。

Windows是使用该window(w: Window)子句定义的,需要使用该as子句指定的别名。为了按窗口对表进行分组,必须在groupBy(...)子句中引用窗口别名,就像常规分组属性一样。以下示例显示如何在表上定义窗口聚合。

  1. Table table = input
  2. .window([Window w].as("w")) // define window with alias w
  3. .groupBy("w") // group the table by window w
  4. .select("b.sum"); // aggregate
  1. val table = input
  2. .window([w: Window] as 'w) // define window with alias w
  3. .groupBy('w) // group the table by window w
  4. .select('b.sum) // aggregate

在流式传输环境中,如果窗口聚合除了窗口之外还在一个或多个属性上进行分组,则它们只能并行计算,即groupBy(...)子句引用窗口别名和至少一个附加属性。一个groupBy(...)仅引用一个窗口别名(如在上面的例子)子句只能由一个单一的,非平行的任务进行评估。以下示例显示如何使用其他分组属性定义窗口聚合。

  1. Table table = input
  2. .window([Window w].as("w")) // define window with alias w
  3. .groupBy("w, a") // group the table by attribute a and window w
  4. .select("a, b.sum"); // aggregate
  1. val table = input
  2. .window([w: Window] as 'w) // define window with alias w
  3. .groupBy('w, 'a) // group the table by attribute a and window w
  4. .select('a, 'b.sum) // aggregate

窗口属性诸如开始,结束,或一个时间窗口的rowtime时间戳可以在选择语句被添加为窗口别名作为的一个属性w.startw.end以及w.rowtime分别。窗口开始和行时间戳是包含的低窗口和超窗口边界。相反,窗口结束时间戳是独占的上窗口边界。例如,从下午2点开始的30分钟的翻滚窗口将具有14:00:00.000开始时间戳,14:29:59.999行时间戳和14:30:00.000结束时间戳。

  1. Table table = input
  2. .window([Window w].as("w")) // define window with alias w
  3. .groupBy("w, a") // group the table by attribute a and window w
  4. .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
  1. val table = input
  2. .window([w: Window] as 'w) // define window with alias w
  3. .groupBy('w, 'a) // group the table by attribute a and window w
  4. .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps

Window参数定义行如何映射到窗口。Window不是用户可以实现的接口。相反, Table API提供了一组Window具有特定语义的预定义类,这些类被转换为底层DataStreamDataSet 算子操作。支持的窗口定义如下所示。

翻滚(翻滚的Windows)

翻滚窗口将行分配给固定长度的非重叠连续窗口。例如,5分钟的翻滚窗口以5分钟为间隔对行进行分组。可以在事件时间,处理时间或行数上定义翻滚窗口。

使用Tumble类定义翻滚窗口如下:

方法 描述
over 定义窗口的长度,可以是时间或行计数间隔。
on 组的时间属性(时间间隔)或排序(行数)。对于批处理查询,这可能是任何Long或Timestamp属性。对于流式查询,这必须是声明的事件时间或处理时间属性
as 为窗口指定别名。别名用于引用以下groupBy()子句中的窗口,并可选择在子句中选择窗口属性,如窗口开始,结束或行时间戳select()
  1. // Tumbling Event-time Window
  2. .window(Tumble.over("10.minutes").on("rowtime").as("w"));
  3. // Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
  4. .window(Tumble.over("10.minutes").on("proctime").as("w"));
  5. // Tumbling Row-count Window (assuming a processing-time attribute "proctime")
  6. .window(Tumble.over("10.rows").on("proctime").as("w"));
  1. // Tumbling Event-time Window .window(Tumble over 10.minutes on 'rowtime as 'w)
  2. // Tumbling Processing-time Window (assuming a processing-time attribute "proctime") .window(Tumble over 10.minutes on 'proctime as 'w)
  3. // Tumbling Row-count Window (assuming a processing-time attribute "proctime") .window(Tumble over 10.rows on 'proctime as 'w)

滑动(滑动窗口)

滑动窗口具有固定大小,并按指定的滑动间隔滑动。如果滑动间隔小于窗口大小,则滑动窗口重叠。因此,可以将行分配给多个窗口。例如,15分钟大小和5分钟滑动间隔的滑动窗口将每行分配给3个不同的15分钟大小的窗口,这些窗口以5分钟的间隔进行评估。可以在事件时间,处理时间或行数上定义滑动窗口。

滑动窗口使用Slide如下类定义:

方法 描述
over 定义窗口的长度,可以是时间或行计数间隔。
every 定义滑动间隔,可以是时间间隔也可以是行计数间隔。滑动间隔必须与大小间隔的类型相同。
on 组的时间属性(时间间隔)或排序(行数)。对于批处理查询,这可能是任何Long或Timestamp属性。对于流式查询,这必须是声明的事件时间或处理时间属性
as 为窗口指定别名。别名用于引用以下groupBy()子句中的窗口,并可选择在子句中选择窗口属性,如窗口开始,结束或行时间戳select()
  1. // Sliding Event-time Window
  2. .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));
  3. // Sliding Processing-time window (assuming a processing-time attribute "proctime")
  4. .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));
  5. // Sliding Row-count window (assuming a processing-time attribute "proctime")
  6. .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
  1. // Sliding Event-time Window .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
  2. // Sliding Processing-time window (assuming a processing-time attribute "proctime") .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)
  3. // Sliding Row-count window (assuming a processing-time attribute "proctime") .window(Slide over 10.rows every 5.rows on 'proctime as 'w)

会话(会话窗口)

会话窗口没有固定的大小,但它们的边界由不活动的间隔定义,即如果在定义的间隙期间没有出现事件,则会话窗口关闭。例如,如果在30分钟不活动后观察到一行,则会开始一个30分钟间隙的会话窗口(否则该行将被添加到现有窗口中),如果在30分钟内未添加任何行,则会关闭。会话窗口可以在事件时间或处理时间上工作。

使用Session类定义会话窗口,如下所示:

方法 描述
withGap 将两个窗口之间的间隔定义为时间间隔。
on 组的时间属性(时间间隔)或排序(行数)。对于批处理查询,这可能是任何Long或Timestamp属性。对于流式查询,这必须是声明的事件时间或处理时间属性
as 为窗口指定别名。别名用于引用以下groupBy()子句中的窗口,并可选择在子句中选择窗口属性,如窗口开始,结束或行时间戳select()
  1. // Session Event-time Window
  2. .window(Session.withGap("10.minutes").on("rowtime").as("w"));
  3. // Session Processing-time Window (assuming a processing-time attribute "proctime")
  4. .window(Session.withGap("10.minutes").on("proctime").as("w"));
  1. // Session Event-time Window .window(Session withGap 10.minutes on 'rowtime as 'w)
  2. // Session Processing-time Window (assuming a processing-time attribute "proctime") .window(Session withGap 10.minutes on 'proctime as 'w)

OverWindows

从标准SQL(OVER子句)已知过窗口聚合,并在SELECT查询的子句中定义。与GROUP BY子句中指定的组窗口不同,在窗口上不会折叠行。而是通过窗口聚合计算每个输入行在其相邻行的范围内的聚合。

OverWindows是使用window(w: OverWindow*)子句定义的,并通过select()方法中的别名引用。以下示例显示如何在表上定义过窗口聚合。

  1. Table table = input
  2. .window([OverWindow w].as("w")) // define over window with alias w
  3. .select("a, b.sum over w, c.min over w"); // aggregate over the over window w
  1. val table = input
  2. .window([w: OverWindow] as 'w) // define over window with alias w
  3. .select('a, 'b.sum over 'w, 'c.min over 'w) // aggregate over the over window w

OverWindow限定范围在其聚集计算的行。OverWindow不是用户可以实现的接口。相反, Table API提供了Over用于配置覆盖窗口属性的类。可以在事件时间或处理时间以及指定为时间间隔或行计数的范围上定义窗口。支持的窗口定义在Over(和其他类)上公开为方法,如下所示:

方法 Required 描述
partitionBy Optional 定义一个或多个属性上的输入分区。每个分区都单独排序,聚合函数分别应用于每个分区。注意:在流式环境中,如果窗口包含partition by子句,则只能并行计算窗口聚合。no / notpartitionBy(...)流由单个非并行任务处理。
orderBy Required 定义每个分区中行的顺序,从而定义聚合函数应用于行的顺序。注意:对于流式查询,这必须是声明的事件时间或处理时间属性。目前,仅支持单个排序属性。
preceding Required 定义窗口中包含的行的间隔,并在当前行之前。间隔可以指定为时间或行计数间隔。在窗口上限定具有间隔的大小,例如,10.minutes对于时间间隔或10.rows行计数间隔。使用常量(即,UNBOUNDED_RANGE时间间隔或UNBOUNDED_ROW行计数间隔)指定在窗口无界限。在Windows上无限制地从分区的第一行开始。
following Optional 定义窗口中包含的行的窗口间隔,并跟随当前行。必须在与前一个间隔(时间或行计数)相同的单位中指定间隔。目前,不支持在当前行之后包含行的窗口。相反,您可以指定两个常量之一:
CURRENT_ROW 将窗口的上限设置为当前行。
CURRENT_RANGE 设置窗口的上限以对当前行的键进行排序,即窗口中包含与当前行具有相同排序键的所有行。
如果following省略该子句,则将时间间隔窗口CURRENT_RANGE的上限定义为,并将行计数间隔窗口的上限定义为CURRENT_ROW
as Required 为覆盖窗口指定别名。别名用于引用以下select()子句中的over window 。

注意:目前,同一select()调用中的所有聚合函数必须计算相同的窗口。

在Windows上无限制

  1. // Unbounded Event-time over window (assuming an event-time attribute "rowtime")
  2. .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w"));
  3. // Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
  4. .window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_range").as("w"));
  5. // Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
  6. .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"));
  7. // Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
  8. .window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_row").as("w"));
  1. // Unbounded Event-time over window (assuming an event-time attribute "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
  2. // Unbounded Processing-time over window (assuming a processing-time attribute "proctime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
  3. // Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
  4. // Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)

限制在Windows上

  1. // Bounded Event-time over window (assuming an event-time attribute "rowtime")
  2. .window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
  3. // Bounded Processing-time over window (assuming a processing-time attribute "proctime")
  4. .window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
  5. // Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
  6. .window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
  7. // Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
  8. .window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))
  1. // Bounded Event-time over window (assuming an event-time attribute "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
  2. // Bounded Processing-time over window (assuming a processing-time attribute "proctime") .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)
  3. // Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
  4. // Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime") .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)

数据类型

Table API建立在Flink的DataSet和DataStream API之上。在内部,它还使用Flink TypeInformation来定义数据类型。完全支持的类型列在org.apache.flink.table.api.Types。下表总结了 Table API类型,SQL类型和生成的Java类之间的关系。

Table API SQL Java类型
Types.STRING VARCHAR java.lang.String
Types.BOOLEAN BOOLEAN java.lang.Boolean
Types.BYTE TINYINT java.lang.Byte
Types.SHORT SMALLINT java.lang.Short
Types.INT INTEGER, INT java.lang.Integer
Types.LONG BIGINT java.lang.Long
Types.FLOAT REAL, FLOAT java.lang.Float
Types.DOUBLE DOUBLE java.lang.Double
Types.DECIMAL DECIMAL java.math.BigDecimal
Types.SQL_DATE DATE java.sql.Date
Types.SQL_TIME TIME java.sql.Time
Types.SQL_TIMESTAMP TIMESTAMP(3) java.sql.Timestamp
Types.INTERVAL_MONTHS INTERVAL YEAR TO MONTH java.lang.Integer
Types.INTERVAL_MILLIS INTERVAL DAY TO SECOND(3) java.lang.Long
Types.PRIMITIVE_ARRAY ARRAY 例如 int[]
Types.OBJECT_ARRAY ARRAY 例如 java.lang.Byte[]
Types.MAP MAP java.util.HashMap
Types.MULTISET MULTISET 例如,java.util.HashMap&lt;String, Integer&gt;对于多重集合String
Types.ROW ROW org.apache.flink.types.Row

通用类型和(嵌套)复合类型(例如,POJO,元组,Row,Scala案例类)也可以是行的字段。

可以使用值访问函数访问具有任意嵌套的复合类型的字段。

通用类型被视为黑盒子,可以由用户定义的函数传递或处理。

表达式语法

前面部分中的一些 算子需要一个或多个表达式。可以使用嵌入式Scala DSL或字符串指定表达式。请参阅上面的示例以了解如何指定表达式。

这是表达式的EBNF语法:

  1. expressionList = expression , { "," , expression } ;
  2. expression = timeIndicator | overConstant | alias ;
  3. alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ;
  4. logic = comparison , [ ( "&&" | "||" ) , comparison ] ;
  5. comparison = term , [ ( "=" | "==" | "===" | "!=" | "!==" | ">" | ">=" | "<" | "<=" ) , term ] ;
  6. term = product , [ ( "+" | "-" ) , product ] ;
  7. product = unary , [ ( "*" | "/" | "%") , unary ] ;
  8. unary = [ "!" | "-" ] , composite ;
  9. composite = over | nullLiteral | suffixed | atom ;
  10. suffixed = interval | cast | as | if | functionCall ;
  11. interval = timeInterval | rowInterval ;
  12. timeInterval = composite , "." , ("year" | "years" | "quarter" | "quarters" | "month" | "months" | "week" | "weeks" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;
  13. rowInterval = composite , "." , "rows" ;
  14. cast = composite , ".cast(" , dataType , ")" ;
  15. dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "MAP" , "(" , dataType , "," , dataType , ")" ) | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;
  16. as = composite , ".as(" , fieldReference , ")" ;
  17. if = composite , ".?(" , expression , "," , expression , ")" ;
  18. functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;
  19. atom = ( "(" , expression , ")" ) | literal | fieldReference ;
  20. fieldReference = "*" | identifier ;
  21. nullLiteral = "Null(" , dataType , ")" ;
  22. timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "QUARTER" | "WEEK" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;
  23. timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;
  24. over = composite , "over" , fieldReference ;
  25. overConstant = "current_row" | "current_range" | "unbounded_row" | "unbounded_row" ;
  26. timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ;

这里literal是一个有效的Java文字,fieldReference指定数据中的列(或使用的所有列*),并functionIdentifier指定支持的标量函数。列名和函数名遵循Java标识符语法。指定为字符串的表达式也可以使用前缀表示法而不是后缀表示法来调用 算子和函数。

如果需要使用精确数值或大小数, Table API也支持JavaBigDecimal类型。在Scala Table API中,小数可以BigDecimal("123456")通过附加“p” 来定义,也可以在Java中定义,例如123456p

为了使用时态值, Table API支持Java SQL的Date,Time和Timestamp类型。在Scala的 Table API文本可以通过使用来定义java.sql.Date.valueOf("2016-06-27")java.sql.Time.valueOf("10:10:42")java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")。在Java和Scala Table API还支持调用"2016-06-27".toDate()"10:10:42".toTime()以及"2016-06-27 10:10:42.123".toTimestamp()对String转化成时间类型。注意:由于Java临时SQL类型与时区有关,因此请确保Flink Client和所有TaskManagers使用相同的时区。

时间间隔可以表示为月数(Types.INTERVAL_MONTHS)或毫秒数(Types.INTERVAL_MILLIS)。可以添加或减去相同类型的间隔(例如1.hour + 10.minutes)。可以将毫秒的间隔添加到时间点(例如"2016-08-10".toDate + 5.days)。