1. Flink四大基石

Flink之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window。

  • Checkpoint

这是Flink最重要的一个特性。
Flink基于Chandy-Lamport(分布式快照算法)算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。
Chandy-Lamport算法实际上在1985年的时候已经被提出来,但并没有被很广泛的应用,而Flink则把这个算法发扬光大了。
Spark最近在实现Continue streaming,Continue streaming的目的是为了降低处理的延时,其也需要提供这种一致性的语义,最终也采用了Chandy-Lamport这个算法,说明Chandy-Lamport算法在业界得到了一定的肯定。
https://zhuanlan.zhihu.com/p/53482103

  • State

提供了一致性的语义之后,Flink为了让用户在编程时能够更轻松、更容易地去管理状态,还提供了一套非常简单明了的State API,包括ValueState、ListState、MapState,BroadcastState。

  • Time

除此之外,Flink还实现了Watermark的机制,能够支持基于事件的时间的处理,能够容忍迟到/乱序的数据。

  • Window

另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。

2. Flink-Window操作

2.1 为什么需要Window

在流处理应用中,数据是连续不断的,有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。
在这种情况下,我们必须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算。

2.1 Window的分类

2.2.1 按照time和count分类

time-window:时间窗口:根据时间划分窗口,如:每xx分钟统计最近xx分钟的数据
count-window:数量窗口:根据数量划分窗口,如:每xx个数据统计最近xx个数据image.png

2.2.2 按照slide和size分类

窗口有两个重要的属性: 窗口大小size和滑动间隔slide,根据它们的大小关系可分为:

tumbling-window:滚动窗口:size=slide,如:每隔10s统计最近10s的数据
image.png

sliding-window:滑动窗口:size>slide,如:每隔5s统计最近10s的数据
image.png
注意:当size<slide的时候,如每隔15s统计最近10s的数据,那么中间5s的数据会丢失,所有开发中不用。

2.2.3 总结

按照上面窗口的分类方式进行组合,可以得出如下的窗口:
1.基于时间的滚动窗口tumbling-time-window—用的较多
2.基于时间的滑动窗口sliding-time-window—用的较多
3.基于数量的滚动窗口tumbling-count-window—用的较少
4.基于数量的滑动窗口sliding-count-window—用的较少
注意:Flink还支持一个特殊的窗口:Session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上个窗口的计算

2.3 Window的API

2.3.1 window和windowAll

image.png

  • 使用keyby的流,应该使用window方法
  • 未使用keyby的流,应该调用windowAll方法

    2.3.2 WindowAssigner

    window/windowAll 方法接收的输入是一个 WindowAssigner, WindowAssigner 负责将每条输入的数据分发到正确的 window 中,Flink提供了很多各种场景用的WindowAssigner:
    image.png
    如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。

    2.3.5 API调用示例

    image.png
    source.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

    source.keyBy(0)..timeWindow(Time.seconds(5))

2.4 案例演示-基于时间的滚动和滑动窗口

2.4.1 需求

nc -lk 9999
有如下数据表示:
信号灯编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量—基于时间的滚动窗口
需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量—基于时间的滑动窗口

2.5 案例演示-基于数量的滚动和滑动窗口

2.5.1 需求

需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计—基于数量的滚动窗口
需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计—基于数量的滑动窗口

2.6 案例演示-会话窗口

2.6.1 需求

设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算

3. Flink-Time与Watermaker

3.1 Time分类

在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:
image.png
事件时间EventTime: 事件真真正正发生产生的时间
摄入时间IngestionTime: 事件到达Flink的时间
处理时间ProcessingTime: 事件真正被处理/计算的时间

问题: 上面的三个时间,我们更关注哪一个?
答案: 更关注事件时间 !
因为: 事件时间更能反映事件的本质! 只要事件时间一产生就不会变化

3.2 EventTime的重要性

3.2.1 示例1

