Flink的Keyed State支持以下数据类型:
ValueState[T]
保存单个的值,值的类型为T。ListState[T]
保存一个列表,列表里的元素的数据类型为TMapState[K, V]
保存Key-Value对ReducingState[T]
AggregatingState[I, O]
使用案例: 连续 5s 水位上涨,则告警
声明有状态类型的变量, 这是初始化有 2种方法:
- 在
Open 方法
中完成变量的初始化 - 在变量声明时使用
lazy val
package com.ylb.time
import com.ylb.myCluss.WaterSensor
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, KeyedProcessFunction}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.util.Collector
/**
* @author yanglibin
* @create 2020-03-07 8:56
*/
object watermark_5 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 处理数据
// 连续 5s 水位上涨,则告警
// 初始化变量
// 初始化 当前水位值
// var currentHigh = 0L
// 初始化 连续告警值
// var alarmTimer = 0L
// markDS
val markDS: DataStream[WaterSensor] = env.socketTextStream("hadoop-master", 9999)
// 将数据封装为 WaterSensor 类
.map(
line => {
val datas = line.split(",")
WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)
}
)
// 设置 watermark
.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks[WaterSensor] {
override def checkAndGetNextWatermark(lastElement: WaterSensor, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp)
}
override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = {
element.ts * 1000
}
}
)
// process
val processDS: DataStream[String] = markDS
.keyBy(_.id)
.process(
new KeyedProcessFunction[String, WaterSensor, String] {
// 方法1: 在 Open 方法 中完成变量的初始化
// private var currentHigh:ValueState[Long] = _
// private var alarmTimer:ValueState[Long] = _
//
// override def open(parameters: Configuration): Unit = {
// currentHigh = getRuntimeContext.getState(
// new ValueStateDescriptor[Long]("currentHigh",classOf[Long])
// )
// alarmTimer = getRuntimeContext.getState(
// new ValueStateDescriptor[Long]("alarmTimer",classOf[Long])
// )
// }
// 方法2: 在变量声明时使用 lazy val
lazy val currentHigh:ValueState[Long] = getRuntimeContext.getState(
new ValueStateDescriptor[Long]("currentHigh",classOf[Long])
)
lazy val alarmTimer:ValueState[Long] = getRuntimeContext.getState(
new ValueStateDescriptor[Long]("alarmTimer", classOf[Long])
)
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext,
out: Collector[String]): Unit = {
out.collect(s"id:${ctx.getCurrentKey}, 当前水位值: ${ctx.timerService().currentWatermark()}, 连接5s水位上涨")
}
override def processElement(
value: WaterSensor,
ctx: KeyedProcessFunction[String, WaterSensor, String]#Context,
out: Collector[String]): Unit = {
if (value.vc > currentHigh.value()) {
// 如果传入值 大于 当前值, 则注册
if (alarmTimer.value() == 0) {
alarmTimer.update(value.ts * 1000 + 5000)
ctx.timerService().registerEventTimeTimer(alarmTimer.value())
}
} else {
// 否则删除 重新注册
ctx.timerService().deleteEventTimeTimer(alarmTimer.value())
alarmTimer.update(value.ts * 1000 + 5000)
ctx.timerService().registerEventTimeTimer(alarmTimer.value())
}
currentHigh.update(value.vc.toLong)
}
}
)
markDS.print("mark>>")
processDS.print("process>>")
// 执行
env.execute()
}
}
状态后端
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.7.2</version>
</dependency>
val backend:StateBackend = new RocksDBStateBackend("c://tmp/output/flink")
// 设置状态后端
env.setStateBackend(backend)
// 启用检查点
// 1000ms 生成一条checkpoint数据 + 精准一次性处理
env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE)