Window

Window概述

streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。
window是无限数据流处理的核心,window将一个无限的stream拆分成有限大小的“buckets”桶,我们可以在这些桶上做计算操作。

Window类型

Window可以分成两类:

  • CountWindow:按照制定的数据条数生成一个Window,与时间无关。
  • TimeWindow:按照时间生成Window。

对于TimeWindow,可以根据窗口实现原理的不同分为三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

滚动窗口(Tumbling Windows)

  • 将数据依据固定的窗口长度对数据进行切片。
  • 特点:时间对齐,窗口长度固定,没有重叠。
  • 滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。
  • 适用场景:适合做BI统计等(做每个时间段的聚合计算)。

滑动窗口(Sliding Windows)

  • 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
  • 特点:时间对齐,窗口时间固定,可以由重叠。
  • 滑动窗口分配器将元素分配器到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口的大小的话,窗口是可以重叠的。这种情况下元素会被分配到多个窗口中。
  • 适用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。

会话窗口(Session Windows)

  • 由一系列事件组合一个制定是时间长度的tiemout间隙组成,类似于web盈余公的session,也就是一段时间没有接收到新数据就会生成新的窗口。
  • 特点:时间无对齐。
  • session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会由重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个sesion间隔来设置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口当中去。

image.png

Window API

TimeWindow

TimeWindow是将制定时间范围内的所有数据组成一个window,一次对一个window里面的数据进行计算。

  1. package com.zh.apitest.window;
  2. import com.zh.apitest.beans.SensorReading;
  3. import org.apache.commons.collections.IteratorUtils;
  4. import org.apache.flink.api.common.functions.AggregateFunction;
  5. import org.apache.flink.api.java.tuple.Tuple;
  6. import org.apache.flink.api.java.tuple.Tuple3;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  11. import org.apache.flink.streaming.api.windowing.time.Time;
  12. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  13. import org.apache.flink.util.Collector;
  14. import org.apache.flink.util.OutputTag;
  15. /**
  16. * author: zhanghui
  17. * project: big-data-learning
  18. * package: com.zh.apitest.window
  19. * filename: WindowTest1_TimeWindow
  20. * date: 2021/12/6 4:25 下午
  21. * description: 时间窗口 timeWindow()
  22. * 窗口分配器(window assigner):
  23. * 滚动窗口(tumbling window)
  24. * 滑动窗口(sliding window)
  25. * 会话窗口(session window)
  26. * 全局窗口(global window)
  27. * 创建不同类型窗口:
  28. * 滚动时间窗口(tumbling time window): .timeWindow(Time.seconds(15))
  29. * 滑动时间窗口(sliding time window): .timeWindow(Time.seconds(15), Time.seconds(5))
  30. * 会话窗口(session window): .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
  31. * 滚动计数窗口(tumbling count window): .countWindow(5)
  32. * 滑动计数窗口(sliding count window): .countWindow(10, 2)
  33. * 窗口函数:
  34. * 增量聚合函数:每条数据到来就进行计算,保持一个简单的状态(ReduceFunction, AggregateFunction)
  35. * 全窗口函数:先把窗口所有数据收集起来,等到计算的时候遍历所有数据(ProcessWindowFunction, WindowFunction 注:ProcessWindowFunction的输入为上下文,包含了上下文,比WindowFunction的内容更加丰富)
  36. * 其他可选API:(注:trigger,evitor在顶层API中已经实现,大都不需要写)
  37. * .trigger() -- 触发器(定义window什么时候关闭,触发计算并输出结果)
  38. * .evitor() -- 移除器(定义移除某些数据的逻辑)
  39. * .allowedLateness() -- 允许处理迟到的数据
  40. * .sideOutputLateData() -- 将迟到的数据放入侧输出流
  41. * .getSideOutput() -- 获取侧输出流,侧输出流只能使用SingleOutputStreamOperator
  42. * 数据流类型变换:
  43. * Keyed Windows: DataStream -keyBy-> KeyedStream -window-> WindowedStream -聚合-> DataStream
  44. * Non-Keyed Windows: DataStream -windowAll-> WindowedStream -聚合-> DataStream
  45. *
  46. */
  47. public class WindowTest1_TimeWindow {
  48. public static void main(String[] args) throws Exception {
  49. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  50. env.setParallelism(1);
  51. // 从文件读取数据
  52. // DataStream<String> inputStream = env.readTextFile("flink-FlinkTutorial/src/main/resources/sensor.txt");
  53. // socket文本流
  54. DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
  55. // 转换成SensorReading类型
  56. DataStream<SensorReading> dataStream = inputStream.map(line -> {
  57. String[] fields = line.split(",");
  58. return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
  59. });
  60. // 开窗测试
  61. // 1.增量聚合测试
  62. DataStream<Integer> resultStream = dataStream.keyBy("id")
  63. // .countWindow(10, 2) // 滑动计数窗口
  64. // .window(EventTimeSessionWindows.withGap(Time.minutes(1))) // 会话窗口
  65. .timeWindow(Time.seconds(15)) // 简写的程序时间开窗
  66. // .window(TumblingProcessingTimeWindows.of(Time.seconds(15))) // 程序时间开窗
  67. .aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
  68. @Override
  69. public Integer createAccumulator() {
  70. return 0;
  71. }
  72. @Override
  73. public Integer add(SensorReading value, Integer accumulator) {
  74. return accumulator + 1;
  75. }
  76. @Override
  77. public Integer getResult(Integer accumulator) {
  78. return accumulator;
  79. }
  80. @Override
  81. public Integer merge(Integer a, Integer b) {
  82. return a + b;
  83. }
  84. });
  85. // 2.全窗口函数测试
  86. DataStream<Tuple3<String, Long, Integer>> resultStream2 = dataStream.keyBy("id")
  87. .timeWindow(Time.seconds(15))
  88. // .process(new ProcessWindowFunction<SensorReading, Object, Tuple, TimeWindow>() {
  89. // })
  90. .apply(new WindowFunction<SensorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {
  91. @Override
  92. public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
  93. String id = tuple.getField(0);
  94. Long windowEnd = window.getEnd();
  95. Integer count = IteratorUtils.toList(input.iterator()).size();
  96. out.collect(new Tuple3<>(id, windowEnd, count));
  97. }
  98. });
  99. // 3. 其他可选API
  100. // 定义侧输出流
  101. OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
  102. };
  103. SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
  104. .timeWindow(Time.seconds(15))
  105. // .trigger() // 触发器
  106. // .evictor() // 移除器
  107. .allowedLateness(Time.minutes(1)) // 允许处理迟到的数据
  108. .sideOutputLateData(outputTag) // 侧输出流,将迟到的数据放入侧输出流
  109. .sum("temperature");
  110. sumStream.getSideOutput(outputTag).print("late"); // 获取侧输出流,获取侧输出流getSideOutput的API只能在SingleOutputStreamOperator中获取到
  111. resultStream2.print();
  112. env.execute();
  113. }
  114. }

