Streams ⇔ Dynamic Tables

持续的SQL查询

image.png
这些只是列举一些可用的连接器。在传统的SQL数据库中,SQL 查询构成了一个将输入表转换为输出表的关系代数。这里也有同样的想法:传入的流被解释为动态表,然后通过查询将其转换为新的动态表,然后再将其转换为输出流。 接下来,我们将了解这两种转换是如何执行的:

  • 流到动态表
  • 动态表回流

Stream → dynamic table conversion

大多数源产生流消息被追加到动态表。
image.png
这种转换有两种形式。第一种形式将传入的流记录追加到相应的动态表中。第二种传入记录也可以表示对动态表的更新。第二种形式源流消息可以对动态表进行更新进行。例如,可以从Postgres 数据库中以Debezium进行bin log日志抽取,即将CDC流转换更新动态表。

只有少部分源可以产生更新流

  • the Upsert Kafka connector
  • the Upsert Pulsar connector
  • the Kafka and FileSystem connectors when using changelog formats
    • Debezium JSON and Avro
    • Maxwell JSON
    • Canal JSON

Flink SQL 只能以某种有限的方式处理更新流。更多内容请看CDC and Changelog Streams

Dynamic table → stream conversion

查询动态表

追加结果表的行(append)

image.png
image.png
image.png
在某些情况下,就像上图查询只是过滤其输入,结果表只是追加。Flink 的任何一个 sinks 都可以支持这一点。例如,我们可以将此结果流追加到文件中,或将其插入到数据库中。

更新结果表的行updated

image.png
image.png
image.png

在其他情况下,就像这个计算每个用户点击次数的查询一样,结果流包括回溯更新以前发出的结果。 并非所有接收器都支持更新结果。例如,FileSink 不能覆盖以前的结果。

追加vs更新

image.png

  • 追加:仅仅只需要insert,所有sink都支持
  • 更新:
    • 某些sink不支持回溯,例如FileSink
    • 某些sink回溯是采用INSERT + DELETE
      • JDBC, Elasticsearch
    • 某些sink回溯是采用UPSERT + DELETE
      • JDBC, Elasticsearch, Upsert Kafka

有两种样式的动态表到流转换。一些操作符会产生一个 APPEND 流——例如,如果您只进行投影和过滤,则生成的流不需要进行任何撤回,而只需要对 sink 进行 INSERT。所有接收器都支持这一点。

另一种动态表到流转换的样式涉及更新先前发出的结果。有些接收器根本无法处理这个问题,或者只能以某些格式处理它。

一些接收器通过删除先前发出的行,然后插入新结果来实现更新,而其他接收器可以执行 UPSERT。此外,一些接收器,如 JDBC 数据库,支持这两种实现撤回的方案。

编目和表

编目

编目在 Flink 中扮演的角色与它们在传统数据库中扮演的角色相似——它们用于存储和管理与正在处理的数据相关的元数据,包括表定义。

使用 Flink SQL,正在处理的数据存在于外部系统中,例如 Kinesis、Kafka、Postgres 或 Hive。表定义是一种元数据,用于描述 Flink 如何将外部数据解释为 SQL 表。

如果您想使用目录来保存和共享此元数据,可以使用上面已经实现的编目。

使用这些外部编目不是必须的,但这样做可以为应用程序之间共享表定义、视图等带来方便。

使用编目的一个优点是设置创建 Flink 中使用的表所需的 SQL DDL 语句,需要了解 Flink 的要求和功能,以及所涉及的外部系统的知识。通过在编目中共享此配置数据,只需一个人来描述这些详细信息。

创建表

示例:由 Kafka 主题支持的 Orders 表

  1. CREATE TABLE Orders (
  2. order_id BIGINT,
  3. order_time TIMESTAMP(3),
  4. price DECIMAL(32, 2),
  5. quantity INT
  6. ) WITH (
  7. 'connector' = 'kafka',
  8. 'topic' = 'orders',
  9. 'properties.bootstrap.servers' = 'localhost:9092',
  10. 'properties.group.id' = 'orderGroup',
  11. 'format' = 'csv'
  12. )
  • 在默认编目的默认数据库中创建 Orders 表
  • 全表名字为 ..Orders
  • docs

请注意,此表不包含任何数据;数据位于支持此表的 Kafka 主题中。

提醒:Flink SQL 中的表不会持久存储任何数据。

数据保留在表定义基础的任何外部数据存储中。在此示例中,Orders 表的数据位于 Kafka 主题中。这段 SQL DDL 描述了存储在 Kafka 中的数据如何被 Flink SQL 解释为表。