假设,你正在去往地下停车场的路上,并且打算用手机点一份外卖。选好了外卖后,你就用在线支付功能付款了,这个时候是11点59分。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在Retry重试“支付”这个操作。
当你找到自己的车并且开出地下停车场的时候,已经是12点01分了。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。
在上面这个场景中你可以看到,支付数据的事件时间是11点59分,而支付数据的处理时间是12点01分。
问题:
如果要统计12之前的订单金额,那么这笔交易是否应被统计?
答案:
应该被统计,因为该数据的真真正正的产生时间为11点59分,即该数据的事件时间为11点59分,
事件时间能够真正反映/代表事件的本质! 所以一般在实际开发中会以事件时间作为计算标准。

3.2.2 示例2

一条错误日志的内容为:
2020-11:11 22:59:00 error NullPointExcep —事件时间
进入Flink的时间为2020-11:11 23:00:00 —摄入时间
到达Window的时间为2020-11:11 23:00:10 —处理时间
问题:
对于业务来说,要统计1h内的故障日志个数,哪个时间是最有意义的?
答案:
EventTime事件时间,因为bug真真正正产生的时间就是事件时间,只有事件时间才能真正反映/代表事件的本质!

3.2.3 示例3

某 App 会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。
A用户在 11:01:00 对 App 进行操作,B用户在 11:02:00 操作了 App,
但是A用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到B用户的消息,然后再接受到A用户的消息,消息乱序了。
问题:
如果这个是一个根据用户操作先后顺序,进行抢购的业务,那么是A用户成功还是B用户成功?
答案:
应该算A成功,因为A确实比B操作的早,但是实际中考虑到实现难度,可能直接按B成功算
也就是说,实际开发中希望基于事件时间来处理数据,但因为数据可能因为网络延迟等原因,出现了乱序,按照事件时间处理起来有难度!

3.2.4 示例4

在实际环境中,经常会出现,因为网络原因,数据有可能会延迟一会才到达Flink实时处理系统。我们先来设想一下下面这个场景:
原本应该被该窗口计算的数据因为网络延迟等原因晚到了,就有可能丢失了。
image.png

3.2.5 总结

实际开发中我们希望基于事件时间来处理数据,但因为数据可能因为网络延迟等原因,出现了乱序或延迟到达,那么可能处理的结果不是我们想要的甚至出现数据丢失的情况,所以需要一种机制来解决一定程度上的数据乱序或延迟到底的问题!也就是我们接下来要学习的Watermaker水印机制/水位线机制。

3.3 Watermaker水印机制/水位线机制

3.3.1 什么是Watermaker?

Watermaker就是给数据再额外的加的一个时间列 也就是Watermaker是个时间戳!

3.3.2 如何计算Watermaker?

Watermaker = 数据的事件时间 - 最大允许的延迟时间或乱序时间
注意:后面通过源码会发现,准确来说:
Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间,
这样可以保证Watermaker水位线会一直上升(变大),不会下降

3.3.3 Watermaker有什么用?

之前的窗口都是按照系统时间来触发计算的,如: [10:00:00 ~ 10:00:10) 的窗口,
一但系统时间到了10:00:10就会触发计算,那么可能会导致延迟到达的数据丢失!
那么现在有了Watermaker,窗口就可以按照Watermaker来触发计算!
也就是说Watermaker是用来触发窗口计算的!

3.3.4 Watermaker如何触发窗口计算的?

窗口计算的触发条件为:
1. 窗口中有数据
2. Watermaker >= 窗口的结束时间
因为前面说到
Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间
也就是说只要不断有数据来,就可以保证Watermaker水位线是会一直上升/变大的,不会下降/减小的
所以最终一定是会触发窗口计算的
注意:
上面的触发公式进行如下变形:
Watermaker >= 窗口的结束时间
Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间
当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间 >= 窗口的结束时间
当前窗口的最大的事件时间 >= 窗口的结束时间 + 最大允许的延迟时间或乱序时间

3.4 Watermaker案例演示

3.4.1 需求

有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。

3.4.2 API

