flink window 概述

window是一种 将无限的流式数据 切割为有限块的手段

Window可以分成两类:

  1. CountWindow 按数据分
  2. TimeWindow 按时间分
    1. 滚动窗口(Tumbling Window)

将数据依据固定的窗口长度对数据进行切片

  1. 滑动窗口(Sliding Window)
  2. 会话窗口(Session Window )

由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

window API

TimeWindow

  1. def timeWindow() = {
  2. env.socketTextStream("hadoop-master",9999)
  3. .flatMap(_.split(" "))
  4. .map( (_,1) )
  5. .keyBy(0)
  6. .timeWindow(Time.seconds(3),Time.seconds(2))
  7. .reduce(
  8. (t1,t2) => {
  9. (t1._1,t1._2 + t2._2)
  10. }
  11. )
  12. .print()
  13. env.execute()
  14. }


CountWindow

  1. def countWindow() = {
  2. env.socketTextStream("hadoop-master", 9999)
  3. .flatMap(_.split(" "))
  4. .map( (_,1) )
  5. .keyBy(0)
  6. .countWindow(3,2)
  7. .reduce(
  8. (t1,t2) => {
  9. (t1._1, t1._2 + t2._2)
  10. }
  11. )
  12. .print()
  13. env.execute()
  14. }


window function ( 窗口函数 )

在 window 之后

reduce Function

  1. def reduceFunc() = {
  2. env.socketTextStream("hadoop-master", 9999)
  3. .flatMap(_.split(" "))
  4. .map( (_,1) )
  5. .keyBy(0)
  6. .countWindow(3,2)
  7. .reduce(
  8. new ReduceFunction[(String, Int)] {
  9. override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
  10. // 第一条数据不做处理, 从第二条开始, 每来一条,就会计算一次
  11. print(" ---- ")
  12. (value1._1, value1._2 + value2._2)
  13. }
  14. }
  15. )
  16. .print()
  17. env.execute()
  18. }

aggregate Function

  1. def aggregateFunc() = {
  2. env.socketTextStream("hadoop-master", 9999)
  3. .flatMap(_.split(" "))
  4. .map( (_,1) )
  5. .keyBy(0)
  6. .countWindow(3)
  7. .aggregate(new MyAggregate)
  8. .print()
  9. env.execute()
  10. }

MyAggregate class

  1. package com.ylb.myCluss
  2. import org.apache.flink.api.common.functions.AggregateFunction
  3. /**
  4. * @author yanglibin
  5. * @create 2020-03-04 12:12
  6. */
  7. class MyAggregate extends AggregateFunction[(String,Int),Int,Int] {
  8. // 泛型说明: 输入In, 累加器中间处理值, 输出Out
  9. // 创建累加器, 初始值
  10. override def createAccumulator(): Int = 0
  11. // 对键入值 累加更新
  12. override def add(value: (String, Int), accumulator: Int): Int = {
  13. println(" add ...")
  14. accumulator + value._2
  15. }
  16. // 获取累加器的值
  17. override def getResult(accumulator: Int): Int = accumulator
  18. // 合并累加器
  19. override def merge(a: Int, b: Int): Int = a + b
  20. }

全窗口函数(full window functions) — process

先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。

  1. def processFunc() = {
  2. env.socketTextStream("hadoop-master", 9999)
  3. .flatMap(_.split(" "))
  4. .map( (_,1) )
  5. .keyBy(_._1) // 注意这里不要用 java格式的0
  6. .timeWindow(Time.seconds(5))
  7. .process(new MyProcess)
  8. .print()
  9. env.execute()
  10. }

MyProcess class

  1. package com.ylb.myCluss
  2. import java.text.SimpleDateFormat
  3. import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
  4. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  5. import org.apache.flink.util.Collector
  6. /**
  7. * @author yanglibin
  8. * @create 2020-03-04 13:36
  9. */
  10. class MyProcess extends ProcessWindowFunction[(String,Int),String,String,TimeWindow]{
  11. override def process(
  12. key: String,
  13. context: Context,
  14. elements: Iterable[(String, Int)],
  15. out: Collector[String]): Unit = {
  16. val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  17. out.collect("window start time : " + sdf.format(context.window.getStart) + " ========")
  18. out.collect("message: " + elements.toList)
  19. out.collect("window end time: " + sdf.format(context.window.getEnd) + " ========")
  20. }
  21. }

