概念和通用API

表API和SQL集成在一个联合API中。这个API的核心概念是“Table”,它用作查询的输入和输出。本文档展示了带有表API和SQL查询的程序的公共结构、如何注册“表”、如何查询“表”以及如何发出“表”。

表API和SQL程序的结构

用于批处理和流式处理的所有表API和SQL程序都遵循相同的模式。下面的代码示例显示了表API和SQL程序的公共结构。

  1. //创建TableEnvironment
  2. //对于批处理程序,使用executionenvironment而不是streamexecutionenvironmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  4. / /注册一个table
  5. tableEnv.registerTable("table1", ...) // or
  6. tableEnv.registerTableSource("table2", ...); // or
  7. tableEnv.registerExternalCatalog("extCat", ...);
  8. //注册输出表
  9. tableEnv.registerTableSink("outputTable", ...);
  10. //从表API查询创建表
  11. Table tapiResult = tableEnv.scan("table1").select(...);
  12. //从SQL查询创建表
  13. Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
  14. //向TableSink发出一个表API结果表,与SQL结果相同
  15. tapiResult.insertInto("outputTable");
  16. // execute
  17. env.execute();
  1. // for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. // create a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // register a Table tableEnv.registerTable("table1", ...) // or tableEnv.registerTableSource("table2", ...) // or tableEnv.registerExternalCatalog("extCat", ...)
  4. // register an output Table tableEnv.registerTableSink("outputTable", ...);
  5. // create a Table from a Table API query val tapiResult = tableEnv.scan("table1").select(...)
  6. // Create a Table from a SQL query val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
  7. // emit a Table API result Table to a TableSink, same for SQL result tapiResult.insertInto("outputTable")
  8. // execute env.execute()

注意: 表API和SQL查询可以很容易地与数据流或数据集程序集成并嵌入其中。查看[与数据流和数据集API集成]部分,了解如何将数据流和数据集转换为表,反之亦然。

创建表环境

“TableEnvironment”是表API和SQL集成的核心概念。它负责:

  • 在内部目录中注册“table”
  • 注册外部目录
  • 执行SQL查询
  • 注册用户定义的(标量、表或聚合)函数
  • 将“DataStream”或“dataset”转换为“table”`
  • 保存对“ExecutionEnvironment”或“StreamExecutionEnvironment”的引用`

“table”始终绑定到特定的“TableEnvironment”。不能在同一查询中组合不同表环境的表,例如联接或联合它们。 “TableEnvironment”是通过使用“StreamExecutionEnvironment”或“Executionenvironment”以及可选的“tableconfig”调用静态的“TableEnvironment.getTableenvironment()”方法创建的。“tableconfig”可用于配置“TableEnvironment”或自定义查询优化和转换过程(请参见[查询优化](查询优化))。

  1. // ***************
  2. // STREAMING QUERY
  3. // ***************
  4. StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  5. // create a TableEnvironment for streaming queries
  6. StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);
  7. // ***********
  8. // BATCH QUERY
  9. // ***********
  10. ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
  11. // create a TableEnvironment for batch queries
  12. BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
  1. // ***************
  2. // STREAMING QUERY
  3. // *************** val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
  4. // create a TableEnvironment for streaming queries val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
  5. // ***********
  6. // BATCH QUERY
  7. // *********** val bEnv = ExecutionEnvironment.getExecutionEnvironment
  8. // create a TableEnvironment for batch queries val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

在目录中注册表

“TableEnvironment”维护按名称注册的表的目录。有两种类型的表:“输入表”和“输出表”。可以在表API和SQL查询中引用输入表,并提供输入数据。输出表可用于向外部系统发出表API或SQL查询的结果。 输入表可以从各种来源注册:

  • 现有的“table”对象,通常是表API或SQL查询的结果。
  • “TableSOurce”访问外部数据,如文件、数据库或消息系统。
  • 来自数据流或数据集程序的“DataStream”或“dataset”。在[与数据流和数据集API集成]部分中讨论了注册“DataStream”或“dataset”。 可以使用“tablesink”注册输出表。

    注册表

    “table”在“tableenvironment”中注册,如下所示:
  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // Table is the result of a simple projection query
  4. Table projTable = tableEnv.scan("X").select(...);
  5. // register the Table projTable as table "projectedX"
  6. tableEnv.registerTable("projectedTable", projTable);
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. // Table is the result of a simple projection query val projTable: Table = tableEnv.scan("X").select(...)
  3. // register the Table projTable as table "projectedX" tableEnv.registerTable("projectedTable", projTable)

