image.png

窗口介绍

什么是窗口:
开始和结束的一个时间范围,左开右闭。
窗口的作用:
对一段时间内的数据进行聚合。

所有窗口逻辑上只能在一个task中,也就是一个并行度中

相关算子

开启窗口的算子

每个语义的算子也是一样的,所以算子个数为24=8
窗口一般用在keyby后
没有分组的窗口只能统计全局,windowAll,
所以窗口一般可以人为是为了*分组后将流式数据按照窗口来做聚合统计
的工具

基于时间:

window方法用在keyby后,需要传入一个窗口分配器windowAssigner来指定窗口类型,其中有处理时间(processing)和事件时间(eventTime)语义的窗口
window方法后可以使用聚合函数(sum、max等),process,reduce,aggregate
image.png

Tumble_Time

滚动窗口

Slide_Time

滑动窗口,size,slide

Session_Time

会话窗口,流中每个元素启动一个窗口,在设置的时间之内持续发出会话,则将两个窗口合并merge,
也就是说:两个窗口之间一定存在一个间隙withGap,小于则合并,大于则分窗
例如:对于某个key,设置了withGap时间为3秒,那么一直输入这个key,直到有3秒中没输入,则窗口关闭触发计算

Global

全局窗口,没有触发器trigger,需要自定义触发器
image.png
image.png

基于个数:

TumbleAndSlide_Count

countWindow
.countWindow(5) // 滚动
.countWindow(_2, 1) _// 滑动

没有分组的情况

NoKey

方法:
windowAll:时间
countWindowAll:个数
没有分组的窗口没有意义,并行度是1,因为不分组就是说有数据都是一组,类比mysql,你要对所有窗口的数据进行聚合那么这些数据必须在一个分区中,否则会跨分区(跨并行度),就算设置的并行度不是1也会强制用一个并行度来执行

窗口处理函数

增量

Reduce_Process

类型不会改变,只能进行一个因素的聚合,什么类型进来什么类型出去

Aggregate_Process

类型会发生改变,可以对多个因素进行聚合,例如求平均值(个数,值数)
重写4个方法:
createAccumulator :可以是tuple、内部类
add(Tuple2两个数据各自累加的逻辑)
getResult(是Tuple2两个数据聚合的逻辑)
merge: session才用
全量

AggregateFunction

一般排名(topN),排序的业务情况才会用

语义及水印

语义和水印是配套使用,只有存在窗口、定时器、等涉及时间的,语义才有意义
相关算子
Watermark 水印
Watermark_Custom 自定义水印
MyWMG 自定义水印生成器
Window_Old 1.12老版flink窗口使用

语义semantic

时间语义种类:
**事件时间 :eventime**
时间戳在数据之中
**处理时间 :processtime**
系统时间
**采集时间:Ingestion Time**
在数据源操作处(进入 Flink source 时),每个事件将进入 Flink 时当时的时间作为时间戳

1.12开始默认语义是时间语义eventime,旧版本需要额外指定
image.png
或者
image.png

水印watermark

这种在Flink中去测量事件时间的进度的机制就是watermark(水印). watermark作为数据流的一部分在流动, 并且携带一个时间戳t.
一个Watermark(t)表示在这个流里面事件时间已经到了时间t, 意味着此时, 流中不应该存在小于t的数据

类似于调慢的时钟

看水印的地方:
image.png

要点

0、水印位置

  1. 随着数据进行流动,到map算子,水印就在map生效,到flatmap就在flatmap生效。。。<br /> 为了让水印作用在更多的地方,尽量在流的前方加水印<br /> 所以最好是source位置加水印,但是source位置的数据未加工,一般难以处理<br /> 所以选择在第一个map处进行处理

1、计算逻辑

水印生成:
①周期生成(系统默认)
②watermark = maxTs - 乱序程度 - 1ms

水印时间是动态变化的
水印从第一条数据的时间戳开始计算,减去‘乱序程度’
如第一个数据的时间戳是1000,允许延迟时间是3000,那么水印=1000-3000-1=-2001
第二个数据的时间戳是3000,允许延迟时间是3000,那么水印=3000-3000-1=-1
此时第三个数据时间戳是2000,允许延迟时间是3000,此时水印还是上一条水印,水印=-1

2、窗口关闭时机:

窗口根据水印时间来关闭,水印时间代替了原本的系统时间
水印时间(原本是系统)大于窗口结束时间,窗口关闭,但是水印又是由下一条数据决定的,所以下一条数据-乱序时间-1ms触发关窗
假设窗口宽度是5000,而窗口左开右闭的值是4999,那么水印时间只要小于等于这个值就能上车,
此后有大于这个水印时间的数据出现,窗口就关门走人。
例如
当第四条数据是8000,那么此时水印=8000-3000-1=4999,刚好可以上车。
如果此时又来了一条7000(<8000)的数据,则还能上车
如果此时又来了一条9000(>8000)的数据,则窗口关闭,9000的数据会去到侧输出流中。

