流式计算中的2个问题
- 流式数据到达计算引擎的时间不一定: 比如 A 流的数据先到了, A 流不知道 B 流对应同 key 的数据什么时候到, 没法关联. (数据质量问题)
- 流式数据不知何时、下发怎样的数据: A 流的数据到达后, 如果 B 流的数据永远不到, 那么 A 流的数据在什么时候以及是否要填充一个null值下发下去. (数据时效问题)
Window Join
DataStreamAPI:
可以用 join+where+equalTo+window, 但只能实现inner join的效果, 如果需要实现outer join
可以用 coGroup+where+equalTo+window
也可以用更灵活的connect
SQL:
SELECT
L.num as L_Num
, L.id as L_Id
, R.num as R_Num
, R.id as R_Id
, L.window_start
, L.window_end
FROM (
SELECT *
FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L
FULL JOIN (
SELECT *
FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R
ON L.num = R.num
AND L.window_start = R.window_start
AND L.window_end = R.window_end;
产出数据质量: 低
产出数据速度: 中
Tips: 由于数据质量和时效性问题在实际生产环境中很少使用这种方案
Interval Join
clickRecordStream
.keyBy(record -> record.getMerchandiseId())
.intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
// 定义 interval 的时间区间
.between(Time.seconds(-30), Time.seconds(30))
.process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
@Override
public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
collector.collect(StringUtils.join(Arrays.asList(
accessRecord.getMerchandiseId(),
orderRecord.getPrice(),
orderRecord.getCouponMoney(),
orderRecord.getRebateAmount()
), '\t'));
}
})
.print();
Regular Join
TODO
参考资料:
2w 字详述双流 Join 3 种解决方案 + 2 种优化方案
Flink 对线面试官(一)
Flink面试八股文(上万字面试必备宝典)