4. Flink Table&SQL

4.1 简介

Flink 的 Table 模块包括 Table API 和 SQL:

  • Table API 是一种类SQL的API,通过Table API,用户可以像操作表一样操作数据,非常直观和方便;
  • SQL作为一种声明式语言,有着标准的语法和规范,用户可以不用关心底层实现即可进行数据的处理,非常易于上手;
  • 这两者有80%的代码是共用的。作为一个流批统一的计算引擎,Flink 的 Runtime 层是统一的。

Table API & SQL的特点

  1. 声明式:属于设定式语言,用户只要表达清楚需求即可,不需要了解底层执行;
  2. 高性能:可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划;
  3. 简单易学易于理解,不同行业和领域的人都懂,学习成本较低;
  4. 标准稳定:语义遵循SQL标准,非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少;
  5. 流批统一:可以做到API层面上流与批的统一,相同的SQL逻辑,既可流模式运行,也可批模式运行,Flink底层Runtime本身就是一个流与批统一的引擎。

4.2 核心概念

  • 动态表(Dynamic Table):
    • 无界流的表,即表是不断变化的;
    • 可以把它理解为建立一个逻辑结构,这个逻辑结构需要映射到数据上去,随着数据不断流入,就好像往表中不断增加数据一样;
  • 4. Flink Table&SQL - 图1
  • image-20220430152447760
  • 连续查询(Continuous Queries)—— 需要借助State:
    • 对于有源源不断的数据流入动态表,我们基于这个数据流建立了一张表,再编写SQL语句查询数据,进行处理。这个SQL语句一定是不断地执行的,而不是只执行一次。于是这种查询操作称为连续查询
    • 由于动态表是实时变化的,因此连续查询每一次查询出来的数据也是不断变化的
  • 4. Flink Table&SQL - 图2

4.3 API

  • 相关依赖:

  • org.apache.flink
    flink-table-api-scala-bridge_2.12
    1.12.0


    org.apache.flink
    flink-table-api-java-bridge_2.12
    1.12.0
  • 程序结构:
  • 4. Flink Table&SQL - 图3

  • 创建环境
  • 4. Flink Table&SQL - 图4
  • image-20220430162905677
  • 创建表
  • 4. Flink Table&SQL - 图5
  • image-20220430162922782
  • 查询
    • Table API
    • 4. Flink Table&SQL - 图6
    • image-20220430163005201
    • SQL
    • 4. Flink Table&SQL - 图7
    • image-20220430163023333

4.4 案例1 —— 将DataStream转换为Table和View并进行SQL统计

package org.example.sql;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;

public class TableAndSQL {
public static void main(String[] args) throws Exception {
// 准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// table环境
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);

  1. // source<br /> DataStreamSource<Order> orderA = env.fromCollection(<br /> Arrays.asList(<br /> new Order(1L, "beer", 3),<br /> new Order(1L, "diaper", 3),<br /> new Order(3L, "rubber", 3)<br /> ));
  2. DataStreamSource<Order> orderB = env.fromCollection(<br /> Arrays.asList(<br /> new Order(2L, "pen", 3),<br /> new Order(2L, "rubber", 3),<br /> new Order(4L, "beer", 1)<br /> ));
  3. // transformation<br /> // 将DataStream数据转Table和View,然后查询<br /> Table tableA = tenv.fromDataStream(orderA, $("user"), $("product"), $("amount"));<br /> tableA.printSchema();<br /> System.out.println(tableA);
  4. tenv.createTemporaryView("tableB", orderB, $("user"), $("product"), $("amount"));
  5. // 查询 tableA 中 amount>2 的和 tableB 中 amount>1 的数据并合并<br /> // 注意表名和where间一定要有空格<br /> String sql = "select * from " + tableA + " where amount > 2 \n" +<br /> "union \n" +<br /> "select * from tableB where amount > 1";
  6. Table resultTable = tenv.sqlQuery(sql);<br /> resultTable.printSchema();<br /> System.out.println(resultTable);
  7. // 将Table转成DataStream<br /> // union all 则用 toAppendStream<br /> // toAppendStream → 将计算后的数据append到结果DataStream中去<br /> // toRetractStream → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false<br /> // 类似StructuredStreaming中的append/update/complete<br /> DataStream<Tuple2<Boolean, Order>> resultDS = tenv.toRetractStream(resultTable, Order.class);
  8. // sink<br /> resultDS.print();
  9. // 启动并等待结束<br /> env.execute();<br /> }
  10. @Data<br /> @NoArgsConstructor<br /> @AllArgsConstructor<br /> public static class Order {<br /> private Long user;<br /> private String product;<br /> private int amount;<br /> }<br />}<br />

