Window
Window概述
streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。
window是无限数据流处理的核心,window将一个无限的stream拆分成有限大小的“buckets”桶,我们可以在这些桶上做计算操作。
Window类型
Window可以分成两类:
- CountWindow:按照制定的数据条数生成一个Window,与时间无关。
- TimeWindow:按照时间生成Window。
对于TimeWindow,可以根据窗口实现原理的不同分为三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。
滚动窗口(Tumbling Windows)
- 将数据依据固定的窗口长度对数据进行切片。
- 特点:时间对齐,窗口长度固定,没有重叠。
- 滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。
- 适用场景:适合做BI统计等(做每个时间段的聚合计算)。
滑动窗口(Sliding Windows)
- 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
- 特点:时间对齐,窗口时间固定,可以由重叠。
- 滑动窗口分配器将元素分配器到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口的大小的话,窗口是可以重叠的。这种情况下元素会被分配到多个窗口中。
- 适用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。
会话窗口(Session Windows)
- 由一系列事件组合一个制定是时间长度的tiemout间隙组成,类似于web盈余公的session,也就是一段时间没有接收到新数据就会生成新的窗口。
- 特点:时间无对齐。
- session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会由重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个sesion间隔来设置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口当中去。

Window API
TimeWindow
TimeWindow是将制定时间范围内的所有数据组成一个window,一次对一个window里面的数据进行计算。
package com.zh.apitest.window;import com.zh.apitest.beans.SensorReading;import org.apache.commons.collections.IteratorUtils;import org.apache.flink.api.common.functions.AggregateFunction;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;import org.apache.flink.util.OutputTag;/*** author: zhanghui* project: big-data-learning* package: com.zh.apitest.window* filename: WindowTest1_TimeWindow* date: 2021/12/6 4:25 下午* description: 时间窗口 timeWindow()* 窗口分配器(window assigner):* 滚动窗口(tumbling window)* 滑动窗口(sliding window)* 会话窗口(session window)* 全局窗口(global window)* 创建不同类型窗口:* 滚动时间窗口(tumbling time window): .timeWindow(Time.seconds(15))* 滑动时间窗口(sliding time window): .timeWindow(Time.seconds(15), Time.seconds(5))* 会话窗口(session window): .window(EventTimeSessionWindows.withGap(Time.minutes(10)))* 滚动计数窗口(tumbling count window): .countWindow(5)* 滑动计数窗口(sliding count window): .countWindow(10, 2)* 窗口函数:* 增量聚合函数:每条数据到来就进行计算,保持一个简单的状态(ReduceFunction, AggregateFunction)* 全窗口函数:先把窗口所有数据收集起来,等到计算的时候遍历所有数据(ProcessWindowFunction, WindowFunction 注:ProcessWindowFunction的输入为上下文,包含了上下文,比WindowFunction的内容更加丰富)* 其他可选API:(注:trigger,evitor在顶层API中已经实现,大都不需要写)* .trigger() -- 触发器(定义window什么时候关闭,触发计算并输出结果)* .evitor() -- 移除器(定义移除某些数据的逻辑)* .allowedLateness() -- 允许处理迟到的数据* .sideOutputLateData() -- 将迟到的数据放入侧输出流* .getSideOutput() -- 获取侧输出流,侧输出流只能使用SingleOutputStreamOperator* 数据流类型变换:* Keyed Windows: DataStream -keyBy-> KeyedStream -window-> WindowedStream -聚合-> DataStream* Non-Keyed Windows: DataStream -windowAll-> WindowedStream -聚合-> DataStream**/public class WindowTest1_TimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从文件读取数据// DataStream<String> inputStream = env.readTextFile("flink-FlinkTutorial/src/main/resources/sensor.txt");// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});// 开窗测试// 1.增量聚合测试DataStream<Integer> resultStream = dataStream.keyBy("id")// .countWindow(10, 2) // 滑动计数窗口// .window(EventTimeSessionWindows.withGap(Time.minutes(1))) // 会话窗口.timeWindow(Time.seconds(15)) // 简写的程序时间开窗// .window(TumblingProcessingTimeWindows.of(Time.seconds(15))) // 程序时间开窗.aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {@Overridepublic Integer createAccumulator() {return 0;}@Overridepublic Integer add(SensorReading value, Integer accumulator) {return accumulator + 1;}@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return a + b;}});// 2.全窗口函数测试DataStream<Tuple3<String, Long, Integer>> resultStream2 = dataStream.keyBy("id").timeWindow(Time.seconds(15))// .process(new ProcessWindowFunction<SensorReading, Object, Tuple, TimeWindow>() {// }).apply(new WindowFunction<SensorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception {String id = tuple.getField(0);Long windowEnd = window.getEnd();Integer count = IteratorUtils.toList(input.iterator()).size();out.collect(new Tuple3<>(id, windowEnd, count));}});// 3. 其他可选API// 定义侧输出流OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {};SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id").timeWindow(Time.seconds(15))// .trigger() // 触发器// .evictor() // 移除器.allowedLateness(Time.minutes(1)) // 允许处理迟到的数据.sideOutputLateData(outputTag) // 侧输出流,将迟到的数据放入侧输出流.sum("temperature");sumStream.getSideOutput(outputTag).print("late"); // 获取侧输出流,获取侧输出流getSideOutput的API只能在SingleOutputStreamOperator中获取到resultStream2.print();env.execute();}}
滚动窗口
Flink默认的时间窗口根据Processing Time进行窗口划分,将Flink获取到的数据进入Flink的时间划分到不同的窗口中。
时间间隔可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中一个来指定。
滑动窗口(SlidingEventTimeWindows)
滑动窗口和滚动擦护给你扣的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。
下面代码中的sliding_size设置为了5s,也就是说,每5s就计算输出结果一次,每一次计算的window范围是15s内的所有元素。
CountWindow
package com.zh.apitest.window;import com.zh.apitest.beans.SensorReading;import org.apache.flink.api.common.functions.AggregateFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** author: zhanghui* project: big-data-learning* package: com.zh.apitest.window* filename: WindowTest2_CountWindow* date: 2021/12/6 5:37 下午* description: 计数窗口 countWindow()*/public class WindowTest2_CountWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});// 开计数窗口测试DataStream<Double> avgTempResultStream = dataStream.keyBy("id").countWindow(10, 2).aggregate(new MyAvgTemp());avgTempResultStream.print();env.execute();}public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double, Integer>, Double> {@Overridepublic Tuple2<Double, Integer> createAccumulator() {return new Tuple2<>(0.0, 0);}@Overridepublic Tuple2<Double, Integer> add(SensorReading value, Tuple2<Double, Integer> accumulator) {return new Tuple2<>(accumulator.f0 + value.getTemperature(), accumulator.f1 + 1);}@Overridepublic Double getResult(Tuple2<Double, Integer> accumulator) {return accumulator.f0 / accumulator.f1;}@Overridepublic Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}}}
CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量到达窗口大小的key对应的结果。
注意:CountWindow的window_size指的是相同key的元素的个数,不是输入的所有元素的总数。
滚动窗口
默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
滑动窗口
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。
Window Function
window function定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:
- 增量聚合函数(incremental aggregation functions)
- 每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction,AggregateFunction。
- 全窗口函数(full window functions)
- 先把窗口所有数据收集起来,等到计算的时候遍历所有数据。
- ProcessWindowFunction就是一个全窗口函数。
其他可选API
.trigger()— 触发器(定义window什么时候关闭,触发计算并输出结果).evitor()— 移除器(定义移除某些数据的逻辑).allowedLateness()— 允许处理迟到的数据.sideOutputLateData()— 将迟到的数据放入侧输出流.getSideOutput()— 获取侧输出流,getSideOutput的API只能在SingleOutputStreamOperator中获取到 ```java// 3. 其他可选API // 定义侧输出流 OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") { }; SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id") .timeWindow(Time.seconds(15))// .trigger() // 触发器 // .evictor() // 移除器
.allowedLateness(Time.minutes(1)) // 允许处理迟到的数据 .sideOutputLateData(outputTag) // 侧输出流,将迟到的数据放入侧输出流 .sum("temperature"); sumStream.getSideOutput(outputTag).print("late"); // 获取侧输出流,获取侧输出流getSideOutput的API只能在SingleOutputStreamOperator中获取到
```

