参考:https://www.cnblogs.com/shengyang17/p/12549199.html

可以把状态理解为某个算子子任务在其当前实例上的一个变量,变量记录了数据流的历史信息。当新数据流入时,我们可以结合历史信息来进行计算。实际上,Flink的状态是由算子的子任务来创建和管理的。一个状态更新和获取的流程如上图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。
在 Flink 中,状态始终与特定算子相关联。总的来说,有两种类型的状态 :算子状态与键控状态
算子状态(operator state)
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有 数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由 相同或不同算子的另一个任务访问。 Flink 为算子状态提供三种基本数据结构:
列表状态(List state)
联合列表状态(Union list state)
也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
广播状态(Broadcast state)
如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应 用广播状态。
键控状态(keyed state)-更常用
键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护 和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于 一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)。 Flink 为键控状态提供三种基本数据结构:
值状态(ValueState)
将状态表示为单个值
将状态表示为一组数据的列表(存多个状态)
- ListState.add(T value)
- ListState.addAll(List
values) - ListState.get()返回Iterable
- ListState.update(List
values) 映射状态(MapState
)
将状态表示为一组Key-Value对
- MapState.get(UK key)
- MapState.put(UK key, UV value)
- MapState.contains(UK key)
- MapState.remove(UK key)
聚合状态(ReducingState
将状态表示为一个用于聚合操作的列表& AggregatingState) 状态的定义
Flink通过StateDescriptor来定义一个状态。这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息,与上面的状态类型对应。如下:
ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptorAggregatingStateDescriptorMapStateDescriptor
状态的使用(以下是键控状态的示例)
getRuntimeContext: 这个函数是获取运行时的上下文。因为flink是面向流的,所以数据要在里面因为flink是面向流的,要给流动的数据进行缓存,这里flink有一个机制叫做获取上下文getRunTime他的入参是一个状态描述符,这个类本身是继承于Serializable接口去存储状态,需要声明类型,如下getRuntimeContext().getState(new valueStateDescriptor);
示例代码为值状态示例
//1.声明一个键控状态
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(
new ValueStateDescriptor[Double]("lastTemp", classOf[Double])
)
//2.读取状态:
val prevTemp = lastTemp.value()
//3.对状态赋值:
lastTemp.update(value.temperature)
代码讲解:(1)第一步申明一个键控状态时,lastTemp为键控状态的变量名;ValueState为状态类型(此处为值状态);[Double]是泛型,说明前面的值是什么类型,以便flink做序列化与反序列化;getRuntimeContext说明是在运行时上下文中去定义,getState方法,通过ValueStateDescriptor参数来获取ValueState;( new ValueStateDescriptor[Double] )表示传递的参数是state的描述器,当前是一个valuestate,所以传递一个ValueStateDescriptor并指定泛型为double,(“lastTemp”, classOf[Double])为ValueStateDescriptor的参数,第一个参数lastTemp表示当前状态的名称,classOf[Double]表示当前状态的类型(2)lastTemp.value()表示当前的valuestate有一个方法.value用来读取状态(3).update为状态赋值
状态编程
代码示例以及理解
状态编程必须有一个RichFunction(富函数:https://www.codenong.com/cs106286917/),因为需要运行时上下文
输入数据的样例类(样例类为flink的数据类型)
//定义样例类,温度传感器
case class SensorReading(id: String, timestamp: Long, temperature: Double)
// 将数据转化为样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
})
//根据数据的id对数据进行分组
val alertStream = dataStream
.keyBy(_.id)
.flatMap(new TempChangeAlert(10.0))
首先对于输入的数据会根据,对数据的数据进行切分,arr(0)对应id,以此类推,转化为对应的三元组。keyBy会根据样例类类型的id进行分组,选择合适的算子并需要实现相应的方法(这里实现TempChangeAlert方法,该方法继承RichMapFunction)
- 编写一个自定义RichFunction(富函数),此处使用RichMapFunction
(1)class MyRichMapper extends RichMapFunction[SensorReading,String]{ override def map(value: SensorReading): String = ??? }class MyRichMapper extends RichMapFunction[SensorReading, String] {}自定义一个名为MyRichMapper的函数,继承RichMapFunction,传递两个泛型,第一个为输入数据的泛型,第二个输出数据的泛型,此函数必须重写一个map方法,其中map(value: SensorReading)为传入的值的变量名以及类型。
(2)如何在这个函数定义一些状态?class TestRichMapper extends RichMapFunction[SensorReading,String]{ val valueState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate",classOf[Double])) override def map(value: SensorReading): String = ??? }val valueState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate",classOf[Double]))为键控状态的定义,可参考键控状态的使用中的说明。
(3)此时如果在类外面调用上下是拿不到的,因为此时类还没创建,必须等到类实例化后才能调用,即至少在open生命周期里面再调用运行时上写文(因为open方法是富函数的初始化方法,它在每个任务首次调用转换方法前调用一次,后续不在调用,在富函数中我们常用到一个方法getRuntimeContext()方法,该方法会获取到RuntimeContext对象,从运行时上下文对象中我们可以获取到一些信息:函数并行度、函数所在子任务编号以及执行函数的名称,同时他还提供访问分区状态的方法。)
(4)此时又产生一个问题:如果将状态定义在生命周期open中,那么如果后面的map函数中如果想使用这个状态时,是使用不了的,因为当前的状态的作用域被限定在open的生命周期里面class TestRichMapper extends RichMapFunction[SensorReading,String]{ override def open(parameters: Configuration): Unit = { val valueState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate",classOf[Double])) } override def map(value: SensorReading): String = ??? }
那如何解决这个问题?
方法一
需要将状态声明在open方法外面,状态类型要为可变类型(Scala中val是不可变类型,var是可变类型),这样就可以在open方法外面(map方法里面)使用这个状态了-valueState.value() ```scala class TestRichMapper extends RichMapFunction[SensorReading,String]{ var valueState:ValueState[Double] = _ override def open(parameters: Configuration): Unit = { valueState = getRuntimeContext.getState(new ValueStateDescriptorDouble) }
override def map(value: SensorReading): String = { val myV = valueState.value() value.id } }
**方法二:**使用在**lazy**关键字定义状态
> Scala中使用关键字lazy来定义惰性变量,实现延迟加载(懒加载)。惰性变量只能是不可变变量(val),并且只有在调用惰性变量时,才会去实例化这个变量。当不想将变量定义在open方法里面的时候,可以使用惰性变量。
(此时不需要再使用open方法进行富函数的初始化,使用lazy关键字修饰变量后,只有在使用该变量时,才会调用其实例化方法即open方法),此时即可在map方法中调用状态-valueState2.value()
```scala
class TestRichMapper extends RichMapFunction[SensorReading,String]{
lazy val valueState2:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate2",classOf[Double]))
override def map(value: SensorReading): String = {
valueState2.value()
value.id
}
}
状态中数据传递与调用理解
有个疑惑:valueState2.value()是如何拿到上下文中的数据的?在重写的map方法中,数据流中每来一个数据,都会调用一次map方法,map方法中要想获取状态中的值,前提是需要定义运行时上下文方法getRuntimeContext(),getRuntimeContext方法中的getState()方法可以获取状态的值(状态只能通过RuntimeContext获取),进而在map方法中可以获取
案例:温度传感器温度报警
案例需求:对于输入的同一个温度传感器的温度值,要求连续来的两个相邻温度值相差不能超过十度,超过十度进行报警
案例分析:需要记录当前温度的上一个温度,即使用状态编程
代码编写:
定义温度传感器的样例类
//定义样例类,温度传感器 case class SensorReading(id: String, timestamp: Long, temperature: Double)将数据转换为样例类类型
val inputStream = env.socketTextStream("192.168.188.8", 7777) // 先转换为为样例类类型 val dataStream = inputStream .map(data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) })将输入的数据按照
,进行切分,切分成三元组(三元组中分别数分别对应的字段类型为id,timestamp,temperature)处理逻辑
针对同一个温度传感器进行温度差值计算,首先根据温度传感器的id进行分组,使用keyBy
val alertStream = dataStream
.keyBy(_.id)
.flatMap(new TempChangeAlert(10.0))
.keyBy(_.id) :按照三元组的第一个元素(id)分组,输入数据类型为SensorReading,输出数据类型为string,数据类型不同,所以使用map/flatmap算子(可查阅flink常见的算子操作:https://cloud.tencent.com/developer/article/1559993),这里使用flatmap,flatmap方法需要自己定义实现一个flatmap方法,这里定义一个名为TempChangeAlert的flatmap方法,并将温度差值10.0作为参数传入自定义方法中
实现自定义的flatmap方法-TempChangeAlert(注意:这里使用了状态编程,所以不仅仅是flatmap方法,更是一个RichFunction方法)
//实现自定义RichFlatmapFunction class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] { // 定义状态,保存上一次的温度值 lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-time", classOf[Double])) override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = { // 获取上一次的温度值 val lastTemp = lastTempState.value() // 跟最新的温度值求差值作比较 val diff = (value.temperature - lastTemp).abs if (diff > threshold) { out.collect((value.id, lastTemp, value.temperature)) } //更新状态 lastTempState.update(value.temperature) } }(1)自定义的TempChangeAlert方法继承RichFlatMapFunction方法,TempChangeAlert(threshold: Double)传入变量名为threshold的,类型为Double的数据(即温度差值)
(2)RichFlatMapFunction[SensorReading, (String, Double, Double)],表示输入数据类型为SensorReading,输出数据为一个三元组,类型分别为String、Double、Double
(3)必须重写flatMap方法(只要是继承了RichFunction方法,就必须重写函数,具体重写是哪个函数,要看RichFunction,如继承RichMapFunction,就必须重写map方法)
(4)lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-time", classOf[Double])),定义一个变量名为lastTempState,类型为ValueState,泛型是Double的状态。
(5)override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit,注意:flatmap输出的是空类型(与map不一样,map是:override def map(value: SensorReading): String),那flatmap什么输出呢?out: Collector,为收集器输出,输出类型为三元组
(6)剩下的为差值计算,与threshold(10.0)作比较
(7)out.collect((value.id, lastTemp, value.temperature))为输出类型为三元组输入数据格式
sensor_1,1547718202,6.7 sensor_1,1547718205,38.1 sensor_1,1547718129,29.8输入(前两次温度差值为10以上,第三次与第二次输入差值没有10以上)

输出
状态编程总结
状态编程必须有一个RichFunction(富函数),因为需要运行时上下文,有如下几个
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- 使用自定义函数继承RichFunction时,需要重写对应的方法(如自定义函数TestRichMapper 继承RichMapFunction,需要重写RichMapFunction中的map方法)
- 在类中定义状态时,有两种方法:一种在open方法中定义,另一种使用lazy关键字进行修饰(此时就不需要open方法)
状态后端(State Backend)
- 每传入一条数据,有状态的算子任务都会读取和更新状态
- 由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问
- 状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(State Backend)
- 状态后端主要负责两件事:本地的状态管理,以及将检查点(checkpoint)状态写入远程存储
状态后端的类型
https://jiamaoxiang.top/2019/08/23/Flink%E7%9A%84%E7%8A%B6%E6%80%81%E5%90%8E%E7%AB%AF-State-Backends/MemoryStateBackend
推荐使用的场景:
推荐使用的场景:
- 处理大状态,长窗口,或大键值状态的有状态处理任务, 例如分钟级窗口聚合或 join。
- 适合用于高可用方案(需要开启HA的作业)。
- 可以在生产环境中使用
RocksDBStateBackend
推荐使用的场景:
- 最适合用于处理大状态,长窗口,或大键值状态的有状态处理任务。
- 非常适合用于高可用方案。
- 最好是对状态读写性能要求不高的作业
注意:如果什么都不配置,系统默认的是MemoryStateBackend
flink容错机制
https://www.cnblogs.com/shengyang17/p/12562269.html
