一、问题

关于Watermark,很容易产生几个问题

  • Flink 流处理应用中,常见的处理需求/应对方案是什么?
  • Watermark究竟应该翻译成水印还是水位线?
  • Watermark本质是什么?
  • Watermark是如何解决问题?

下面我们就来简要解答这些问题以给大家一个大致概念,在后文中,会再深入描述。

问题1. Flink 流处理应用中常见的需求/方案是什么

聚合类的处理 Flink可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。所以Flink引入了窗口概念。
窗口 窗口的作用为了周期性的获取数据。就是把传入的原始数据流切分成多个buckets,所有计算都在单一的buckets中进行。窗口(window)就是从 Streaming 到 Batch 的一个桥梁。
带来的问题:聚合类处理带来了新的问题,比如乱序/延迟。其解决方案就是 Watermark / allowLateNess / sideOutPut 这一组合拳。
Watermark 的作用是防止 数据乱序 / 指定时间内获取不到全部数据。
allowLateNess 是将窗口关闭时间再延迟一段时间。
sideOutPut 是最后兜底操作,当指定窗口已经彻底关闭后,就会把所有过期延迟数据放到侧输出流,让用户决定如何处理。
总结起来就是说

  1. Windows -----> Watermark -----> allowLateNess -----> sideOutPut
  2. Windows把流数据分块处理,用Watermark确定什么时候不再等待更早的数据/触发窗口进行计算,
  3. allowLateNess 将窗口关闭时间再延迟一段时间。用sideOutPut 最后兜底把数据导出到其他地方。

问题2. Watermark本质是什么

