流式计算中的2个问题
- 流式数据到达计算引擎的时间不一定: 比如 A 流的数据先到了, A 流不知道 B 流对应同 key 的数据什么时候到, 没法关联. (数据质量问题)
- 流式数据不知何时、下发怎样的数据: A 流的数据到达后, 如果 B 流的数据永远不到, 那么 A 流的数据在什么时候以及是否要填充一个null值下发下去. (数据时效问题)
Window Join
DataStreamAPI:
可以用 join+where+equalTo+window, 但只能实现inner join的效果, 如果需要实现outer join
可以用 coGroup+where+equalTo+window
也可以用更灵活的connect
SQL:
SELECTL.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id, L.window_start, L.window_endFROM (SELECT *FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) LFULL JOIN (SELECT *FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) RON L.num = R.numAND L.window_start = R.window_startAND L.window_end = R.window_end;
产出数据质量: 低
产出数据速度: 中
Tips: 由于数据质量和时效性问题在实际生产环境中很少使用这种方案
Interval Join
clickRecordStream.keyBy(record -> record.getMerchandiseId()).intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))// 定义 interval 的时间区间.between(Time.seconds(-30), Time.seconds(30)).process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {@Overridepublic void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {collector.collect(StringUtils.join(Arrays.asList(accessRecord.getMerchandiseId(),orderRecord.getPrice(),orderRecord.getCouponMoney(),orderRecord.getRebateAmount()), '\t'));}}).print();
Regular Join
TODO
参考资料:
2w 字详述双流 Join 3 种解决方案 + 2 种优化方案
Flink 对线面试官(一)
Flink面试八股文(上万字面试必备宝典)
