Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现所有的特性。不是所有的 [Table API,SQL] 和 [流,批] 的组合都是支持的。
引用API
需要引入的 pom 依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.11.2</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.11.2</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.11.2</version><scope>provided</scope></dependency>
2.11为Scala版本,1.11.2为flink版本
表环境创建方式
//1.创建流式处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//并行度设置
env.setParallelism(1)
//2.创建表执行环境
val tableEnv = StreamTableEnvironment.create(env)
//2.1 基于老版本planner的流处理
val settings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build()
val oldStreamTableEnv = StreamTableEnvironment.create(env,settings)
//2.2 基于老版本planner的批处理
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val oldBatchTableEnv = BatchTableEnvironment.create(batchEnv)
//2.3 基于新版本blink planner的流处理
val blinkStreamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val blinkStreamTableEnv = StreamTableEnvironment.create(env,blinkStreamSettings)
//2.3 基于新版本blink planner的批处理
val blinkBatchSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build()
val blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings)
注意 对于生产环境,建议使用在1.11版本之后已经变成默认的Blink Planner
//2.3 基于新版本blink planner的流处理
val blinkStreamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val blinkStreamTableEnv = StreamTableEnvironment.create(env,blinkStreamSettings)
//2.3 基于新版本blink planner的批处理
val blinkBatchSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build()
val blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings)
API调用
https://www.cnblogs.com/shengyang17/p/14342173.html
基本程序结构
val tableEnv = ... // 创建表环境
// 创建表
tableEnv.connect(...).createTemporaryTable("table1")
// 注册输出表
tableEnv.connect(...).createTemporaryTable("outputTable")
// 使用 Table API query 创建表
val tapiResult = tableEnv.from("table1").select(...)
// 使用 SQL query 创建表
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ...")
// 输出一张结果表到 TableSink,SQL查询的结果表也一样
TableResult tableResult = tapiResult.executeInsert("outputTable");
tableResult...
// 执行
tableEnv.execute("scala_job")
注意
注意:需要导入的隐式类型转换
org.apache.flink.table.api._
org.apache.flink.api.scala._
org.apache.flink.table.api.bridge.scala._
创建表
TableEnvironment可以调用.connect()方法,连接外部系统,并调用.createTemporaryTable()方法,在Catalog中注册表
tableEnv
.connect(...) //定义表的数据结构,和外部系统建立连接
.withFormat(...) //定义数据格式化方法
.withSchema(...) //定义表结构
.createTemporaryTable("MyTable") //创建临时表
读取外部数据
从文件系统中读取数据
val tableEnv = StreamTableEnvironment.create(env)
//连接外部系统,读取数据,注册表
//读取文件
val filePath = "D:\\IDEA\\flink\\src\\main\\resources\\stock\\sensor.txt"
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id",DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable")
val inputTable: Table = tableEnv.from("inputTable")
inputTable.toAppendStream[(String,Long,Double)].print()
env.execute("table api test")
其中Csv()为新版本的描述器,flink没有直接提供,要想使用新版本描述器,需要引用依赖flink-csv
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.11.2</version>
</dependency>
从kafka读取数据
https://www.jianshu.com/p/4824641a757b
一个完整的配置实例:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
tableEnv.connect(new Kafka()
//kafka连接配置
.version("universal")
.topic("sensor")
.property("zookeeper.connect", "192.168.188.8:2181")
.property("bootstrap.servers", "192.168.188.8:9092")
)
//定义表结构
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
//创建临时表
.createTemporaryTable("kafkaInputTable")
val inputTable: Table = tableEnv.from("kafkaInputTable")
inputTable.toAppendStream[(String, Long, Double)].print()
表查询
利用外部系统的连接器connector,我们可以读写数据,并在环境的Catalog中注册表。接下来就可以对表做查询转换了。
Flink给我们提供了两种查询方式:Table API和 SQL。
方式一:table api
//查询转换
val sensorTable = tableEnv.from("inputTable")
val resultTable = sensorTable
.select("id,temperature")
.filter('id === "")
- val sensorTable = tableEnv.from(“inputTable”),使用表API操作表之前,需要先进行转换
- 查询语句的表达式写法(方式一与方式二都是flink1.9与1.10中table api的写法,方式三是flink1.11以后官网推荐的写法)
- 方式一(使用
',):.select('id,'temperature) - 方式二(使用
" "):.select("id,temperature") - 方式三(使用
$(" ")):.select($"id",$"temperature")
- 方式一(使用
- 注意:filter判断字段值是否等于一个值时,使用
===(因为等号前面是一个对象,等号后面是一个字符串,不能使用==)
方式二:使用SQL语句
// 3.2 sql
val resultSqlTable = tableEnv.sqlQuery(
"""
|select id,temperature
|from inputTable
|where id = 'sensor_1'
|""".stripMargin)
推荐写法,模板字符串写SQL
将DataStream转化为表
Flink允许我们把Table和DataStream做转换:我们可以基于一个DataStream,先流式地读取数据源,然后map成POJO,再把它转成Table。Table的列字段(column fields),就是POJO里的字段,这样就不用再麻烦地定义schema了。(定义表的时候需要定义表结构,但将datastream转换为表后,就不要麻烦的定义表结构schema了,此时表结构结尾数据流中的结构,数据流中每一个字段就是表每一行对应的字段名)
代码表达
代码中实现非常简单,直接用tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。
如何单独指定字段的对应关系?首先明白数据类型与scheme的对应关系
数据类型与scheme的对应
在上节的例子中,DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照类中的字段名来对应的(name-based mapping),所以还可以用as做重命名。
基于名称的对应:Table sensorTable = tableEnv.fromDataStream(dataStream, "timestamp as ts, id as myId, temperature");
基于位置的对应: val sensorTable = tableEnv.fromDataStream(dataStream, 'myId, 'ts)
Flink的DataStream和 DataSet API支持多种类型。比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问。其他类型,则被视为原子类型。元组类型,默认的名称是 “_1”, “_2”;而原子类型,默认名称是 ”f0”。
创建临时视图(Temporary View)
方式一:直接从DataStream转换
同样,可以直接对应字段转换;也可以在转换的时候,指定相应的字段。
tableEnv.createTemporaryView("datatable",dataTable)
tableEnv.createTemporaryView("sensorView", dataStream, "id, temperature, timestamp as ts");
方式二:基于table创建临时视图
tableEnv.createTemporaryView("sensorView", sensorTable);
注意:
- 创建临时视图的意义所在:在当前环境中注册了一张临时表,在当前环境中可以使用table的实例
- View和Table的Schema完全相同。事实上,在Table API中,可以认为View和Table是等价的。
输出表
表的输出,是通过将数据写入TableSink 来实现的。TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列。
实现方法
输出表最直接的方法,就是通过Table.insertInto() 方法将一个 Table 写入注册过的 TableSink 中。
输出到文件
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, OldCsv, Schema}
object FileOutputTest {
def main(args: Array[String]): Unit = {
//1.创建流式处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//并行度设置
env.setParallelism(1)
//2.创建表执行环境
val tableEnv = StreamTableEnvironment.create(env)
/**
* 文件读入,将读入的数据转化为表结构
*/
val filePath = "D:\\IDEA\\flink\\src\\main\\resources\\stock\\sensor.txt"
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
//创建临时表
.createTemporaryTable("inputTable")
// 转换操作
val sensorTable = tableEnv.from("inputTable")
val resultTable = sensorTable
.select($"id",$"temperature")
.filter($"id" === "sensor_1")
//输出打印
// resultTable.toAppendStream[(String,Double)].print("resultTable")
//输出到文件
val outFilePath = "D:\\IDEA\\flink\\src\\main\\resources\\stock\\output.txt"
tableEnv.connect(new FileSystem().path(outFilePath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temperature", DataTypes.DOUBLE())
)
//创建临时表(注册输出表)
.createTemporaryTable("outputTable")
//输出到文件
result.executeInsert("outputTable")
}
}

输出到kafka
结合前面Kafka作为输入数据,构建数据管道,kafka进,kafka出
object outputKafkaTest {
def main(args: Array[String]): Unit = {
//1.创建流式处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//并行度设置
env.setParallelism(1)
//2.创建表执行环境
val tableEnv = StreamTableEnvironment.create(env)
//从kafka读取数据
tableEnv.connect(new Kafka()
.version("universal")
.topic("sensor")
.property("zookeeper.connect", "192.168.188.8:2181")
.property("bootstrap.servers", "192.168.188.8:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable")
//3.查询转换
val sensorTable:Table = tableEnv.from("kafkaInputTable")
//3.1简单转换
val resultTable = sensorTable
.select($"id", $"temperature")
.filter($"id" === "sensor_1")
// 输出到kafka
tableEnv.connect(new Kafka()
.version("universal")
.topic("sinktest")
.property("zookeeper.connect", "192.168.188.8:2181")
.property("bootstrap.servers", "192.168.188.8:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaOutputTable")
resultTable.insertInto("kafkaOutputTable")
tableEnv.execute("kafka test")
}
}
注意:
(1)输入与输出的topic名称不一样
(2)从 1.11 版本开始,sqlUpdate 方法 和 insertInto 方法被废弃,从这两个方法构建的 Table 程序必须通过 StreamTableEnvironment.execute() 方法执行,而不能通过StreamExecutionEnvironment.execute() 方法来执行。
更新模式
1)追加模式(Append Mode)
在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。
2)撤回模式(Retract Mode)
在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。
- 插入(Insert)会被编码为添加消息;
- 删除(Delete)则编码为撤回消息;
- 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行)的添加消息。
在此模式下,不能定义key,这一点跟upsert模式完全不同。
3)Upsert(更新插入)模式
在Upsert模式下,动态表和外部连接器交换Upsert和Delete消息。
这个模式需要一个唯一的key,通过这个key可以传递更新消息。为了正确应用消息,外部连接器需要知道这个唯一key的属性。
- 插入(Insert)和更新(Update)都被编码为Upsert消息;
- 删除(Delete)编码为Delete信息。
这种模式和Retract模式的主要区别在于,Update操作是用单个消息编码的,所以效率会更高。
将表转换为datastream
动态表
下图显示了流、动态表和连续查询的关系:
流式持续查询的过程:
- 流被转换为动态表。(该动态表不涉及更改删除操作,只会不断追加数据)
- 对动态表计算连续查询,生成新的动态表。
- 生成的动态表被转换回流。
下图显示了如何将访问URL事件流,或者叫点击事件流(左侧)转换为表(右侧)