image.png
注意:一般我们都是直接使用Flink提供好的BoundedOutOfOrdernessTimestampExtractor

3.5 Allowed Lateness案例演示

3.5.1 需求 (解决迟到严重的数据)

有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
并使用OutputTag+allowedLateness解决数据丢失问题

image.png

4. Flink-状态管理

4.1 Flink中的有状态计算

注意:
Flink中已经对需要进行有状态计算的API做了封装,底层已经维护好了状态!
例如,之前下面代码,直接使用即可,不需要像SparkStreaming那样还得自己写updateStateByKey
也就是说我们今天学习的State只需要掌握原理,实际开发中一般都是使用Flink底层维护好的状态或第三方维护好的状态(如Flink整合Kafka的offset维护底层就是使用的State,但是人家已经写好了的)

4.2 无状态计算和有状态计算

4.2.1 无状态计算

不需要考虑历史数据
相同的输入得到相同的输出就是无状态计算, 如map/flatMap/filter….
image.png
首先举一个无状态计算的例子:消费延迟计算。
假设现在有一个消息队列,消息队列中有一个生产者持续往消费队列写入消息,多个消费者分别从消息队列中读取消息。
从图上可以看出,生产者已经写入16 条消息,Offset 停留在 15 ;有 3 个消费者,有的消费快,而有的消费慢。消费快的已经消费了 13 条数据,消费者慢的才消费了 7、8 条数据。
如何实时统计每个消费者落后多少条数据,如图给出了输入输出的示例。可以了解到输入的时间点有一个时间戳,生产者将消息写到了某个时间点的位置,每个消费者同一时间点分别读到了什么位置。刚才也提到了生产者写入了15 条,消费者分别读取了 10、7、12 条。那么问题来了,怎么将生产者、消费者的进度转换为右侧示意图信息呢?
consumer 0 落后了 5 条,consumer 1 落后了 8 条,consumer 2 落后了 3 条,根据 Flink 的原理,此处需进行 Map 操作。Map 首先把消息读取进来,然后分别相减,即可知道每个 consumer 分别落后了几条。Map 一直往下发,则会得出最终结果。
大家会发现,在这种模式的计算中,无论这条输入进来多少次,输出的结果都是一样的,因为单条输入中已经包含了所需的所有信息。消费落后等于生产者减去消费者。生产者的消费在单条数据中可以得到,消费者的数据也可以在单条数据中得到,所以相同输入可以得到相同输出,这就是一个无状态的计算。

4.2.1 有状态计算

需要考虑历史数据
相同的输入得到不同的输出/不一定得到相同的输出,就是有状态计算,如:sum/reduce
image.png
以访问日志统计量的例子进行说明,比如当前拿到一个Nginx 访问日志,一条日志表示一个请求,记录该请求从哪里来,访问的哪个地址,需要实时统计每个地址总共被访问了多少次,也即每个 API 被调用了多少次。可以看到下面简化的输入和输出,输入第一条是在某个时间点请求 GET 了 /api/a;第二条日志记录了某个时间点 Post /api/b ;第三条是在某个时间点 GET了一个 /api/a,总共有 3 个 Nginx 日志。
从这3 条 Nginx 日志可以看出,第一条进来输出 /api/a 被访问了一次,第二条进来输出 /api/b 被访问了一次,紧接着又进来一条访问 api/a,所以 api/a 被访问了 2 次。不同的是,两条 /api/a 的 Nginx 日志进来的数据是一样的,但输出的时候结果可能不同,第一次输出 count=1 ,第二次输出 count=2,说明相同输入可能得到不同输出。输出的结果取决于当前请求的 API 地址之前累计被访问过多少次。第一条过来累计是 0 次,count = 1,第二条过来 API 的访问已经有一次了,所以 /api/a 访问累计次数 count=2。单条数据其实仅包含当前这次访问的信息,而不包含所有的信息。要得到这个结果,还需要依赖 API 累计访问的量,即状态。
这个计算模式是将数据输入算子中,用来进行各种复杂的计算并输出数据。这个过程中算子会去访问之前存储在里面的状态。另外一方面,它还会把现在的数据对状态的影响实时更新,如果输入200 条数据,最后输出就是 200 条结果。