列类型

  • 物理(常规)列
  • 元数据列
    • 这些允许访问每行连接器和/或格式元数据
  • 计算列

    • 可以引用常量、内置函数和其他列的虚拟列
    • 计算列不能使用子查询或引用其他计算列
      1. CREATE TABLE Orders (
      2. `order_id` BIGINT,
      3. `order_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', -- read/write metadata
      4. `offset` BIGINT METADATA VIRTUAL, -- read-only metadata
      5. `price` DECIMAL(32, 2),
      6. `quantity` INT,
      7. `cost` AS price * quanitity -- computed column
      8. ) WITH (
      9. 'connector' = 'kafka', ...
      10. )
      上面表包含上述3种类型列
  • order_time 来自kafka头的时间戳

  • VIRTUAL 关键字将此偏移量 METADATA 列标记为只读
    • 在这种情况下,Kafka 偏移量是 SELECT 查询中可用,但是 INSERT INTO不可用
  • 这个cost列不会被物理存储,它的类型是自动派生的

算子和状态

算子类型

  • 无状态类型
    • 投影(select)
    • 过滤(where)
  • 物化(有状态)算子:最终可能会使用无界状态。
    • Aggregation (GROUP BY)
    • Joins
  • 时态算子
    • 窗口聚合(GROUP BY, OVER,window table-valued functions)
    • Time-based joins (interval joins, temporal table joins)
    • 模式匹配 (MATCH_RECOGNIZE)

时态算子能够利用时间约束来限制他们保留的状态量。

物化算子

结果表的行存储在 Flink 状态,并随着新数据的到来而更新。
image.png

  • 聚合需要永远为每个用户维护一个计数
    • 每个用户都可以随时点击
  • 聚合状态随着每个新用户而增长
    • 对于某些聚合函数,状态随着每个新的输入行而增长

时态算子

每行追加到结果表中,并且从不更新
image.png
通过时态算子,例如这个计算每个用户每小时点击次数的窗口,Flink 会在内部为正在进行的窗口保持状态。但是,一旦每个小时的结果完成,它们就会追加到输出中,并且内部状态会被清除。

状态算子

  • 物化算子
    • 计算不受时间条件的限制,并且永远不会完成
    • 输入和输出记录可以更新或删除
    • 永久保存记录和/或结果
    • 状态可以随时间增长(取决于查询和数据)
  • 时态算子
    • 每条记录都与一个时态条件相关联
    • 只接收新输入
    • 以前添加的记录无法更新或删除
    • 将记录和/或结果保持在状态,不再需要后就不存了

上面是对我们前面内容的小结,理解物化运算符和时态运算符之间的区别是使用 Flink SQL 的关键。Flink 的 SQL 计划器识别特定的流式查询,它可以针对这些查询生成优化的执行计划,利用时间的流逝(或watermark的到来)知道何时可以安全地从托管的 Flink 状态中清除物化查询结果。

详细关于时态算子的细节,参考time and windows and joins

管理物化算子的状态大小

  • 查询状态可能无限增长
    • 取决于查询和输入表
  • 可以通过扩展集群来解决缓慢增长的状态
    • SELECT user, COUNT(*) FROM logins GROUP BY user;
  • 状态可以自动修剪
    • SELECT session, COUNT(*) FROM clicks GROUP BY session;
    • 空闲超时后可以删除行和持久化结果

在某些情况下,物化运算符对状态的永不满足的需求可以通过扩展集群来管理,或者通过添加约束将查询转换为使用时间运算符的查询。但在其他情况下,有必要使用状态 TTL 来删除(希望)过时的状态。

空闲状态清理

  • 配置 Flink 自动移除 一段 时间未访问的状态
    • table.exec.state.ttl
    • 移除状态时查询结果不更新
  • 如果被移除的状态不再需要,查询结果保持一致
  • 如果查询需要被删除的状态,查询结果将变得不一致
  • 需要权衡准确性和状态大小

总结

  • 在(动态)表上执行查询
    • 生成的动态表可能是仅追加或更新的,具体取决于输入和查询
  • 动态表可以来回转换成流
    • 仅 INSERT,Changelog 流(如 INSERT+DELETE 或 UPSERT+DELETE)
  • 注意你的查询状态不会无限增长
    • 尽可能尝试使用具有高效状态管理的时态运算符
    • 可以使用 RocksDB 状态后端来管理非常大的状态
    • 能需要设置一个空闲状态保留间隔(table.exec.state.ttl)