注意:Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现Flink中所有的特性。不是所有的 [Table API,SQL] 和 [流,批] 的组合都是支持的。

Table API和SQL的由来:
Flink针对标准的流处理和批处理提供了两种关系型API,Table APISQL
Table API允许用户以一种很直观的方式进行select 、filter和join操作。
Flink SQL基于 Apache Calcite实现标准SQL。针对批处理和流处理可以提供相同的处理语义和结果。

Flink Table API、SQL和Flink的DataStream API、DataSet API是紧密联系在一起的。 Table API和SQL是一种关系型 API,用户可以像操作 Mysql 数据库表一样的操作数据,而不需要写代码,更不需要手工的对代码进行调优。另外,SQL 作为一个非程序员可操作的语言,学习成本很低,如果一个系统提供 SQL 支持,将很容易被用户接受。

如果你想要使用Table API 和SQL的话,需要添加下面的依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  4. <version>1.11.0</version>
  5. <!--<scope>provided</scope>-->
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
  10. <version>1.11.0</version>
  11. <!--<scope>provided</scope>-->
  12. </dependency>

如果你想在 本地 IDE中运行程序,还需要添加下面的依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.11.0</version>
            <!--<scope>provided</scope>-->
        </dependency>

如果你用到了老的执行引擎,还需要添加下面这个依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>1.11.1</version>
            <!--<scope>provided</scope>-->
        </dependency>

注意:由于部分 table 相关的代码是用 Scala 实现的,所以,这个依赖也是必须的。【这个依赖我们在前面开发DataStream程序的时候已经添加过了】

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.12</artifactId>
  <version>1.11.0</version>
  <scope>provided</scope>
</dependency>

Table API和SQL通过join API集成在一起,这个join API的核心概念是Table,Table可以作为查询的输入和输出。
针对Table API和SQL我们主要研究以下内容
1:Table API和SQL的使用
2:DataStream、DataSet和Table之间的互相转换

创建TableEnvironment对象

想要使用Table API 和SQL,首先要创建一个TableEnvironment对象。

object CreateTableEnvironmentScala {
  def main(args: Array[String]): Unit = {

    /**
     * 注意:如果Table API 和 SQL不需要和DataStream或者DataSet互相转换
     * 则针对stream和batch都可以使用TableEnvironment
     */
    //指定底层引擎为Blink,以及数据处理模式-stream
    val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    //创建TableEnvironment对象
    val sTableEnv = TableEnvironment.create(sSettings)


    //指定底层引擎为Blink,以及数据处理模式-batch
    val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
    //创建TableEnvironment对象
    val bTableEnv = TableEnvironment.create(bSettings)

    /**
     * 注意:如果Table API和SQL需要和DataStream或者DataSet互相转换
     * 针对stream需要使用StreamTableEnvironment
     * 针对batch需要使用BatchTableEnvironment
     */
    //创建StreamTableEnvironment
    val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)

    //创建BatchTableEnvironment
    //注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转换
    val bbEnv = ExecutionEnvironment.getExecutionEnvironment
    val bbTableEnv = BatchTableEnvironment.create(bbEnv)

  }
}

Table API 和SQL的使用

目前创建Table的很多方法都过时了,都不推荐使用了,例如:registerTableSource、connect等方法
目前官方推荐使用executeSql的方式,executeSql里面支持DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE等语法
在D:\data\source目录创建user.txt :::info 1,jack
2,tom
3,mark :::

/**
 * TableAPI 和 SQL的使用
 */