4.5 案例2 —— 使用Table/DSL风格和SQL风格完成WordCount

package org.example.sql;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;

public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);

  1. DataStreamSource<String> lines = env.fromElements("hadoop spark flink", "kafka flume", "flink spark java");<br /> SingleOutputStreamOperator<WordAndOne> wordAndOne = lines.flatMap(<br /> new FlatMapFunction<String, WordAndOne>() {<br /> @Override<br /> public void flatMap(String s, Collector<WordAndOne> collector) throws Exception {<br /> String[] arr = s.split(" ");<br /> for (String word : arr) {<br /> collector.collect(new WordAndOne(word, 1));<br /> }<br /> }<br /> });
  2. tenv.createTemporaryView("t_words", wordAndOne, $("word"), $("frequency"));
  3. String sql = "select word, sum(frequency) as frequency from t_words group by word";
  4. Table resultTable = tenv.sqlQuery(sql);
  5. // 转为DataStream<br /> DataStream<Tuple2<Boolean, WordAndOne>> resultDS = tenv.toRetractStream(resultTable, WordAndOne.class);
  6. // Sink<br /> resultDS.print();
  7. env.execute();
  8. }
  9. @Data<br /> @AllArgsConstructor<br /> @NoArgsConstructor<br /> public static class WordAndOne {<br /> private String word;<br /> private Integer frequency;<br /> }<br />}<br />DSL风格的代码如下:<br />package org.example.sql;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;

public class WordCountWithDSL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);

  1. DataStreamSource<String> lines = env.fromElements("hadoop spark flink", "kafka flume", "flink spark java");<br /> SingleOutputStreamOperator<WordAndOne> wordAndOne = lines.flatMap(<br /> new FlatMapFunction<String, WordAndOne>() {<br /> @Override<br /> public void flatMap(String s, Collector<WordAndOne> collector) throws Exception {<br /> String[] arr = s.split(" ");<br /> for (String word : arr) {<br /> collector.collect(new WordAndOne(word, 1));<br /> }<br /> }<br /> });
  2. Table table = tenv.fromDataStream(wordAndOne);
  3. // DSL风格: 过滤出数量为2的单词<br /> Table resultTable = table<br /> .groupBy($("word"))<br /> .select($("word"), $("frequency").sum().as("frequency"))<br /> .filter($("frequency").isEqual(2));<br /> resultTable.printSchema();
  4. // 转为DataStream<br /> DataStream<Tuple2<Boolean, WordAndOne>> resultDS = tenv.toRetractStream(resultTable, WordAndOne.class);
  5. // Sink<br /> resultDS.print();
  6. env.execute();
  7. }
  8. @Data<br /> @AllArgsConstructor<br /> @NoArgsConstructor<br /> public static class WordAndOne {<br /> private String word;<br /> private Integer frequency;<br /> }<br />}<br />

4.6 案例3 —— 结合窗口和Watermarker

需求:使用Flink SQL统计5秒内每个用户的订单总数、订单的最大金额和订单的最小金额
提示:每隔5秒统计5秒内的相关信息,可以使用基于时间的滚动窗口实现
package org.example.sql;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.table.api.Expressions.$;

