1. Table API & SQL 介绍

1.1 为什么需要Table API & SQL

Flink的Table模块包括 Table API 和 SQL:
Table API 是一种类SQL的API,通过Table API,用户可以像操作表一样操作数据,非常直观和方便
SQL作为一种声明式语言,有着标准的语法和规范,用户可以不用关心底层实现即可进行数据的处理,非常易于上手
Flink Table API 和 SQL 的实现上有80%左右的代码是公用的。作为一个流批统一的计算引擎,Flink 的 Runtime 层是统一的。
Table API & SQL的特点
Flink之所以选择将Table API & SQL 作为未来的核心 API,是因为其具有一些非常重要的特点:
1. 声明式:属于设定式语言,用户只要表达清楚需求即可,不需要了解底层执行;
2. 高性能:可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划;
3. 简单易学:易于理解,不同行业和领域的人都懂,学习成本较低;
4. 标准稳定:语义遵循SQL标准,非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少;
5. 流批统一:可以做到API层面上流与批的统一,相同的SQL逻辑,既可流模式运行,也可批模式运行,Flink底层Runtime本身就是一个流与批统一的引擎;

1.2 Table API& SQL发展历程

架构升级:
自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。随着版本的不断更新,API 也出现了很多不兼容的地方。
在 Flink 1.9 中,Table 模块迎来了核心架构的升级,引入了阿里巴巴Blink团队贡献的诸多功能;
图片2.png
在Flink 1.9 之前,Flink API 层 一直分为DataStream API 和 DataSet API,Table API & SQL 位于 DataStream API 和 DataSet API 之上。可以看处流处理和批处理有各自独立的api (流处理DataStream,批处理DataSet)。而且有不同的执行计划解析过程,codegen过程也完全不一样,完全没有流批一体的概念,面向用户不太友好。
在Flink1.9之后新的架构中,有两个查询处理器:Flink Query Processor,也称作Old Planner和Blink Query Processor,也称作Blink Planner。为了兼容老版本Table及SQL模块,插件化实现了Planner,Flink原有的Flink Planner不变,后期版本会被移除。新增加了Blink Planner,新的代码及特性会在Blink planner模块上实现。批或者流都是通过解析为Stream Transformation来实现的,不像Flink Planner,批是基于Dataset,流是基于DataStream。

查询处理器的选择:
查询处理器是Planner 的具体实现,通过parser、optimizer、codegen(代码生成技术)等流程将 Table API & SQL作业转换成 Flink Runtime 可识别的 Transformation DAG,最终由 Flink Runtime 进行作业的调度和执行。
Flink Query Processor查询处理器针对流计算和批处理作业有不同的分支处理,流计算作业底层的 API 是 DataStream API, 批处理作业底层的 API 是 DataSet API
Blink Query Processor查询处理器则实现流批作业接口的统一,底层的 API 都是Transformation,这就意味着我们和Dataset完全没有关系了
Flink1.11之后Blink Query Processor查询处理器已经是默认的了

  1. Blink 将批处理作业视为流式处理的一种特殊情况。因此也不支持Table和DataSet之间的转换,批处理作业不会翻译成DateSet程序而是翻译成DataStream程序,和流式作业一样。
  2. Blink 规划器不支持BatchTableSource,使用 boundedStreamTableSource代替它。
  3. FilterableTableSourceold planner 和 Blink planner 的实现不兼容。旧规划器会将 s 下推PlannerExpression到FilterableTableSource中,而 Blink 规划器会将 s 下推Expression。
  4. 基于字符串的键值配置选项(请参阅有关配置的文档了解详细信息)仅用于 Blink 规划器。
  5. 两个规划器中的 implementation( CalciteConfig)PlannerConfig是不同的。
  6. TableEnvironmentBlink 规划器将在和上将多个接收器优化为一个 DAG StreamTableEnvironment。旧的规划器总是将每个汇优化成一个新的 DAG,其中所有 DAG 相互独立。
  7. 旧的规划器现在不支持目录统计,而 Blink 规划器支持。

1.1 注意:

API稳定性: Flink1.11版本,Flink1.12版本则进行了优化
性能对比:
注意:目前FlinkSQL性能不如SparkSQL,未来FlinkSQL可能会越来越好
下图是Hive、Spark、Flink的SQL执行速度对比:
image.png

2. 案例准备

