Table API 的简单使用

1. 引用依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-table_2.11</artifactId>
  4. <version>1.7.2</version>
  5. </dependency>

2. 导入table的隐式转换

import org.apache.flink.table.api.scala._

3. 基本操作

  1. object TableAPi {
  2. def api1() = {
  3. val env = StreamExecutionEnvironment.getExecutionEnvironment
  4. val mapDS = env.readTextFile("c:/tmp/input/sensor-data.log")
  5. .map(
  6. line => {
  7. val datas = line.split(",")
  8. WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)
  9. }
  10. )
  11. // 获取Table API 环境
  12. TableEnvironment.getTableEnvironment(env)
  13. .fromDataStream(mapDS)
  14. .select("ts")
  15. .toAppendStream[Long]
  16. .print("table>>")
  17. env.execute()
  18. }

4. 使用 sql格式

  1. package com.ylb.table
  2. import com.ylb.myCluss.WaterSensor
  3. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  4. import org.apache.flink.api.scala._
  5. import org.apache.flink.table.api.{Table, TableEnvironment}
  6. import org.apache.flink.table.api.scala._
  7. /**
  8. * @author yanglibin
  9. * @create 2020-03-07 17:16
  10. */
  11. object tableAPI2 {
  12. def main(args: Array[String]): Unit = {
  13. val env = StreamExecutionEnvironment.getExecutionEnvironment
  14. env.setParallelism(1)
  15. // table env
  16. val tableEnv = TableEnvironment.getTableEnvironment(env)
  17. val mapDS = env.readTextFile("c:/tmp/input/sensor-data.log")
  18. .map(
  19. line => {
  20. val datas = line.split(",")
  21. WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)
  22. }
  23. )
  24. val table: Table = tableEnv.fromDataStream(mapDS,'id,'ts,'vc)
  25. val stream: DataStream[(String,Long, Double)] = tableEnv.sqlQuery("select * from " + table)
  26. .toAppendStream[(String,Long, Double)] // 注意 from 后面的空格
  27. stream.print("sql>>")
  28. env.execute()
  29. }
  30. }