一、Flink中的状态

  • 无状态的计算观察每个独立事件,并根据最后一个事件输出结果。
  • 有状态的计算则会基于多个事件输出结果。

下图展示了无状态流处理和有状态流处理的主要区别:

  1. 无状态流处理分别接收每条数据记录(图中的黑条),然后根据最新输入的数据生成输出数据(白条)。
  2. 有状态流处理会维护状态(根据每条输入记录进行更新),并基于最新输入的记录和当前的状态值生成输出记录(灰条)。

图中输入数据由黑条表示。无状态流处理每次只转换一条输入记录,并且仅根据最新的输入记录输出结果(白条)**有状态流处理维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录(灰条)反映的是综合考虑多个事件之后的结果。
image.png
状态的主要概念:**

  • 由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态
  • 可以认为状态就是一个本地变量,可以被任务的业务逻辑访问
  • Flink会进行状态管理,包括状态一致性故障处理以及高效存储和访问,以便开发人员可以专注于应用程序的逻辑
  • 在Flink中,状态始终与特定的算子相关连在一起
  • 为了使运行时的Flink了解算子的状态,算子需要预先注册其状态

状态分类:
1、ManagerState—推荐使用,Flink自动管理/优化,支持多种数据结构
算子的状态,总的有两种:

  • 算子状态(Operator State):算子状态的作用范围限定为算子任务,**一个任务一个状态(**即是状态对于同一个任务而言是共享的,每一个并行的子任务共享一个状态;算子状态不能由相同或不同算子的另一个任务访问,相同算子的不同任务之间也不能访问)

image.pngimage.png

  • 键控状态(Keyed State):根据输入数据流中定义的键(Key,在Keyby之后)来维护和访问(**不同的key也是独立访问的,一个key只能访问它自己的状态,不同key之间也不能互相访问**)

2、Raw State—完全由用户自己管理,只支持byte[],只能在自定义Operator上使用

  • OperatorState



算子状态提供三种数据结构:
① 列表状态(List state),将状态表示为一组数据的列表;(会根据并行度的调整把之前的状态重新分组重新分配
② 联合列表状态(Union list state),也将状态表示为数据的列表,它常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复(把之前的每一个状态广播到对应的每个算子中)。
③ 广播状态(Broadcast state),如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态(把同一个状态广播给所有算子子任务)。

键控状态Keyed State 数据结构:
① 值状态(ValueState),将状态表示为单个值;(直接.value获取,Set操作是.update)

  • get操作: ValueState.value()
  • set操作: ValueState.update(T value)

② 列表状态(ListState),将状态表示为一组数据的列表(存多个状态);(.get,.update,.add)

  • ListState.add(T value)
  • ListState.addAll(List values)
  • ListState.get()返回Iterable
  • ListState.update(List values)

③ 映射状态(MapState),将状态表示为一组Key-Value对;(.get,.put ,类似HashMap)

  • MapState.get(UK key)
  • MapState.put(UK key, UV value)
  • MapState.contains(UK key)
  • MapState.remove(UK key)

④ 聚合状态(ReducingState & AggregatingState),将状态表示为一个用于聚合操作的列表;(.add不像之前添加到列表,它是直接聚合到之前的结果中)

二、Demo

  1. package window
  2. import java.{lang, util}
  3. import datasource.SensorReading
  4. import org.apache.flink.api.common.functions.{ReduceFunction, RichMapFunction, RichReduceFunction}
  5. import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor, ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor}
  6. import org.apache.flink.configuration.Configuration
  7. object StateTest {
  8. def main(args: Array[String]): Unit = {
  9. }
  10. }
  11. //状态必须定义在RichFunction中,因为需要运行时上下文
  12. class MyRichMapper extends RichMapFunction[SensorReading,String]{
  13. //有两种方法定义一个状态
  14. //第一种,通过生命周期方法去调用
  15. var valueState: ValueState[Double] = _
  16. override def open(parameters: Configuration): Unit = {
  17. valueState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate"))
  18. }
  19. //第二种,通过lazy标识只有在使用到才调用
  20. lazy val valueState2: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate",classOf[Double]))
  21. //ListState的声明
  22. lazy val listState: ListState[Int] = getRuntimeContext.getListState(new ListStateDescriptor[Int]("liststate",classOf[Int]))
  23. //MapState的声明
  24. lazy val mapState: MapState[String,Double] = getRuntimeContext.getMapState(new MapStateDescriptor[String,Double]("mapstate",classOf[String],classOf[Double]))
  25. //ReduceState的声明,参数比其他状态多一个,需要传入一个Reduce函数
  26. lazy val reduceState: ReducingState[SensorReading] = getRuntimeContext.getReducingState(new ReducingStateDescriptor[SensorReading]("reducestate",new MyReduce,classOf[SensorReading]))
  27. override def map(value: SensorReading): String = {
  28. //状态的读写操作
  29. val myVal: Double = valueState.value()
  30. valueState.update(value.temperature)
  31. listState.add(1)
  32. listState.update(new util.ArrayList[Int]()(1,2,3))
  33. listState.addAll(new util.ArrayList[Int]()(1,2,3))
  34. val listIterable: lang.Iterable[Int] = listState.get()
  35. mapState.contains("sensor1")
  36. mapState.get("sensor1")
  37. mapState.put("key",12.33)
  38. reduceState.get()//获取聚合完成的值
  39. reduceState.add(_)//添加新的值进去reduce
  40. value.id
  41. }
  42. }
  43. class MyReduce extends ReduceFunction[SensorReading]{
  44. override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = ???
  45. }

