时间语义

image.png

  • Event Time ( 一般实际业务中使用 )
  • Ingestion Time
  • Window Processing Time ( 默认 )

Watermark

在 Flink 中使用 Watermark 来处理 迟到数据 ,使用window 来处理 乱序数据
对迟到数据处理的机制,但是watermark 达到窗口结束的时候,触发窗口计算
当窗口时间到达时, 本身应该触发计算,但是为了能够对迟到的数据进行正确的处理,需要将计算的时间点推迟,推迟到 watermark 标记到达时。

image.png

image.png

设置 watermark

连续性 watermark ( AssignerWithPeriodicWatermarks)

  1. .assignTimestampsAndWatermarks(
  2. new AssignerWithPeriodicWatermarks[WaterSensor] {
  3. override def getCurrentWatermark: Watermark = {
  4. // 水位线推迟 3s
  5. println("getCurrentWatermart .... ")
  6. new Watermark(currentTS - 3000)
  7. }
  8. override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = {
  9. currentTS = currentTS.max(element.ts * 1000)
  10. element.ts * 1000
  11. }
  12. }
  13. )

间歇性 watermark ( AssignerWithPunctuatedWatermarks )

  1. .assignTimestampsAndWatermarks(
  2. new AssignerWithPunctuatedWatermarks[WaterSensor] {
  3. override def checkAndGetNextWatermark(lastElement: WaterSensor, extractedTimestamp: Long): Watermark = {
  4. println("checkAndGetNextWatermark ..... ")
  5. // 间歇性水位线设置
  6. new Watermark(extractedTimestamp)
  7. }
  8. override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = {
  9. element.ts * 1000
  10. }
  11. }
  12. )

预期有序的 watermark ( assignAscendingTimestamps )

  1. // 如果我们事先得知数据流的时间戳是单调递增的,也就是说没有乱序,
  2. // 那我们可以使用assignAscendingTimestamps,这个方法会直接使用数据的时间戳生成watermark。
  3. val tsDS = mapDS.assignAscendingTimestamps(e => e._2)

准备数据

  1. sensor_1,1549044122,1
  2. sensor_1,1549044123,2
  3. sensor_1,1549044124,3
  4. sensor_1,1549044125,4
  5. sensor_1,1549044126,5
  6. sensor_1,1549044127,6
  7. sensor_1,1549044128,7
  8. sensor_1,1549044129,8
  9. sensor_1,1549044130,9
  1. // 定义样例类:水位传感器:用于接收空高数据
  2. // id:传感器编号
  3. // ts:时间戳
  4. // vc:空高
  5. case class WaterSensor(id:String, ts:Long, vc:Double)

代码

  1. 1. 设置时间语义
  2. 2. 设置 watermark
  3. .map()
  4. // 设置 watermark
  5. .assignTimestampsAndWatermarks()
  6. // 设置window time
  7. .keyBy(_.id)
  8. .timeWindow(Time.seconds(5))

分析:

  1. 时间窗口如何划分

timestamp - (timestamp - offset(0) + windowSize) % windowSize
[00: 00 - 00:05)
[00: 05 - 00:10)
….

  1. 标记何时触发窗口计算

当标记( 事件时间+推迟时间 ) 大于窗口结束的时候就会触发

  1. 如果窗口计算完毕后,还想对迟到的数据进行处理,可以让窗口的计算再晚一些 .allowedLateness(Time.seconds(2))
  2. 如果指定的窗口已经计算完毕,不再接收新的 数据,原则上 不再接收的数据就会丢弃

如果必须要统计, 可是窗口又不再接收,那么可以将数据放在一个特殊的流中, —— 侧输出流
.sideOutputLateData(outputTag)

  1. package com.ylb.time
  2. import java.text.SimpleDateFormat
  3. import com.ylb.myCluss.WaterSensor
  4. import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
  5. import org.apache.flink.api.scala._
  6. import org.apache.flink.streaming.api.TimeCharacteristic
  7. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  8. import org.apache.flink.streaming.api.windowing.time.Time
  9. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  10. import org.apache.flink.util.Collector
  11. /**
  12. * @author yanglibin
  13. * @create 2020-03-04 17:41
  14. */
  15. object TimeTest {
  16. def main(args: Array[String]): Unit = {
  17. val env = StreamExecutionEnvironment.getExecutionEnvironment
  18. env.setParallelism(1)
  19. // 设置时间语义
  20. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  21. // 侧输出流
  22. val outputTag: OutputTag[WaterSensor] = new OutputTag[WaterSensor]("lateData")
  23. // val markDS: DataStream[WaterSensor] = env.readTextFile("c:/tmp/input/sensor-data.log")
  24. val markDS: DataStream[WaterSensor] = env.socketTextStream("hadoop-master",9999)
  25. .map(
  26. line => {
  27. val datas = line.split(",")
  28. WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
  29. }
  30. )
  31. // 从数据中抽取数据作为事件时间
  32. // 设定水位标记, 这个标记一般比当前数据事件时间要推迟
  33. .assignTimestampsAndWatermarks(
  34. new BoundedOutOfOrdernessTimestampExtractor[WaterSensor](Time.seconds(3)) {
  35. override def extractTimestamp(element: WaterSensor): Long = {
  36. element.ts * 1000L
  37. }
  38. }
  39. )
  40. val applyDS: DataStream[String] = markDS
  41. .keyBy(_.id)
  42. .timeWindow(Time.seconds(5))
  43. .allowedLateness(Time.seconds(2))
  44. .sideOutputLateData(outputTag)
  45. .apply(
  46. (key: String, window: TimeWindow, datas: Iterable[WaterSensor], out: Collector[String]) => {
  47. val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  48. out.collect(s"window:[${sdf.format(window.getStart)}-${sdf.format(window.getEnd)}]:{ ${datas.mkString(",")} }")
  49. }
  50. )
  51. markDS.print("mark >>>")
  52. applyDS.print("window>>>")
  53. applyDS.getSideOutput(outputTag).print("side>>>")
  54. env.execute()
  55. }
  56. }

注意:
如果数据源为 file , 那么在读取数据后进行计算时,即使数据窗口没有提交,那么在文件读取结束后也会自动计算,因为flink会在文件读完后将 watermark 设置为 Long 的最大值,需要将所有未计算的窗口全部进行计算 。
Event Time的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用Processing Time了)。

watermark 并行度

newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);

EvnetTime在window中的使用

滚动窗口(TumblingEventTimeWindows)

· ` .window(TumblingEventTimeWindows.of(Time.seconds(5)))

`

滑动窗口(SlidingEventTimeWindows)

` .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))

`

会话窗口(EventTimeSessionWindows)

` .window(EventTimeSessionWindows.withGap(Time.seconds(5)))

`

时间间隔应该大于 session 间隔数据,会触发窗口的计算,如:
sensor_1,1549044120,1
sensor_1,1549044126,1
( 两个数据之间的时间间隔大于5 )