4.3 有状态计算的场景

image.png
什么场景会用到状态呢?下面列举了常见的4 种:
1.去重:比如上游的系统数据可能会有重复,落到下游系统时希望把重复的数据都去掉。去重需要先了解哪些数据来过,哪些数据还没有来,也就是把所有的主键都记录下来,当一条数据到来后,能够看到在主键当中是否存在。
2.窗口计算:比如统计每分钟 Nginx 日志 API 被访问了多少次。窗口是一分钟计算一次,在窗口触发前,如 08:00 ~ 08:01 这个窗口,前59秒的数据来了需要先放入内存,即需要把这个窗口之内的数据先保留下来,等到 8:01 时一分钟后,再将整个窗口内触发的数据输出。未触发的窗口数据也是一种状态。
3.机器学习/深度学习:如训练的模型以及当前模型的参数也是一种状态,机器学习可能每次都用有一个数据集,需要在数据集上进行学习,对模型进行一个反馈。
4.访问历史数据:比如与昨天的数据进行对比,需要访问一些历史数据。如果每次从外部去读,对资源的消耗可能比较大,所以也希望把这些历史数据也放入状态中做对比。

4.4 状态的分类

4.4.1 Managed State & Raw State

image.png
从Flink是否接管角度:可以分为
ManagedState(托管状态)
RawState(原始状态)
两者的区别如下:
1. 从状态管理方式的方式来说,Managed State 由 Flink Runtime 管理,自动存储,自动恢复,在内存管理上有优化;而 Raw State 需要用户自己管理,需要自己序列化,Flink 不知道 State 中存入的数据是什么结构,只有用户自己知道,需要最终序列化为可存储的数据结构。
2. 从状态数据结构来说,Managed State 支持已知的数据结构,如 Value、List、Map 等。而 Raw State只支持字节数组 ,所有状态都要转换为二进制字节数组才可以。
3. 从推荐使用场景来说,Managed State 大多数情况下均可使用,而 Raw State 是当 Managed State 不够用时,比如需要自定义 Operator 时,才会使用 Raw State。
在实际生产中,都只推荐使用ManagedState

4.4.2 Keyed State & Operator State

image.png
Managed State 分为两种,Keyed State 和 Operator State
(Raw State都是Operator State)

  • Keyed State

image.png
在Flink Stream模型中,Datastream 经过 keyBy 的操作可以变为 KeyedStream。
Keyed State是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state,如stream.keyBy(…)
KeyBy之后的State,可以理解为分区过的State,每个并行keyed Operator的每个实例的每个key都有一个Keyed State,即就是一个唯一的状态,由于每个key属于一个keyed Operator的并行实例,因此我们将其简单的理解为

  • Operator State

image.png
这里的fromElements会调用FromElementsFunction的类,其中就使用了类型为 list state 的 operator state
Operator State又称为 non-keyed state,与Key无关的State,每一个 operator state 都仅与一个 operator 的实例绑定。
Operator State 可以用于所有算子,但一般常用于 Source。

4.4 存储State的数据结构/API介绍

前面说过有状态计算其实就是需要考虑历史数据
而历史数据需要搞个地方存储起来
Flink为了方便不同分类的State的存储和管理,提供了如下的API/数据结构来存储State!
image.png
Keyed State 通过 RuntimeContext 访问,这需要 Operator 是一个RichFunction。
保存Keyed state的数据结构:
ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值,如求按用户id统计用户交易总额
ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值,如统计按用户id统计用户经常登录的Ip
ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值
MapState:即状态值为一个map。用户通过put或putAll方法添加元素
需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄
Operator State 需要自己实现 CheckpointedFunction 或 ListCheckpointed 接口。
保存Operator state的数据结构:
ListState
BroadcastState
举例来说,Flink中的FlinkKafkaConsumer,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射
image.png