三、温度报警Demo

当温度跳变超过10时,输出当前的数据

  1. package state
  2. import datasource.SensorReading
  3. import org.apache.flink.api.common.functions.RichFlatMapFunction
  4. import org.apache.flink.api.common.state.ValueStateDescriptor
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.util.Collector
  7. object TempAlarmDemo {
  8. def main(args: Array[String]): Unit = {
  9. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  10. env.setParallelism(1)
  11. val inputData: DataStream[String] = env.socketTextStream("localhost",7777)
  12. val dataStream: DataStream[SensorReading] = inputData.map(data=>{
  13. val arr: Array[String] = data.split(",")
  14. SensorReading(arr(0).trim , arr(1).trim.toLong , arr(2).trim.toDouble)
  15. })
  16. dataStream.keyBy(_.id).flatMap(new TempAlarmFunction).print("alarm")
  17. env.execute("TempAlarm")
  18. }
  19. }
  20. class TempAlarmFunction extends RichFlatMapFunction[SensorReading,SensorReading]{
  21. lazy val tempStatevalue = getRuntimeContext.getState(new ValueStateDescriptor[Double]("tempstate",classOf[Double]))
  22. override def flatMap(value: SensorReading, out: Collector[SensorReading]): Unit = {
  23. val nowData: Double = tempStatevalue.value()
  24. if (nowData != null && (nowData - value.temperature).abs > 10){
  25. out.collect(value) // 通过该方法输出数据
  26. }
  27. tempStatevalue.update(value.temperature)
  28. }
  29. }
  1. //方法三: 带状态的flatMap
  2. val processedStream3: DataStream[(String, Double, Double)] = dataStream.keyBy(_.id)
  3. .flatMapWithState[(String, Double, Double), Double]{
  4. //如果没有状态的话,也就是没有数据过来,那么就将当前数据湿度值存入状态
  5. case (input: SensorReading, None) => (List.empty, Some(input.temperature))
  6. //如果有状态,就应该与上次的温度值比较差值,如果大于阈值就输出报警
  7. case(input: SensorReading, lastTemp: Some[Double]) =>
  8. val diff = (input.temperature - lastTemp.get).abs
  9. if (diff > 10.0){
  10. (List((input.id, lastTemp.get, input.temperature)), Some(input.temperature))
  11. }else{
  12. (List.empty, Some(input.temperature))
  13. }
  14. }

四、状态后端

— 状态管理(存储、访问、维护和检查点)
每传入一条数据,有状态的算子任务都会读取和更新状态;
由于有效的状态访问对于处理数据的低效迟至关重要,因此每个并行任务都会在本地维度其状态,以确保快速的状态访问;
状态的存储、访问以及维度,由一个可插入的组件决定,这个组件就叫做状态后端(State Backends)
状态后端主要负责两件事:本地的状态管理,以及将检查点(checkpoint)状态写入远程存储;
状态后端的分类
① MemoryStateBackend: 一般用于开发和测试

  • 内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上,而将checkpoint存储在JobManager的内存中;
  • 特点快速、低延迟,但不稳定;

