Streams ⇔ Dynamic Tables
持续的SQL查询
这些只是列举一些可用的连接器。在传统的SQL数据库中,SQL 查询构成了一个将输入表转换为输出表的关系代数。这里也有同样的想法:传入的流被解释为动态表,然后通过查询将其转换为新的动态表,然后再将其转换为输出流。 接下来,我们将了解这两种转换是如何执行的:
- 流到动态表
- 动态表回流
Stream → dynamic table conversion
大多数源产生流消息被追加到动态表。
这种转换有两种形式。第一种形式将传入的流记录追加到相应的动态表中。第二种传入记录也可以表示对动态表的更新。第二种形式源流消息可以对动态表进行更新进行。例如,可以从Postgres 数据库中以Debezium进行bin log日志抽取,即将CDC流转换更新动态表。
只有少部分源可以产生更新流
- the Upsert Kafka connector
- the Upsert Pulsar connector
- the Kafka and FileSystem connectors when using changelog formats
- Debezium JSON and Avro
- Maxwell JSON
- Canal JSON
Flink SQL 只能以某种有限的方式处理更新流。更多内容请看CDC and Changelog Streams
Dynamic table → stream conversion
查询动态表
追加结果表的行(append)
在某些情况下,就像上图查询只是过滤其输入,结果表只是追加。Flink 的任何一个 sinks 都可以支持这一点。例如,我们可以将此结果流追加到文件中,或将其插入到数据库中。
更新结果表的行updated
在其他情况下,就像这个计算每个用户点击次数的查询一样,结果流包括回溯更新以前发出的结果。 并非所有接收器都支持更新结果。例如,FileSink 不能覆盖以前的结果。
追加vs更新
- 追加:仅仅只需要insert,所有sink都支持
- 更新:
- 某些sink不支持回溯,例如FileSink
- 某些sink回溯是采用INSERT + DELETE
- JDBC, Elasticsearch
- 某些sink回溯是采用UPSERT + DELETE
- JDBC, Elasticsearch, Upsert Kafka
有两种样式的动态表到流转换。一些操作符会产生一个 APPEND 流——例如,如果您只进行投影和过滤,则生成的流不需要进行任何撤回,而只需要对 sink 进行 INSERT。所有接收器都支持这一点。
另一种动态表到流转换的样式涉及更新先前发出的结果。有些接收器根本无法处理这个问题,或者只能以某些格式处理它。
一些接收器通过删除先前发出的行,然后插入新结果来实现更新,而其他接收器可以执行 UPSERT。此外,一些接收器,如 JDBC 数据库,支持这两种实现撤回的方案。
编目和表
编目
- 保存有关的元数据
- 数据库、表、函数、视图
- 实现
- 编目并不是必须的
- 你也可以简单的使用临时表、视图、函数
编目在 Flink 中扮演的角色与它们在传统数据库中扮演的角色相似——它们用于存储和管理与正在处理的数据相关的元数据,包括表定义。
使用 Flink SQL,正在处理的数据存在于外部系统中,例如 Kinesis、Kafka、Postgres 或 Hive。表定义是一种元数据,用于描述 Flink 如何将外部数据解释为 SQL 表。
如果您想使用目录来保存和共享此元数据,可以使用上面已经实现的编目。
使用这些外部编目不是必须的,但这样做可以为应用程序之间共享表定义、视图等带来方便。
使用编目的一个优点是设置创建 Flink 中使用的表所需的 SQL DDL 语句,需要了解 Flink 的要求和功能,以及所涉及的外部系统的知识。通过在编目中共享此配置数据,只需一个人来描述这些详细信息。
创建表
示例:由 Kafka 主题支持的 Orders 表
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)
- 在默认编目的默认数据库中创建 Orders 表
- 全表名字为
. .Orders - docs
请注意,此表不包含任何数据;数据位于支持此表的 Kafka 主题中。
提醒:Flink SQL 中的表不会持久存储任何数据。
数据保留在表定义基础的任何外部数据存储中。在此示例中,Orders 表的数据位于 Kafka 主题中。这段 SQL DDL 描述了存储在 Kafka 中的数据如何被 Flink SQL 解释为表。
列类型
- 物理(常规)列
- 元数据列
- 这些允许访问每行连接器和/或格式元数据
计算列
- 可以引用常量、内置函数和其他列的虚拟列
- 计算列不能使用子查询或引用其他计算列
上面表包含上述3种类型列CREATE TABLE Orders (
`order_id` BIGINT,
`order_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', -- read/write metadata
`offset` BIGINT METADATA VIRTUAL, -- read-only metadata
`price` DECIMAL(32, 2),
`quantity` INT,
`cost` AS price * quanitity -- computed column
) WITH (
'connector' = 'kafka', ...
)
order_time 来自kafka头的时间戳
- VIRTUAL 关键字将此偏移量 METADATA 列标记为只读
- 在这种情况下,Kafka 偏移量是 SELECT 查询中可用,但是 INSERT INTO不可用
- 这个cost列不会被物理存储,它的类型是自动派生的
算子和状态
算子类型
- 无状态类型
- 投影(select)
- 过滤(where)
- 物化(有状态)算子:最终可能会使用无界状态。
- Aggregation (GROUP BY)
- Joins
- 时态算子
- 窗口聚合(GROUP BY, OVER,window table-valued functions)
- Time-based joins (interval joins, temporal table joins)
- 模式匹配 (MATCH_RECOGNIZE)
时态算子能够利用时间约束来限制他们保留的状态量。
物化算子
结果表的行存储在 Flink 状态,并随着新数据的到来而更新。
- 聚合需要永远为每个用户维护一个计数
- 每个用户都可以随时点击
- 聚合状态随着每个新用户而增长
- 对于某些聚合函数,状态随着每个新的输入行而增长
时态算子
每行追加到结果表中,并且从不更新
通过时态算子,例如这个计算每个用户每小时点击次数的窗口,Flink 会在内部为正在进行的窗口保持状态。但是,一旦每个小时的结果完成,它们就会追加到输出中,并且内部状态会被清除。
状态算子
- 物化算子
- 计算不受时间条件的限制,并且永远不会完成
- 输入和输出记录可以更新或删除
- 永久保存记录和/或结果
- 状态可以随时间增长(取决于查询和数据)
- 时态算子
- 每条记录都与一个时态条件相关联
- 只接收新输入
- 以前添加的记录无法更新或删除
- 将记录和/或结果保持在状态,不再需要后就不存了
上面是对我们前面内容的小结,理解物化运算符和时态运算符之间的区别是使用 Flink SQL 的关键。Flink 的 SQL 计划器识别特定的流式查询,它可以针对这些查询生成优化的执行计划,利用时间的流逝(或watermark的到来)知道何时可以安全地从托管的 Flink 状态中清除物化查询结果。
详细关于时态算子的细节,参考time and windows and joins
管理物化算子的状态大小
- 查询状态可能无限增长
- 取决于查询和输入表
- 可以通过扩展集群来解决缓慢增长的状态
- SELECT user, COUNT(*) FROM logins GROUP BY user;
- 状态可以自动修剪
- SELECT session, COUNT(*) FROM clicks GROUP BY session;
- 空闲超时后可以删除行和持久化结果
在某些情况下,物化运算符对状态的永不满足的需求可以通过扩展集群来管理,或者通过添加约束将查询转换为使用时间运算符的查询。但在其他情况下,有必要使用状态 TTL 来删除(希望)过时的状态。
空闲状态清理
- 配置 Flink 自动移除 一段 时间未访问的状态
- table.exec.state.ttl
- 移除状态时查询结果不更新
- 如果被移除的状态不再需要,查询结果保持一致
- 如果查询需要被删除的状态,查询结果将变得不一致
- 需要权衡准确性和状态大小
总结
- 在(动态)表上执行查询
- 生成的动态表可能是仅追加或更新的,具体取决于输入和查询
- 动态表可以来回转换成流
- 仅 INSERT,Changelog 流(如 INSERT+DELETE 或 UPSERT+DELETE)
- 注意你的查询状态不会无限增长
- 尽可能尝试使用具有高效状态管理的时态运算符
- 可以使用 RocksDB 状态后端来管理非常大的状态
- 能需要设置一个空闲状态保留间隔(table.exec.state.ttl)