注意: 已注册的“table”与关系数据库系统中的“view”类似,即定义“table”的查询未优化,但在另一个查询引用已注册的“table”时将被内联。如果多个查询引用同一个已注册的“table”,则将为每个引用查询内联并执行多次,即不共享已注册的“table”的结果。

注册TableSource

“TableSource”提供对存储在存储系统中的外部数据的访问,例如数据库(mysql、hbase,…)、具有特定编码的文件(csv、apache[parquet、avro、orc]…)或消息系统(apache kafka、rabbitmq,…)。

Flink旨在为通用数据格式和存储系统提供表源。请查看[table sources and sinks](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sourcesinks.html)页面,以获取支持的表源列表以及如何构建自定义“tablesource”的说明。

“TableSOurce”在“TableEnvironment”中注册,如下所示:

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // create a TableSource
  4. TableSource csvSource = new CsvTableSource("/path/to/file", ...);
  5. // register the TableSource as table "CsvTable"
  6. tableEnv.registerTableSource("CsvTable", csvSource);
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. // create a TableSource val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
  3. // register the TableSource as table "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource)

注册TableSink

已注册的“tablesink”可用于[向外部存储系统(如数据库、键值存储、消息队列或文件系统)发送表API或SQL查询的结果(common.html emit-a-table)(在不同的编码中,如csv、apache[parquet、avro、orc]…)。

Flink的目标是为常用的数据格式和存储系统提供表格接收器。有关可用接收器的详细信息以及如何实现自定义“TableSink”的说明,请参见[表源和接收器](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sourcesinks.html)页面。

“TableSink”在“TableEnvironment”中注册,如下所示:

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // create a TableSink
  4. TableSink csvSink = new CsvTableSink("/path/to/file", ...);
  5. // define the field names and types
  6. String[] fieldNames = {"a", "b", "c"};
  7. TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
  8. // register the TableSink as table "CsvSinkTable"
  9. tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. // create a TableSink val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
  3. // define the field names and types val fieldNames: Array[String] = Array("a", "b", "c")
  4. val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
  5. // register the TableSink as table "CsvSinkTable" tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

注册外部目录

外部目录可以提供有关外部数据库和表的信息,例如它们的名称、模式、统计信息以及如何访问存储在外部数据库、表或文件中的数据的信息。

外部目录可以通过实现“externalCatalog”接口创建,并在“tableEnvironment”中注册,如下所示:

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // create an external catalog
  4. ExternalCatalog catalog = new InMemoryExternalCatalog();
  5. // register the ExternalCatalog catalog
  6. tableEnv.registerExternalCatalog("InMemCatalog", catalog);
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. // create an external catalog val catalog: ExternalCatalog = new InMemoryExternalCatalog
  3. // register the ExternalCatalog catalog tableEnv.registerExternalCatalog("InMemCatalog", catalog)

一旦在“TableEnvironment”中注册,“ExternalCatalog”中定义的所有表都可以通过指定其完整路径(如“catalog.database.table”)从表API或SQL查询访问。

目前,Flink为演示和测试提供了一个“InMemoryExternalCatalog”。但是,“externalCatalog”接口也可用于将目录(如hcatalog或metastore)连接到表API。

查询一个表

表的API

表API是用于Scala和Java的语言集成查询API。与SQL不同,查询不是作为字符串指定的,而是在宿主语言中逐步组成的。

该API基于“table”类,该类表示一个表(流式处理或批处理),并提供应用关系操作的方法。这些方法返回一个新的“table”对象,它表示对输入“table”应用关系操作的结果。有些关系操作由多个方法调用组成,如“table.groupby(…).select(),其中“groupby(…)指定了“table”的分组,而“select(…)`指定了“table”分组上的投影。

[table api](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/table api.html)文档描述了流式和批处理表支持的所有table api操作。 下面的示例显示了一个简单的表API聚合查询:

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // register Orders table
  4. // scan registered Orders table
  5. Table orders = tableEnv.scan("Orders");
  6. // compute revenue for all customers from France
  7. Table revenue = orders
  8. .filter("cCountry === 'FRANCE'")
  9. .groupBy("cID, cName")
  10. .select("cID, cName, revenue.sum AS revSum");
  11. // emit or convert Table
  12. // execute query
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. // register Orders table
  3. // scan registered Orders table val orders = tableEnv.scan("Orders")
  4. // compute revenue for all customers from France val revenue = orders
  5. .filter('cCountry === "FRANCE")
  6. .groupBy('cID, 'cName)
  7. .select('cID, 'cName, 'revenue.sum AS 'revSum)
  8. // emit or convert Table // execute query

注意:scala表API使用scala符号,该符号以单勾号(`)开头,以引用“table”的属性。表API使用scala隐式。确保导入'org.apache.flink.api.scala.和’org.apache.flink.table.api.scala.`以便使用scala隐式转换。

