SQL查询是用 TableEnvironmentsqlQuery() 方法指定的。该方法将 SQL 查询的结果作为 Table 返回。Table 可用于后续 SQ L和表 API 查询转换为 DataSet 或 DataStream写入TableSink。SQL 和表 API 查询可以无缝地混合在一起,并进行整体优化并转换成一个程序。

为了访问 SQL 查询中的表,必须在 TableEnvironment 中注册。可以从TableSourcetableDataStream,或DataSet注册表。或者,用户也可以在表环境中注册外部目录来指定数据源的位置。

为了方便起见,Table.toString() 会自动在其 TableEnvironment 中以唯一名称注册表并返回该名称。因此,Table 对象可以直接内联到 SQL 查询中(通过字符串连接),如下面的示例所示。

注意: Flink 的 SQL 支持尚未完成。包含不支持的 SQL 特性的查询会导致 TableException。下面几节列出了在批处理表和流表上支持的 SQL 特性。

指定一个查询

下面的示例显示如何在已注册和内联的表上指定 SQL 查询。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // ingest a DataStream from an external source
  4. DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
  5. // SQL query with an inlined (unregistered) table
  6. Table table = tableEnv.fromDataStream(ds, "user, product, amount");
  7. Table result = tableEnv.sqlQuery(
  8. "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
  9. // SQL query with a registered table
  10. // register the DataStream as table "Orders"
  11. tableEnv.registerDataStream("Orders", ds, "user, product, amount");
  12. // run a SQL query on the Table and retrieve the result as a new Table
  13. Table result2 = tableEnv.sqlQuery(
  14. "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  15. // SQL update with a registered table
  16. // create and register a TableSink
  17. TableSink csvSink = new CsvTableSink("/path/to/file", ...);
  18. String[] fieldNames = {"product", "amount"};
  19. TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
  20. tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink);
  21. // run a SQL update query on the Table and emit the result to the TableSink
  22. tableEnv.sqlUpdate(
  23. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
  4. // SQL query with an inlined (unregistered) table val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
  5. val result = tableEnv.sqlQuery(
  6. s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
  7. // SQL query with a registered table
  8. // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
  9. // run a SQL query on the Table and retrieve the result as a new Table val result2 = tableEnv.sqlQuery(
  10. "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  11. // SQL update with a registered table
  12. // create and register a TableSink TableSink csvSink = new CsvTableSink("/path/to/file", ...)
  13. val fieldNames: Array[String] = Array("product", "amount")
  14. val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
  15. tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink)
  16. // run a SQL update query on the Table and emit the result to the TableSink tableEnv.sqlUpdate(
  17. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

支持语法

Flink 使用 Apache Calcite解析SQL,它支持标准的 ANSI SQL。Flink 不支持 DDL 语句。

下面的 BNF 语法(BNF-grammar)描述了批处理和流查询中支持的 SQL 特性的超集。Operations小节展示了支持的特性的示例,并指出哪些特性只支持批处理或流查询。

  1. insert:
  2. INSERT INTO tableReference
  3. query
  4. query:
  5. values
  6. | {
  7. select
  8. | selectWithoutFrom
  9. | query UNION [ ALL ] query
  10. | query EXCEPT query
  11. | query INTERSECT query
  12. }
  13. [ ORDER BY orderItem [, orderItem ]* ]
  14. [ LIMIT { count | ALL } ]
  15. [ OFFSET start { ROW | ROWS } ]
  16. [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
  17. orderItem:
  18. expression [ ASC | DESC ]
  19. select:
  20. SELECT [ ALL | DISTINCT ]
  21. { * | projectItem [, projectItem ]* }
  22. FROM tableExpression
  23. [ WHERE booleanExpression ]
  24. [ GROUP BY { groupItem [, groupItem ]* } ]
  25. [ HAVING booleanExpression ]
  26. [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
  27. selectWithoutFrom:
  28. SELECT [ ALL | DISTINCT ]
  29. { * | projectItem [, projectItem ]* }
  30. projectItem:
  31. expression [ [ AS ] columnAlias ]
  32. | tableAlias . *
  33. tableExpression:
  34. tableReference [, tableReference ]*
  35. | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
  36. joinCondition:
  37. ON booleanExpression
  38. | USING '(' column [, column ]* ')'
  39. tableReference:
  40. tablePrimary
  41. [ matchRecognize ]
  42. [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
  43. tablePrimary:
  44. [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
  45. | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  46. | UNNEST '(' expression ')'
  47. values:
  48. VALUES expression [, expression ]*
  49. groupItem:
  50. expression
  51. | '(' ')'
  52. | '(' expression [, expression ]* ')'
  53. | CUBE '(' expression [, expression ]* ')'
  54. | ROLLUP '(' expression [, expression ]* ')'
  55. | GROUPING SETS '(' groupItem [, groupItem ]* ')'
  56. windowRef:
  57. windowName
  58. | windowSpec
  59. windowSpec:
  60. [ windowName ]
  61. '('
  62. [ ORDER BY orderItem [, orderItem ]* ]
  63. [ PARTITION BY expression [, expression ]* ]
  64. [
  65. RANGE numericOrIntervalExpression {PRECEDING}
  66. | ROWS numericExpression {PRECEDING}
  67. ]
  68. ')'
  69. matchRecognize:
  70. MATCH_RECOGNIZE '('
  71. [ PARTITION BY expression [, expression ]* ]
  72. [ ORDER BY orderItem [, orderItem ]* ]
  73. [ MEASURES measureColumn [, measureColumn ]* ]
  74. [ ONE ROW PER MATCH ]
  75. [ AFTER MATCH
  76. ( SKIP TO NEXT ROW
  77. | SKIP PAST LAST ROW
  78. | SKIP TO FIRST variable
  79. | SKIP TO LAST variable
  80. | SKIP TO variable )
  81. ]
  82. PATTERN '(' pattern ')'
  83. DEFINE variable AS condition [, variable AS condition ]*
  84. ')'
  85. measureColumn:
  86. expression AS alias
  87. pattern:
  88. patternTerm [ '|' patternTerm ]*
  89. patternTerm:
  90. patternFactor [ patternFactor ]*
  91. patternFactor:
  92. variable [ patternQuantifier ]
  93. patternQuantifier:
  94. '*'
  95. | '*?'
  96. | '+'
  97. | '+?'
  98. | '?'
  99. | '??'
  100. | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  101. | '{' repeat '}'

Flink SQL 对标识符(表、属性、函数名)使用词法策略,类似于Java:

  • 无论是否引用标识符,都会保留标识符的大小写。
  • 之后,标识符将区分大小写地匹配。
  • 与 Java 不同,反号(back-ticks)允许标识符包含非字母数字字符(non-alphanumeric characters)(例如 "SELECT a ASmy fieldFROM t")。

字符串文字必须用单引号括起来(例如,SELECT 'Hello World')。复制一个转义单引号(例如,SELECT 'It''s me.')。Unicode 字符支持字符串文字。如果需要显式 unicode 代码点,请使用以下语法:

  • 使用反斜杠(\)作为转义字符(默认): SELECT U&'\263A'
  • 使用自定义转义字符: SELECT U&'#263A' UESCAPE '#'

操作 Operations

扫描、投影和过滤(Scan, Projection, and Filter)

操作 描述
Scan / Select / As
Batch Streaming
  1. SELECT * FROM Orders
  2. SELECT a, c AS d FROM Orders

|
| Where / Filter
Batch Streaming |

  1. SELECT * FROM Orders WHERE b = 'red'
  2. SELECT * FROM Orders WHERE a % 2 = 0

|
| User-defined Scalar Functions (Scalar UDF)
Batch Streaming | UDF 必须在表环境(TableEnvironment)中注册。有关如何指定和注册标量UDF的详细信息,请参阅UDF文档

  1. SELECT PRETTY_PRINT(user) FROM Orders

|

聚合(Aggregations)

操作 描述
GroupBy Aggregation

Batch Streaming
Result Updating | 注意: GroupBy在流表上生成更新结果。有关详细信息,请参阅Dynamic Tables Streaming Concepts页面。

  1. SELECT a, SUM(b) as d
  2. FROM Orders
  3. GROUP BY a

|
| GroupBy Window Aggregation
Batch Streaming | Use a group window to compute a single result row per group. See Group Windows section for more details.
使用组窗口(group window)计算每个组的单个结果行。有关更多细节,请参见Group Windows一节。

  1. SELECT user, SUM(amount)
  2. FROM Orders
  3. GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user

|
| Over Window aggregation
Streaming | 注意: 所有聚合必须在同一个窗口上定义,即相同的分区、排序和范围。目前,只支持具有当前行范围之前(无界和有界)的窗口。还不支持包含以下内容的范围。ORDER BY必须在单个[time属性]上指定(streaming/time_attributes.html)

  1. SELECT COUNT(amount) OVER (
  2. PARTITION BY user
  3. ORDER BY proctime
  4. ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
  5. FROM Orders
  6. SELECT COUNT(amount) OVER w, SUM(amount) OVER w
  7. FROM Orders
  8. WINDOW w AS (
  9. PARTITION BY user
  10. ORDER BY proctime
  11. ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

|
| Distinct
Batch Streaming
Result Updating |

  1. SELECT DISTINCT users FROM Orders

注意: 对于流查询,计算查询结果所需的状态可能会根据不同字段的数量无限增长。请提供具有有效保留间隔的查询配置,以防止状态过大。有关详细信息,请参见查询配置。 |
| Grouping sets, Rollup, Cube
Batch |

  1. SELECT SUM(amount)
  2. FROM Orders
  3. GROUP BY GROUPING SETS ((user), (product))

|
| Having
Batch Streaming |

  1. SELECT SUM(amount)
  2. FROM Orders
  3. GROUP BY users
  4. HAVING SUM(amount) &gt; 50

|
| User-defined Aggregate Functions (UDAGG)
Batch Streaming | UDAGG 必须在表环境中注册。有关如何指定和注册 UDAGG 的详细信息,请参阅UDF文档

  1. SELECT MyAggregate(amount)
  2. FROM Orders
  3. GROUP BY users

|

连接 Joins

操作 描述
Inner Equi-join
Batch Streaming 目前,只支持等值连接(equi-joins),即,连接具有至少一个连接条件并带有相等谓词的连接。不支持任意交叉或连接。注: 连接顺序未优化。表按照 FROM 子句中指定的顺序连接。确保以不产生交叉连接(笛卡尔积)的顺序指定表,这些表不受支持并且会导致查询失败。
  1. SELECT *
  2. FROM Orders INNER JOIN Product ON Orders.productId = Product.id

注意: 对于流查询,计算查询结果所需的状态可能会根据不同的输入行数无限增长。请提供具有有效保留间隔的查询配置,以防止状态过大。有关详细信息,请参见查询配置。 |
| Outer Equi-join
Batch Streaming Result Updating | 目前,只支持等值连接(equi-joins),即,具有至少一个连接条件并带有相等谓词的连接。不支持任意交叉或连接。注意: 连接顺序未优化。表按照 FROM 子句中指定的顺序连接。确保以不产生交叉连接(笛卡尔积)的顺序指定表,这些表不受支持并且会导致查询失败。

  1. SELECT *
  2. FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
  3. SELECT *
  4. FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
  5. SELECT *
  6. FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id

注意: 对于流查询,计算查询结果所需的状态可能会根据不同的输入行数无限增长。请提供具有有效保留间隔的查询配置,以防止状态过大。有关详细信息,请参见查询配置。 |
| Time-windowed Join
Batch Streaming | 注意: 时间窗口连接是可以以流方式处理的常规连接的子集。有时间窗口的连接至少需要一个等值连接谓词和一个连接条件,该条件在连接两边限定时间。这样的条件可以由两个适当的范围谓词(&lt;, &lt;=, &gt;=, &gt;)、 BETWEEN 谓词或单个等式谓词来定义,该单个等式谓词比较两个输入表的相同类型(即处理时间或事件时间)的时间属性例如,以下谓词是有效的窗口连接条件:

  • ltime = rtime
  • ltime &gt;= rtime AND ltime &lt; rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
  1. SELECT *
  2. FROM Orders o, Shipments s
  3. WHERE o.id = s.orderId AND
  4. o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

如果订单是在收到订单后4小时发出的,那么上面的示例将把所有订单与其相应的发货连接起来。 |
| Expanding arrays into a relation
Batch Streaming | 不支持使用序数反嵌套(Unnesting WITH ORDINALITY)。

  1. SELECT users, tag
  2. FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

|
| Join with Table Function
Batch Streaming | 将表与表函数的结果连接在一起。左边(外部)表的每一行都与表函数的相应调用生成的所有行连接在一起。必须在此之前注册用户定义的表函数(UDTFs)。有关如何指定和注册UDTFs的详细信息,请参阅UDF文档内连接(Inner Join) 如果表函数调用返回空结果,则删除左侧(外部)表的一行。

  1. SELECT users, tag
  2. FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag

左外连接(Left Outer Join) 如果表函数调用返回空结果,则保留相应的外部行,并使用空值填充结果。

  1. SELECT users, tag
  2. FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE

注意: 目前,只支持字面上的 TRUE 作为针对横向表的左外连接的谓词。 |
| Join with Temporal Table
Streaming | 时态表是跟踪随时间变化的表。时态表函数提供对特定时间点时态表状态的访问。使用时态表函数连接表的语法与 表函数联接(Join with Table Function) 中的语法相同。注意: 目前只支持带时态表的内部连接。假设 Rates时态表函数,连接可以用 SQL 表示如下:

  1. SELECT
  2. o_amount, r_rate
  3. FROM
  4. Orders,
  5. LATERAL TABLE (Rates(o_proctime))
  6. WHERE
  7. r_currency = o_currency

有关更多信息,请查看更详细的时态表概念描述。 |

集合操作(Set Operations)

操作 描述
Union
Batch
  1. SELECT *
  2. FROM (
  3. (SELECT user FROM Orders WHERE a % 2 = 0)
  4. UNION
  5. (SELECT user FROM Orders WHERE b = 0)
  6. )

|
| UnionAll
Batch Streaming |

  1. SELECT *
  2. FROM (
  3. (SELECT user FROM Orders WHERE a % 2 = 0)
  4. UNION ALL
  5. (SELECT user FROM Orders WHERE b = 0)
  6. )

|
| Intersect / Except
Batch |

  1. SELECT *
  2. FROM (
  3. (SELECT user FROM Orders WHERE a % 2 = 0)
  4. INTERSECT
  5. (SELECT user FROM Orders WHERE b = 0)
  6. )
  1. SELECT *
  2. FROM (
  3. (SELECT user FROM Orders WHERE a % 2 = 0)
  4. EXCEPT
  5. (SELECT user FROM Orders WHERE b = 0)
  6. )

|
| In
Batch Streaming | 如果给定表子查询中存在表达式,则返回 true。子查询表必须由一列组成。该列必须具有与表达式相同的数据类型。

  1. SELECT user, amount
  2. FROM Orders
  3. WHERE product IN (
  4. SELECT product FROM NewProducts
  5. )

注意: 对于流查询,操作在 join 和 group 操作中重写。计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。有关详细信息,请参见查询配置。 |
| Exists
Batch Streaming | 如果子查询至少返回一行,则返回true。只有在可以在 join 和 group 操作中重写操作时,才支持该操作。

  1. SELECT user, amount
  2. FROM Orders
  3. WHERE product EXISTS (
  4. SELECT product FROM NewProducts
  5. )

注意 对于流查询,操作在 join 和 group 操作中重写。计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。有关详细信息,请参见查询配置。 |

OrderBy & Limit

操作 描述
Order By
Batch Streaming 注意: 流查询的结果必须主要根据升序time 属性进行排序。还支持其他排序属性。
  1. SELECT *
  2. FROM Orders
  3. ORDER BY orderTime

|
| Limit
Batch |

  1. SELECT *
  2. FROM Orders
  3. LIMIT 3

|

插入(Insert)

操作 描述
Insert Into
Batch Streaming 输出表必须在 TableEnvironment 中注册(参见Register a TableSink)。此外,已注册表的模式必须与查询的模式匹配。
  1. INSERT INTO OutputTable
  2. SELECT users, tag
  3. FROM Orders

|

Group Windows

Group 窗口(Group windows)是在SQL查询的 GROUP BY 子句中定义的。就像使用常规的 GROUP BY 子句查询一样,使用包含 GROUP 窗口函数的 GROUP BY 子句查询会为每个组计算一个结果行。批处理表和流表上的 SQL 支持以下 group windows 函数。

Group 窗口函数 描述
TUMBLE(time_attr, interval) 定义一个滚动时间窗口。滚动时间窗口将行分配给具有固定持续时间(间隔(interval))的非重叠连续窗口。例如,一个5分钟滚动窗口以5分钟为间隔将行分组。滚动窗口可以在事件时间(流+批处理)或处理时间(流)上定义。
HOP(time_attr, interval, interval) 定义一个跳转时间窗口(hopping time window)(在表 API 中称为滑动窗口)。跳转时间窗口具有固定的持续时间(第二个 interval 参数),并按指定的跳转间隔(第一个 interval 参数)跳转。如果跳转间隔小于窗口大小,则跳转窗口重叠。因此,可以将行分配给多个窗口。例如,一个15分钟大小的跳转窗口和5分钟的跳转间隔将每一行分配给3个15分钟大小的不同窗口,这些窗口在5分钟的间隔内计算。跳跃窗口可以在事件时间(流+批处理)或处理时间(流)上定义。
SESSION(time_attr, interval) 定义会话时间窗口(session time window)。会话时间窗口没有固定的持续时间,但它们的边界由不活动的时间 间隔(interval) 定义,即,如果在定义的间隔期间没有出现事件,则关闭会话窗口。例如,如果在30分钟不活动之后观察到一行(否则该行将被添加到现有窗口),则会启动一个间隔为30分钟的会话窗口,如果在30分钟内没有添加行,则会关闭该会话窗口。会话窗口可以在事件时间(流+批处理)或处理时间(流)上工作。

时间属性

对于流表上的 SQL 查询,组窗口函数的 time_attr 参数必须引用一个有效的时间属性,该属性指定行处理时间或事件时间。参见时间属性文档了解如何定义时间属性。

对于批处理表上的 SQL,组窗口函数的 time_attr 参数必须是 TIMESTAMP 类型的属性。

选择 Group 窗口开始和结束时间戳

Group 窗口的开始和结束时间戳以及时间属性可以通过以下辅助函数(auxiliary functions)来选择:

辅助函数 描述
TUMBLE_START(time_attr, interval)

HOP_START(time_attr, interval, interval)
SESSION_START(time_attr, interval)
| 返回相应滚动、跳跃或会话窗口的包含下界的时间戳。 |
| TUMBLE_END(time_attr, interval)
HOP_END(time_attr, interval, interval)
SESSION_END(time_attr, interval)
| 返回相应滚动、跳跃或会话窗口的 独占(exclusive) 上界的时间戳。注意: 在后续基于时间的操作中,例如时间窗口连接组窗口或窗口之上的聚合)中,不能 将独占的上界时间戳用作rowtime 属性。 |
| TUMBLE_ROWTIME(time_attr, interval)
HOP_ROWTIME(time_attr, interval, interval)
SESSION_ROWTIME(time_attr, interval)
| 返回相应滚动、跳跃或会话窗口的 独占(inclusive) 上界的时间戳。得到的属性是一个rowtime 属性,可以在后续基于时间的操作中使用,比如时间窗口连接组窗口或窗口之上的聚合。 |
| TUMBLE_PROCTIME(time_attr, interval)
HOP_PROCTIME(time_attr, interval, interval)
SESSION_PROCTIME(time_attr, interval)
| 返回一个proctime 属性,该属性可用于后续基于时间的操作,如时间窗口连接组窗口或窗口聚合之上的窗口。 |

注意: 必须使用与 GROUP BY 子句中的组窗口函数完全相同的参数调用辅助函数。

下面的示例显示如何在流表上使用组窗口指定 SQL 查询。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // ingest a DataStream from an external source
  4. DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
  5. // register the DataStream as table "Orders"
  6. tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime, rowtime.rowtime");
  7. // compute SUM(amount) per day (in event-time)
  8. Table result1 = tableEnv.sqlQuery(
  9. "SELECT user, " +
  10. " TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart, " +
  11. " SUM(amount) FROM Orders " +
  12. "GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");
  13. // compute SUM(amount) per day (in processing-time)
  14. Table result2 = tableEnv.sqlQuery(
  15. "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user");
  16. // compute every hour the SUM(amount) of the last 24 hours in event-time
  17. Table result3 = tableEnv.sqlQuery(
  18. "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product");
  19. // compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
  20. Table result4 = tableEnv.sqlQuery(
  21. "SELECT user, " +
  22. " SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, " +
  23. " SESSION_ROWTIME(rowtime, INTERVAL '12' HOUR) AS snd, " +
  24. " SUM(amount) " +
  25. "FROM Orders " +
  26. "GROUP BY SESSION(rowtime, INTERVAL '12' HOUR), user");
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // read a DataStream from an external source val ds: DataStream[(Long, String, Int)] = env.addSource(...)
  4. // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime)
  5. // compute SUM(amount) per day (in event-time) val result1 = tableEnv.sqlQuery(
  6. """
  7. |SELECT
  8. | user,
  9. | TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,
  10. | SUM(amount)
  11. | FROM Orders
  12. | GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
  13. """.stripMargin)
  14. // compute SUM(amount) per day (in processing-time) val result2 = tableEnv.sqlQuery(
  15. "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user")
  16. // compute every hour the SUM(amount) of the last 24 hours in event-time val result3 = tableEnv.sqlQuery(
  17. "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product")
  18. // compute SUM(amount) per session with 12 hour inactivity gap (in event-time) val result4 = tableEnv.sqlQuery(
  19. """
  20. |SELECT
  21. | user,
  22. | SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart,
  23. | SESSION_END(rowtime, INTERVAL '12' HOUR) AS sEnd,
  24. | SUM(amount)
  25. | FROM Orders
  26. | GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user
  27. """.stripMargin)

模式识别

操作 描述
MATCH_RECOGNIZE
Streaming 根据 MATCH_RECOGNIZE ISO标准在流表中搜索给定的模式。这使得在 SQL 查询中表达复杂事件处理(CEP)逻辑成为可能。有关更详细的描述,请参见检测表中的模式的专用页面。
  1. SELECT T.aid, T.bid, T.cid
  2. FROM MyTable
  3. MATCH_RECOGNIZE (
  4. PARTITION BY userid
  5. ORDER BY proctime
  6. MEASURES
  7. A.id AS aid,
  8. B.id AS bid,
  9. C.id AS cid
  10. PATTERN (A B C)
  11. DEFINE
  12. A AS name = 'a',
  13. B AS name = 'b',
  14. C AS name = 'c'
  15. ) AS T

|

数据类型

The SQL runtime is built on top of Flink’s DataSet and DataStream APIs. Internally, it also uses Flink’s TypeInformation to define data types. Fully supported types are listed in org.apache.flink.table.api.Types. The following table summarizes the relation between SQL Types, Table API types, and the resulting Java class.
SQL 运行时构建在 Flink 的数据集(DataSet)和数据流(DataStream) API 之上。在内部,它还使用 Flink 的 TypeInformation 来定义数据类型。完全支持的类型列在 org.apache.flink.table.api.Types 中。下表总结了 SQL 类型、表 API 类型和生成的 Java 类之间的关系。

Table API SQL Java type
Types.STRING VARCHAR java.lang.String
Types.BOOLEAN BOOLEAN java.lang.Boolean
Types.BYTE TINYINT java.lang.Byte
Types.SHORT SMALLINT java.lang.Short
Types.INT INTEGER, INT java.lang.Integer
Types.LONG BIGINT java.lang.Long
Types.FLOAT REAL, FLOAT java.lang.Float
Types.DOUBLE DOUBLE java.lang.Double
Types.DECIMAL DECIMAL java.math.BigDecimal
Types.SQL_DATE DATE java.sql.Date
Types.SQL_TIME TIME java.sql.Time
Types.SQL_TIMESTAMP TIMESTAMP(3) java.sql.Timestamp
Types.INTERVAL_MONTHS INTERVAL YEAR TO MONTH java.lang.Integer
Types.INTERVAL_MILLIS INTERVAL DAY TO SECOND(3) java.lang.Long
Types.PRIMITIVE_ARRAY ARRAY e.g. int[]
Types.OBJECT_ARRAY ARRAY e.g. java.lang.Byte[]
Types.MAP MAP java.util.HashMap
Types.MULTISET MULTISET e.g. java.util.HashMap&lt;String, Integer&gt; for a multiset of String
Types.ROW ROW org.apache.flink.types.Row

泛型类型和(嵌套的)复合类型(例如POJOs, tuples, rows, Scala case classes)也可以是一行的字段。

可以使用value access functions访问具有任意嵌套的复合类型字段。

泛型类型被视为一个黑盒子,可以通过用户定义函数传递或处理。

保留关键字

虽然还没有实现所有 SQL 特性,但是一些字符串组合已经被保留为关键字,以供将来使用。如果您想使用下列字符串之一作为字段名,请确保用反引号(例如 value, count)。

  1. A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE