flink window 概述
window是一种 将无限的流式数据 切割为有限块的手段
Window可以分成两类:
- CountWindow 按数据分
- TimeWindow 按时间分
- 滚动窗口(Tumbling Window)
将数据依据固定的窗口长度对数据进行切片
- 滑动窗口(Sliding Window)
- 会话窗口(Session Window )
由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
window API
TimeWindow
def timeWindow() = {env.socketTextStream("hadoop-master",9999).flatMap(_.split(" ")).map( (_,1) ).keyBy(0).timeWindow(Time.seconds(3),Time.seconds(2)).reduce((t1,t2) => {(t1._1,t1._2 + t2._2)}).print()env.execute()}
CountWindow
def countWindow() = {env.socketTextStream("hadoop-master", 9999).flatMap(_.split(" ")).map( (_,1) ).keyBy(0).countWindow(3,2).reduce((t1,t2) => {(t1._1, t1._2 + t2._2)}).print()env.execute()}
window function ( 窗口函数 )
reduce Function
def reduceFunc() = {env.socketTextStream("hadoop-master", 9999).flatMap(_.split(" ")).map( (_,1) ).keyBy(0).countWindow(3,2).reduce(new ReduceFunction[(String, Int)] {override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {// 第一条数据不做处理, 从第二条开始, 每来一条,就会计算一次print(" ---- ")(value1._1, value1._2 + value2._2)}}).print()env.execute()}
aggregate Function
def aggregateFunc() = {env.socketTextStream("hadoop-master", 9999).flatMap(_.split(" ")).map( (_,1) ).keyBy(0).countWindow(3).aggregate(new MyAggregate).print()env.execute()}
MyAggregate class
package com.ylb.myClussimport org.apache.flink.api.common.functions.AggregateFunction/*** @author yanglibin* @create 2020-03-04 12:12*/class MyAggregate extends AggregateFunction[(String,Int),Int,Int] {// 泛型说明: 输入In, 累加器中间处理值, 输出Out// 创建累加器, 初始值override def createAccumulator(): Int = 0// 对键入值 累加更新override def add(value: (String, Int), accumulator: Int): Int = {println(" add ...")accumulator + value._2}// 获取累加器的值override def getResult(accumulator: Int): Int = accumulator// 合并累加器override def merge(a: Int, b: Int): Int = a + b}
全窗口函数(full window functions) — process
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。
def processFunc() = {env.socketTextStream("hadoop-master", 9999).flatMap(_.split(" ")).map( (_,1) ).keyBy(_._1) // 注意这里不要用 java格式的0.timeWindow(Time.seconds(5)).process(new MyProcess).print()env.execute()}
MyProcess class
package com.ylb.myClussimport java.text.SimpleDateFormatimport org.apache.flink.streaming.api.scala.function.ProcessWindowFunctionimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collector/*** @author yanglibin* @create 2020-03-04 13:36*/class MyProcess extends ProcessWindowFunction[(String,Int),String,String,TimeWindow]{override def process(key: String,context: Context,elements: Iterable[(String, Int)],out: Collector[String]): Unit = {val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")out.collect("window start time : " + sdf.format(context.window.getStart) + " ========")out.collect("message: " + elements.toList)out.collect("window end time: " + sdf.format(context.window.getEnd) + " ========")}}
KeyedProcessFunction
// 数据处理env.socketTextStream("hadoop-master",9999).map(line => {val datas = line.split(",")WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)}).keyBy(_.id).process(new KeyedProcessFunction[String,WaterSensor,String] {// 定时器在指定的时间点触发该方法的执行override def onTimer(timestamp: Long,ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext,out: Collector[String]): Unit = {out.collect(" timer execute ... ")}//override def processElement(value: WaterSensor,ctx: KeyedProcessFunction[String, WaterSensor, String]#Context,out: Collector[String]): Unit = {// ctx.getCurrentKey // 当前的key// ctx.output() // outputTag// ctx.timestamp() // 时间戳// ctx.timerService() // 时间相关的服务( 定时器 )// ctx.timerService().currentProcessingTime()// ctx.timerService().currentWatermark()// ctx.timerService().registerEventTimeTimer(3) // 注册定时器// ctx.timerService().registerProcessingTimeTimer()// ctx.timerService().deleteEventTimeTimer(3)// ctx.timerService().deleteProcessingTimeTimer()out.collect("KeyProcess ...")// 注册定时器ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime())}}).print(" process>> ")
CoProcessFunction
对于两条输入流,DataStream API提供了CoProcessFunction这样的low-level操作。
使用参考 KeyedProcessFunction
其它API
.trigger() —— 触发器定义 window 什么时候关闭,触发计算并输出结果.evitor() —— 移除器定义移除某些数据的逻辑.allowedLateness() —— 允许处理迟到的数据.sideOutputLateData() —— 将迟到的数据放入侧输出流.getSideOutput() —— 获取侧输出流
案例实操 ( 水位报警 )
需求:监控水位传感器的水位值,如果水位值在五分钟之内(processing time)连续上升,则报警。 并输出水位大于10 的
// 核心代码如下:val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 设置时间语义env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.getConfig.setTaskCancellationTimeout(3)// 数据处理val markDS: DataStream[WaterSensor] = env.socketTextStream("hadoop-master", 9999).map(line => {val datas = line.split(",")WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)})// 设置水位线// .assignAscendingTimestamps(_.ts * 1000) // 会少 1ms 的时间.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[WaterSensor] {override def checkAndGetNextWatermark(lastElement: WaterSensor, extractedTimestamp: Long): Watermark = {new Watermark(extractedTimestamp)}override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = {element.ts * 1000}})val processDS: DataStream[String] = markDS.keyBy(_.id).process(new KeyedProcessFunction[String, WaterSensor, String] {// 初始化数据// 初始化水位线private var currentHigh = 0L// 初始化告警定时器private var alarmTime = 0Lval outputTag: OutputTag[Double] = new OutputTag[Double]("high")// 定时器在指定的时间点触发该方法的执行override def onTimer(timestamp: Long,ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext,out: Collector[String]): Unit = {// out.collect(" timer execute ... ")out.collect(s"id:${ctx.getCurrentKey}, ${ctx.timerService().currentWatermark()}, 连续5s水位上涨")}override def processElement(value: WaterSensor,ctx: KeyedProcessFunction[String, WaterSensor, String]#Context,out: Collector[String]): Unit = {if (value.vc > currentHigh) {// 如果传入的值 大于当前值, 则注册定时器if (alarmTime == 0) {// 更新告警的初始时间alarmTime = (value.ts + 5) * 1000// 注册定时器ctx.timerService().registerEventTimeTimer(alarmTime)}} else {// 否则,清除定时器,重新注册ctx.timerService().deleteEventTimeTimer(alarmTime)// 更新告警的初始时间alarmTime = (value.ts + 5) * 1000// 注册定时器ctx.timerService().registerEventTimeTimer(alarmTime)}currentHigh = value.vc.toLong// 输出水位大于10 的, 使用侧输出流if (value.vc > 10) {ctx.output(outputTag, value.vc)}}})markDS.print("mark>>")processDS.print("process>> ")val outputTag: OutputTag[Double] = new OutputTag[Double]("high")processDS.getSideOutput(outputTag).print("side>>")env.execute()