2.1 依赖

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
  4. <version>${flink.version}</version>
  5. <scope>provided</scope>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  10. <version>${flink.version}</version>
  11. <scope>provided</scope>
  12. </dependency>
  13. <!-- flink执行计划,这是1.9版本之前的-->
  14. <dependency>
  15. <groupId>org.apache.flink</groupId>
  16. <artifactId>flink-table-planner_2.12</artifactId>
  17. <version>${flink.version}</version>
  18. </dependency>
  19. <!-- blink执行计划,1.11+默认的-->
  20. <dependency>
  21. <groupId>org.apache.flink</groupId>
  22. <artifactId>flink-table-planner-blink_2.12</artifactId>
  23. <version>${flink.version}</version>
  24. <scope>provided</scope>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.flink</groupId>
  28. <artifactId>flink-table-common</artifactId>
  29. <version>${flink.version}</version>
  30. <scope>provided</scope>
  31. </dependency>

● flink-table-common:这个包中主要是包含 Flink Planner 和 Blink Planner一些共用的代码。
● flink-table-api-java:这部分是用户编程使用的 API,包含了大部分的 API。
● flink-table-api-scala:这里只是非常薄的一层,仅和 Table API 的 Expression 和 DSL 相关。
● 两个 Planner:flink-table-planner 和 flink-table-planner-blink。
● 两个 Bridge:flink-table-api-scala-bridge 和 flink-table-api-java-bridge,
Flink Planner 和 Blink Planner 都会依赖于具体的 JavaAPI,也会依赖于具体的 Bridge,通过 Bridge 可以将 API 操作相应的转化为Scala 的 DataStream、DataSet,或者转化为 JAVA 的 DataStream 或者Data Set

2.2 程序结构

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#structure-of-table-api-and-sql-programs

  1. // create a TableEnvironment for specific planner batch or streaming
  2. TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
  3. // create a Table
  4. tableEnv.connect(...).createTemporaryTable("table1");
  5. // register an output Table
  6. tableEnv.connect(...).createTemporaryTable("outputTable");
  7. // create a Table object from a Table API query
  8. Table tapiResult = tableEnv.from("table1").select(...);
  9. // create a Table object from a SQL query
  10. Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
  11. // emit a Table API result Table to a TableSink, same for SQL result
  12. TableResult tableResult = tapiResult.executeInsert("outputTable");
  13. tableResult...

2.3 API

2.3.1 获取环境

  1. // **********************
  2. // BLINK STREAMING QUERY
  3. // **********************
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.table.api.EnvironmentSettings;
  6. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  7. StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  8. EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  9. StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
  10. // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
  11. // ******************
  12. // BLINK BATCH QUERY
  13. // ******************
  14. import org.apache.flink.table.api.EnvironmentSettings;
  15. import org.apache.flink.table.api.TableEnvironment;
  16. EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
  17. TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

2.3.2 创建表

  1. // get a TableEnvironment
  2. TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
  3. // table is the result of a simple projection query
  4. Table projTable = tableEnv.from("X").select(...);
  5. // register the Table projTable as table "projectedTable"
  6. tableEnv.createTemporaryView("projectedTable", projTable);
  1. tableEnvironment
  2. .connect(...)
  3. .withFormat(...)
  4. .withSchema(...)
  5. .inAppendMode()
  6. .createTemporaryTable("MyTable")