② FsStateBackend(文件系统状态后端):生产环境

  • 将checkpoint存到远程的持久化文件系统(FileSystem),HDFS上,而对于本地状态,跟MemoryStateBackend一样,也会存到TaskManager的JVM堆上。
  • 同时拥有内存级的本地访问速度,和更好的容错保证;(如果是超大规模的需要保存还是无法解决,存到本地状态就可能OOM)

③ RocksDBStateBackend:

  • 将所有状态序列化后,存入本地的RocksDB(本地数据库硬盘空间,序列化到磁盘)中存储,全部序列化存储到本地。
    1. <!-- 需要引入第三方的依赖 -->
    2. <dependency>
    3. <groupId>org.apache.flink</groupId>
    4. <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
    5. <version>1.10.1</version>
    6. </dependency>
    设置状态后端为FsStateBackend,并配置检查点和重启策略: ```scala StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);

// 1. 状态后端配置 //env.setStateBackend(new MemoryStateBackend()); //默认的状态后端策略 env.setStateBackend(new FsStateBackend(“路径”, true)) //env.setStateBackend(new RocksDBStateBackend(“路径”,true)) 第二个参数表示增量是否存盘,开启了就表示之前的checkpoint都不能丢

// 2. 检查点配置 开启checkpoint,设置checkpoint生成周期,单位毫秒 env.enableCheckpointing(1000);//等同于env.getCheckpointConfig.setCheckpointInterval(1000)

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000);//设置checkpoint超时时间,超时了直接丢弃 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);//设置两次checkpoint最小相隔时间 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//设置checkpoint并行度 env.getCheckpointConfig().setPreferCheckpointForRecovery(true);//是否更倾向使用checkpoint来进行故障恢复,也可以使用savepoint env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);//设置容忍checkpoints失败次数,如果cp失败也认为是任务失败 env.getCheckpointConfig().setCheckpointInterval(10000L)

// 3. 重启策略配置 // 固定延迟重启(隔一段时间尝试重启一次) env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 尝试重启次数 100000L // 尝试重启的时间间隔,也可org.apache.flink.api.common.time.Time )); env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.minutes(5), Time.seconds(10))) ```

五、Flink的容错机制

5.1、一致性检查点(Checkpoints)

image.png
简单来说就是对所有任务进行一个存盘,在某一个时间点把所有任务的状态进行一份拷贝(快照),等到出现故障再把所有状态逐个读出来。当分布式计系统中引入状态计算时,就无可避免一致性的问题(比如故障发生在状态更新前还是更新后,恢复故障时需要恢复到哪一个时间点)。为什么了?因为若是计算过程中出现故障,中间数据咋办了?若是不保存,那就只能重新从头计算了,不然怎么保证计算结果的正确性。这就是要求系统具有容错性了。
难点:不同的任务在同一时间点怎么确定
这个时间点应该是所有任务恰好处理完一个相同的输入数据的时候

5.2、从检查点恢复状态

1、在执行流应用程序期间,Flink会定期保存状态的一致性检查点,不同的任务都会保存一份(如上图三个任务保存的5、6、9)
2、如果发生故障,Flink将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程
3、重启应用,从checkpoint中读取状态,将状态重置;从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同
4、开始消费并处理检查点到发生故障之间的所有数据,称之为exactly-one的一致性(对状态来说)

5.3、Flink检查点算法

Flink基于检查点算法的优化实现–基于Chandy-Lamport算法的分布式快照,将检查点的保存和数据分开处理,不需要暂停整个应用。
检查点分界线(Checkpoint Barrier):

  • Flink检查点算法用到一种称为分界线(barrier)的特殊数据形式用来把一条流上数据按照不同的检查点分开(类似watermark)
  • 分界线之前来到的数据导致的状态更改,都会被包含在当前分界线所属的检查点中;而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中
  • 通过jobmanager进行管理,只需在source插入就行,数据往下游传递就会收到该checkpoint barrier;但是在多个并行任务的时候,下游的任务需要等待所有上游的checkpoint barrier,等待所有的checkpoint barrier到达才进行checkpoint

