时间语义
- Event Time ( 一般实际业务中使用 )
- Ingestion Time
- Window Processing Time ( 默认 )
Watermark
在 Flink 中使用 Watermark 来处理 迟到数据
,使用window 来处理 乱序数据
对迟到数据处理的机制,但是watermark 达到窗口结束的时候,触发窗口计算
当窗口时间到达时, 本身应该触发计算,但是为了能够对迟到的数据进行正确的处理,需要将计算的时间点推迟,推迟到 watermark 标记到达时。
设置 watermark
连续性 watermark ( AssignerWithPeriodicWatermarks)
.assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks[WaterSensor] {
override def getCurrentWatermark: Watermark = {
// 水位线推迟 3s
println("getCurrentWatermart .... ")
new Watermark(currentTS - 3000)
}
override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = {
currentTS = currentTS.max(element.ts * 1000)
element.ts * 1000
}
}
)
间歇性 watermark ( AssignerWithPunctuatedWatermarks )
.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks[WaterSensor] {
override def checkAndGetNextWatermark(lastElement: WaterSensor, extractedTimestamp: Long): Watermark = {
println("checkAndGetNextWatermark ..... ")
// 间歇性水位线设置
new Watermark(extractedTimestamp)
}
override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = {
element.ts * 1000
}
}
)
预期有序的 watermark ( assignAscendingTimestamps )
// 如果我们事先得知数据流的时间戳是单调递增的,也就是说没有乱序,
// 那我们可以使用assignAscendingTimestamps,这个方法会直接使用数据的时间戳生成watermark。
val tsDS = mapDS.assignAscendingTimestamps(e => e._2)
准备数据
sensor_1,1549044122,1
sensor_1,1549044123,2
sensor_1,1549044124,3
sensor_1,1549044125,4
sensor_1,1549044126,5
sensor_1,1549044127,6
sensor_1,1549044128,7
sensor_1,1549044129,8
sensor_1,1549044130,9
// 定义样例类:水位传感器:用于接收空高数据
// id:传感器编号
// ts:时间戳
// vc:空高
case class WaterSensor(id:String, ts:Long, vc:Double)
代码
1. 设置时间语义
2. 设置 watermark
.map()
// 设置 watermark
.assignTimestampsAndWatermarks()
// 设置window time
.keyBy(_.id)
.timeWindow(Time.seconds(5))
分析:
- 时间窗口如何划分
timestamp - (timestamp - offset(0) + windowSize) % windowSize
[00: 00 - 00:05)
[00: 05 - 00:10)
….
- 标记何时触发窗口计算
当标记( 事件时间+推迟时间 ) 大于窗口结束的时候就会触发
- 如果窗口计算完毕后,还想对迟到的数据进行处理,可以让窗口的计算再晚一些
.allowedLateness(Time.seconds(2))
- 如果指定的窗口已经计算完毕,不再接收新的 数据,原则上 不再接收的数据就会丢弃
如果必须要统计, 可是窗口又不再接收,那么可以将数据放在一个特殊的流中, —— 侧输出流
.sideOutputLateData(outputTag)
package com.ylb.time
import java.text.SimpleDateFormat
import com.ylb.myCluss.WaterSensor
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/**
* @author yanglibin
* @create 2020-03-04 17:41
*/
object TimeTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 侧输出流
val outputTag: OutputTag[WaterSensor] = new OutputTag[WaterSensor]("lateData")
// val markDS: DataStream[WaterSensor] = env.readTextFile("c:/tmp/input/sensor-data.log")
val markDS: DataStream[WaterSensor] = env.socketTextStream("hadoop-master",9999)
.map(
line => {
val datas = line.split(",")
WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
}
)
// 从数据中抽取数据作为事件时间
// 设定水位标记, 这个标记一般比当前数据事件时间要推迟
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[WaterSensor](Time.seconds(3)) {
override def extractTimestamp(element: WaterSensor): Long = {
element.ts * 1000L
}
}
)
val applyDS: DataStream[String] = markDS
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.allowedLateness(Time.seconds(2))
.sideOutputLateData(outputTag)
.apply(
(key: String, window: TimeWindow, datas: Iterable[WaterSensor], out: Collector[String]) => {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
out.collect(s"window:[${sdf.format(window.getStart)}-${sdf.format(window.getEnd)}]:{ ${datas.mkString(",")} }")
}
)
markDS.print("mark >>>")
applyDS.print("window>>>")
applyDS.getSideOutput(outputTag).print("side>>>")
env.execute()
}
}
注意:
如果数据源为 file , 那么在读取数据后进行计算时,即使数据窗口没有提交,那么在文件读取结束后也会自动计算,因为flink会在文件读完后将 watermark 设置为 Long 的最大值,需要将所有未计算的窗口全部进行计算 。
Event Time的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用Processing Time了)。
watermark 并行度
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
EvnetTime在window中的使用
滚动窗口(TumblingEventTimeWindows)
· ` .window(TumblingEventTimeWindows.of(Time.seconds(5)))
`
滑动窗口(SlidingEventTimeWindows)
` .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
会话窗口(EventTimeSessionWindows)
` .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
`
时间间隔应该大于 session 间隔数据,会触发窗口的计算,如:
sensor_1,1549044120,1
sensor_1,1549044126,1
( 两个数据之间的时间间隔大于5 )