Watermarks是基于已经收集的消息来估算是否还有消息未到达,本质上是一个时间戳。时间戳反映的是事件发生的时间,而不是事件处理的时间。
这个从Flink的源码就能看出来,唯一有意义的成员变量就是 timestamp。

  1. public final class Watermark extends StreamElement {
  2. /*The watermark that signifies end-of-event-time. */
  3. public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
  4. /* The timestamp of the watermark in milliseconds. */
  5. private final long timestamp;
  6. /* Creates a new watermark with the given timestamp in milliseconds.*/
  7. public Watermarklong timestamp) {
  8. this.timestamp = timestamp;
  9. }
  10. /*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/
  11. public long getTimestamp() {
  12. return timestamp;
  13. }
  14. }

问题3. Watermark如何解决问题

Watermark是一种告诉Flink一个消息延迟多少的方式。它定义了什么时候不再等待更早的数据
可以把Watermarks理解为一个水位线,这个Watermarks在不断的变化。Watermark实际上作为数据流的一部分随数据流流动
当Flink中的运算符接收到Watermarks时,它明白早于该时间的消息已经完全抵达计算引擎,即假设不会再有时间小于水位线的事件到达
这个假设是触发窗口计算的基础,只有水位线越过窗口对应的结束时间,窗口才会关闭和进行计算

二、开始

Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。watermark是用于处理乱序事件或延迟数据的,这通常用watermark机制结合window来实现(Watermarks用来触发window窗口计算)。
比如对于late element,我们不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。 可以把Watermark看作是一种告诉Flink一个消息延迟多少的方式。定义了什么时候不再等待更早的数据。

1. 窗口触发条件

上面谈到了对数据乱序问题的处理机制是watermark+window,那么window什么时候该被触发呢?
基于Event Time的事件处理,Flink默认的事件触发条件为:
对于out-of-order及正常的数据而言

  • watermark的时间戳 > = window endTime
  • 在 [window_start_time,window_end_time] 中有数据存在。

对于late element太多的数据而言

  • Event Time > watermark的时间戳

WaterMark相当于一个EndLine,一旦Watermarks大于了某个window的end_time,就意味着windows_end_time时间和WaterMark时间相同的窗口开始计算执行了。
就是说,我们根据一定规则,计算出Watermarks,并且设置一些延迟,给迟到的数据一些机会,也就是说正常来讲,对于迟到的数据,我只等你一段时间,再不来就没有机会了。
WaterMark时间可以用Flink系统现实时间,也可以用处理数据所携带的Event time。
使用Flink系统现实时间,在并行和多线程中需要注意的问题较少,因为都是以现实时间为标准。
如果使用处理数据所携带的Event time作为WaterMark时间,需要注意两点:

  • 因为数据到达并不是循序的,注意保存一个当前最大时间戳作为WaterMark时间
  • 并行同步问题

    2. WaterMark设定方法

    标点水位线(Punctuated Watermark)

    标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。
    在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

    定期水位线(Periodic Watermark)

    周期性的(允许一定时间间隔或者达到一定的记录条数)产生一个Watermark。水位线提升的时间间隔是由用户设置的,在两次水位线提升时隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的水位线。
    在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。
    举个例子,最简单的水位线算法就是取目前为止最大的事件时间,然而这种方式比较暴力,对乱序事件的容忍程度比较低,容易出现大量迟到事件。

    3. 迟到事件

    虽说水位线表明着早于它的事件不应该再出现,但是上如上文所讲,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。
    迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:

  • 重新激活已经关闭的窗口并重新计算以修正结果。

  • 将迟到事件收集起来另外处理。
  • 将迟到事件视为错误消息并丢弃。

Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用Side OutputAllowed Lateness
Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。
Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。
这里总结机制为:

  • 窗口window 的作用是为了周期性的获取数据。
  • watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
  • allowLateNess是将窗口关闭时间再延迟一段时间。
  • sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。

看看如何触发窗口

我们明白了窗口的触发机制,这里我们添加了水位线,到底是个怎么个情况?我们来看下面

假如我们设置10s的时间窗口(window),那么020s都是一个窗口,以0~10s为例,0为start-time,10为end-time。假如有4个数据的event-time分别是8(A),12.5(B),9(C),13.5(D),我们设置Watermarks为当前所有到达数据event-time的最大值减去延迟值3.5秒 当A到达的时候,Watermarks为max{8}-3.5=8-3.5 = 4.5 < 10,不会触发计算 当B到达的时候,Watermarks为max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不会触发计算 当C到达的时候,Watermarks为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不会触发计算 当D到达的时候,Watermarks为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,触发计算 触发计算的时候,会将A,C(因为他们都小于10)都计算进去,其中C是迟到的。 max这个很关键,就是当前窗口内,所有事件的最大事件。 这里的延迟3.5s是我们假设一个数据到达的时候,比他早3.5s的数据肯定也都到达了,这个是需要根据经验推算。假设加入D到达以后有到达了一个E,event-time=6,但是由于0~10的时间窗口已经开始计算了,所以E就丢了。 从这里上面E的丢失说明,水位线也不是万能的,但是如果根据我们自己的生产经验+侧道输出等方案,可以做到数据不丢失。

参考文档:https://www.cnblogs.com/rossiXYZ/articles/12286407.html

三、Demo延迟数据处理watermark allowedLateness() sideOutputLateData()

  1. import org.apache.flink.streaming.api.TimeCharacteristic
  2. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  3. import org.apache.flink.streaming.api.scala._
  4. import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
  5. import org.apache.flink.streaming.api.windowing.time.Time
  6. object demo1 {
  7. def main(args: Array[String]): Unit = {
  8. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  9. env.setParallelism(1) //这里最好设置并行度为1,否则可能看不到最终效果
  10. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime )
  11. val inputStream: DataStream[String] = env.socketTextStream("hadoop102",7777)
  12. val outputTag = new OutputTag[SensorReading]("side")
  13. val dataStream = inputStream
  14. .map(data => {
  15. val dataArray = data.split(",")
  16. SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
  17. }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(2)) {
  18. override def extractTimestamp(element: SensorReading): Long = {
  19. element.timestamp*1000 //我的测试时间戳是s,flink要求ms
  20. }
  21. })
  22. val minStream: DataStream[SensorReading] = dataStream.keyBy(_.id)
  23. // .window( SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2)))
  24. .timeWindow(Time.seconds(10))
  25. .allowedLateness(Time.seconds(4))
  26. .sideOutputLateData(outputTag)
  27. .minBy("temperature")
  28. dataStream.print("data")
  29. minStream.print("min")
  30. minStream.getSideOutput(outputTag).print("slide")
  31. env.execute("demo1")
  32. }
  33. }
  34. case class SensorReading(id: String, timestamp: Long, temperature: Double)

注意参数
1、窗口开窗为10s ,此次采用滚动窗口比较简单点
2、watermark为2s
3、允许延迟为4s
注意事项
1、如果用我的代码进行测试,不要修改测试数据第一条,因为涉及到计算窗口的start
测试数据
sensor_1, 1547718120,20
sensor_1, 1547718130,10
sensor_1, 1547718131,9
sensor_1, 1547718132,8
sensor_1, 1547718120,9
sensor_1, 1547718135,5
sensor_1, 1547718120,9
sensor_1, 1547718136,4
sensor_1, 1547718120,9
打印结果
data> SensorReading(sensor_1,1547718120,20.0)
data> SensorReading(sensor_1,1547718130,10.0)
data> SensorReading(sensor_1,1547718131,9.0)
data> SensorReading(sensor_1,1547718132,8.0)
min> SensorReading(sensor_1,1547718120,20.0)
data> SensorReading(sensor_1,1547718120,9.0)
min> SensorReading(sensor_1,1547718120,9.0)
data> SensorReading(sensor_1,1547718135,5.0)
data> SensorReading(sensor_1,1547718120,9.0)
min> SensorReading(sensor_1,1547718120,9.0)
data> SensorReading(sensor_1,1547718136,4.0)
data> SensorReading(sensor_1,1547718120,9.0)
slide> SensorReading(sensor_1,1547718120,9.0)
说明
1、经过计算窗口的开始时间是1547718120,所以第一个窗口是【20-30),
2、第一个窗口关闭的时间是20+10+2=32,所以当输入32这条数据的时候【20-30)的窗口关闭,此时窗口内的数据只有20,所以算出温度最小值为20
3、当输入 SensorReading(sensor_1,1547718120,9.0)这条数据的时候,allowlateness起作用,认为这条数据也是延迟数据,对原先算出的最小值20进行修正,最后算出min=9.0
4、此时需要计算一个最多延迟时间20+10+2+4=36,所以输入35的时候,这条数据,会进入到第二个窗口,同时第一个窗口还没有彻底关闭,所以再次输入 SensorReading(sensor_1,1547718120,9.0),仍然会进入到【20-30)的窗口,并在此计算最小值
5、输入SensorReading(sensor_1,1547718136,4.0),窗口彻底关闭,再次输入 SensorReading(sensor_1,1547718120,9.0),不再对第一个窗口min进行修正,直接把数据放到测输入流,以后所有的【20-30)的数据在输入都会全部放到侧输出流
总结
1、窗口window 的作用是为了周期性的获取数据
2、watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法,
3、allowLateNess,是将窗口关闭时间再延迟一段时间,
思考?这里的allowLateNess 感觉就好像window变大了,那么为什么不直接把window设置大一点呢?或者把watermark加大点
业务需要,比如我业务需要统计每个小时内的数据,那么开窗一定是1h,但是数据乱序可能会达到几分钟,一般来说水印设置的都比较小(为什么呢?暂时不知道),所以提出了延迟时间这个概念
4、sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流

四、Flink窗口的起始点确定

  1. org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
  2. //处理时间processing Time
  3. @Override
  4. public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
  5. final long now = context.getCurrentProcessingTime();
  6. long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
  7. return Collections.singletonList(new TimeWindow(start, start + size));
  8. }
  9. org.apache.flink.streaming.api.windowing.assigners
  10. //时间时间event Time
  11. @Override
  12. public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
  13. if (timestamp > Long.MIN_VALUE) {
  14. // Long.MIN_VALUE is currently assigned when no timestamp is present
  15. long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
  16. return Collections.singletonList(new TimeWindow(start, start + size));
  17. } else {
  18. throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
  19. "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
  20. "'DataStream.assignTimestampsAndWatermarks(...)'?");
  21. }
  22. }
  23. //这个方法最终确定窗口起始时间
  24. public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
  25. return timestamp - (timestamp - offset + windowSize) % windowSize;
  26. }