Why :为什么会出现

流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由网络的延迟,反压,资源的不足等原因,导致乱序的产生(out-of-order或者说late element)。 但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算
问题:多种原因导致数据的顺序产生了乱序,我们需要有一个机制 将数据放置到对应处理的窗口中。


What:是什么

本身是一个时间戳。

watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp。

Where : 什么情况?组成部分

How:如何操作

How1. 具体的解决流程

补充

1: WaterMark的三种类型

  • 并行数据流的WaterMark

    • 每个Operator子Task中都会独立生成Watermar,并更新下游算子的Wk,找出最小的watermark去跟窗口的结束时间进行比较
  • 顺序事件的WaterMark

    • 实践按照顺序进入到系统中,WaterMark会随着事件时间生成,周期的进行标机 设置超时间而导致延迟输出计算结果
  • 乱序时间的WaterMark

    • 根据延迟分别计算出对应的WaterMark,当他们到达一个Operator中便立即调整算子基于事件时间与当前WaterMark值继续比较,再触发计算与输出

      2.

6. 引用

Apache Flink中的水印变得简单 Flink事件时间处理和水印 FLink-WaterMark

7. 代码实现

  1. /**
  2. * This generator generates watermarks assuming that elements arrive out of order,
  3. * but only to a certain degree. The latest elements for a certain timestamp t will arrive
  4. * at most n milliseconds after the earliest elements for timestamp t.
  5. */
  6. public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
  7. private final long maxOutOfOrderness = 3500; // 3.5 seconds
  8. private long currentMaxTimestamp;
  9. @Override
  10. public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
  11. long timestamp = element.getCreationTime();
  12. currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
  13. return timestamp;
  14. }
  15. @Override
  16. public Watermark getCurrentWatermark() {
  17. // return the watermark as current highest timestamp minus the out-of-orderness bound
  18. return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  19. }
  20. }
  21. /**
  22. * This generator generates watermarks that are lagging behind processing time by a fixed amount.
  23. * It assumes that elements arrive in Flink after a bounded delay.
  24. */
  25. public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
  26. private final long maxTimeLag = 5000; // 5 seconds
  27. @Override
  28. public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
  29. return element.getCreationTime();
  30. }
  31. @Override
  32. public Watermark getCurrentWatermark() {
  33. // return the watermark as current time minus the maximum time lag
  34. return new Watermark(System.currentTimeMillis() - maxTimeLag);
  35. }
  36. }