2.3.3 查询表

  1. // get a TableEnvironment
  2. TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
  3. // register Orders table
  4. // scan registered Orders table
  5. Table orders = tableEnv.from("Orders");// compute revenue for all customers from France
  6. Table revenue = orders
  7. .filter($("cCountry")
  8. .isEqual("FRANCE"))
  9. .groupBy($("cID"), $("cName")
  10. .select($("cID"), $("cName"), $("revenue")
  11. .sum()
  12. .as("revSum"));
  13. // emit or convert Table
  14. // execute query
  1. // get a TableEnvironment
  2. TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
  3. // register Orders table
  4. // compute revenue for all customers from France
  5. Table revenue = tableEnv.sqlQuery(
  6. "SELECT cID, cName, SUM(revenue) AS revSum " +
  7. "FROM Orders " +
  8. "WHERE cCountry = 'FRANCE' " +
  9. "GROUP BY cID, cName"
  10. );
  11. // emit or convert Table
  12. // execute query
  1. // get a TableEnvironment
  2. TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
  3. // register "Orders" table
  4. // register "RevenueFrance" output table
  5. // compute revenue for all customers from France and emit to "RevenueFrance"
  6. tableEnv.executeSql(
  7. "INSERT INTO RevenueFrance " +
  8. "SELECT cID, cName, SUM(revenue) AS revSum " +
  9. "FROM Orders " +
  10. "WHERE cCountry = 'FRANCE' " +
  11. "GROUP BY cID, cName"
  12. );

2.3.4 写出表

  1. // get a TableEnvironment
  2. TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
  3. // create an output Table
  4. final Schema schema = new Schema()
  5. .field("a", DataTypes.INT())
  6. .field("b", DataTypes.STRING())
  7. .field("c", DataTypes.BIGINT());
  8. tableEnv.connect(new FileSystem().path("/path/to/file"))
  9. .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
  10. .withSchema(schema)
  11. .createTemporaryTable("CsvSinkTable");
  12. // compute a result Table using Table API operators and/or SQL queries
  13. Table result = ...
  14. // emit the result Table to the registered TableSink
  15. result.executeInsert("CsvSinkTable");

2.3.5 与DataSet/DataStream集成

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api

  1. // get StreamTableEnvironment
  2. // registration of a DataSet in a BatchTableEnvironment is equivalent
  3. StreamTableEnvironment tableEnv = ...;
  4. // see "Create a TableEnvironment" section
  5. DataStream<Tuple2<Long, String>> stream = ...
  6. // register the DataStream as View "myTable" with fields "f0", "f1"
  7. tableEnv.createTemporaryView("myTable", stream);
  8. // register the DataStream as View "myTable2" with fields "myLong", "myString"
  9. tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));
  1. // get StreamTableEnvironment// registration of a DataSet in a BatchTableEnvironment is equivalent
  2. StreamTableEnvironment tableEnv = ...;
  3. // see "Create a TableEnvironment" section
  4. DataStream<Tuple2<Long, String>> stream = ...
  5. // Convert the DataStream into a Table with default fields "f0", "f1"
  6. Table table1 = tableEnv.fromDataStream(stream);
  7. // Convert the DataStream into a Table with fields "myLong", "myString"
  8. Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));

Convert a Table into a DataStream or DataSet:
Convert a Table into a DataStream:
Append Mode: This mode can only be used if the dynamic Table is only modified by INSERT changes, i.e, it is append-only and previously emitted results are never updated.
追加模式:只有当动态表仅通过插入更改进行修改时,才能使用此模式,即,它是仅追加模式,并且以前发出的结果从不更新。
Retract Mode: This mode can always be used. It encodes INSERT and DELETE changes with a boolean flag.
撤回模式:此模式始终可用。它使用布尔标志对插入和删除更改进行编码。

  1. // get StreamTableEnvironment.
  2. StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
  3. // Table with two fields (String name, Integer age)
  4. Table table = ...
  5. // convert the Table into an append DataStream of Row by specifying the class
  6. DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
  7. // convert the Table into an append DataStream of Tuple2<String, Integer>
  8. // via a TypeInformation
  9. TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  10. Types.STRING(),
  11. Types.INT());
  12. DataStream<Tuple2<String, Integer>> dsTuple =
  13. tableEnv.toAppendStream(table, tupleType);
  14. // convert the Table into a retract DataStream of Row.
  15. // A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
  16. // The boolean field indicates the type of the change.
  17. // True is INSERT, false is DELETE.
  18. DataStream<Tuple2<Boolean, Row>> retractStream =
  19. tableEnv.toRetractStream(table, Row.class);
  1. // get BatchTableEnvironment
  2. BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
  3. // Table with two fields (String name, Integer age)
  4. Table table = ...
  5. // convert the Table into a DataSet of Row by specifying a class
  6. DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
  7. // convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformationTupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  8. Types.STRING(),
  9. Types.INT());
  10. DataSet<Tuple2<String, Integer>> dsTuple =
  11. tableEnv.toDataSet(table, tupleType);

2.3.6 TableAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html

2.3.7 SQLAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/

2.4 相关概念

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/dynamic_tables.html

2.4.1 Dynamic Tables & Continuous Queries

