维表 JOIN 语法

由于维表是一张不断变化的表(静态表只是动态表的一种特例)。那如何 JOIN 一张不断变化的表呢?如果用传统的 JOIN 语法SELECT * FROM T JOIN dimtable on T.id = dim_table.id来表达维表 JOIN,是不完整的。因为维表是一直在更新变化的,如果用这个语法那么关联上的是哪个时刻的维表呢?我们是不知道的,结果是不确定的。所以 Flink SQL 的维表 JOIN 语法引入了 SQL:2011 Temporal Table 的标准语法,用来声明关联的是维表哪个时刻的快照。维表 JOIN 语法/示例如下。
假设我们有一个 Orders 订单数据流,希望根据产品 ID 补全流上的产品维度信息,所以需要跟 Products 维度表进行关联。Orders 和 Products 的 DDL 声明语句如下:
CREATE TABLE Orders (
orderId VARCHAR,
— 订单 id
productId VARCHAR,
— 产品 id
units INT,
— 购买数量
orderTime TIMESTAMP
— 下单时间
) with ( type = ‘tt’,
— tt 日志流_ …)

CREATE TABLE Products (
productId VARCHAR, — 产品 id
name VARCHAR, — 产品名称
unitPrice DOUBLE — 单价
PERIOD FOR SYSTEMTIME, — 这是一张随系统时间而变化的表,用来声明维表
PRIMARY KEY (productId)
— 维表必须声明主键
) with ( type = ‘alihbase’,
— HBase 数据源_ …)
如上声明了一张来自 TT 的 Orders 订单数据流,和一张存储于 HBase 中的 Products 产品维表。为了补齐订单流的产品信息,需要 JOIN 维表,这里可以分为 JOIN 当前表和 JOIN 历史表。

JOIN 当前维表

SELECT * FROM Orders AS o[LEFT] JOIN Products FOR SYSTEM_TIME AS OF PROCTIME() AS pON o.productId = p.productId
Flink SQL 支持 LEFT JOIN 和 INNER JOIN 的维表关联。如上语法所示的,维表 JOIN 语法与传统的 JOIN 语法并无二异。只是 Products 维表后面需要跟上 FOR SYSTEM_TIME AS OF PROCTIME() 的关键字,其含义是每条到达的数据所关联上的是到达时刻的维表快照,也就是说,当数据到达时,我们会根据数据上的 key 去查询远程数据库,拿到匹配的结果后关联输出。这里的 PROCTIME 即 processing time。使用 JOIN 当前维表功能需要注意的是,如果维表插入了一条数据能匹配上之前左表的数据时,JOIN的结果流,不会发出更新的数据以弥补之前的未匹配。JOIN行为只发生在处理时间(processing time),即使维表中的数据都被删了,之前JOIN流已经发出的关联上的数据也不会被撤回或改变。
FlinkSQL维表Lookup Join - 图1

JOIN 历史维表

SELECT *FROM Orders AS o[LEFT] JOIN Products FOR SYSTEMTIME AS OF o.orderTime AS pON o.productId = p.productId
有时候想关联上的维度数据,并不是当前时刻的值,而是某个历史时刻的值。比如,产品的价格一直在发生变化,订单流希望补全的是下单时的价格,而不是当前的价格,那就是 JOIN 历史维表。语法上只需要将上文的 PROCTIME() 改成 o.orderTime 即可。含义是关联上的是下单时刻的 Products 维表。
FlinkSQL维表Lookup Join - 图2
Flink 在获取维度数据时,会根据左流的时间去查对应时刻的快照数据。因此 JOIN 历史维表需要外部存储支持多版本存储,如 HBase,或者存储的数据中带有多版本信息。
注:JOIN 历史维表功能目前暂未开放_

维表优化

在实际使用的过程中,会遇到许多性能问题。为了解决这些性能问题,我们做了大量的优化,性能得到大幅提升,在双11的洪峰下表现特别淡定。
我们的优化主要是为了解决两方面的问题:
1. 提高吞吐。维表的IO请求严重阻塞了数据流的计算处理。
2. 降低维表数据库读压力。如 HBase 也只能承受单机每秒 20 万的读请求。

