Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现所有的特性。不是所有的 [Table API,SQL] 和 [流,批] 的组合都是支持的。

引用API

需要引入的 pom 依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  4. <version>1.11.2</version>
  5. <scope>provided</scope>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-table-planner-blink_2.11</artifactId>
  10. <version>1.11.2</version>
  11. <scope>provided</scope>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.flink</groupId>
  15. <artifactId>flink-streaming-scala_2.11</artifactId>
  16. <version>1.11.2</version>
  17. <scope>provided</scope>
  18. </dependency>

2.11为Scala版本,1.11.2为flink版本

表环境创建方式

    //1.创建流式处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //并行度设置
    env.setParallelism(1)

    //2.创建表执行环境
    val tableEnv = StreamTableEnvironment.create(env)

    //2.1 基于老版本planner的流处理
    val settings = EnvironmentSettings.newInstance()
      .useOldPlanner()
      .inStreamingMode()
      .build()
    val oldStreamTableEnv = StreamTableEnvironment.create(env,settings)

    //2.2 基于老版本planner的批处理
    val batchEnv = ExecutionEnvironment.getExecutionEnvironment
    val oldBatchTableEnv = BatchTableEnvironment.create(batchEnv)


    //2.3 基于新版本blink planner的流处理
    val blinkStreamSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    val blinkStreamTableEnv = StreamTableEnvironment.create(env,blinkStreamSettings)

    //2.3 基于新版本blink planner的批处理
    val blinkBatchSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inBatchMode()
      .build()
    val blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings)

注意 对于生产环境,建议使用在1.11版本之后已经变成默认的Blink Planner

//2.3 基于新版本blink planner的流处理
val blinkStreamSettings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()
val blinkStreamTableEnv = StreamTableEnvironment.create(env,blinkStreamSettings)

//2.3 基于新版本blink planner的批处理
val blinkBatchSettings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inBatchMode()
  .build()
val blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings)

API调用

https://www.cnblogs.com/shengyang17/p/14342173.html
基本程序结构

val tableEnv = ... // 创建表环境

// 创建表
tableEnv.connect(...).createTemporaryTable("table1")
// 注册输出表
tableEnv.connect(...).createTemporaryTable("outputTable")

// 使用 Table API query 创建表
val tapiResult = tableEnv.from("table1").select(...)
// 使用 SQL query 创建表
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...")

// 输出一张结果表到 TableSink,SQL查询的结果表也一样
TableResult tableResult = tapiResult.executeInsert("outputTable");
tableResult...

// 执行
tableEnv.execute("scala_job")

注意

注意:需要导入的隐式类型转换

org.apache.flink.table.api._
org.apache.flink.api.scala._
org.apache.flink.table.api.bridge.scala._

创建表

TableEnvironment可以调用.connect()方法,连接外部系统,并调用.createTemporaryTable()方法,在Catalog中注册表

tableEnv
    .connect(...)       //定义表的数据结构,和外部系统建立连接
    .withFormat(...)    //定义数据格式化方法
    .withSchema(...)    //定义表结构
    .createTemporaryTable("MyTable")  //创建临时表

读取外部数据

从文件系统中读取数据

val tableEnv = StreamTableEnvironment.create(env)

//连接外部系统,读取数据,注册表
//读取文件
val filePath = "D:\\IDEA\\flink\\src\\main\\resources\\stock\\sensor.txt"
    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id",DataTypes.STRING())
          .field("timestamp",DataTypes.BIGINT())
          .field("temperature",DataTypes.DOUBLE())
      )
      .createTemporaryTable("inputTable")

val inputTable: Table = tableEnv.from("inputTable")
inputTable.toAppendStream[(String,Long,Double)].print()

env.execute("table api test")

其中Csv()为新版本的描述器,flink没有直接提供,要想使用新版本描述器,需要引用依赖flink-csv

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.11.2</version>
</dependency>

从kafka读取数据

https://www.jianshu.com/p/4824641a757b
一个完整的配置实例:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