滚动窗口

Flink默认的时间窗口根据Processing Time进行窗口划分,将Flink获取到的数据进入Flink的时间划分到不同的窗口中。
时间间隔可以通过Time.milliseconds(x)Time.seconds(x)Time.minutes(x)等其中一个来指定。

滑动窗口(SlidingEventTimeWindows)

滑动窗口和滚动擦护给你扣的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。
下面代码中的sliding_size设置为了5s,也就是说,每5s就计算输出结果一次,每一次计算的window范围是15s内的所有元素。

CountWindow

  1. package com.zh.apitest.window;
  2. import com.zh.apitest.beans.SensorReading;
  3. import org.apache.flink.api.common.functions.AggregateFunction;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. /**
  8. * author: zhanghui
  9. * project: big-data-learning
  10. * package: com.zh.apitest.window
  11. * filename: WindowTest2_CountWindow
  12. * date: 2021/12/6 5:37 下午
  13. * description: 计数窗口 countWindow()
  14. */
  15. public class WindowTest2_CountWindow {
  16. public static void main(String[] args) throws Exception {
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. env.setParallelism(1);
  19. // socket文本流
  20. DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
  21. // 转换成SensorReading类型
  22. DataStream<SensorReading> dataStream = inputStream.map(line -> {
  23. String[] fields = line.split(",");
  24. return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
  25. });
  26. // 开计数窗口测试
  27. DataStream<Double> avgTempResultStream = dataStream.keyBy("id")
  28. .countWindow(10, 2)
  29. .aggregate(new MyAvgTemp());
  30. avgTempResultStream.print();
  31. env.execute();
  32. }
  33. public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double, Integer>, Double> {
  34. @Override
  35. public Tuple2<Double, Integer> createAccumulator() {
  36. return new Tuple2<>(0.0, 0);
  37. }
  38. @Override
  39. public Tuple2<Double, Integer> add(SensorReading value, Tuple2<Double, Integer> accumulator) {
  40. return new Tuple2<>(accumulator.f0 + value.getTemperature(), accumulator.f1 + 1);
  41. }
  42. @Override
  43. public Double getResult(Tuple2<Double, Integer> accumulator) {
  44. return accumulator.f0 / accumulator.f1;
  45. }
  46. @Override
  47. public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
  48. return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  49. }
  50. }
  51. }

CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量到达窗口大小的key对应的结果。
注意:CountWindow的window_size指的是相同key的元素的个数,不是输入的所有元素的总数。

滚动窗口

默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。

Window Function

window function定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:

  • 增量聚合函数(incremental aggregation functions)
    • 每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction,AggregateFunction。
  • 全窗口函数(full window functions)
    • 先把窗口所有数据收集起来,等到计算的时候遍历所有数据。
    • ProcessWindowFunction就是一个全窗口函数。

其他可选API

  • .trigger() — 触发器(定义window什么时候关闭,触发计算并输出结果)
  • .evitor() — 移除器(定义移除某些数据的逻辑)
  • .allowedLateness() — 允许处理迟到的数据
  • .sideOutputLateData() — 将迟到的数据放入侧输出流
  • .getSideOutput() — 获取侧输出流,getSideOutput的API只能在SingleOutputStreamOperator中获取到 ```java

      // 3. 其他可选API
    
      // 定义侧输出流
      OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
      };
    
      SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
              .timeWindow(Time.seconds(15))
    

    // .trigger() // 触发器 // .evictor() // 移除器

              .allowedLateness(Time.minutes(1)) // 允许处理迟到的数据
              .sideOutputLateData(outputTag) // 侧输出流,将迟到的数据放入侧输出流
              .sum("temperature");
    
      sumStream.getSideOutput(outputTag).print("late"); // 获取侧输出流,获取侧输出流getSideOutput的API只能在SingleOutputStreamOperator中获取到
    

``` image.png