参考:https://www.cnblogs.com/shengyang17/p/12549199.html


xgz0ztsiqz.png
可以把状态理解为某个算子子任务在其当前实例上的一个变量,变量记录了数据流的历史信息。当新数据流入时,我们可以结合历史信息来进行计算。实际上,Flink的状态是由算子的子任务来创建和管理的。一个状态更新和获取的流程如上图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。

在 Flink 中,状态始终与特定算子相关联。总的来说,有两种类型的状态 :算子状态键控状态


算子状态(operator state)

算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有 数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由 相同或不同算子的另一个任务访问。 Flink 为算子状态提供三种基本数据结构:


列表状态(List state)

将状态表示为一组数据的列表。

联合列表状态(Union list state)

也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。

广播状态(Broadcast state)

如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应 用广播状态。

键控状态(keyed state)-更常用

键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护 和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于 一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)。 Flink 为键控状态提供三种基本数据结构:


值状态(ValueState


将状态表示为单个值

  • get操作: ValueState.value()
  • set操作: ValueState.update(T value)

    列表状态(ListState


将状态表示为一组数据的列表(存多个状态)

  • ListState.add(T value)
  • ListState.addAll(List values)
  • ListState.get()返回Iterable
  • ListState.update(List values)

    映射状态(MapState


将状态表示为一组Key-Value对

  • MapState.get(UK key)
  • MapState.put(UK key, UV value)
  • MapState.contains(UK key)
  • MapState.remove(UK key)

    聚合状态(ReducingState & AggregatingState

    将状态表示为一个用于聚合操作的列表

    状态的定义

Flink通过StateDescriptor来定义一个状态。这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息,与上面的状态类型对应。如下:

  1. ValueStateDescriptor
  2. ListStateDescriptor
  3. ReducingStateDescriptor
  4. FoldingStateDescriptor
  5. AggregatingStateDescriptor
  6. MapStateDescriptor

状态的使用(以下是键控状态的示例)

getRuntimeContext: 这个函数是获取运行时的上下文。因为flink是面向流的,所以数据要在里面因为flink是面向流的,要给流动的数据进行缓存,这里flink有一个机制叫做获取上下文getRunTime他的入参是一个状态描述符,这个类本身是继承于Serializable接口去存储状态,需要声明类型,如下getRuntimeContext().getState(new valueStateDescriptor);

示例代码为值状态示例

//1.声明一个键控状态
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(
  new ValueStateDescriptor[Double]("lastTemp", classOf[Double])
)

//2.读取状态:
val prevTemp = lastTemp.value()
//3.对状态赋值:
lastTemp.update(value.temperature)

代码讲解:(1)第一步申明一个键控状态时,lastTemp为键控状态的变量名;ValueState为状态类型(此处为值状态);[Double]是泛型,说明前面的值是什么类型,以便flink做序列化与反序列化;getRuntimeContext说明是在运行时上下文中去定义,getState方法,通过ValueStateDescriptor参数来获取ValueState;( new ValueStateDescriptor[Double] )表示传递的参数是state的描述器,当前是一个valuestate,所以传递一个ValueStateDescriptor并指定泛型为double,(“lastTemp”, classOf[Double])为ValueStateDescriptor的参数,第一个参数lastTemp表示当前状态的名称,classOf[Double]表示当前状态的类型(2)lastTemp.value()表示当前的valuestate有一个方法.value用来读取状态(3).update为状态赋值


状态编程

代码示例以及理解

状态编程必须有一个RichFunction(富函数:https://www.codenong.com/cs106286917/),因为需要运行时上下文
输入数据的样例类(样例类为flink的数据类型)

//定义样例类,温度传感器
case class SensorReading(id: String, timestamp: Long, temperature: Double)

//  将数据转化为样例类类型
val dataStream = inputStream
    .map(data => {
    val arr = data.split(",")
    SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
 })

//根据数据的id对数据进行分组
val alertStream = dataStream
    .keyBy(_.id)
    .flatMap(new TempChangeAlert(10.0))

首先对于输入的数据会根据对数据的数据进行切分,arr(0)对应id,以此类推,转化为对应的三元组。keyBy会根据样例类类型的id进行分组,选择合适的算子并需要实现相应的方法(这里实现TempChangeAlert方法,该方法继承RichMapFunction)

  1. 编写一个自定义RichFunction(富函数),此处使用RichMapFunction
    class MyRichMapper extends RichMapFunction[SensorReading,String]{
    override def map(value: SensorReading): String = ???
    }
    
    (1)class MyRichMapper extends RichMapFunction[SensorReading, String] {}自定义一个名为MyRichMapper的函数,继承RichMapFunction,传递两个泛型,第一个为输入数据的泛型,第二个输出数据的泛型,此函数必须重写一个map方法,其中map(value: SensorReading)为传入的值的变量名以及类型。
    (2)如何在这个函数定义一些状态?
    class TestRichMapper extends RichMapFunction[SensorReading,String]{
    val valueState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate",classOf[Double]))
    override def map(value: SensorReading): String = ???
    }
    
    val valueState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate",classOf[Double]))为键控状态的定义,可参考键控状态的使用中的说明。
    (3)此时如果在类外面调用上下是拿不到的,因为此时类还没创建,必须等到类实例化后才能调用,即至少在open生命周期里面再调用运行时上写文(因为open方法是富函数的初始化方法,它在每个任务首次调用转换方法前调用一次,后续不在调用,在富函数中我们常用到一个方法getRuntimeContext()方法,该方法会获取到RuntimeContext对象,从运行时上下文对象中我们可以获取到一些信息:函数并行度、函数所在子任务编号以及执行函数的名称,同时他还提供访问分区状态的方法。)
    class TestRichMapper extends RichMapFunction[SensorReading,String]{
    override def open(parameters: Configuration): Unit = {
     val valueState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate",classOf[Double]))
    }
    override def map(value: SensorReading): String = ???
    }
    
    (4)此时又产生一个问题:如果将状态定义在生命周期open中,那么如果后面的map函数中如果想使用这个状态时,是使用不了的,因为当前的状态的作用域被限定在open的生命周期里面
    image.png
    那如何解决这个问题?
    方法一
    需要将状态声明在open方法外面,状态类型要为可变类型(Scala中val是不可变类型,var是可变类型),这样就可以在open方法外面(map方法里面)使用这个状态了-valueState.value() ```scala class TestRichMapper extends RichMapFunction[SensorReading,String]{ var valueState:ValueState[Double] = _ override def open(parameters: Configuration): Unit = { valueState = getRuntimeContext.getState(new ValueStateDescriptorDouble) }

override def map(value: SensorReading): String = { val myV = valueState.value() value.id } }

**方法二:**使用在**lazy**关键字定义状态
> Scala中使用关键字lazy来定义惰性变量,实现延迟加载(懒加载)。惰性变量只能是不可变变量(val),并且只有在调用惰性变量时,才会去实例化这个变量。当不想将变量定义在open方法里面的时候,可以使用惰性变量。

(此时不需要再使用open方法进行富函数的初始化,使用lazy关键字修饰变量后,只有在使用该变量时,才会调用其实例化方法即open方法),此时即可在map方法中调用状态-valueState2.value()
```scala
class TestRichMapper extends RichMapFunction[SensorReading,String]{
  lazy val valueState2:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate2",classOf[Double]))

  override def map(value: SensorReading): String = {
    valueState2.value()
    value.id
  }
}

状态中数据传递与调用理解
有个疑惑:valueState2.value()是如何拿到上下文中的数据的?在重写的map方法中,数据流中每来一个数据,都会调用一次map方法,map方法中要想获取状态中的值,前提是需要定义运行时上下文方法getRuntimeContext(),getRuntimeContext方法中的getState()方法可以获取状态的值(状态只能通过RuntimeContext获取),进而在map方法中可以获取
image.png


案例:温度传感器温度报警

案例需求:对于输入的同一个温度传感器的温度值,要求连续来的两个相邻温度值相差不能超过十度,超过十度进行报警

案例分析:需要记录当前温度的上一个温度,即使用状态编程

代码编写:

  1. 定义温度传感器的样例类

    //定义样例类,温度传感器
    case class SensorReading(id: String, timestamp: Long, temperature: Double)
    
  2. 将数据转换为样例类类型

     val inputStream = env.socketTextStream("192.168.188.8", 7777)
     //  先转换为为样例类类型
     val dataStream = inputStream
       .map(data => {
         val arr = data.split(",")
         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
       })
    

    将输入的数据按照,进行切分,切分成三元组(三元组中分别数分别对应的字段类型为id,timestamp,temperature)

  3. 处理逻辑

针对同一个温度传感器进行温度差值计算,首先根据温度传感器的id进行分组,使用keyBy

val alertStream = dataStream
    .keyBy(_.id)
    .flatMap(new TempChangeAlert(10.0))

.keyBy(_.id) :按照三元组的第一个元素(id)分组,输入数据类型为SensorReading,输出数据类型为string,数据类型不同,所以使用map/flatmap算子(可查阅flink常见的算子操作:https://cloud.tencent.com/developer/article/1559993),这里使用flatmap,flatmap方法需要自己定义实现一个flatmap方法,这里定义一个名为TempChangeAlert的flatmap方法,并将温度差值10.0作为参数传入自定义方法中

  1. 实现自定义的flatmap方法-TempChangeAlert(注意:这里使用了状态编程,所以不仅仅是flatmap方法,更是一个RichFunction方法)

    //实现自定义RichFlatmapFunction
    class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
    //  定义状态,保存上一次的温度值
    lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-time", classOf[Double]))
    
    override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
     //    获取上一次的温度值
     val lastTemp = lastTempState.value()
     //    跟最新的温度值求差值作比较
     val diff = (value.temperature - lastTemp).abs
     if (diff > threshold) {
       out.collect((value.id, lastTemp, value.temperature))
     }
    
     //更新状态
     lastTempState.update(value.temperature)
    }
    }
    

    (1)自定义的TempChangeAlert方法继承RichFlatMapFunction方法,TempChangeAlert(threshold: Double)传入变量名为threshold的,类型为Double的数据(即温度差值)
    (2)RichFlatMapFunction[SensorReading, (String, Double, Double)],表示输入数据类型为SensorReading,输出数据为一个三元组,类型分别为String、Double、Double
    (3)必须重写flatMap方法(只要是继承了RichFunction方法,就必须重写函数,具体重写是哪个函数,要看RichFunction,如继承RichMapFunction,就必须重写map方法)
    (4)lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-time", classOf[Double])),定义一个变量名为lastTempState,类型为ValueState,泛型是Double的状态。
    (5)override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit,注意:flatmap输出的是空类型(与map不一样,map是:override def map(value: SensorReading): String),那flatmap什么输出呢?out: Collector,为收集器输出,输出类型为三元组
    (6)剩下的为差值计算,与threshold(10.0)作比较
    (7)out.collect((value.id, lastTemp, value.temperature))为输出类型为三元组

  2. 输入数据格式

    sensor_1,1547718202,6.7
    sensor_1,1547718205,38.1
    sensor_1,1547718129,29.8
    

    输入(前两次温度差值为10以上,第三次与第二次输入差值没有10以上)
    image.png
    输出
    image.png

    状态编程总结

  3. 状态编程必须有一个RichFunction(富函数),因为需要运行时上下文,有如下几个

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction
  1. 使用自定义函数继承RichFunction时,需要重写对应的方法(如自定义函数TestRichMapper 继承RichMapFunction,需要重写RichMapFunction中的map方法)
  2. 在类中定义状态时,有两种方法:一种在open方法中定义,另一种使用lazy关键字进行修饰(此时就不需要open方法)

状态后端(State Backend)

  • 每传入一条数据,有状态的算子任务都会读取和更新状态
  • 由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问
  • 状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(State Backend)
  • 状态后端主要负责两件事:本地的状态管理,以及将检查点(checkpoint)状态写入远程存储

    状态后端的类型

    https://jiamaoxiang.top/2019/08/23/Flink%E7%9A%84%E7%8A%B6%E6%80%81%E5%90%8E%E7%AB%AF-State-Backends/

    MemoryStateBackend

推荐使用的场景

  • 本地测试、几乎无状态的作业,比如 ETL、JobManager 不容易挂,或挂掉影响不大的情况。
  • 不推荐在生产场景使用。

    FsStateBackend

推荐使用的场景

  • 处理大状态,长窗口,或大键值状态的有状态处理任务, 例如分钟级窗口聚合或 join。
  • 适合用于高可用方案(需要开启HA的作业)。
  • 可以在生产环境中使用

RocksDBStateBackend

推荐使用的场景

  • 最适合用于处理大状态,长窗口,或大键值状态的有状态处理任务。
  • 非常适合用于高可用方案。
  • 最好是对状态读写性能要求不高的作业

注意:如果什么都不配置,系统默认的是MemoryStateBackend

flink容错机制

https://www.cnblogs.com/shengyang17/p/12562269.html

一致性检查点

检查点算法

保存点

flink状态一致性