一、基本概念
窗口(window)就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)中进行分析。
**
- 时间窗口(Time Window):将窗口划分长度固定的大小,每个元素只能划分到一个窗口
- 滚动时间窗口
- 滑动时间窗口
- 会话窗口
- 计数窗口(Count Window):由窗口大小和滑动步长组成,所以会存在窗口重合的情况
- 滚动计数窗口
- 滑动计数窗口
滚动窗口:将数据依据固定的长度对数据进行划分,时间对齐,窗口长度固定,没有重叠。窗口是前闭后开,如8:00-9:00的窗口是会包括8:00的数据,但是不包括9:00的数据。
滑动窗口:是固定窗口的更广义一种形式,滑动窗口由固定的长度和滑动间隔组成,窗口长度固定,可以有重叠。滚动窗口可以看作是一种滑动步长=窗口长度**的特殊滑动窗口。
会话窗口:有一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口,也没有一个固定的开始时间和结束时间,特点是时间没有对齐。
**
二、API使用
- 窗口分配器—window()方法
- 可以使用.window()来定义一个窗口,然后基于这个window去做一些聚合或者其他的处理操作。注意window()方法必须在keyBy()之后才能使用。
- Flink提供了更加简单的.timeWindow() 和 .countWindow() 方法,用于定义事件窗口和计数窗口。 ```scala package window import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows import org.apache.flink.streaming.api.windowing.time.Time
object WindowSocketDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val socket = env.socketTextStream(“localhost”,7777)
val words = socket.flatMap(_.split(" ")).map((_,1)).keyBy(_._1)
/**
* 开窗操作都分为 窗口分配 和 函数操作 两部分
*/
//flink提供了window函数作为窗口分配器,window操作必须在KeyBy操作之后
//同时也提供了两个函数 .timeWindow() 和 .countWindow() 用于定义时间窗口和计数窗口
//这里定义了一个 15秒的窗口函数,即每15秒统计一次,每次只会统计该窗口内的函数
val window = words.timeWindow(Time.seconds(15)).sum(1)
//timeWindow(Time.seconds(15)) 滚动时间窗口 或者 words.window(TumblingEventTimeWindows.of(Time.seconds(15)))
//timeWindow(Time.seconds(15),Time.seconds(5)) 滑动时间窗口 words.window(SlidingProcessingTimeWindows.of(Time.seconds(15),Time.seconds(3)))
//.window(EventTimeSessionWindows.withGap(Time.seconds(10))) 会话窗口
//words.countWindow() //全局窗口的简写,此api传一个参数表示滚动窗口,传两个参数表示滑动窗口
/**
* 窗口函数:定义了要对窗口函数中收集的数据做的计算操作
* 可以分为两类 =>
* 增量聚合函数:每一条数据来到计算一次,ReduceFunction、AggregateFunction
* 全窗口函数:先把窗口所有数据收集起来,等到计算的时间会遍历所有的数据,ProcessWindowFunction
*/
/**
* 其他API:
* 1、trigger():定义window什么时间关闭,触发计算并输出
* 2、evitor():移除器,定义移除某些数据的逻辑
* 3、allowedLateness():允许处理迟到的数据,传入一个时间表示可以延迟多久进行处理
* 4、sideOutputLateData() : 将迟到的数据放入侧输出流,在window Function前使用
* 5、getSideOutput():获取侧输出流,在window Function后使用
*/
window.print("words").setParallelism(1)
env.execute("window Stream")
} } ```
三、窗口函数
窗口函数(window function)定义了要对窗口中收集的数据做的计算操作
可以分为两大类
- 增量聚合函数
- 每条数据来到就进行计算,保持一个简单的状态
- ReduceFunction,AggregateFunction
- 全窗口函数
- 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据,比如要排序、求中位数的统计
- ProcessWindowFunction