4.6 State代码示例

4.6.1 Keyed State

下图就 word count 的 sum 所使用的StreamGroupedReduce类为例讲解了如何在代码中使用 keyed state:
image.png
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/fault-tolerance/state/
需求:
使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可)

  1. //-1.定义一个状态用来存放最大值
  2. private transient ValueState<Long> maxValueState;
  3. //-2.创建一个状态描述符对象
  4. ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);
  5. //-3.根据状态描述符获取State
  6. maxValueState = getRuntimeContext().getState(maxValueStateDescriptor);
  7. //-4.使用State
  8. Long historyValue = maxValueState.value();
  9. //判断当前值和历史值谁大
  10. if (historyValue == null || currentValue > historyValue)
  11. //-5.更新状态
  12. maxValueState.update(currentValue);
  1. package cn.itcast.state;
  2. import org.apache.flink.api.common.functions.RichMapFunction;
  3. import org.apache.flink.api.common.state.ValueState;
  4. import org.apache.flink.api.common.state.ValueStateDescriptor;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.api.java.tuple.Tuple3;
  7. import org.apache.flink.configuration.Configuration;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. /**
  12. * Author itcast
  13. * Desc
  14. * 使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
  15. */
  16. public class StateDemo01_KeyedState {
  17. public static void main(String[] args) throws Exception {
  18. //1.env
  19. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20. env.setParallelism(1);//方便观察
  21. //2.Source
  22. DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(
  23. Tuple2.of("北京", 1L),
  24. Tuple2.of("上海", 2L),
  25. Tuple2.of("北京", 6L),
  26. Tuple2.of("上海", 8L),
  27. Tuple2.of("北京", 3L),
  28. Tuple2.of("上海", 4L)
  29. );
  30. //3.Transformation
  31. //使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
  32. //实现方式1:直接使用maxBy--开发中使用该方式即可
  33. //min只会求出最小的那个字段,其他的字段不管
  34. //minBy会求出最小的那个字段和对应的其他的字段
  35. //max只会求出最大的那个字段,其他的字段不管
  36. //maxBy会求出最大的那个字段和对应的其他的字段
  37. SingleOutputStreamOperator<Tuple2<String, Long>> result = tupleDS.keyBy(t -> t.f0)
  38. .maxBy(1);
  39. //实现方式2:使用KeyState中的ValueState---学习测试时使用,或者后续项目中/实际开发中遇到复杂的Flink没有实现的逻辑,才用该方式!
  40. SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0)
  41. .map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
  42. //-1.定义状态用来存储最大值
  43. private ValueState<Long> maxValueState = null;
  44. @Override
  45. public void open(Configuration parameters) throws Exception {
  46. //-2.定义状态描述符:描述状态的名称和里面的数据类型
  47. ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);
  48. //-3.根据状态描述符初始化状态
  49. maxValueState = getRuntimeContext().getState(descriptor);
  50. }
  51. @Override
  52. public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
  53. //-4.使用State,取出State中的最大值/历史最大值
  54. Long historyMaxValue = maxValueState.value();
  55. Long currentValue = value.f1;
  56. if (historyMaxValue == null || currentValue > historyMaxValue) {
  57. //5-更新状态,把当前的作为新的最大值存到状态中
  58. maxValueState.update(currentValue);
  59. return Tuple3.of(value.f0, currentValue, currentValue);
  60. } else {
  61. return Tuple3.of(value.f0, currentValue, historyMaxValue);
  62. }
  63. }
  64. });
  65. //4.Sink
  66. //result.print();
  67. result2.print();
  68. //5.execute
  69. env.execute();
  70. }
  71. }

1.1.1 Operator State