在Flink中,它把针对无界流的表称之为Dynamic Table(动态表)。它是Flink Table API和SQL的核心概念。顾名思义,它表示了Table是不断变化的。
我们可以这样来理解,当我们用Flink的API,建立一个表,其实把它理解为建立一个逻辑结构,这个逻辑结构需要映射到数据上去。Flink source源源不断的流入数据,就好比每次都往表上新增一条数据。表中有了数据,我们就可以使用SQL去查询了。要注意一下,流处理中的数据是只有新增的,所以看起来数据会源源不断地添加到表中。
动态表也是一种表,既然是表,就应该能够被查询。我们来回想一下原先我们查询表的场景。
打开编译工具,编写一条SQL语句
将SQL语句放入到mysql的终端执行
查看结果
再编写一条SQL语句
再放入到终端执行
再查看结果
…..如此反复

而针对动态表,Flink的source端肯定是源源不断地会有数据流入,然后我们基于这个数据流建立了一张表,再编写SQL语句查询数据,进行处理。这个SQL语句一定是不断地执行的。而不是只执行一次。注意:针对流处理的SQL绝对不会像批式处理一样,执行一次拿到结果就完了。而是会不停地执行,不断地查询获取结果处理。所以,官方给这种查询方式取了一个名字,叫Continuous Query,中文翻译过来叫连续查询。而且每一次查询出来的数据也是不断变化的。
image.png
这是一个非常简单的示意图。该示意图描述了:我们通过建立动态表和连续查询来实现在无界流中的SQL操作。大家也可以看到,在Continuous上面有一个State,表示查询出来的结果会存储在State中,再下来Flink最终还是使用流来进行处理。
所以,我们可以理解为Flink的Table API和SQL,是一个逻辑模型,通过该逻辑模型可以让我们的数据处理变得更加简单。
image.pngimage.png

3. 案例1

3.1 需求

  1. package cn.itcast.sql;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.table.api.Table;
  8. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  9. import java.util.Arrays;
  10. import static org.apache.flink.table.api.Expressions.$;
  11. /**
  12. * Author itcast
  13. * Desc
  14. */
  15. public class FlinkSQL_Table_Demo01 {
  16. public static void main(String[] args) throws Exception {
  17. //1.准备环境
  18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19. //EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  20. //StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
  21. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  22. //2.Source
  23. DataStream<Order> orderA = env.fromCollection(Arrays.asList(
  24. new Order(1L, "beer", 3),
  25. new Order(1L, "diaper", 4),
  26. new Order(3L, "rubber", 2)));
  27. DataStream<Order> orderB = env.fromCollection(Arrays.asList(
  28. new Order(2L, "pen", 3),
  29. new Order(2L, "rubber", 3),
  30. new Order(4L, "beer", 1)));
  31. //3.注册表
  32. // convert DataStream to Table
  33. Table tableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
  34. // register DataStream as Table
  35. tEnv.createTemporaryView("OrderB", orderB, $("user"), $("product"), $("amount"));
  36. //4.执行查询
  37. System.out.println(tableA);
  38. // union the two tables
  39. Table resultTable = tEnv.sqlQuery(
  40. "SELECT * FROM " + tableA + " WHERE amount > 2 " +
  41. "UNION ALL " +
  42. "SELECT * FROM OrderB WHERE amount < 2"
  43. );
  44. //5.输出结果
  45. DataStream<Order> resultDS = tEnv.toAppendStream(resultTable, Order.class);
  46. resultDS.print();
  47. env.execute();
  48. }
  49. @Data
  50. @NoArgsConstructor
  51. @AllArgsConstructor
  52. public static class Order {
  53. public Long user;
  54. public String product;
  55. public int amount;
  56. }
  57. }

4. 案例2

4.1 需求

使用SQL和Table两种方式对DataStream中的单词进行统计

1.1 代码实现-SQL

见Idea

5. 案例3

5.1 需求

使用Flink SQL来统计5秒内 每个用户的 订单总数、订单的最大金额、订单的最小金额
也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额
上面的需求使用流处理的Window的基于时间的滚动窗口就可以搞定!
那么接下来使用FlinkTable&SQL-API来实现

6. 案例4

6.1 需求

从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka

  1. {"user_id": "1", "page_id":"1", "status": "success"}
  2. {"user_id": "1", "page_id":"1", "status": "success"}
  3. {"user_id": "1", "page_id":"1", "status": "success"}
  4. {"user_id": "1", "page_id":"1", "status": "success"}
  5. {"user_id": "1", "page_id":"1", "status": "fail"}

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html