Joins 是批数据处理中常见且易于理解的操作,用于连接两个关系的行。但是,动态表上的连接语义不太明显甚至令人困惑。

因此,有几种方法可以使用 Table API 或 SQL 实际执行连接。

有关语法的更多信息,请查看 Table APISQL 中的连接部分。

常规联接

常规联接是最通用的联接类型,其中任何新记录或对联接输入任一侧的更改都是可见的并且影响整个联接结果。例如,如果左侧有新记录,则它将与右侧的所有先前和未来记录进行连接。

  1. SELECT * FROM Orders
  2. INNER JOIN Product
  3. ON Orders.productId = Product.id

这些语义允许任何类型的更新(插入,更新,删除)输入表。

但是,此操作具有重要意义:它需要将连接输入的两侧永久保持在Flink的状态。 因此,如果一个或两个输入表不断增长,资源使用也将无限增长。

时间窗口连接

基于时间窗口的连接由连接谓词定义,该连接谓词检查输入记录的时间属性是否在某些时间限制内,即时间窗口。

  1. SELECT *
  2. FROM
  3. Orders o,
  4. Shipments s
  5. WHERE o.id = s.orderId AND
  6. o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

与常规连接操作相比,此类连接仅支持具有时间属性的仅追加表(append-only tables)。由于时间属性是 quasi-monontic 增加,Flink可以从其状态中删除旧值而不影响结果的正确性。

时态表连接

具有时态表的连接将仅追加(append-only)表(左输入/探测侧)与时态表(右输入/构建侧)连接,即随时间变化并跟踪其变化的表。 有关时态表的更多信息,请查看相应的页面。

以下示例显示了一个仅附加表 Orders,它应与不断变化的货币汇率表 RatesHistory 连接。

Orders 是一个仅附加表,表示给定 金额 和给定 货币 的付款。 例如,在 10:15,订单金额为 2欧元

  1. SELECT * FROM Orders;
  2. rowtime amount currency
  3. ======= ====== =========
  4. 10:15 2 Euro
  5. 10:30 1 US Dollar
  6. 10:32 50 Yen
  7. 10:52 3 Euro
  8. 11:04 5 US Dollar

RatesHistory 代表一个不断变化的仅追加货币汇率表,相对于 Yen(汇率为 1)。例如,从 09:0010:45欧元日元期间的汇率为 114。 从 10:4511:15,它是 116

  1. SELECT * FROM RatesHistory;
  2. rowtime currency rate
  3. ======= ======== ======
  4. 09:00 US Dollar 102
  5. 09:00 Euro 114
  6. 09:00 Yen 1
  7. 10:45 Euro 116
  8. 11:15 Euro 119
  9. 11:49 Pounds 108

鉴于我们想要计算转换为通用货币(日元)的所有 Orders 的数量。

例如,我们希望使用给定 rowtime114)的适当转换率转换以下顺序。

  1. rowtime amount currency
  2. ======= ====== =========
  3. 10:15 2 Euro

如果不使用时态表的概念,就需要编写如下查询:

  1. SELECT
  2. SUM(o.amount * r.rate) AS amount
  3. FROM Orders AS o,
  4. RatesHistory AS r
  5. WHERE r.currency = o.currency
  6. AND r.rowtime = (
  7. SELECT MAX(rowtime)
  8. FROM RatesHistory AS r2
  9. WHERE r2.currency = o.currency
  10. AND r2.rowtime <= o.rowtime);

借助于一个时间表函数 Rates 而不是 RatesHistory,我们可以在SQL中表达这样一个查询:

  1. SELECT
  2. o.amount * r.rate AS amount
  3. FROM
  4. Orders AS o,
  5. LATERAL TABLE (Rates(o.rowtime)) AS r
  6. WHERE r.currency = o.currency

来自探测器侧的每个记录将在探测器侧记录的相关时间属性时与构建侧表的版本连接。为了支持构建端表上先前值的更新(覆盖),该表必须定义主键。

在我们的例子中,来自 Orders的每条记录将在 o.rowtime 时加入 Rates 的版本。currency 字段已被定义为之前 Rates 的主键,用于连接我们示例中的两个表。如果查询使用处理时间概念,则在执行操作时,新添加的订单将始终与最新版本的 Rates 连接。

常规连接相反,这意味着如果构建端有新记录,则不会影响以前的连接结果。这再次允许 Flink 限制必须保持在该状态的元素的数量。

time-windowed join相比,时态表连接不定义时间窗口(时间窗口内的数据将会被join)。探针端的记录始终与 time 属性指定的构建端版本连接。因此,构建方面的记录可能是任意旧的。随着时间的推移,将从状态中删除先前和不再需要的记录版本(对于给定的主键)。

这种行为使得时态表加入了一个很好的候选者来表达关系术语中的流富集。

用法

定义时态表函数后,我们可以开始使用它。时态表函数的使用方式与使用普通表函数的方式相同。

以下代码片段解决了我们从 订单 表转换货币的问题:

  1. SELECT
  2. SUM(o_amount * r_rate) AS amount
  3. FROM
  4. Orders,
  5. LATERAL TABLE (Rates(o_proctime))
  6. WHERE
  7. r_currency = o_currency
  1. Table result = orders
  2. .join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")
  3. .select("(o_amount * r_rate).sum as amount");
  1. val result = orders
  2. .join(rates('o_proctime), 'r_currency === 'o_currency)
  3. .select(('o_amount * 'r_rate).sum as 'amount)

注意:在时间连接中尚未实现在查询配置中定义的状态保留。这意味着计算查询结果所需的状态可能会无限增长,具体取决于历史记录表的不同主键的数量。

处理时间时态连接

使用处理时间时间属性,不可能将 过去 时间属性作为参数传递给时态表函数。根据定义,它始终是当前时间戳。因此,处理时间时间表函数的调用将始终返回基础表的最新已知版本,并且基础历史表中的任何更新也将立即覆盖当前值。

只有构建端记录的最新版本(相对于已定义的主键)保留在该状态中。构建端的更新将不会影响先前发出的连接结果。

可以将处理时间时态连接视为一个简单的 HashMap<K, V>;,它存储来自构建方的所有记录。当构建端的新记录具有与先前记录相同的密钥 key 时,旧值只是被覆盖。探测器端的每条记录总是根据 HashMap 的最新/当前状态进行评估。

事件时间时态连接

利用事件时间属性(即 rowtime 属性),可以将 过去 时间属性传递给时态表函数。这允许在共同的时间点连接两个表。

与处理时间时态连接相比,时态表不仅保持状态中的构建侧记录的最新版本(相对于定义的主键),而且存储自上一个水印以来的所有版本(由时间标识)。

例如,根据时态表的概念附加到探测器侧表的事件时间时间戳为 12:30:00 的传入行与时间 12:30:00 的构建端表的版本连接在一起。因此,传入的行仅与时间戳小于或等于 12:30:00 的行连接,并根据主键应用更新,直到此时为止。

根据事件时间的定义,watermarks 允许连接操作及时向前移动并丢弃不再需要的构建表的版本,因为不期望具有较低或相等时间戳的传入行。