Table API 是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者 批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将 查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义 的,具有 IDE 支持如:自动完成和语法检测。

10.1 需要引入的 pom 依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-table_2.11</artifactId>
  4. <version>1.7.2</version>
  5. </dependency>

10.2 简单了解 TableAPI

  1. def main(args: Array[String]): Unit = {
  2. val env: StreamExecutionEnvironment =
  3. StreamExecutionEnvironment.getExecutionEnvironment
  4. val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("ECOMMERCE")
  5. val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
  6. val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
  7. val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{
  8. jsonString => JSON.parseObject(jsonString,classOf[EcommerceLog])
  9. }
  10. val ecommerceLogTable: Table = tableEnv.fromDataStream(ecommerceLogDstream)
  11. val table: Table = ecommerceLogTable.select("mid,ch").filter("ch='appstore'")
  12. val midchDataStream: DataStream[(String, String)] =
  13. table.toAppendStream[(String,String)]
  14. midchDataStream.print()
  15. env.execute()
  16. }

10.2.1 动态表

如果流中的数据类型是 case class 可以直接根据 case class 的结构生成 table

  1. tableEnv.fromDataStream(ecommerceLogDstream)

或者根据字段顺序单独命名

  1. tableEnv.fromDataStream(ecommerceLogDstream,’mid,’uid .......)

最后的动态表可以转换为流进行输出

  1. table.toAppendStream[(String,String)]

10.2.2 字段

用一个单引放到字段前面来标识字段名, 如 ‘name , ‘mid ,’amount 等

10.3 TableAPI 的窗口聚合操作

10.3.1 通过一个例子了解 TableAPI

  1. //每 10 秒中渠道为 appstore 的个数
  2. def main(args: Array[String]): Unit = {
  3. //sparkcontext
  4. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  5. //时间特性改为 eventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  6. val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("ECOMMERCE")
  7. val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
  8. val ecommerceLogDstream: DataStream[EcommerceLog] =
  9. dstream.map{
  10. jsonString =>JSON.parseObject(jsonString,classOf[EcommerceLog])
  11. }
  12. //告知 watermark 和 eventTime 如何􏰁取
  13. val ecommerceLogWithEventTimeDStream: DataStream[EcommerceLog] =
  14. ecommerceLogDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[EcommerceLog](Time.seconds(0L)) {
  15. override def extractTimestamp(element: EcommerceLog): Long = {
  16. element.ts
  17. }
  18. }).setParallelism(1)
  19. val tableEnv: StreamTableEnvironment =
  20. TableEnvironment.getTableEnvironment(env)
  21. //把数据流转化成 Table
  22. val ecommerceTable: Table = tableEnv.fromDataStream(ecommerceLogWithEventTimeDStream ,
  23. 'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,
  24. 'logDate,'logHour,'logHourMinut e,'ts.rowtime)
  25. //通过 table api 进行操作
  26. // 每 10 秒 统计一次各个渠道的个数 table api 解决
  27. //1 groupby 2 要用 window 3 用 eventtime 来确定开窗时间
  28. val resultTable: Table =
  29. ecommerceTable.window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch,'tt ).select( 'ch, 'ch.count)
  30. //把 Table 转化成数据流
  31. val resultDstream: DataStream[(Boolean, (String, Long))] =
  32. resultSQLTable.toRetractStream[(String,Long)]
  33. resultDstream.filter(_._1).print()
  34. env.execute()
  35. }

10.3.2 关于 group by

  1. 如果了使用 groupby,table 转换为流的时候只能用 toRetractDstream
  1. val rDstream: DataStream[(Boolean, (String, Long))] = table
  2. .toRetractStream[(String,Long)]
  1. toRetractDstream 得到的第一个 boolean 型字段标识 true 就是最新的数据(Insert),false 表示过期老数据(Delete)
  1. val rDstream: DataStream[(Boolean, (String, Long))] = table
  2. .toRetractStream[(String,Long)]
  3. rDstream.filter(_._1).print()
  1. 如果使用的 api 包括时间窗口,那么窗口的字段必须出现在 groupBy 中。
  1. val table: Table = ecommerceLogTable
  2. .filter("ch ='appstore'")
  3. .window(Tumble over 10000.millis on 'ts as 'tt) .groupBy('ch ,'tt)
  4. .select("ch,ch.count ")

10.3.3 关于时间窗口

  1. 用到时间窗口,必须􏰁前声明时间字段,如果是 processTime 直接在创建动态表时进行追加就可以。
  1. val ecommerceLogTable: Table = tableEnv
  2. .fromDataStream( ecommerceLogWithEtDstream,
  3. 'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'lo gHourMinute,'ps.proctime)
  1. 如果是 EventTime 要在创建动态表时声明
  1. val ecommerceLogTable: Table = tableEnv
  2. .fromDataStream(ecommerceLogWithEtDstream,
  3. 'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'lo gHourMinute,'ts.rowtime)
  1. 滚动窗口可以使用 Tumble over 10000.millis on 来表示
  1. val table: Table = ecommerceLogTable.filter("ch ='appstore'")
  2. .window(Tumble over 10000.millis on 'ts as 'tt)
  3. .groupBy('ch ,'tt)
  4. .select("ch,ch.count ")

10.4 SQL 如何编写

  1. def main(args: Array[String]): Unit = {
  2. //sparkcontext
  3. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  4. //时间特性改为 eventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  5. val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("ECOMMERCE")
  6. val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
  7. val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[EcommerceLog]) }
  8. //告知 watermark 和 eventTime 如何􏰁取
  9. val ecommerceLogWithEventTimeDStream: DataStream[EcommerceLog] =
  10. ecommerceLogDstream.assignTimestampsAndWatermarks(
  11. new BoundedOutOfOrdernessTimestampExtractor[EcommerceLog](Time.seconds(0L)) {
  12. override def extractTimestamp(element: EcommerceLog): Long = { element.ts
  13. }
  14. }).setParallelism(1)
  15. //SparkSession
  16. val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
  17. //把数据流转化成 Table
  18. val ecommerceTable: Table =
  19. tableEnv.fromDataStream(ecommerceLogWithEventTimeDStream ,
  20. 'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,
  21. 'logDate,'logHour,'logHourMinu te,'ts.rowtime)
  22. //通过 table api 进行操作
  23. // 每 10 秒 统计一次各个渠道的个数 table api 解决
  24. //1 groupby 2 要用 window 3 用 eventtime 来确定开窗时间
  25. val resultTable: Table = ecommerceTable.window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch,'tt ).select( 'ch, 'ch.count)
  26. // 通过 sql 进行操作
  27. val resultSQLTable : Table = tableEnv.sqlQuery( "select ch ,count(ch) from"
  28. +ecommerceTable
  29. +" group by ch ,Tumble(ts,interval '10' SECOND )")
  30. //把 Table 转化成数据流
  31. //val appstoreDStream: DataStream[(String, String, Long)] =
  32. appstoreTable.toAppendStream[(String,String,Long)]
  33. val resultDstream: DataStream[(Boolean, (String, Long))] =
  34. resultSQLTable.toRetractStream[(String,Long)]
  35. resultDstream.filter(_._1).print()
  36. env.execute()
  37. }