KeyedProcessFunction

  1. // 数据处理
  2. env.socketTextStream("hadoop-master",9999)
  3. .map(
  4. line => {
  5. val datas = line.split(",")
  6. WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)
  7. }
  8. )
  9. .keyBy(_.id)
  10. .process(
  11. new KeyedProcessFunction[String,WaterSensor,String] {
  12. // 定时器在指定的时间点触发该方法的执行
  13. override def onTimer(
  14. timestamp: Long,
  15. ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext,
  16. out: Collector[String]): Unit = {
  17. out.collect(" timer execute ... ")
  18. }
  19. //
  20. override def processElement(
  21. value: WaterSensor,
  22. ctx: KeyedProcessFunction[String, WaterSensor, String]#Context,
  23. out: Collector[String]): Unit = {
  24. // ctx.getCurrentKey // 当前的key
  25. // ctx.output() // outputTag
  26. // ctx.timestamp() // 时间戳
  27. // ctx.timerService() // 时间相关的服务( 定时器 )
  28. // ctx.timerService().currentProcessingTime()
  29. // ctx.timerService().currentWatermark()
  30. // ctx.timerService().registerEventTimeTimer(3) // 注册定时器
  31. // ctx.timerService().registerProcessingTimeTimer()
  32. // ctx.timerService().deleteEventTimeTimer(3)
  33. // ctx.timerService().deleteProcessingTimeTimer()
  34. out.collect("KeyProcess ...")
  35. // 注册定时器
  36. ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime())
  37. }
  38. }
  39. )
  40. .print(" process>> ")

CoProcessFunction

对于两条输入流,DataStream API提供了CoProcessFunction这样的low-level操作。
使用参考 KeyedProcessFunction

其它API

  1. .trigger() —— 触发器
  2. 定义 window 什么时候关闭,触发计算并输出结果
  3. .evitor() —— 移除器
  4. 定义移除某些数据的逻辑
  5. .allowedLateness() —— 允许处理迟到的数据
  6. .sideOutputLateData() —— 将迟到的数据放入侧输出流
  7. .getSideOutput() —— 获取侧输出流

案例实操 ( 水位报警 )

需求:监控水位传感器的水位值,如果水位值在五分钟之内(processing time)连续上升,则报警。 并输出水位大于10 的


  1. // 核心代码如下:
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment
  3. env.setParallelism(1)
  4. // 设置时间语义
  5. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  6. env.getConfig.setTaskCancellationTimeout(3)
  7. // 数据处理
  8. val markDS: DataStream[WaterSensor] = env.socketTextStream("hadoop-master", 9999)
  9. .map(
  10. line => {
  11. val datas = line.split(",")
  12. WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)
  13. }
  14. )
  15. // 设置水位线
  16. // .assignAscendingTimestamps(_.ts * 1000) // 会少 1ms 的时间
  17. .assignTimestampsAndWatermarks(
  18. new AssignerWithPunctuatedWatermarks[WaterSensor] {
  19. override def checkAndGetNextWatermark(lastElement: WaterSensor, extractedTimestamp: Long): Watermark = {
  20. new Watermark(extractedTimestamp)
  21. }
  22. override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = {
  23. element.ts * 1000
  24. }
  25. }
  26. )
  27. val processDS: DataStream[String] = markDS
  28. .keyBy(_.id)
  29. .process(
  30. new KeyedProcessFunction[String, WaterSensor, String] {
  31. // 初始化数据
  32. // 初始化水位线
  33. private var currentHigh = 0L
  34. // 初始化告警定时器
  35. private var alarmTime = 0L
  36. val outputTag: OutputTag[Double] = new OutputTag[Double]("high")
  37. // 定时器在指定的时间点触发该方法的执行
  38. override def onTimer(
  39. timestamp: Long,
  40. ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext,
  41. out: Collector[String]): Unit = {
  42. // out.collect(" timer execute ... ")
  43. out.collect(s"id:${ctx.getCurrentKey}, ${ctx.timerService().currentWatermark()}, 连续5s水位上涨")
  44. }
  45. override def processElement(
  46. value: WaterSensor,
  47. ctx: KeyedProcessFunction[String, WaterSensor, String]#Context,
  48. out: Collector[String]): Unit = {
  49. if (value.vc > currentHigh) {
  50. // 如果传入的值 大于当前值, 则注册定时器
  51. if (alarmTime == 0) {
  52. // 更新告警的初始时间
  53. alarmTime = (value.ts + 5) * 1000
  54. // 注册定时器
  55. ctx.timerService().registerEventTimeTimer(alarmTime)
  56. }
  57. } else {
  58. // 否则,清除定时器,重新注册
  59. ctx.timerService().deleteEventTimeTimer(alarmTime)
  60. // 更新告警的初始时间
  61. alarmTime = (value.ts + 5) * 1000
  62. // 注册定时器
  63. ctx.timerService().registerEventTimeTimer(alarmTime)
  64. }
  65. currentHigh = value.vc.toLong
  66. // 输出水位大于10 的, 使用侧输出流
  67. if (value.vc > 10) {
  68. ctx.output(outputTag, value.vc)
  69. }
  70. }
  71. }
  72. )
  73. markDS.print("mark>>")
  74. processDS.print("process>> ")
  75. val outputTag: OutputTag[Double] = new OutputTag[Double]("high")
  76. processDS.getSideOutput(outputTag).print("side>>")
  77. env.execute()