Table API 是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API 。与常规 SQL 语言中将查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义 的,具有 IDE 支持如 : 自动完成和语法检测。

9.1 需要引入的 pom 依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-table-planner_2.11</artifactId>
  4. <version>1.10.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  9. <version>1.10.0</version>
  10. </dependency>

9.2 简单了解 TableAPI

  1. def main(args: Array[String]): Unit = {
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment
  3. env.setParallelism(1)
  4. val inputStream = env.readTextFile("..\\sensor.txt")
  5. val dataStream = inputStream
  6. .map(data => {
  7. val dataArray = data.split(",")
  8. SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
  9. })
  10. // 基于env创建 tableEnv
  11. val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
  12. val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
  13. // 从一条流创建一张表
  14. val dataTable: Table = tableEnv.fromDataStream(dataStream)
  15. // 从表里选取特定的数据
  16. val selectedTable: Table = dataTable.select('id, 'temperature)
  17. .filter("id = 'sensor_1'")
  18. val selectedStream: DataStream[(String, Double)] = selectedTable
  19. .toAppendStream[(String, Double)]
  20. selectedStream.print()
  21. env.execute("table test")
  22. }

9.2.1 动态表

如果流中的数据类型是 case class 可以直接根据 case class 的结构生成 table

  1. tableEnv.fromDataStream(dataStream)

或者根据字段顺序单独命名:

  1. tableEnv.fromDataStream(dataStream, id, timestamp .......)

最后的动态表可以转换为流进行输出:

  1. table.toAppendStream[(String,String)]

9.2.2 字段

用一个单引号放到字段前面来标识字段名 , 如 ’name , ’mid ,’amount 等。

9.3 TableAPI 的窗口聚合操作

9.3.1 通过一个例子了解 TableAPI

  1. // 统计每10秒中每个传感器温度值的个数
  2. def main(args: Array[String]): Unit = {
  3. val env = StreamExecutionEnvironment.getExecutionEnvironment
  4. env.setParallelism(1)
  5. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  6. val inputStream = env.readTextFile("..\\sensor.txt")
  7. val dataStream = inputStream
  8. .map(data => {
  9. val dataArray = data.split(",")
  10. SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
  11. }
  12. )
  13. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
  14. override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
  15. })
  16. // 基于env创建 tableEnv
  17. val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
  18. val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
  19. // 从一条流创建一张表,按照字段去定义,并指定事件时间的时间字段
  20. val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'ts.rowtime)
  21. // 按照时间开窗聚合统计
  22. val resultTable: Table = dataTable
  23. .window(Tumble over 10.seconds on 'ts as 'tw)
  24. .groupBy('id, 'tw)
  25. .select('id, 'id.count)
  26. val selectedStream: DataStream[(Boolean, (String, Long))] = resultTable
  27. .toRetractStream[(String, Long)]
  28. selectedStream.print()
  29. env.execute("table window test")
  30. }

9.3.2 关于 group by

如果了使用 groupby,table 转换为流的时候只能用 toRetractDstream:

  1. val dataStream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)]

toRetractDstream 得到的第一个 boolean 型字段标识 true 就是最新的数据 (Insert) , false 表示过期老数据 (Delete):

  1. val dataStream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)]
  2. dataStream.filter(_._1).print()

如果使用的 api 包括时间窗口, 那么窗口的字段必须出现在 groupBy 中:

  1. val resultTable: Table = dataTable
  2. .window(Tumble over 10.seconds on 'ts as 'tw)
  3. .groupBy('id, 'tw)
  4. .select('id, 'id.count)

9.3.3 关于时间窗口

用到时间窗口,必须提前声明时间字段,如果是 processTime 直接在创建动态表时进行追加就可以:

  1. val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'ps.proctime)

如果是 EventTime 要在创建动态表时声明:

  1. val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'ts.rowtime)

滚动窗口可以使用 Tumble over 10000.millis on 来表示:

  1. val resultTable: Table = dataTable
  2. .window(Tumble over 10.seconds on 'ts as 'tw)
  3. .groupBy('id, 'tw)
  4. .select('id, 'id.count)

9.4 SQL 如何编写

  1. // 统计每10秒中每个传感器温度值的个数
  2. def main(args: Array[String]): Unit = {
  3. val env = StreamExecutionEnvironment.getExecutionEnvironment
  4. env.setParallelism(1)
  5. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  6. val inputStream = env.readTextFile("..\\sensor.txt")
  7. val dataStream = inputStream
  8. .map(data => {
  9. val dataArray = data.split(",")
  10. SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
  11. }
  12. )
  13. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
  14. override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
  15. })
  16. // 基于env创建 tableEnv
  17. val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
  18. val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
  19. // 从一条流创建一张表,按照字段去定义,并指定事件时间的时间字段
  20. val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'ts.rowtime)
  21. // 直接写sql完成开窗统计
  22. val resultSqlTable: Table = tableEnv.sqlQuery("select id, count(id) from "
  23. + dataTable + " group by id, tumble(ts, interval '15' second)")
  24. val selectedStream: DataStream[(Boolean, (String, Long))] = resultSqlTable.toRetractStream[(String, Long)]
  25. selectedStream.print()
  26. env.execute("table window test")
  27. }