一、基本概念

窗口(window)就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)中进行分析。
**

  • 时间窗口(Time Window):将窗口划分长度固定的大小,每个元素只能划分到一个窗口
    • 滚动时间窗口
    • 滑动时间窗口
    • 会话窗口
  • 计数窗口(Count Window):由窗口大小和滑动步长组成,所以会存在窗口重合的情况
    • 滚动计数窗口
    • 滑动计数窗口

image.png
滚动窗口:将数据依据固定的长度对数据进行划分,时间对齐,窗口长度固定,没有重叠。窗口是前闭后开,如8:00-9:00的窗口是会包括8:00的数据,但是不包括9:00的数据。

image.png
滑动窗口:是固定窗口的更广义一种形式,滑动窗口由固定的长度滑动间隔组成,窗口长度固定可以有重叠。滚动窗口可以看作是一种滑动步长=窗口长度**的特殊滑动窗口。

image.png
会话窗口:有一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口,也没有一个固定的开始时间和结束时间,特点是时间没有对齐。
**

二、API使用

  • 窗口分配器—window()方法
  • 可以使用.window()来定义一个窗口,然后基于这个window去做一些聚合或者其他的处理操作。注意window()方法必须在keyBy()之后才能使用
  • Flink提供了更加简单的.timeWindow() 和 .countWindow() 方法,用于定义事件窗口和计数窗口。 ```scala package window import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows import org.apache.flink.streaming.api.windowing.time.Time

object WindowSocketDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val socket = env.socketTextStream(“localhost”,7777)

  1. val words = socket.flatMap(_.split(" ")).map((_,1)).keyBy(_._1)
  2. /**
  3. * 开窗操作都分为 窗口分配 和 函数操作 两部分
  4. */
  5. //flink提供了window函数作为窗口分配器,window操作必须在KeyBy操作之后
  6. //同时也提供了两个函数 .timeWindow() 和 .countWindow() 用于定义时间窗口和计数窗口
  7. //这里定义了一个 15秒的窗口函数,即每15秒统计一次,每次只会统计该窗口内的函数
  8. val window = words.timeWindow(Time.seconds(15)).sum(1)
  9. //timeWindow(Time.seconds(15)) 滚动时间窗口 或者 words.window(TumblingEventTimeWindows.of(Time.seconds(15)))
  10. //timeWindow(Time.seconds(15),Time.seconds(5)) 滑动时间窗口 words.window(SlidingProcessingTimeWindows.of(Time.seconds(15),Time.seconds(3)))
  11. //.window(EventTimeSessionWindows.withGap(Time.seconds(10))) 会话窗口
  12. //words.countWindow() //全局窗口的简写,此api传一个参数表示滚动窗口,传两个参数表示滑动窗口
  13. /**
  14. * 窗口函数:定义了要对窗口函数中收集的数据做的计算操作
  15. * 可以分为两类 =>
  16. * 增量聚合函数:每一条数据来到计算一次,ReduceFunction、AggregateFunction
  17. * 全窗口函数:先把窗口所有数据收集起来,等到计算的时间会遍历所有的数据,ProcessWindowFunction
  18. */
  19. /**
  20. * 其他API:
  21. * 1、trigger():定义window什么时间关闭,触发计算并输出
  22. * 2、evitor():移除器,定义移除某些数据的逻辑
  23. * 3、allowedLateness():允许处理迟到的数据,传入一个时间表示可以延迟多久进行处理
  24. * 4、sideOutputLateData() : 将迟到的数据放入侧输出流,在window Function前使用
  25. * 5、getSideOutput():获取侧输出流,在window Function后使用
  26. */
  27. window.print("words").setParallelism(1)
  28. env.execute("window Stream")

} } ```

三、窗口函数

窗口函数(window function)定义了要对窗口中收集的数据做的计算操作
可以分为两大类

  • 增量聚合函数
    • 每条数据来到就进行计算,保持一个简单的状态
    • ReduceFunction,AggregateFunction
  • 全窗口函数
    • 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据,比如要排序、求中位数的统计
    • ProcessWindowFunction