Why :为什么会出现
流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由网络的延迟,反压,资源的不足等原因,导致乱序的产生(out-of-order或者说late element)。 但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算
问题:多种原因导致数据的顺序产生了乱序,我们需要有一个机制 将数据放置到对应处理的窗口中。
What:是什么
本身是一个时间戳。
watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp。
Where : 什么情况?组成部分
How:如何操作
How1. 具体的解决流程
补充
1: WaterMark的三种类型
并行数据流的WaterMark
- 每个Operator子Task中都会独立生成Watermar,并更新下游算子的Wk,找出最小的watermark去跟窗口的结束时间进行比较
顺序事件的WaterMark
- 实践按照顺序进入到系统中,WaterMark会随着事件时间生成,周期的进行标机 设置超时间而导致延迟输出计算结果
乱序时间的WaterMark
6. 引用
7. 代码实现
/*** This generator generates watermarks assuming that elements arrive out of order,* but only to a certain degree. The latest elements for a certain timestamp t will arrive* at most n milliseconds after the earliest elements for timestamp t.*/public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {private final long maxOutOfOrderness = 3500; // 3.5 secondsprivate long currentMaxTimestamp;@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {long timestamp = element.getCreationTime();currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current highest timestamp minus the out-of-orderness boundreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}}/*** This generator generates watermarks that are lagging behind processing time by a fixed amount.* It assumes that elements arrive in Flink after a bounded delay.*/public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {private final long maxTimeLag = 5000; // 5 seconds@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current time minus the maximum time lagreturn new Watermark(System.currentTimeMillis() - maxTimeLag);}}