SQL

Flink的SQL集成基于实现SQL标准的[apache-calcite](https://calcite.apache.org)。SQL查询被指定为常规字符串。

[sql](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sql.html)文档描述了Flink对流式和批处理表的SQL支持。

下面的示例演示如何指定查询并将结果作为“table”返回。

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // register Orders table
  4. // compute revenue for all customers from France
  5. Table revenue = tableEnv.sqlQuery(
  6. "SELECT cID, cName, SUM(revenue) AS revSum " +
  7. "FROM Orders " +
  8. "WHERE cCountry = 'FRANCE' " +
  9. "GROUP BY cID, cName"
  10. );
  11. // emit or convert Table
  12. // execute query
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. // register Orders table
  3. // compute revenue for all customers from France val revenue = tableEnv.sqlQuery("""
  4. |SELECT cID, cName, SUM(revenue) AS revSum
  5. |FROM Orders
  6. |WHERE cCountry = 'FRANCE'
  7. |GROUP BY cID, cName
  8. """.stripMargin)
  9. // emit or convert Table // execute query

下面的示例演示如何指定将其结果插入已注册表的更新查询。

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // register "Orders" table
  4. // register "RevenueFrance" output table
  5. // compute revenue for all customers from France and emit to "RevenueFrance"
  6. tableEnv.sqlUpdate(
  7. "INSERT INTO RevenueFrance " +
  8. "SELECT cID, cName, SUM(revenue) AS revSum " +
  9. "FROM Orders " +
  10. "WHERE cCountry = 'FRANCE' " +
  11. "GROUP BY cID, cName"
  12. );
  13. // execute query
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. // register "Orders" table
  3. // register "RevenueFrance" output table
  4. // compute revenue for all customers from France and emit to "RevenueFrance" tableEnv.sqlUpdate("""
  5. |INSERT INTO RevenueFrance
  6. |SELECT cID, cName, SUM(revenue) AS revSum
  7. |FROM Orders
  8. |WHERE cCountry = 'FRANCE'
  9. |GROUP BY cID, cName
  10. """.stripMargin)
  11. // execute query

混合表API和SQL

表API和SQL查询可以很容易地混合使用,因为它们都返回“table”对象:

  • 可以在SQL查询返回的“table”对象上定义表API查询。
  • 通过在“tableenvironment”中[注册结果表](register-a-table),并在SQL查询的“from”子句中引用,可以对表API查询的结果定义SQL查询。

发出一张表

“table”是通过将其写入“tablesink”而发出的。“tablesink”是一个通用接口,支持各种文件格式(如csv、apache parquet、apache avro)、存储系统(如jdbc、apache hbase、apache cassandra、elasticsearch)或消息系统(如apache kafka、rabbitmq)。

批处理“table”只能写入“batchTableSink”,而流式“table”需要“appendstreamTableSink”、“retractstreamTableSink”或“upsertstreamTableSink”。

有关可用接收器的详细信息以及如何实现自定义“tablesink”的说明,请参阅有关[表源和接收器](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sourcesinks.html)的文档。

table.insertinto(string tablename)方法将“table”发送到已注册的“tablesink”。该方法按名称从目录中查找“tablesink”,并验证“table”的架构是否与“tablesink”的架构相同。

以下示例显示如何发出“table”:

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // create a TableSink
  4. TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
  5. // register the TableSink with a specific schema
  6. String[] fieldNames = {"a", "b", "c"};
  7. TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
  8. tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
  9. // compute a result Table using Table API operators and/or SQL queries
  10. Table result = ...
  11. // emit the result Table to the registered TableSink
  12. result.insertInto("CsvSinkTable");
  13. // execute the program
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. // create a TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
  3. // register the TableSink with a specific schema val fieldNames: Array[String] = Array("a", "b", "c")
  4. val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
  5. tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
  6. // compute a result Table using Table API operators and/or SQL queries val result: Table = ...
  7. // emit the result Table to the registered TableSink result.insertInto("CsvSinkTable")
  8. // execute the program

翻译并执行查询

表API和SQL查询根据输入是流输入还是批输入,被转换为[DataStream](//ci.apache.org/projects/flink-docs-release-1.7/dev/datastream_api.html)或[dataset](//ci apache.org/projects/flink/flink-docs-release-1.7/dev/batch)程序。查询在内部表示为逻辑查询计划,并分两个阶段进行转换:

1.优化逻辑计划, 2.转换为数据流或数据集程序。

表API或SQL查询在以下情况下转换:

  • “table”被发送到“tablesink”,即当调用“table.insertinto()”时。
  • 指定了SQL更新查询,即当调用’tableenvironment.sql update()’时。
  • “table”转换为“DataStream”或“dataset”(请参见[与DataStream和dataset api集成](与DataStream和dataset api集成))。 转换后,表API或SQL查询将像常规数据流或数据集程序一样处理,并在调用“streamExecutionenvironment.execute()”或“executionenvironment.execute()”时执行。

与数据流和数据集API集成

表API和SQL查询可以很容易地与[DataStream](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/datastream_api.html)和[dataset](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch)程序集成和嵌入。例如,可以查询一个外部表(例如从RDBMS),进行一些预处理,例如筛选、投影、聚合或与元数据结合,然后使用数据流或数据集API(以及在这些API之上构建的任何库,例如CEP或GELLY)进一步处理数据。相反,表API或SQL查询也可以应用于数据流或数据集程序的结果。

这种交互可以通过将“DataStream”或“DataSet”转换为“table”来实现,反之亦然。在本节中,我们将描述如何完成这些转换。

scala的隐式转换

scala表API为“DataSet”、“DataStream”和“table”类提供隐式转换。除了scala数据流api的’org.apache.flink.table.api.scala.’外,还可以导入包’org.apache.flink.api.scala.’来启用这些转换。

将数据流或数据集注册为表

“DataStream”或“DataSet”可以在“TableEnvironment”中注册为表。结果表的架构取决于已注册的“datastream”或“dataset”的数据类型。有关详细信息,请查看[数据类型到表架构的映射]部分(数据类型到表架构的映射)。

  1. // get StreamTableEnvironment
  2. // registration of a DataSet in a BatchTableEnvironment is equivalent
  3. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  4. DataStream<Tuple2<Long, String>> stream = ...
  5. // register the DataStream as Table "myTable" with fields "f0", "f1"
  6. tableEnv.registerDataStream("myTable", stream);
  7. // register the DataStream as table "myTable2" with fields "myLong", "myString"
  8. tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
  1. // get TableEnvironment
  2. // registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. val stream: DataStream[(Long, String)] = ...
  4. // register the DataStream as Table "myTable" with fields "f0", "f1" tableEnv.registerDataStream("myTable", stream)
  5. // register the DataStream as table "myTable2" with fields "myLong", "myString" tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)

注意:datastreamtable`的名称不能与`^ datastream table`[0-9]+`模式匹配,` dataset table的名称不能与^ dataset table[0-9]+`模式匹配。这些模式仅供内部使用。

将数据流或数据集转换为表

它也可以直接转换为“table”,而不是在“TableEnvironment”中注册“DataStream”或“DataSet”。如果要在表API查询中使用表,这很方便。

  1. // get StreamTableEnvironment
  2. // registration of a DataSet in a BatchTableEnvironment is equivalent
  3. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  4. DataStream<Tuple2<Long, String>> stream = ...
  5. // Convert the DataStream into a Table with default fields "f0", "f1"
  6. Table table1 = tableEnv.fromDataStream(stream);
  7. // Convert the DataStream into a Table with fields "myLong", "myString"
  8. Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
  1. // get TableEnvironment
  2. // registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. val stream: DataStream[(Long, String)] = ...
  4. // convert the DataStream into a Table with default fields '_1, '_2 val table1: Table = tableEnv.fromDataStream(stream)
  5. // convert the DataStream into a Table with fields 'myLong, 'myString val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

将表转换为数据流或数据集

“table”可以转换为“datastream”或“dataset”。通过这种方式,可以对表API或SQL查询的结果运行自定义数据流或数据集程序。

将“table”转换为“datastream”或“dataset”时,需要指定生成的“datastream”或“dataset”的数据类型,即要将“table”的行转换为的数据类型。通常最方便的转换类型是“row”。以下列表概述了不同选项的功能:

  • :字段按位置、任意数量的字段进行映射,支持“空”值,没有类型安全访问。
  • pojo:字段按名称映射(pojo字段必须命名为“table”字段),字段数量任意,支持“null”值,键入safe access。
  • case class:字段按位置映射,不支持“null”值,请键入safe access。
  • 元组*:字段由位置映射,限制为22(Scala)或25(Java)字段,不支持“NULL”值,类型安全访问。
  • 原子类型table必须有单个字段,不支持’null’值,类型安全访问。

将表转换为数据流

流式查询的结果是“table”,它将动态更新,即,当新记录到达查询的输入流时,它将发生更改。因此,将这种动态查询转换为的“DataStream”需要对表的更新进行编码。

将“table”转换为“DataStream”有两种模式:

1.追加模式:只有当动态“table”只被“insert”更改时,即它是“append only”,以前发出的结果从不更新时,才能使用此模式。

2.收回模式:始终可以使用此模式。它用“boolean”标志对“insert”和“delete”更改进行编码。

  1. // get StreamTableEnvironment.
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // Table with two fields (String name, Integer age)
  4. Table table = ...
  5. // convert the Table into an append DataStream of Row by specifying the class
  6. DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
  7. // convert the Table into an append DataStream of Tuple2<String, Integer>
  8. // via a TypeInformation
  9. TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  10. Types.STRING(),
  11. Types.INT());
  12. DataStream<Tuple2<String, Integer>> dsTuple =
  13. tableEnv.toAppendStream(table, tupleType);
  14. // convert the Table into a retract DataStream of Row.
  15. // A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
  16. // The boolean field indicates the type of the change.
  17. // True is INSERT, false is DELETE.
  18. DataStream<Tuple2<Boolean, Row>> retractStream =
  19. tableEnv.toRetractStream(table, Row.class);
  1. // get TableEnvironment.
  2. // registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // Table with two fields (String name, Integer age) val table: Table = ...
  4. // convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
  5. // convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple =
  6. tableEnv.toAppendStream[(String, Int)](table)
  7. // convert the Table into a retract DataStream of Row.
  8. // A retract stream of type X is a DataStream[(Boolean, X)].
  9. // The boolean field indicates the type of the change.
  10. // True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

注: 有关动态表及其属性的详细讨论,请参见[动态表](streaming/dynamic_tables.html)文档。

将表转换为数据集

“table”转换为“dataset”,如下所示:

  1. // get BatchTableEnvironment
  2. BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // Table with two fields (String name, Integer age)
  4. Table table = ...
  5. // convert the Table into a DataSet of Row by specifying a class
  6. DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
  7. // convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
  8. TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  9. Types.STRING(),
  10. Types.INT());
  11. DataSet<Tuple2<String, Integer>> dsTuple =
  12. tableEnv.toDataSet(table, tupleType);
  1. // get TableEnvironment
  2. // registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // Table with two fields (String name, Integer age) val table: Table = ...
  4. // convert the Table into a DataSet of Row val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
  5. // convert the Table into a DataSet of Tuple2[String, Int] val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

将数据类型映射到表架构

Flink的数据流和数据集API支持非常多样的类型。复合类型(如Tuples(内置Scala和FLink Java元组)、POJOs、scala case类和弗林克的行类型)允许嵌套的数据结构,它们可以在表表达式中访问多个字段。其他类型被视为原子类型。在下面的内容中,我们将描述表API如何将这些类型转换为内部行表示,并显示将“datastream”转换为“table”的示例。

数据类型到表模式的映射有两种方式:基于字段位置或基于字段名称

基于位置的映射

基于位置的映射可用于在保持字段顺序的同时为字段提供更有意义的名称。此映射可用于具有定义的字段顺序和原子类型的复合数据类型。元组、行和case类等复合数据类型具有这样的字段顺序。但是,必须基于字段名映射POJO的字段(请参见下一节)。

定义基于位置的映射时,输入数据类型中不能存在指定的名称,否则API将假定映射应基于字段名称进行。如果未指定字段名,则使用复合类型的默认字段名和字段顺序,或“f0”表示原子类型。

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. DataStream<Tuple2<Long, Integer>> stream = ...
  4. // convert DataStream into Table with default field names "f0" and "f1"
  5. Table table = tableEnv.fromDataStream(stream);
  6. // convert DataStream into Table with field names "myLong" and "myInt"
  7. Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. val stream: DataStream[(Long, Int)] = ...
  3. // convert DataStream into Table with default field names "_1" and "_2" val table: Table = tableEnv.fromDataStream(stream)
  4. // convert DataStream into Table with field names "myLong" and "myInt" val table: Table = tableEnv.fromDataStream(stream, 'myLong 'myInt)

基于名称的映射 基于名称的映射可以用于任何数据类型,包括POJO。它是定义表模式映射的最灵活的方法。映射中的所有字段都由名称引用,并且可以使用别名“as”重命名。字段可以重新排序和投影。

如果未指定字段名,则使用复合类型的默认字段名和字段顺序,或“f0”表示原子类型。

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. DataStream<Tuple2<Long, Integer>> stream = ...
  4. // convert DataStream into Table with default field names "f0" and "f1"
  5. Table table = tableEnv.fromDataStream(stream);
  6. // convert DataStream into Table with field "f1" only
  7. Table table = tableEnv.fromDataStream(stream, "f1");
  8. // convert DataStream into Table with swapped fields
  9. Table table = tableEnv.fromDataStream(stream, "f1, f0");
  10. // convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
  11. Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. val stream: DataStream[(Long, Int)] = ...
  3. // convert DataStream into Table with default field names "_1" and "_2" val table: Table = tableEnv.fromDataStream(stream)
  4. // convert DataStream into Table with field "_2" only val table: Table = tableEnv.fromDataStream(stream, '_2)
  5. // convert DataStream into Table with swapped fields val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)
  6. // convert DataStream into Table with swapped fields and field names "myInt" and "myLong" val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myInt, '_1 as 'myLong)

原子类型

Flink将基本类型(integerdoublestring)或泛型类型(无法分析和分解的类型)视为原子类型。原子类型的“Datastream”或“Dataset”转换为具有单个属性的“table”。属性的类型是从原子类型推断出来的,并且可以指定属性的名称。

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. DataStream<Long> stream = ...
  4. // convert DataStream into Table with default field name "f0"
  5. Table table = tableEnv.fromDataStream(stream);
  6. // convert DataStream into Table with field name "myLong"
  7. Table table = tableEnv.fromDataStream(stream, "myLong");
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. val stream: DataStream[Long] = ...
  3. // convert DataStream into Table with default field name "f0" val table: Table = tableEnv.fromDataStream(stream)
  4. // convert DataStream into Table with field name "myLong" val table: Table = tableEnv.fromDataStream(stream, 'myLong)

元组 (Scala and Java) 和 Case 类 (Scala )

FLink支持Scala的内置元组,并为Java提供了自己的元组类。这两种元组的数据流和数据集都可以转换为表。通过为所有字段提供名称(基于位置的映射),可以重命名字段。如果未指定字段名,则使用默认字段名。如果引用了原始字段名(f0f1、…(对于flink tuples)和u 1`、u 2、…(对于scala tuples),则API假定映射是基于名称而不是基于位置的。基于名称的映射允许使用别名(as`)重新排序字段和投影。

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. DataStream<Tuple2<Long, String>> stream = ...
  4. // convert DataStream into Table with default field names "f0", "f1"
  5. Table table = tableEnv.fromDataStream(stream);
  6. // convert DataStream into Table with renamed field names "myLong", "myString" (position-based)
  7. Table table = tableEnv.fromDataStream(stream, "myLong, myString");
  8. // convert DataStream into Table with reordered fields "f1", "f0" (name-based)
  9. Table table = tableEnv.fromDataStream(stream, "f1, f0");
  10. // convert DataStream into Table with projected field "f1" (name-based)
  11. Table table = tableEnv.fromDataStream(stream, "f1");
  12. // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
  13. Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'");
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. val stream: DataStream[(Long, String)] = ...
  3. // convert DataStream into Table with renamed default field names '_1, '_2 val table: Table = tableEnv.fromDataStream(stream)
  4. // convert DataStream into Table with field names "myLong", "myString" (position-based) val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
  5. // convert DataStream into Table with reordered fields "_2", "_1" (name-based) val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)
  6. // convert DataStream into Table with projected field "_2" (name-based) val table: Table = tableEnv.fromDataStream(stream, '_2)
  7. // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based) val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong)
  8. // define case class case class Person(name: String, age: Int)
  9. val streamCC: DataStream[Person] = ...
  10. // convert DataStream into Table with default field names 'name, 'age val table = tableEnv.fromDataStream(streamCC)
  11. // convert DataStream into Table with field names 'myName, 'myAge (position-based) val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
  12. // convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)

POJO (Java 和 Scala)

Flink支持pojos作为复合类型。确定POJO的规则记录在[这里](/ci.apache.org/projects/flink/flink-docs-release-1.7/dev/api-concepts.html pojos)。

如果将Pojo Datastream'或 Dataset’转化为` Table’,而没有具体的名称,则最初的Pojo Fields的名称被使用。名称映射需要原始名称,不能按位置进行。可以使用别名(使用“as”关键字)、重新排序和投影来重命名字段。

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // Person is a POJO with fields "name" and "age"
  4. DataStream<Person> stream = ...
  5. // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
  6. Table table = tableEnv.fromDataStream(stream);
  7. // convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
  8. Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
  9. // convert DataStream into Table with projected field "name" (name-based)
  10. Table table = tableEnv.fromDataStream(stream, "name");
  11. // convert DataStream into Table with projected and renamed field "myName" (name-based)
  12. Table table = tableEnv.fromDataStream(stream, "name as myName");
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. // Person is a POJO with field names "name" and "age" val stream: DataStream[Person] = ...
  3. // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!) val table: Table = tableEnv.fromDataStream(stream)
  4. // convert DataStream into Table with renamed fields "myAge", "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)
  5. // convert DataStream into Table with projected field "name" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name)
  6. // convert DataStream into Table with projected and renamed field "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

ROW

“row”数据类型支持任意数量的字段和值为“null”的字段。字段名可以通过“RowTypeInfo”或在将“row`datastream或’dataset`转换为“table”时指定。行类型支持按位置和名称映射字段。可以通过为所有字段提供名称(基于位置的映射)来重命名字段,也可以单独选择进行投影/排序/重命名(基于名称的映射)。

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
  4. DataStream<Row> stream = ...
  5. // convert DataStream into Table with default field names "name", "age"
  6. Table table = tableEnv.fromDataStream(stream);
  7. // convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
  8. Table table = tableEnv.fromDataStream(stream, "myName, myAge");
  9. // convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
  10. Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
  11. // convert DataStream into Table with projected field "name" (name-based)
  12. Table table = tableEnv.fromDataStream(stream, "name");
  13. // convert DataStream into Table with projected and renamed field "myName" (name-based)
  14. Table table = tableEnv.fromDataStream(stream, "name as myName");
  1. // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
  2. // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo` val stream: DataStream[Row] = ...
  3. // convert DataStream into Table with default field names "name", "age" val table: Table = tableEnv.fromDataStream(stream)
  4. // convert DataStream into Table with renamed field names "myName", "myAge" (position-based) val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
  5. // convert DataStream into Table with renamed fields "myName", "myAge" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
  6. // convert DataStream into Table with projected field "name" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name)
  7. // convert DataStream into Table with projected and renamed field "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

查询优化

ApacheFlink利用Apache来优化和转换查询。目前执行的优化包括投影和过滤器下推、子查询去相关和其他类型的查询重写。Flink还没有优化联接的顺序,而是按照查询中定义的相同顺序(从子句中表的顺序和/或从子句中联接谓词的顺序)执行它们。 通过提供“CalciteConfig”对象,可以调整在不同阶段应用的优化规则集。这可以通过调用“CalciteConfig.createbuilder())”通过生成器创建,并通过调用“tableEnv.getConfig.setCalciteConfig(calciteConfig)”提供给TableEnvironment。

解释Table

表API提供了一种机制来解释计算“table”的逻辑和优化查询计划。这是通过“tableenvironment.explain(table)”方法完成的。它返回一个字符串,描述三个计划:

  • 1.关系查询的抽象语法树,即未优化的逻辑查询计划,
  • 2.优化的逻辑查询计划,以及
  • 3.实际执行计划。

下面的代码显示了一个示例和相应的输出:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
  3. DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
  4. DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
  5. Table table1 = tEnv.fromDataStream(stream1, "count, word");
  6. Table table2 = tEnv.fromDataStream(stream2, "count, word");
  7. Table table = table1
  8. .where("LIKE(word, 'F%')")
  9. .unionAll(table2);
  10. String explanation = tEnv.explain(table);
  11. System.out.println(explanation);
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tEnv = TableEnvironment.getTableEnvironment(env)
  3. val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
  4. val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
  5. val table = table1
  6. .where('word.like("F%"))
  7. .unionAll(table2)
  8. val explanation: String = tEnv.explain(table)
  9. println(explanation)
  1. == Abstract Syntax Tree ==
  2. LogicalUnion(all=[true])
  3. LogicalFilter(condition=[LIKE($1, 'F%')])
  4. LogicalTableScan(table=[[_DataStreamTable_0]])
  5. LogicalTableScan(table=[[_DataStreamTable_1]])
  6. == Optimized Logical Plan ==
  7. DataStreamUnion(union=[count, word])
  8. DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
  9. DataStreamScan(table=[[_DataStreamTable_0]])
  10. DataStreamScan(table=[[_DataStreamTable_1]])
  11. == Physical Execution Plan ==
  12. Stage 1 : Data Source
  13. content : collect elements with CollectionInputFormat
  14. Stage 2 : Data Source
  15. content : collect elements with CollectionInputFormat
  16. Stage 3 : Operator
  17. content : from: (count, word)
  18. ship_strategy : REBALANCE
  19. Stage 4 : Operator
  20. content : where: (LIKE(word, 'F%')), select: (count, word)
  21. ship_strategy : FORWARD
  22. Stage 5 : Operator
  23. content : from: (count, word)
  24. ship_strategy : REBALANCE