流式计算中的2个问题

  • 流式数据到达计算引擎的时间不一定: 比如 A 流的数据先到了, A 流不知道 B 流对应同 key 的数据什么时候到, 没法关联. (数据质量问题)
  • 流式数据不知何时、下发怎样的数据: A 流的数据到达后, 如果 B 流的数据永远不到, 那么 A 流的数据在什么时候以及是否要填充一个null值下发下去. (数据时效问题)

Window Join

DataStreamAPI:
可以用 join+where+equalTo+window, 但只能实现inner join的效果, 如果需要实现outer join
可以用 coGroup+where+equalTo+window
也可以用更灵活的connect

SQL:

  1. SELECT
  2. L.num as L_Num
  3. , L.id as L_Id
  4. , R.num as R_Num
  5. , R.id as R_Id
  6. , L.window_start
  7. , L.window_end
  8. FROM (
  9. SELECT *
  10. FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
  11. ) L
  12. FULL JOIN (
  13. SELECT *
  14. FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
  15. ) R
  16. ON L.num = R.num
  17. AND L.window_start = R.window_start
  18. AND L.window_end = R.window_end;

产出数据质量: 低
产出数据速度: 中
Tips: 由于数据质量和时效性问题在实际生产环境中很少使用这种方案

Interval Join

  1. clickRecordStream
  2. .keyBy(record -> record.getMerchandiseId())
  3. .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
  4. // 定义 interval 的时间区间
  5. .between(Time.seconds(-30), Time.seconds(30))
  6. .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
  7. @Override
  8. public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
  9. collector.collect(StringUtils.join(Arrays.asList(
  10. accessRecord.getMerchandiseId(),
  11. orderRecord.getPrice(),
  12. orderRecord.getCouponMoney(),
  13. orderRecord.getRebateAmount()
  14. ), '\t'));
  15. }
  16. })
  17. .print();

Regular Join

TODO

参考资料:
2w 字详述双流 Join 3 种解决方案 + 2 种优化方案
Flink 对线面试官(一)
Flink面试八股文(上万字面试必备宝典)