object TableAPIAndSQLOpScala {
  def main(args: Array[String]): Unit = {
    //获取TableEnvironment
    val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val sTableEnv = TableEnvironment.create(sSettings)

    //创建输入表:文件转表
    /**
     * connector.type:指定connector的类型
     * connector.path:指定文件或者目录地址
     * format.type:文件数据格式化类型,现在只支持csv格式
     * 注意:SQL语句中如果出现了换行,行尾末尾可以添加空格或者\n都可以,最后一行不用添加
     */
    sTableEnv.executeSql("" +
      "create table myTable(\n" +
      "id int,\n" +
      "name string\n" +
      ") with (\n" +
      "'connector.type' = 'filesystem',\n" +
      "'connector.path' = 'D:\\data\\source',\n" +
      "'format.type' = 'csv'\n" +
      ")")

    //使用TableAPI实现数据查询和过滤等操作
    //    import org.apache.flink.table.api._
    //    val result = sTableEnv.from("myTable")
    //      .select($"id", $"name")
    //      .filter($"id" > 1)

    //使用SQL实现数据查询和过滤等操作
    val result = sTableEnv.sqlQuery("select id,name from myTable where id > 1")

    //输出结果到控制台
    result.execute().print()

    //创建输出表
    sTableEnv.executeSql("" +
      "create table newTable(\n" +
      "id int,\n" +
      "name string\n" +
      ") with (\n" +
      "'connector.type' = 'filesystem',\n" +
      "'connector.path' = 'D:\\data\\res',\n" +
      "'format.type' = 'csv'\n" +
      ")")

    //保存查询结果到表newTable中
    result.executeInsert("newTable")
  }
}

运行结果:
image.pngimage.png

DataStream、DataSet和Table之间的互相转换

Table API和SQL可以很容易的和DataStream和DataSet程序集成到一块。
通过TableEnvironment ,可以把DataStream或者DataSet注册为Table,这样就可以使用Table API和SQL查询了。
通过TableEnvironment ,也可以把Table对象转换为DataStream或者DataSet,这样就可以使用DataStream或者DataSet中的相关API了。

DataStream→Table

1:使用DataStream创建表,主要包含下面这两种情况

  • 使用DataStream创建view视图
  • 使用DataStream创建table对象 ```scala /**

    • 将DataStream转换为表 */ object DataStreamToTableScala { def main(args: Array[String]): Unit = { //获取StreamTableEnvironment val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)

      //获取DataStream import org.apache.flink.api.scala._ val stream = ssEnv.fromCollection(Array((1, “jack”), (2, “tom”), (3, “mack”)))

      //第一种:将DataStream转换为view视图 import org.apache.flink.table.api._ ssTableEnv.createTemporaryView(“myTable”, stream, ‘id, ‘name) ssTableEnv.sqlQuery(“select * from myTable where id > 1”).execute().print()

//第二种:将DataStream转换为Table对象
val table = ssTableEnv.fromDataStream(stream, $"id", $"name")
table.select($"id", $"name")
  .filter($"id" > 1)
  .execute()
  .print()

//注意:'id,'name 和 $"id", $"name" 这两种写法是一样的效果

} }

![image.png](https://cdn.nlark.com/yuque/0/2022/png/5363330/1646038620211-2461e609-1ee8-4b09-a38f-040776298405.png#clientId=u45b7b884-fb3b-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=397&id=u210c6279&margin=%5Bobject%20Object%5D&name=image.png&originHeight=397&originWidth=569&originalType=binary&ratio=1&rotation=0&showTitle=false&size=22379&status=done&style=stroke&taskId=uef4f0289-be2b-48ef-9299-876d12222dd&title=&width=569)
<a name="OGSco"></a>
## DataSet→Table(旧)
2:使用DataSet创建表
> 注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转换

```scala
/**
 * 将DataSet转换为表
 */
object DataSetToTableScala {
  def main(args: Array[String]): Unit = {
    //获取BatchTableEnvironment
    val bbEnv = ExecutionEnvironment.getExecutionEnvironment
    val bbTableEnv = BatchTableEnvironment.create(bbEnv)

    //获取DataSet
    import org.apache.flink.api.scala._
    val dataSet = bbEnv.fromCollection(Array((1, "jack"), (2, "tom"), (3, "mack")))

    //第一种:将DataSet转换为view视图
    import org.apache.flink.table.api._
    bbTableEnv.createTemporaryView("myTable", dataSet, 'id, 'name)
    bbTableEnv.sqlQuery("select * from myTable where id > 1").execute().print()

    //第二种:将DataSet转换为table对象
    val table = bbTableEnv.fromDataSet(dataSet, 'id, 'name)
    table.select($"id", $"name")
      .filter($"id" > 1)
      .execute()
      .print()
  }

}