面试:
有界流如果关闭窗口:
程序结束的时候,系统会给窗口加一个大水印,用来关闭最后一个窗口
(窗口关闭原则:新水印激活)
3、水印相当于调慢时间的司机,水印不能倒流,只能根据新来的数据进行更新。

4、水印有序和乱序是一样的原理,继承了乱序,只是把乱序的“允许延迟时间”设置为0

5、水印在多并行度中传递
多并行度下,以多个并行度中的:最小的水印为准(木桶原理,漏水的木桶)
原因:
多并行度下,如果以水印大的数据为准,那么数据传递到下游的时候,触发下游窗口关闭,其他水印小的并行度就都是迟到数据了
image.png

如果出现数据倾斜(有热点数据),很容易造成数据阻塞
比如一个并行度一直没数据,所以水印也是不存在的,这会导致其他有数据的并行度的窗口无法关闭,数据一直进入这个窗口,造成阻塞
避免数据倾斜的方法
减少并行度(不可取),计算能力变差了,不符合分布式思想

  • flink强制水印更新:withIdleness,允许空闲时间,一般设置几分钟

image.png

水印源码

image.png

自定义水印(工作一般不会用)

image.png

侧输出流:

获取方式:

通过process中的上下文获取

两个作用:

1、对流的切分,例如放入预警流。(多)
如果获取到侧输出流: 需要在process中, context
image.png
2、作为数据丢失的第三层保障 (少)
放在窗口后面, 承载窗口关闭之后真正迟到的数据
image.png

侧输出流参数泛型问题

  1. sideOutputLateDate(new OutputTag<>("late")) 报错<br /> sideOutputLateDate(new OutputTag<WaterSensor>("late"){}) 使用匿名内部类的方式才能知道泛型

flink保障数据不丢失的三层保障

①保证数据不丢失的第一层保障
水印
调慢3秒
②保证数据不丢失的第二层保障
允许迟到allowedLateness ,这个值是相对于水印的时间
先对窗口中的数据进行计算,但是窗口没关闭
调慢5秒,之后关闭
③保证数据不丢失的第三次保障
将数据放入 :侧输出流sideOutputLateDate
将窗口外的数据丢进这里
image.png
以上3层保证数据必不会丢

举例:
窗口大小是5秒,
允许延迟时间是3秒,
允许迟到时间现在是5秒,

那么当数据时间满足:t-3>=5(>4.999)的时候,窗口开始计算,但不会关闭
那么当数据时间满足:t-3-5>=5(>4.999)的时候,才会触发窗口关闭

如果:
此时来了一条9秒的数据,那么数据进入5-10秒的窗口,并且0-5的窗口开始计算
又来了一条5秒的数据,此时这条数据进入0-5秒的窗口,继续计算
如果:
此时来了一条14秒的数据,那么0-5秒的窗口关闭,
再来一条3秒的数据则会进入侧输出流中。
但在来一条6秒的数据则会进入5-10秒的窗口

定时器

定时器
满足某种触发条件后,注册定时器,注册了定时器就已经属于报警了,可通过设置一下时间定义多久开始报警
通过process算子的上下文获得
ctx.timerService().registerProcessingTimeTimer(now + 5000);
注册定时器后(也就是触发报警后),需要重写触发器方法onTimer

  1. 种类:<br /> 非事件时间定时器<br />不需要分配水印<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/21361442/1642148091215-1f191f0d-d577-41a0-8dc3-a066fb64c8ea.png#clientId=u3aec0b34-8e83-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=781&id=ufcfbdb1b&margin=%5Bobject%20Object%5D&name=image.png&originHeight=781&originWidth=939&originalType=binary&ratio=1&rotation=0&showTitle=false&size=116406&status=done&style=none&taskId=uc20bd9f5-189c-408d-9092-53e101daeb1&title=&width=939)<br /> 事件定时器<br />需要分配水印<br />应用例子:com.atguigu.chapter07_state_timer_window.timer.Flink02_Timeer_Project<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/21361442/1642148051360-d6c4088f-f16a-48ef-b3a3-8b5e4c3bc9aa.png#clientId=u3aec0b34-8e83-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=817&id=u20954973&margin=%5Bobject%20Object%5D&name=image.png&originHeight=817&originWidth=970&originalType=binary&ratio=1&rotation=0&showTitle=false&size=113862&status=done&style=none&taskId=u5a585b15-b852-4b86-8ef4-3059003adc8&title=&width=970)<br />应用例子:com.atguigu.chapter07_state_timer_window.timer.Flink02_Timeer_Project<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/21361442/1626748401546-d75e7836-105f-4b80-9160-e22b934f5b86.png#crop=0&crop=0&crop=1&crop=1&height=410&id=f8E2f&margin=%5Bobject%20Object%5D&name=image.png&originHeight=410&originWidth=963&originalType=binary&ratio=1&rotation=0&showTitle=false&size=288210&status=done&style=none&title=&width=963)
  2. 注册定时器后需要重写触发器方法

结果展示

image.png
image.png

注意

报这个错,是因为打开的还是之前的网页,当停止进程的时候,需要重新打开flink指定的那个端口

image.png