时间语义

- Event Time ( 一般实际业务中使用 )
 - Ingestion Time
 - Window Processing Time ( 默认 )
 
Watermark
在 Flink 中使用 Watermark 来处理 迟到数据 ,使用window 来处理 乱序数据
对迟到数据处理的机制,但是watermark 达到窗口结束的时候,触发窗口计算
当窗口时间到达时, 本身应该触发计算,但是为了能够对迟到的数据进行正确的处理,需要将计算的时间点推迟,推迟到 watermark 标记到达时。


设置 watermark
连续性 watermark ( AssignerWithPeriodicWatermarks)
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[WaterSensor] {override def getCurrentWatermark: Watermark = {// 水位线推迟 3sprintln("getCurrentWatermart .... ")new Watermark(currentTS - 3000)}override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = {currentTS = currentTS.max(element.ts * 1000)element.ts * 1000}})
间歇性 watermark ( AssignerWithPunctuatedWatermarks )
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[WaterSensor] {override def checkAndGetNextWatermark(lastElement: WaterSensor, extractedTimestamp: Long): Watermark = {println("checkAndGetNextWatermark ..... ")// 间歇性水位线设置new Watermark(extractedTimestamp)}override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = {element.ts * 1000}})
预期有序的 watermark ( assignAscendingTimestamps )
// 如果我们事先得知数据流的时间戳是单调递增的,也就是说没有乱序,// 那我们可以使用assignAscendingTimestamps,这个方法会直接使用数据的时间戳生成watermark。val tsDS = mapDS.assignAscendingTimestamps(e => e._2)
准备数据
sensor_1,1549044122,1sensor_1,1549044123,2sensor_1,1549044124,3sensor_1,1549044125,4sensor_1,1549044126,5sensor_1,1549044127,6sensor_1,1549044128,7sensor_1,1549044129,8sensor_1,1549044130,9
// 定义样例类:水位传感器:用于接收空高数据// id:传感器编号// ts:时间戳// vc:空高case class WaterSensor(id:String, ts:Long, vc:Double)
代码
1. 设置时间语义2. 设置 watermark.map()// 设置 watermark.assignTimestampsAndWatermarks()// 设置window time.keyBy(_.id).timeWindow(Time.seconds(5))
分析:
- 时间窗口如何划分
 
timestamp - (timestamp - offset(0) + windowSize) % windowSize 
[00: 00 - 00:05)
[00: 05 - 00:10)
….
- 标记何时触发窗口计算
 
当标记( 事件时间+推迟时间 ) 大于窗口结束的时候就会触发
- 如果窗口计算完毕后,还想对迟到的数据进行处理,可以让窗口的计算再晚一些 
.allowedLateness(Time.seconds(2)) - 如果指定的窗口已经计算完毕,不再接收新的 数据,原则上 不再接收的数据就会丢弃
 
如果必须要统计, 可是窗口又不再接收,那么可以将数据放在一个特殊的流中, —— 侧输出流.sideOutputLateData(outputTag)  
package com.ylb.timeimport java.text.SimpleDateFormatimport com.ylb.myCluss.WaterSensorimport org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}import org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collector/*** @author yanglibin* @create 2020-03-04 17:41*/object TimeTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 设置时间语义env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 侧输出流val outputTag: OutputTag[WaterSensor] = new OutputTag[WaterSensor]("lateData")// val markDS: DataStream[WaterSensor] = env.readTextFile("c:/tmp/input/sensor-data.log")val markDS: DataStream[WaterSensor] = env.socketTextStream("hadoop-master",9999).map(line => {val datas = line.split(",")WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)})// 从数据中抽取数据作为事件时间// 设定水位标记, 这个标记一般比当前数据事件时间要推迟.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[WaterSensor](Time.seconds(3)) {override def extractTimestamp(element: WaterSensor): Long = {element.ts * 1000L}})val applyDS: DataStream[String] = markDS.keyBy(_.id).timeWindow(Time.seconds(5)).allowedLateness(Time.seconds(2)).sideOutputLateData(outputTag).apply((key: String, window: TimeWindow, datas: Iterable[WaterSensor], out: Collector[String]) => {val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")out.collect(s"window:[${sdf.format(window.getStart)}-${sdf.format(window.getEnd)}]:{ ${datas.mkString(",")} }")})markDS.print("mark >>>")applyDS.print("window>>>")applyDS.getSideOutput(outputTag).print("side>>>")env.execute()}}
注意:
     如果数据源为 file , 那么在读取数据后进行计算时,即使数据窗口没有提交,那么在文件读取结束后也会自动计算,因为flink会在文件读完后将 watermark 设置为 Long 的最大值,需要将所有未计算的窗口全部进行计算  。
Event Time的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用Processing Time了)。
watermark 并行度
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
EvnetTime在window中的使用
滚动窗口(TumblingEventTimeWindows)
· ` .window(TumblingEventTimeWindows.of(Time.seconds(5)))
`
滑动窗口(SlidingEventTimeWindows)
` .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
会话窗口(EventTimeSessionWindows)
` .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
`
时间间隔应该大于 session 间隔数据,会触发窗口的计算,如:
sensor_1,1549044120,1
sensor_1,1549044126,1
( 两个数据之间的时间间隔大于5 )
