何为流表
将无限数据流抽象成表的概念。流中每条数据对应表中每一行。Table API和SQL相当于对一张流表持续处理,不断输出值。每来一条数据都会进入SQL编译好的程序中进行处理。
为何需要Table/SQL API
开发简单,业务人员也可以使用,极大的降低开发成本。
通用原理
retraction机制
每次聚合要清除groupBy中group上一次聚合计算的值。retract方法。
数据类型
新版(1.10)之后数据类型用DataTypes不用TypeInfomation了。DataTypes更贴近SQL。
Table API
依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.0</version>
</dependency>
执行下面代码必须有上面依赖
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
输入输出
TableSource和Sink就是Source和Sink加上Schema。
注册表
Flink 1.10标准用法,其他过时。
tEnv.connect().createTemporaryTable("table_name");
三种方式
tEnv.connect().registerTableSource("foo");
// defined table source
csv = new CsvTableSource();
tEnv.registerTableSource(csv) #过时
// from data stream
tEnv.registerDataStream("foo", dataStream, "word");
其中connect()三要素:连接器、序列化器、表结构。这三个可以做成中台的接口,动态配置。
输出表
tEnv.connect().registerTableSink("foo");
// defined table sink
csv = new CsvTableSink();
tEnv.registerTableSink(csv); #过时
// to data stream
tEnv.toAppendStream(result, Row.class)
tEnv.toRetractStream(result, Row.class);
操作表
Columns Operation
table.dropColumns("a, b");
Colunmns Function
select("withColumns(2 to 4)");
select("withoutColumns(2 to 4)");
Row-based operation
Group By Window
groupBy之后的select只能调用w.xxx 以及xx.sum, xx.max等聚合函数。
.window(Tumble.over("4.seconds").on("proctime").as("w"))
.groupBy("w")
.select("w.end, reply_count.sum");
优化
count函数调用两次,最终只会算一次。
Schema自动推倒序列化Format
TABLE本身的坑
时间格式必须是TZ这种
“%Y-%m-%dT%H:%M:%S.%fZ”
ZK设置成空也可以的
.property("zookeeper.connect", "")
连接外部系统
坑
kafka必须加.version(“”)。要不报如下错误
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory
对接Kafka必须TIMESTAMP(3)。
Flink 1.10.0 的坑
createTemporaryTable不能识别proctime和rowtime。需等版本更新
https://issues.apache.org/jira/browse/FLINK-16160
调用栈
createTermporaryTable()
ConnectTableDescriptor.getTableSchema()
DescriptorProperties.getTableSchema(Schema.SCHEMA);
DescriptorProperties.getOptionalTableSchema("schema")
数据类型
Array类型声明
ARRAY<ROW(user_id222 STRING, name222 STRING)>