Table API 的简单使用
1. 引用依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.2</version>
</dependency>
2. 导入table的隐式转换
import org.apache.flink.table.api.scala._
3. 基本操作
object TableAPi {
def api1() = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val mapDS = env.readTextFile("c:/tmp/input/sensor-data.log")
.map(
line => {
val datas = line.split(",")
WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)
}
)
// 获取Table API 环境
TableEnvironment.getTableEnvironment(env)
.fromDataStream(mapDS)
.select("ts")
.toAppendStream[Long]
.print("table>>")
env.execute()
}
4. 使用 sql格式
package com.ylb.table
import com.ylb.myCluss.WaterSensor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala._
/**
* @author yanglibin
* @create 2020-03-07 17:16
*/
object tableAPI2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// table env
val tableEnv = TableEnvironment.getTableEnvironment(env)
val mapDS = env.readTextFile("c:/tmp/input/sensor-data.log")
.map(
line => {
val datas = line.split(",")
WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)
}
)
val table: Table = tableEnv.fromDataStream(mapDS,'id,'ts,'vc)
val stream: DataStream[(String,Long, Double)] = tableEnv.sqlQuery("select * from " + table)
.toAppendStream[(String,Long, Double)] // 注意 from 后面的空格
stream.print("sql>>")
env.execute()
}
}