image.png

Table→DataStream

将 Table 转换为 DataStream 或者 DataSet 时,你需要指定生成的 DataStream 或者 DataSet 的数据类型,即,Table 的每行数据要转换成的数据类型。通常最方便的选择是转换成 Row 。以下列表概述了不同选项的功能:
Row: 通过角标映射字段,支持任意数量的字段,支持 null 值,无类型安全(type-safe)检查。
POJO: Java中的实体类,这个实体类中的字段名称需要和Table中的字段名称保持一致,支持任意数量的字段,支持null值,有类型安全检查。
Case Class: 通过角标映射字段,不支持null值,有类型安全检查。
Tuple: 通过角标映射字段,Scala中限制22个字段,Java中限制25个字段,不支持null值,有类型安全检查。
Atomic Type: Table 必须有一个字段,不支持 null 值,有类型安全检查。

3:将表转换成 DataStream
流式查询的结果Table会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此动态查询的DataStream需要对表的更新进行编码。
有几种模式可以将Table转换为DataStream。

  • Append Mode:这种模式只适用于当动态表仅由INSERT更改修改时(仅附加),之前添加的数据不会被更新。
  • Retract Mode:可以始终使用此模式,它使用一个Boolean标识来编码INSERT和DELETE更改。 ```scala /**

    • 将table转换成DataStream */ object TableToDataStream { def main(args: Array[String]): Unit = { //获取StreamTableEnvironment val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)

      //创建输入表 ssTableEnv.executeSql(“” + “create table myTable(\n” + “id int,\n” + “name string\n” + “) with (\n” + “‘connector.type’ = ‘filesystem’,\n” + “‘connector.path’ = ‘D:\data\source’,\n” + “‘format.type’ = ‘csv’\n” + “)”)

      //获取table val table = ssTableEnv.from(“myTable”)

      //将table转换为DataStream //如果只有新增(追加)操作,可以使用toAppendStream import org.apache.flink.api.scala._ val appStream = ssTableEnv.toAppendStreamRow appStream.map(row => (row.getField(0).toString.toInt, row.getField(1).toString)) .print()

//如果有增加操作,还有删除操作,则使用toRetractStream
val retStream = ssTableEnv.toRetractStream[Row](table)
retStream.map(tup => {
  val flag = tup._1
  val row = tup._2
  val id = row.getField(0).toString.toInt
  val name = row.getField(1).toString
  (flag, id, name)
}).print()

//注意:将table转换为DataStream之后,就需要调用StreamExecutionEnvironment中的execute方法了
ssEnv.execute("TableToDataStreamScala")

} }

<a name="douWl"></a>
## Table→DataSet
```scala
/**
 * 将table转换成DataSet
 */
object TableToDataSet {
  def main(args: Array[String]): Unit = {
    //获取BatchTableEnvironment
    val bbEnv = ExecutionEnvironment.getExecutionEnvironment
    val bbTableEnv = BatchTableEnvironment.create(bbEnv)

    //创建输入表
    bbTableEnv.executeSql("" +
      "create table myTable(\n" +
      "id int,\n" +
      "name string\n" +
      ") with (\n" +
      "'connector.type' = 'filesystem',\n" +
      "'connector.path' = 'D:\\data\\source',\n" +
      "'format.type' = 'csv'\n" +
      ")")

    //获取table
    val table = bbTableEnv.from("myTable")

    //将table转换成DataSet
    import org.apache.flink.api.scala._
    val set = bbTableEnv.toDataSet[Row](table)
    set.map(row => (row.getField(0).toString.toInt, row.getField(1).toString))
      .print()

  }
}