Async IO

我在 《Flink 原理与实现:Aysnc I/O》中介绍了 Async IO 的使用场景和实现原理。原始的维表 JOIN 是同步访问的方式,来一条数据,去数据库查询一次,等待返回后输出关联结果。可以发现网络等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式可以并发地处理多个请求和回复,从而连续的请求之间不需要阻塞等待。
FlinkSQL维表Lookup Join - 图3

缓存

数据库的维表查询请求,有大量相同 key 的重复请求。如何减少重复请求?本地缓存是常用的方案。Flink SQL 目前提供两种缓存方案:LRU 和 ALL。(详见文档)

LRU

通过 cache=’LRU’参数可以开启 LRU 缓存优化,Blink 会为每个 JoinTable 节点创建一个 LRU 本地缓存。当每个数据进来的时候,先去缓存中查询,如果存在则直接关联输出,减少了一次 IO 请求。如果不存在,再发起数据库查询请求(异步或同步方式),请求返回的结果会先存入缓存中以备下次查询。
为了防止缓存无限制增长,所以使用的是 LRU 缓存,并且可以通过 cacheSize 调整缓存的大小。为了定期更新维表数据,可以通过 cacheTTLMs 调整缓存的失效时间。cacheTTLMs 是作用于每条缓存数据上的,也就是某条缓存数据在指定 timeout 时间内没有被访问,则会从缓存中移除。

ALL

Async 和 LRU-Cache 能极大提高吞吐率并降低数据库的读压力,但是仍然会有大量的 IO 请求存在,尤其是当 miss key(维表中不存在的 key)很多的时候。如果维表数据不大(通常百万级以内),那么其实可以将整个维表缓存到本地。那么 miss key 永远不会去请求数据库。因为本地缓存就是维表的镜像,缓存中不存在那么远程数据库中也不存在。
ALL cache 可以通过 cache=’ALL’参数开启,通过cacheTTLMs控制缓存的刷新间隔。Flink SQL 会为 JoinTable 节点起一个异步线程去同步缓存。在 Job 刚启动时,会先阻塞主数据流,直到缓存数据加载完毕,保证主数据流流过时缓存就已经 ready。在之后的更新缓存的过程中,不会阻塞主数据流。因为异步更新线程会将维表数据加载到临时缓存中,加载完毕后再与主缓存做原子替换。只有替换操作是加了锁的。
因为几乎没有 IO 操作,所以使用 cache ALL 的维表 JOIN 性能可以非常高。但是由于内存需要能同时容纳下两份维表拷贝,因此需要加大内存的配置。

缓存未命中 key

在使用 LRU 缓存时,如果存在大量的 invalid key ,或者数据库中不存在的 key。由于命中不了缓存,导致缓存的收益较低,仍然会有大量请求打到数据库。因此我们将未命中的 key 也加进了缓存,提高了未命中 key 和 invalid key 情况下的缓存命中率。

Distribute By 提高缓存命中率

默认 JoinTable 节点与上游节点之间的数据是通过 shuffle 传输的。这在缓存大小有限、key总量大、热点不明显的情况下, 缓存的收益可能较低。这种情况下可以将上游节点与 JoinTable 节点的数据传输改成按 key 分区。这样通常可以缩小单个节点的 key 个数,提高缓存的命中率。比如一段时间内 JoinTable 节点总共需要处理100万个key, 节点并发100, 在数据不倾斜时单节点平均只需处理1万个key = 100万/100并发. 如果不做 key 分区, 单节点实际处理的key个数可能远大于1万。使用上也非常简单,在维表的 DDL 参数中加上partitionedJoin=’true’即可。

最佳实践

在使用维表 JOIN 时,如果维表数据不大,或者 miss key (维表中不存在的 key)非常多,则可以使用 ALL cache,但是可能需要适当调大节点的内存,因为内存需要能同时容纳下两份维表拷贝。如果用不了 ALL cache,则可以使用 Async + LRU 来提高节点的吞吐。