Table API 的简单使用
1. 引用依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table_2.11</artifactId><version>1.7.2</version></dependency>
2. 导入table的隐式转换
import org.apache.flink.table.api.scala._ 
3. 基本操作
object TableAPi {def api1() = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval mapDS = env.readTextFile("c:/tmp/input/sensor-data.log").map(line => {val datas = line.split(",")WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)})// 获取Table API 环境TableEnvironment.getTableEnvironment(env).fromDataStream(mapDS).select("ts").toAppendStream[Long].print("table>>")env.execute()}
4. 使用 sql格式
package com.ylb.tableimport com.ylb.myCluss.WaterSensorimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import org.apache.flink.api.scala._import org.apache.flink.table.api.{Table, TableEnvironment}import org.apache.flink.table.api.scala._/*** @author yanglibin* @create 2020-03-07 17:16*/object tableAPI2 {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// table envval tableEnv = TableEnvironment.getTableEnvironment(env)val mapDS = env.readTextFile("c:/tmp/input/sensor-data.log").map(line => {val datas = line.split(",")WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)})val table: Table = tableEnv.fromDataStream(mapDS,'id,'ts,'vc)val stream: DataStream[(String,Long, Double)] = tableEnv.sqlQuery("select * from " + table).toAppendStream[(String,Long, Double)] // 注意 from 后面的空格stream.print("sql>>")env.execute()}}
