flink window 概述
window是一种 将无限的流式数据 切割为有限块的手段
Window可以分成两类:
- CountWindow 按数据分
- TimeWindow 按时间分
- 滚动窗口(Tumbling Window)
将数据依据固定的窗口长度对数据进行切片
- 滑动窗口(Sliding Window)
- 会话窗口(Session Window )
由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
window API
TimeWindow
def timeWindow() = {
env.socketTextStream("hadoop-master",9999)
.flatMap(_.split(" "))
.map( (_,1) )
.keyBy(0)
.timeWindow(Time.seconds(3),Time.seconds(2))
.reduce(
(t1,t2) => {
(t1._1,t1._2 + t2._2)
}
)
.print()
env.execute()
}
CountWindow
def countWindow() = {
env.socketTextStream("hadoop-master", 9999)
.flatMap(_.split(" "))
.map( (_,1) )
.keyBy(0)
.countWindow(3,2)
.reduce(
(t1,t2) => {
(t1._1, t1._2 + t2._2)
}
)
.print()
env.execute()
}
window function ( 窗口函数 )
reduce Function
def reduceFunc() = {
env.socketTextStream("hadoop-master", 9999)
.flatMap(_.split(" "))
.map( (_,1) )
.keyBy(0)
.countWindow(3,2)
.reduce(
new ReduceFunction[(String, Int)] {
override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
// 第一条数据不做处理, 从第二条开始, 每来一条,就会计算一次
print(" ---- ")
(value1._1, value1._2 + value2._2)
}
}
)
.print()
env.execute()
}
aggregate Function
def aggregateFunc() = {
env.socketTextStream("hadoop-master", 9999)
.flatMap(_.split(" "))
.map( (_,1) )
.keyBy(0)
.countWindow(3)
.aggregate(new MyAggregate)
.print()
env.execute()
}
MyAggregate class
package com.ylb.myCluss
import org.apache.flink.api.common.functions.AggregateFunction
/**
* @author yanglibin
* @create 2020-03-04 12:12
*/
class MyAggregate extends AggregateFunction[(String,Int),Int,Int] {
// 泛型说明: 输入In, 累加器中间处理值, 输出Out
// 创建累加器, 初始值
override def createAccumulator(): Int = 0
// 对键入值 累加更新
override def add(value: (String, Int), accumulator: Int): Int = {
println(" add ...")
accumulator + value._2
}
// 获取累加器的值
override def getResult(accumulator: Int): Int = accumulator
// 合并累加器
override def merge(a: Int, b: Int): Int = a + b
}
全窗口函数(full window functions) — process
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。
def processFunc() = {
env.socketTextStream("hadoop-master", 9999)
.flatMap(_.split(" "))
.map( (_,1) )
.keyBy(_._1) // 注意这里不要用 java格式的0
.timeWindow(Time.seconds(5))
.process(new MyProcess)
.print()
env.execute()
}
MyProcess class
package com.ylb.myCluss
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/**
* @author yanglibin
* @create 2020-03-04 13:36
*/
class MyProcess extends ProcessWindowFunction[(String,Int),String,String,TimeWindow]{
override def process(
key: String,
context: Context,
elements: Iterable[(String, Int)],
out: Collector[String]): Unit = {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
out.collect("window start time : " + sdf.format(context.window.getStart) + " ========")
out.collect("message: " + elements.toList)
out.collect("window end time: " + sdf.format(context.window.getEnd) + " ========")
}
}
KeyedProcessFunction
// 数据处理
env.socketTextStream("hadoop-master",9999)
.map(
line => {
val datas = line.split(",")
WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)
}
)
.keyBy(_.id)
.process(
new KeyedProcessFunction[String,WaterSensor,String] {
// 定时器在指定的时间点触发该方法的执行
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext,
out: Collector[String]): Unit = {
out.collect(" timer execute ... ")
}
//
override def processElement(
value: WaterSensor,
ctx: KeyedProcessFunction[String, WaterSensor, String]#Context,
out: Collector[String]): Unit = {
// ctx.getCurrentKey // 当前的key
// ctx.output() // outputTag
// ctx.timestamp() // 时间戳
// ctx.timerService() // 时间相关的服务( 定时器 )
// ctx.timerService().currentProcessingTime()
// ctx.timerService().currentWatermark()
// ctx.timerService().registerEventTimeTimer(3) // 注册定时器
// ctx.timerService().registerProcessingTimeTimer()
// ctx.timerService().deleteEventTimeTimer(3)
// ctx.timerService().deleteProcessingTimeTimer()
out.collect("KeyProcess ...")
// 注册定时器
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime())
}
}
)
.print(" process>> ")
CoProcessFunction
对于两条输入流,DataStream API提供了CoProcessFunction这样的low-level操作。
使用参考 KeyedProcessFunction
其它API
.trigger() —— 触发器
定义 window 什么时候关闭,触发计算并输出结果
.evitor() —— 移除器
定义移除某些数据的逻辑
.allowedLateness() —— 允许处理迟到的数据
.sideOutputLateData() —— 将迟到的数据放入侧输出流
.getSideOutput() —— 获取侧输出流
案例实操 ( 水位报警 )
需求:监控水位传感器的水位值,如果水位值在五分钟之内(processing time)连续上升,则报警。 并输出水位大于10 的
// 核心代码如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setTaskCancellationTimeout(3)
// 数据处理
val markDS: DataStream[WaterSensor] = env.socketTextStream("hadoop-master", 9999)
.map(
line => {
val datas = line.split(",")
WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)
}
)
// 设置水位线
// .assignAscendingTimestamps(_.ts * 1000) // 会少 1ms 的时间
.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks[WaterSensor] {
override def checkAndGetNextWatermark(lastElement: WaterSensor, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp)
}
override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = {
element.ts * 1000
}
}
)
val processDS: DataStream[String] = markDS
.keyBy(_.id)
.process(
new KeyedProcessFunction[String, WaterSensor, String] {
// 初始化数据
// 初始化水位线
private var currentHigh = 0L
// 初始化告警定时器
private var alarmTime = 0L
val outputTag: OutputTag[Double] = new OutputTag[Double]("high")
// 定时器在指定的时间点触发该方法的执行
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext,
out: Collector[String]): Unit = {
// out.collect(" timer execute ... ")
out.collect(s"id:${ctx.getCurrentKey}, ${ctx.timerService().currentWatermark()}, 连续5s水位上涨")
}
override def processElement(
value: WaterSensor,
ctx: KeyedProcessFunction[String, WaterSensor, String]#Context,
out: Collector[String]): Unit = {
if (value.vc > currentHigh) {
// 如果传入的值 大于当前值, 则注册定时器
if (alarmTime == 0) {
// 更新告警的初始时间
alarmTime = (value.ts + 5) * 1000
// 注册定时器
ctx.timerService().registerEventTimeTimer(alarmTime)
}
} else {
// 否则,清除定时器,重新注册
ctx.timerService().deleteEventTimeTimer(alarmTime)
// 更新告警的初始时间
alarmTime = (value.ts + 5) * 1000
// 注册定时器
ctx.timerService().registerEventTimeTimer(alarmTime)
}
currentHigh = value.vc.toLong
// 输出水位大于10 的, 使用侧输出流
if (value.vc > 10) {
ctx.output(outputTag, value.vc)
}
}
}
)
markDS.print("mark>>")
processDS.print("process>> ")
val outputTag: OutputTag[Double] = new OutputTag[Double]("high")
processDS.getSideOutput(outputTag).print("side>>")
env.execute()