下图对 word count 示例中的FromElementsFunction类进行详解并分享如何在代码中使用 operator state:image.png
需求:
使用ListState存储offset模拟Kafka的offset维护

  1. //-1.声明一个OperatorState来记录offset
  2. private ListState<Long> offsetState = null;
  3. private Long offset = 0L;
  4. //-2.创建状态描述器
  5. ListStateDescriptor<Long> descriptor = new ListStateDescriptor<Long>("offsetState", Long.class);
  6. //-3.根据状态描述器获取State
  7. offsetState = context.getOperatorStateStore().getListState(descriptor);
  8. //-4.获取State中的值
  9. Iterator<Long> iterator = offsetState.get().iterator();
  10. if (iterator.hasNext()) {//迭代器中有值
  11. offset = iterator.next();//取出的值就是offset
  12. }
  13. offset += 1L;
  14. ctx.collect("subTaskId:" + getRuntimeContext().getIndexOfThisSubtask() + ",当前的offset为:" + offset);
  15. if (offset % 5 == 0) {//每隔5条消息,模拟一个异常
  16. //-5.保存State到Checkpoint中
  17. offsetState.clear();//清理内存中存储的offset到Checkpoint中
  18. //-6.将offset存入State中
  19. offsetState.add(offset);
  1. package cn.itcast.state;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  4. import org.apache.flink.api.common.state.ListState;
  5. import org.apache.flink.api.common.state.ListStateDescriptor;
  6. import org.apache.flink.runtime.state.FunctionInitializationContext;
  7. import org.apache.flink.runtime.state.FunctionSnapshotContext;
  8. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  9. import org.apache.flink.streaming.api.CheckpointingMode;
  10. import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
  11. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  12. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  14. import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
  15. import java.util.Iterator;
  16. /**
  17. * Author itcast
  18. * Desc 使用OperatorState中的ListState模拟KafkaSource进行offset维护
  19. */
  20. public class StateDemo02_OperatorState {
  21. public static void main(String[] args) throws Exception {
  22. //TODO 0.env
  23. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  24. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  25. env.setParallelism(1);//并行度设置为1方便观察
  26. //下面的Checkpoint和重启策略配置先直接使用,下次课学
  27. env.enableCheckpointing(1000);//每隔1s执行一次Checkpoint
  28. env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
  29. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  30. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  31. //固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
  32. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));
  33. //TODO 1.source
  34. DataStreamSource<String> ds = env.addSource(new MyKafkaSource()).setParallelism(1);
  35. //TODO 2.transformation
  36. //TODO 3.sink
  37. ds.print();
  38. //TODO 4.execute
  39. env.execute();
  40. }
  41. //使用OperatorState中的ListState模拟KafkaSource进行offset维护
  42. public static class MyKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {
  43. private boolean flag = true;
  44. //-1.声明ListState
  45. private ListState<Long> offsetState = null; //用来存放offset
  46. private Long offset = 0L;//用来存放offset的值
  47. //-2.初始化/创建ListState
  48. @Override
  49. public void initializeState(FunctionInitializationContext context) throws Exception {
  50. ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor<>("offsetState", Long.class);
  51. offsetState = context.getOperatorStateStore().getListState(stateDescriptor);
  52. }
  53. //-3.使用state
  54. @Override
  55. public void run(SourceContext<String> ctx) throws Exception {
  56. while (flag){
  57. Iterator<Long> iterator = offsetState.get().iterator();
  58. if(iterator.hasNext()){
  59. offset = iterator.next();
  60. }
  61. offset += 1;
  62. int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
  63. ctx.collect("subTaskId:"+ subTaskId + ",当前的offset值为:"+offset);
  64. Thread.sleep(1000);
  65. //模拟异常
  66. if(offset % 5 == 0){
  67. System.out.println("bug出现了.....");
  68. throw new Exception("bug出现了.....");
  69. }
  70. }
  71. }
  72. //-4.state持久化
  73. //该方法会定时执行将state状态从内存存入Checkpoint磁盘目录中
  74. @Override
  75. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  76. offsetState.clear();//清理内容数据并存入Checkpoint磁盘目录中
  77. offsetState.add(offset);
  78. }
  79. @Override
  80. public void cancel() {
  81. flag = false;
  82. }
  83. }
  84. }