何为流表

将无限数据流抽象成表的概念。流中每条数据对应表中每一行。Table API和SQL相当于对一张流表持续处理,不断输出值。每来一条数据都会进入SQL编译好的程序中进行处理。

为何需要Table/SQL API

开发简单,业务人员也可以使用,极大的降低开发成本。

通用原理

retraction机制

每次聚合要清除groupBy中group上一次聚合计算的值。retract方法。

数据类型

新版(1.10)之后数据类型用DataTypes不用TypeInfomation了。DataTypes更贴近SQL。

Table API

依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-table-planner_2.11</artifactId>
  4. <version>1.9.0</version>
  5. </dependency>

执行下面代码必须有上面依赖

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

输入输出

TableSource和Sink就是Source和Sink加上Schema。

注册表

Flink 1.10标准用法,其他过时。

  1. tEnv.connect().createTemporaryTable("table_name");

三种方式

  1. tEnv.connect().registerTableSource("foo");
  2. // defined table source
  3. csv = new CsvTableSource();
  4. tEnv.registerTableSource(csv) #过时
  5. // from data stream
  6. tEnv.registerDataStream("foo", dataStream, "word");

其中connect()三要素:连接器、序列化器、表结构。这三个可以做成中台的接口,动态配置。

输出表

  1. tEnv.connect().registerTableSink("foo");
  2. // defined table sink
  3. csv = new CsvTableSink();
  4. tEnv.registerTableSink(csv); #过时
  5. // to data stream
  6. tEnv.toAppendStream(result, Row.class)
  7. tEnv.toRetractStream(result, Row.class);

操作表

Columns Operation

  1. table.dropColumns("a, b");

增删改查列

Colunmns Function

  1. select("withColumns(2 to 4)");
  2. select("withoutColumns(2 to 4)");

Row-based operation

Group By Window

groupBy之后的select只能调用w.xxx 以及xx.sum, xx.max等聚合函数。

  1. .window(Tumble.over("4.seconds").on("proctime").as("w"))
  2. .groupBy("w")
  3. .select("w.end, reply_count.sum");

优化

count函数调用两次,最终只会算一次。

Schema自动推倒序列化Format

TABLE本身的坑

时间格式必须是TZ这种

“%Y-%m-%dT%H:%M:%S.%fZ”

ZK设置成空也可以的

  1. .property("zookeeper.connect", "")

连接外部系统

kafka必须加.version(“”)。要不报如下错误

org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory

对接Kafka必须TIMESTAMP(3)。

Flink 1.10.0 的坑

createTemporaryTable不能识别proctime和rowtime。需等版本更新
https://issues.apache.org/jira/browse/FLINK-16160

调用栈

  1. createTermporaryTable()
  2. ConnectTableDescriptor.getTableSchema()
  3. DescriptorProperties.getTableSchema(Schema.SCHEMA);
  4. DescriptorProperties.getOptionalTableSchema("schema")

数据类型

Array类型声明

  1. ARRAY<ROW(user_id222 STRING, name222 STRING)>