public class Statistics {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);

  1. // Source<br /> DataStreamSource<Order> orderDS = env.addSource(<br /> new RichSourceFunction<Order>() {<br /> private boolean flag = true;
  2. @Override<br /> public void run(SourceContext<Order> sourceContext) throws Exception {<br /> Random random = new Random();<br /> while (flag) {<br /> Order order = new Order(<br /> UUID.randomUUID().toString(),<br /> random.nextInt(3),<br /> random.nextInt(101),<br /> System.currentTimeMillis()<br /> );<br /> TimeUnit.SECONDS.sleep(1);<br /> sourceContext.collect(order);<br /> }<br /> }
  3. @Override<br /> public void cancel() {<br /> flag = false;<br /> }<br /> });
  4. // transformation<br /> SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(<br /> WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))<br /> .withTimestampAssigner((order, recordTimestamp) -> order.getCreateTime()));
  5. tenv.createTemporaryView("t_order", orderDSWithWatermark, $("orderId"), $("userId"), $("money"), $("createTime").rowtime());
  6. String sql = "select userId, count(orderId) as orderCount, max(money) as maxMoney, min(money) as minMoney \n" +<br /> "from t_order\n" +<br /> "group by userId, \n" +<br /> // 时间窗口<br /> "tumble(createTime, INTERVAL '5' SECOND)";
  7. Table resultTable = tenv.sqlQuery(sql);<br /> resultTable.printSchema();
  8. DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(resultTable, Row.class);
  9. resultDS.print();
  10. env.execute();<br /> }
  11. @Data<br /> @NoArgsConstructor<br /> @AllArgsConstructor<br /> public static class Order {<br /> private String orderId;<br /> private Integer userId;<br /> private Integer money;<br /> private Long createTime;<br /> }<br />}<br />DSL的写法如下:<br />package org.example.sql;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

public class StatisticsWithDSL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);

  1. // Source<br /> DataStreamSource<Order> orderDS = env.addSource(<br /> new RichSourceFunction<Order>() {<br /> private boolean flag = true;
  2. @Override<br /> public void run(SourceContext<Order> sourceContext) throws Exception {<br /> Random random = new Random();<br /> while (flag) {<br /> Order order = new Order(<br /> UUID.randomUUID().toString(),<br /> random.nextInt(3),<br /> random.nextInt(101),<br /> System.currentTimeMillis()<br /> );<br /> TimeUnit.SECONDS.sleep(1);<br /> sourceContext.collect(order);<br /> }<br /> }
  3. @Override<br /> public void cancel() {<br /> flag = false;<br /> }<br /> });
  4. // transformation<br /> SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(<br /> WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))<br /> .withTimestampAssigner((order, recordTimestamp) -> order.getCreateTime()));
  5. tenv.createTemporaryView("t_order", orderDSWithWatermark, $("orderId"), $("userId"), $("money"), $("createTime").rowtime());
  6. Table resultTable = tenv.from("t_order")<br /> // 采用方法的方式设置窗口<br /> .window(Tumble.over(lit(5).second())<br /> .on($("createTime"))<br /> .as("tumbleWindow"))<br /> // 按用户id和窗口进行聚合<br /> .groupBy($("tumbleWindow"), $("userId"))<br /> .select(<br /> $("userId"),<br /> $("orderId").count().as("orderCount"),<br /> $("money").max().as("maxMoney"),<br /> $("money").min().as("minMoney")<br /> );
  7. resultTable.printSchema();
  8. DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(resultTable, Row.class);
  9. resultDS.print();
  10. env.execute();<br /> }
  11. @Data<br /> @NoArgsConstructor<br /> @AllArgsConstructor<br /> public static class Order {<br /> private String orderId;<br /> private Integer userId;<br /> private Integer money;<br /> private Long createTime;<br /> }<br />}<br />

4.7 案例4 —— 整合Kafka