算法:

  1. 首先JobManager会向每一个source任务发生一条带有新检查点(ID为2)的消息,通过该方式来启动检查点
  2. source将读取的偏移量状态写入检查点为2的状态后端中,并往下游发送一个barrier(检查点的ID对应也为2);状态后端保存成功后通知source,source任务会向jobmanager确定检查点已经完成(此处为异步操作)
  3. ID为2的barrier会向下游传递,每到达一个分区,该分区中的数据就会被缓存到状态后端中;
  4. 其中有一个分界线对齐机制,即是barrier向下游传递,下游任务会等待所有上游分区的barrier到达;对于有barrier已经到达的分区,继续到达的数据会被缓存(缓存起来的数据会等到checkpoint完成后继续按顺序处理);而对于那些barrier尚未到达的的分区,数据会被正常处理。
  5. 当收到所有输入分区的barrier时,任务就将其状态保存到状态后端的checkpoint中,然后继续将barrier下发。
  6. 直到sink向jobmanager确认状态保存到状态后端完成,此次所有任务的checkpoint就正式完成了。

5.4、保存点(save point)

  • Flink提供的可以自定义的镜像保存功能
  • 原则上,创建savepoint的算法和checkpoint的算法完全相同,因为savepoint可以认为就是具有一些额外元数据的checkpoint
  • Flink不会自动创建savepoint,因此用户必须明确地触发savepoint创建操作
  • savepoint处理用于故障恢复,还可以被用于手动备份,更新应用程序,版本迁移,暂停和重启应用等

六、状态一致性

6.1、状态一致性

  • 有状态的流处理,内部每个算子任务都可以由自己的状态
  • 对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确
  • 一条数据不应该被丢失,也不应该重复计算
  • 再遇到故障可以恢复状态,恢复后的重新计算,结果也是完全正确的

状态一致性分类:

  • AT-MOST-ONCE(最多一次)
    • 当任务故障时,最简单的做法就是什么都不做,既不恢复丢失的状态,也不重播丢失的数据;即数据最多只会被计算一次。
  • AT-LEAST-ONCE(至少一次)
    • 在大多数场景,我们希望不丢失事件,意思就是所有的事件都得到了处理,而一些事件还可能会被多次处理,会造成数据重复计算。
  • EXACTLY-ONCE(精确一次)
    • 恰好处理一次是最严格的保证,也是最难实现的,不仅仅是事件不能丢失,还需要针对每一个数据,内部状态仅仅更新一次。

6.2、一致性检查点(checkpoint)

6.3、端到端(end-to-end)状态一致性

  • 目前看到的一致性保证都是在流处理器内部实现的;但是在真实的应用中,流处理应用除了流处理器以外还包含了数据源source和输出到持久化系统sink的一致性
  • 端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终,每一个组件都保证了它自己的一致性
  • 整个端到端的一致性级别取决于所有组件中一致性最弱的组件

6.4、端到端的精确一次(exactly-one)保证

  • 内部保证-checkpoint
  • source端-可重置数据的读取位置
  • sink端-从故障恢复时,数据不会重复写入到外部系统
    • 幂等写入:就是一个操作,可以重复执行多次,但只导致一次结果更改,也就是说后面的重复操作就不会起作用了(例如hash去重)
    • 事务写入:构建事务对应着checkpoint,等到cp真正结束的时候,才把所有对应的结果写入到sink中
      • 预写日志,GenericWriteAheadSink模板类
      • 两阶段提交,TwoPhaseCommitSinkFunction接口

不同source和sink的一致性保证

sink\source 不可重置 可重置
任意(Any) At-most-once At-least-once
幂等 At-most-once Exactly-once(故障恢复时会出现短暂不一致)
预写入日志(WAL) At-most-once At-least-once
两阶段提交(2PC) At-most-once Exactly-once

6.5、Flink+fafka端到端状态一致性的保证

  • 内部—利用checkpoint机制,把状态存盘,发生故障时进行恢复
  • source—可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
  • sink—kafka作为sink,采用两阶段提交sink,需要实现一个TwoPhaseCommitSinkFunction

步骤:

  1. 第一条数据来了知乎,开启一个kafka的事务,正常写入kafka分区日志但标记为未提交,这就是预提交
  2. jobmanager触发checkpoint操作,barrier从source开始向下传递,遇到barrier的算子将状态存入状态后端,并通知jobmanager
  3. sink连接器收到barrier,保存当前状态,存入checkpoint,通知jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
  4. jobmanager收到所有任务的通知,发出确认信息,表示checkpoint完成
  5. sink任务收到jobmanager的确认消息,正式提交这个阶段时间的数据
  6. 外部kafka关闭事务,提交的数据可以正常消费了

image.png
image.png