窗口介绍
什么是窗口:
开始和结束的一个时间范围,左开右闭。
窗口的作用:
对一段时间内的数据进行聚合。
所有窗口逻辑上只能在一个task中,也就是一个并行度中
相关算子
开启窗口的算子
每个语义的算子也是一样的,所以算子个数为24=8
窗口一般用在keyby后
没有分组的窗口只能统计全局,windowAll,
所以窗口一般可以人为是为了*分组后将流式数据按照窗口来做聚合统计的工具
基于时间:
window方法用在keyby后,需要传入一个窗口分配器windowAssigner来指定窗口类型,其中有处理时间(processing)和事件时间(eventTime)语义的窗口
window方法后可以使用聚合函数(sum、max等),process,reduce,aggregate
Tumble_Time
Slide_Time
Session_Time
会话窗口,流中每个元素启动一个窗口,在设置的时间之内持续发出会话,则将两个窗口合并merge,
也就是说:两个窗口之间一定存在一个间隙withGap,小于则合并,大于则分窗
例如:对于某个key,设置了withGap时间为3秒,那么一直输入这个key,直到有3秒中没输入,则窗口关闭触发计算
Global
基于个数:
TumbleAndSlide_Count
countWindow
.countWindow(5) // 滚动
.countWindow(_2, 1) _// 滑动
没有分组的情况
NoKey
方法:
windowAll:时间
countWindowAll:个数
没有分组的窗口没有意义,并行度是1,因为不分组就是说有数据都是一组,类比mysql,你要对所有窗口的数据进行聚合那么这些数据必须在一个分区中,否则会跨分区(跨并行度),就算设置的并行度不是1也会强制用一个并行度来执行
窗口处理函数
Reduce_Process
类型不会改变,只能进行一个因素的聚合,什么类型进来什么类型出去
Aggregate_Process
类型会发生改变,可以对多个因素进行聚合,例如求平均值(个数,值数)
重写4个方法:
createAccumulator :可以是tuple、内部类
add(Tuple2两个数据各自累加的逻辑)
getResult(是Tuple2两个数据聚合的逻辑)
merge: session才用
全量
AggregateFunction
一般排名(topN),排序的业务情况才会用
语义及水印
语义和水印是配套使用,只有存在窗口、定时器、等涉及时间的,语义才有意义
相关算子
Watermark 水印
Watermark_Custom 自定义水印
MyWMG 自定义水印生成器
Window_Old 1.12老版flink窗口使用
语义semantic
时间语义种类:**事件时间 :eventime**
时间戳在数据之中**处理时间 :processtime**
系统时间 **采集时间:Ingestion Time**
在数据源操作处(进入 Flink source 时),每个事件将进入 Flink 时当时的时间作为时间戳
1.12开始默认语义是时间语义eventime,旧版本需要额外指定
或者
水印watermark
这种在Flink中去测量事件时间的进度的机制就是watermark(水印). watermark作为数据流的一部分在流动, 并且携带一个时间戳t.
一个Watermark(t)表示在这个流里面事件时间已经到了时间t, 意味着此时, 流中不应该存在小于t的数据
类似于调慢的时钟
要点
0、水印位置
随着数据进行流动,到map算子,水印就在map生效,到flatmap就在flatmap生效。。。<br /> 为了让水印作用在更多的地方,尽量在流的前方加水印<br /> 所以最好是source位置加水印,但是source位置的数据未加工,一般难以处理<br /> 所以选择在第一个map处进行处理
1、计算逻辑
水印生成:
①周期生成(系统默认)
②watermark = maxTs - 乱序程度 - 1ms
水印时间是动态变化的
水印从第一条数据的时间戳开始计算,减去‘乱序程度’
如第一个数据的时间戳是1000,允许延迟时间是3000,那么水印=1000-3000-1=-2001
第二个数据的时间戳是3000,允许延迟时间是3000,那么水印=3000-3000-1=-1
此时第三个数据时间戳是2000,允许延迟时间是3000,此时水印还是上一条水印,水印=-1
2、窗口关闭时机:
窗口根据水印时间来关闭,水印时间代替了原本的系统时间
当水印时间(原本是系统)大于窗口结束时间,窗口关闭,但是水印又是由下一条数据决定的,所以下一条数据-乱序时间-1ms触发关窗
假设窗口宽度是5000,而窗口左开右闭的值是4999,那么水印时间只要小于等于这个值就能上车,
此后有大于这个水印时间的数据出现,窗口就关门走人。
例如
当第四条数据是8000,那么此时水印=8000-3000-1=4999,刚好可以上车。
如果此时又来了一条7000(<8000)的数据,则还能上车
如果此时又来了一条9000(>8000)的数据,则窗口关闭,9000的数据会去到侧输出流中。
面试:
有界流如果关闭窗口:
程序结束的时候,系统会给窗口加一个大水印,用来关闭最后一个窗口
(窗口关闭原则:新水印激活)
3、水印相当于调慢时间的司机,水印不能倒流,只能根据新来的数据进行更新。
4、水印有序和乱序是一样的原理,继承了乱序,只是把乱序的“允许延迟时间”设置为0
5、水印在多并行度中传递:
多并行度下,以多个并行度中的:最小的水印为准(木桶原理,漏水的木桶)
原因:
多并行度下,如果以水印大的数据为准,那么数据传递到下游的时候,触发下游窗口关闭,其他水印小的并行度就都是迟到数据了
如果出现数据倾斜(有热点数据),很容易造成数据阻塞
比如一个并行度一直没数据,所以水印也是不存在的,这会导致其他有数据的并行度的窗口无法关闭,数据一直进入这个窗口,造成阻塞
避免数据倾斜的方法
减少并行度(不可取),计算能力变差了,不符合分布式思想
- flink强制水印更新:withIdleness,允许空闲时间,一般设置几分钟
水印源码
自定义水印(工作一般不会用)
侧输出流:
获取方式:
两个作用:
1、对流的切分,例如放入预警流。(多)
如果获取到侧输出流: 需要在process中, context
2、作为数据丢失的第三层保障 (少)
放在窗口后面, 承载窗口关闭之后真正迟到的数据
侧输出流参数泛型问题
sideOutputLateDate(new OutputTag<>("late")) 报错<br /> sideOutputLateDate(new OutputTag<WaterSensor>("late"){}) 使用匿名内部类的方式才能知道泛型
flink保障数据不丢失的三层保障
①保证数据不丢失的第一层保障
水印
调慢3秒
②保证数据不丢失的第二层保障
允许迟到allowedLateness ,这个值是相对于水印的时间
先对窗口中的数据进行计算,但是窗口没关闭
调慢5秒,之后关闭
③保证数据不丢失的第三次保障
将数据放入 :侧输出流sideOutputLateDate
将窗口外的数据丢进这里
以上3层保证数据必不会丢
举例:
窗口大小是5秒,
允许延迟时间是3秒,
允许迟到时间现在是5秒,
那么当数据时间满足:t-3>=5(>4.999)的时候,窗口开始计算,但不会关闭
那么当数据时间满足:t-3-5>=5(>4.999)的时候,才会触发窗口关闭
如果:
此时来了一条9秒的数据,那么数据进入5-10秒的窗口,并且0-5的窗口开始计算
又来了一条5秒的数据,此时这条数据进入0-5秒的窗口,继续计算
如果:
此时来了一条14秒的数据,那么0-5秒的窗口关闭,
再来一条3秒的数据则会进入侧输出流中。
但在来一条6秒的数据则会进入5-10秒的窗口
定时器
定时器
满足某种触发条件后,注册定时器,注册了定时器就已经属于报警了,可通过设置一下时间定义多久开始报警
通过process算子的上下文获得
ctx.timerService().registerProcessingTimeTimer(now + 5000);
注册定时器后(也就是触发报警后),需要重写触发器方法onTimer
种类:<br /> 非事件时间定时器<br />不需要分配水印<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/21361442/1642148091215-1f191f0d-d577-41a0-8dc3-a066fb64c8ea.png#clientId=u3aec0b34-8e83-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=781&id=ufcfbdb1b&margin=%5Bobject%20Object%5D&name=image.png&originHeight=781&originWidth=939&originalType=binary&ratio=1&rotation=0&showTitle=false&size=116406&status=done&style=none&taskId=uc20bd9f5-189c-408d-9092-53e101daeb1&title=&width=939)<br /> 事件定时器<br />需要分配水印<br />应用例子:com.atguigu.chapter07_state_timer_window.timer.Flink02_Timeer_Project<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/21361442/1642148051360-d6c4088f-f16a-48ef-b3a3-8b5e4c3bc9aa.png#clientId=u3aec0b34-8e83-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=817&id=u20954973&margin=%5Bobject%20Object%5D&name=image.png&originHeight=817&originWidth=970&originalType=binary&ratio=1&rotation=0&showTitle=false&size=113862&status=done&style=none&taskId=u5a585b15-b852-4b86-8ef4-3059003adc8&title=&width=970)<br />应用例子:com.atguigu.chapter07_state_timer_window.timer.Flink02_Timeer_Project<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/21361442/1626748401546-d75e7836-105f-4b80-9160-e22b934f5b86.png#crop=0&crop=0&crop=1&crop=1&height=410&id=f8E2f&margin=%5Bobject%20Object%5D&name=image.png&originHeight=410&originWidth=963&originalType=binary&ratio=1&rotation=0&showTitle=false&size=288210&status=done&style=none&title=&width=963)
注册定时器后需要重写触发器方法
结果展示
注意
报这个错,是因为打开的还是之前的网页,当停止进程的时候,需要重新打开flink指定的那个端口