需求:从Kafka中消费数据,并过滤出状态为Success的数据并写入Kafka
示例数据如下:
{“user_id”: “1”, “page_id”:”1”, “status”: “success”}
{“user_id”: “1”, “page_id”:”1”, “status”: “success”}
{“user_id”: “1”, “page_id”:”1”, “status”: “success”}
{“user_id”: “1”, “page_id”:”1”, “status”: “success”}
{“user_id”: “1”, “page_id”:”1”, “status”: “fail”}
package org.example.sql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class KafkaDemo {
public static void main(String[] args) throws Exception {
// Environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);

  1. // Source<br /> TableResult inputTable = tenv.executeSql(<br /> "CREATE TABLE input_kafka (\n" +<br /> " `user_id` BIGINT,\n" +<br /> " `page_id` BIGINT,\n" +<br /> " `status` STRING\n" +<br /> ") WITH (\n" +<br /> " 'connector' = 'kafka',\n" +<br /> " 'topic' = 'input_kafka',\n" +<br /> " 'properties.bootstrap.servers' = 'node01:9092',\n" +<br /> " 'properties.group.id' = 'testGroup',\n" +<br /> " 'scan.startup.mode' = 'latest-offset',\n" +<br /> " 'format' = 'json'\n" +<br /> ")"<br /> );
  2. // Transformation<br /> // 过滤出状态为 Success的数据<br /> String sql = "select * from input_kafka where status = 'success' ";<br /> Table etlTable = tenv.sqlQuery(sql);
  3. // Sink<br /> DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(etlTable, Row.class);<br /> resultDS.print();
  4. TableResult outputTable = tenv.executeSql(<br /> "CREATE TABLE output_kafka (\n" +<br /> " `user_id` BIGINT,\n" +<br /> " `page_id` BIGINT,\n" +<br /> " `status` STRING\n" +<br /> ") WITH (\n" +<br /> " 'connector' = 'kafka',\n" +<br /> " 'topic' = 'output_kafka',\n" +<br /> " 'properties.bootstrap.servers' = 'node01:9092',\n" +<br /> " 'format' = 'json',\n" +<br /> " 'sink.partitioner' = 'round-robin'\n" +<br /> ")"<br /> );
  5. tenv.executeSql("insert into output_kafka select * from " + etlTable);
  6. // 启动并等待结束<br /> env.execute();<br /> }<br />}<br />执行步骤如下:
  1. 启动zk和kafka
  2. 准备Kafka主题
  • /export/servers/kafka-2.11/bin/kafka-topics.sh —create —zookeeper node01:2181 —replication-factor 2 —partitions 3 —topic input_kafka
  • /export/servers/kafka-2.11/bin/kafka-topics.sh —create —zookeeper node01:2181 —replication-factor 2 —partitions 3 —topic output_kafka
  1. 启动生产者和消费者
  • /export/servers/kafka-2.11/bin/kafka-console-producer.sh —broker-list node01:9092 —topic input_kafka
  • /export/servers/kafka-2.11/bin/kafka-console-consumer.sh —bootstrap-server node01:9092 —topic output_kafka —from-beginning
  1. 启动程序并向生产者输入以下数据
  • {“user_id”: “1”, “page_id”:”1”, “status”: “success”}
    {“user_id”: “1”, “page_id”:”1”, “status”: “success”}
    {“user_id”: “1”, “page_id”:”1”, “status”: “success”}
    {“user_id”: “1”, “page_id”:”1”, “status”: “success”}
    {“user_id”: “1”, “page_id”:”1”, “status”: “fail”}

4.8 整合Hive

4.8.1 Flink集成Hive的基本方式

主要体现在两个方面:

  • 持久化元数据
    • Flink 利用 Hive 的 MetaStore 作为持久化的 Catalog ;
    • 可以通过 HiveCatalog 将不同会话中的 Flink 元数据存储到 Hive MetaStore中;
    • 比如可以使用 HiveCatalog 将其 Kafka 的数据源表存储在 Hive Metastore 中,这样该表的元数据信息会被持久化到 Hive 的 MetaStore 对应的元数据库中,在后续的 SQL 查询中,我们可以重复使用它们。
  • 利用Flink读写Hive的表
    • Flink 打通了与 Hive 的集成,如同使用 SparkSQL 操作 Hive 中的数据一样,可以使用 Flink 直接读写 Hive 中的表;
    • HiveCatalog 的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive 表。不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

4.8.2 配置

(待补充)