tableEnv.connect(new Kafka()
  //kafka连接配置
  .version("universal")
  .topic("sensor")
  .property("zookeeper.connect", "192.168.188.8:2181")
  .property("bootstrap.servers", "192.168.188.8:9092")
)
    //定义表结构
  .withFormat(new Csv())
  .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("timestamp", DataTypes.BIGINT())
    .field("temperature", DataTypes.DOUBLE())
  )

    //创建临时表
  .createTemporaryTable("kafkaInputTable")

val inputTable: Table = tableEnv.from("kafkaInputTable")
inputTable.toAppendStream[(String, Long, Double)].print()

表查询

利用外部系统的连接器connector,我们可以读写数据,并在环境的Catalog中注册表。接下来就可以对表做查询转换了。
Flink给我们提供了两种查询方式:Table API和 SQL。


方式一:table api

//查询转换
val sensorTable = tableEnv.from("inputTable")
val resultTable = sensorTable
  .select("id,temperature")
  .filter('id === "")
  • val sensorTable = tableEnv.from(“inputTable”),使用表API操作表之前,需要先进行转换
  • 查询语句的表达式写法(方式一与方式二都是flink1.9与1.10中table api的写法,方式三是flink1.11以后官网推荐的写法)
    • 方式一(使用' ,):.select('id,'temperature)
    • 方式二(使用" "):.select("id,temperature")
    • 方式三(使用$(" ")): .select($"id",$"temperature")
  • 注意:filter判断字段值是否等于一个值时,使用 ===(因为等号前面是一个对象,等号后面是一个字符串,不能使用==)

方式二:使用SQL语句

// 3.2 sql
val resultSqlTable = tableEnv.sqlQuery(
  """
    |select id,temperature
    |from inputTable
    |where id = 'sensor_1'
    |""".stripMargin)

推荐写法,模板字符串写SQL

将DataStream转化为表

Flink允许我们把Table和DataStream做转换:我们可以基于一个DataStream,先流式地读取数据源,然后map成POJO,再把它转成Table。Table的列字段(column fields),就是POJO里的字段,这样就不用再麻烦地定义schema了。(定义表的时候需要定义表结构,但将datastream转换为表后,就不要麻烦的定义表结构schema了,此时表结构结尾数据流中的结构,数据流中每一个字段就是表每一行对应的字段名

代码表达

代码中实现非常简单,直接用tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。


如何单独指定字段的对应关系?首先明白数据类型与scheme的对应关系

数据类型与scheme的对应

在上节的例子中,DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照类中的字段名来对应的(name-based mapping),所以还可以用as做重命名。

基于名称的对应:Table sensorTable = tableEnv.fromDataStream(dataStream, "timestamp as ts, id as myId, temperature");

基于位置的对应: val sensorTable = tableEnv.fromDataStream(dataStream, 'myId, 'ts)

Flink的DataStream和 DataSet API支持多种类型。比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问。其他类型,则被视为原子类型。元组类型,默认的名称是 “_1”, “_2”;而原子类型,默认名称是 ”f0”。

创建临时视图(Temporary View)

方式一:直接从DataStream转换

同样,可以直接对应字段转换;也可以在转换的时候,指定相应的字段。

tableEnv.createTemporaryView("datatable",dataTable)

tableEnv.createTemporaryView("sensorView", dataStream, "id, temperature, timestamp as ts");

方式二:基于table创建临时视图

tableEnv.createTemporaryView("sensorView", sensorTable);

注意

  • 创建临时视图的意义所在:在当前环境中注册了一张临时表,在当前环境中可以使用table的实例
  • View和Table的Schema完全相同。事实上,在Table API中,可以认为View和Table是等价的。

输出表

表的输出,是通过将数据写入TableSink 来实现的。TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列。

实现方法

输出表最直接的方法,就是通过Table.insertInto() 方法将一个 Table 写入注册过的 TableSink 中。

输出到文件

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, OldCsv, Schema}

object FileOutputTest {
  def main(args: Array[String]): Unit = {
    //1.创建流式处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //并行度设置
    env.setParallelism(1)

    //2.创建表执行环境
    val tableEnv = StreamTableEnvironment.create(env)

    /**
     * 文件读入,将读入的数据转化为表结构
     */
    val filePath = "D:\\IDEA\\flink\\src\\main\\resources\\stock\\sensor.txt"
    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
      //创建临时表
      .createTemporaryTable("inputTable")


//    转换操作
    val sensorTable = tableEnv.from("inputTable")
    val resultTable = sensorTable
        .select($"id",$"temperature")
        .filter($"id" === "sensor_1")

    //输出打印
//    resultTable.toAppendStream[(String,Double)].print("resultTable")

    //输出到文件
    val outFilePath = "D:\\IDEA\\flink\\src\\main\\resources\\stock\\output.txt"
    tableEnv.connect(new FileSystem().path(outFilePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("temperature", DataTypes.DOUBLE())
      )
      //创建临时表(注册输出表)
      .createTemporaryTable("outputTable")

    //输出到文件
    result.executeInsert("outputTable")
  }

}

image.png

输出到kafka

结合前面Kafka作为输入数据,构建数据管道,kafka进,kafka出

object outputKafkaTest {
  def main(args: Array[String]): Unit = {
    //1.创建流式处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //并行度设置
    env.setParallelism(1)

    //2.创建表执行环境
    val tableEnv = StreamTableEnvironment.create(env)

    //从kafka读取数据
    tableEnv.connect(new Kafka()
      .version("universal")
      .topic("sensor")
      .property("zookeeper.connect", "192.168.188.8:2181")
      .property("bootstrap.servers", "192.168.188.8:9092")
    )
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaInputTable")


    //3.查询转换
    val sensorTable:Table = tableEnv.from("kafkaInputTable")
    //3.1简单转换
    val resultTable = sensorTable
      .select($"id", $"temperature")
      .filter($"id" === "sensor_1")


    //  输出到kafka
    tableEnv.connect(new Kafka()
      .version("universal")
      .topic("sinktest")
      .property("zookeeper.connect", "192.168.188.8:2181")
      .property("bootstrap.servers", "192.168.188.8:9092")
    )
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaOutputTable")


    resultTable.insertInto("kafkaOutputTable")
    tableEnv.execute("kafka test")
  }

}

注意
(1)输入与输出的topic名称不一样
(2)从 1.11 版本开始,sqlUpdate 方法 和 insertInto 方法被废弃,从这两个方法构建的 Table 程序必须通过 StreamTableEnvironment.execute() 方法执行,而不能通过StreamExecutionEnvironment.execute() 方法来执行。


更新模式

Flink Table API中的更新模式有以下三种:

1)追加模式(Append Mode)

在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。

2)撤回模式(Retract Mode)

在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。

  • 插入(Insert)会被编码为添加消息;
  • 删除(Delete)则编码为撤回消息;
  • 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行)的添加消息。

在此模式下,不能定义key,这一点跟upsert模式完全不同。

3)Upsert(更新插入)模式

在Upsert模式下,动态表和外部连接器交换Upsert和Delete消息。
这个模式需要一个唯一的key,通过这个key可以传递更新消息。为了正确应用消息,外部连接器需要知道这个唯一key的属性。

  • 插入(Insert)和更新(Update)都被编码为Upsert消息;
  • 删除(Delete)编码为Delete信息。

这种模式和Retract模式的主要区别在于,Update操作是用单个消息编码的,所以效率会更高。


将表转换为datastream


动态表

下图显示了流、动态表和连续查询的关系:
image.png
流式持续查询的过程:

    1. 流被转换为动态表。(该动态表不涉及更改删除操作,只会不断追加数据)
    1. 对动态表计算连续查询,生成新的动态表。
    1. 生成的动态表被转换回流。

下图显示了如何将访问URL事件流,或者叫点击事件流(左侧)转换为表(右侧)
image.png