Flink的Keyed State支持以下数据类型:

  • ValueState[T] 保存单个的值,值的类型为T。
  • ListState[T] 保存一个列表,列表里的元素的数据类型为T
  • MapState[K, V] 保存Key-Value对
  • ReducingState[T]
  • AggregatingState[I, O]

使用案例: 连续 5s 水位上涨,则告警

声明有状态类型的变量, 这是初始化有 2种方法:

  1. Open 方法 中完成变量的初始化
  2. 在变量声明时使用 lazy val


  1. package com.ylb.time
  2. import com.ylb.myCluss.WaterSensor
  3. import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
  4. import org.apache.flink.streaming.api.TimeCharacteristic
  5. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  6. import org.apache.flink.api.scala._
  7. import org.apache.flink.configuration.Configuration
  8. import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, KeyedProcessFunction}
  9. import org.apache.flink.streaming.api.watermark.Watermark
  10. import org.apache.flink.util.Collector
  11. /**
  12. * @author yanglibin
  13. * @create 2020-03-07 8:56
  14. */
  15. object watermark_5 {
  16. def main(args: Array[String]): Unit = {
  17. val env = StreamExecutionEnvironment.getExecutionEnvironment
  18. env.setParallelism(1)
  19. // 设置时间语义
  20. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  21. // 处理数据
  22. // 连续 5s 水位上涨,则告警
  23. // 初始化变量
  24. // 初始化 当前水位值
  25. // var currentHigh = 0L
  26. // 初始化 连续告警值
  27. // var alarmTimer = 0L
  28. // markDS
  29. val markDS: DataStream[WaterSensor] = env.socketTextStream("hadoop-master", 9999)
  30. // 将数据封装为 WaterSensor 类
  31. .map(
  32. line => {
  33. val datas = line.split(",")
  34. WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)
  35. }
  36. )
  37. // 设置 watermark
  38. .assignTimestampsAndWatermarks(
  39. new AssignerWithPunctuatedWatermarks[WaterSensor] {
  40. override def checkAndGetNextWatermark(lastElement: WaterSensor, extractedTimestamp: Long): Watermark = {
  41. new Watermark(extractedTimestamp)
  42. }
  43. override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = {
  44. element.ts * 1000
  45. }
  46. }
  47. )
  48. // process
  49. val processDS: DataStream[String] = markDS
  50. .keyBy(_.id)
  51. .process(
  52. new KeyedProcessFunction[String, WaterSensor, String] {
  53. // 方法1: 在 Open 方法 中完成变量的初始化
  54. // private var currentHigh:ValueState[Long] = _
  55. // private var alarmTimer:ValueState[Long] = _
  56. //
  57. // override def open(parameters: Configuration): Unit = {
  58. // currentHigh = getRuntimeContext.getState(
  59. // new ValueStateDescriptor[Long]("currentHigh",classOf[Long])
  60. // )
  61. // alarmTimer = getRuntimeContext.getState(
  62. // new ValueStateDescriptor[Long]("alarmTimer",classOf[Long])
  63. // )
  64. // }
  65. // 方法2: 在变量声明时使用 lazy val
  66. lazy val currentHigh:ValueState[Long] = getRuntimeContext.getState(
  67. new ValueStateDescriptor[Long]("currentHigh",classOf[Long])
  68. )
  69. lazy val alarmTimer:ValueState[Long] = getRuntimeContext.getState(
  70. new ValueStateDescriptor[Long]("alarmTimer", classOf[Long])
  71. )
  72. override def onTimer(
  73. timestamp: Long,
  74. ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext,
  75. out: Collector[String]): Unit = {
  76. out.collect(s"id:${ctx.getCurrentKey}, 当前水位值: ${ctx.timerService().currentWatermark()}, 连接5s水位上涨")
  77. }
  78. override def processElement(
  79. value: WaterSensor,
  80. ctx: KeyedProcessFunction[String, WaterSensor, String]#Context,
  81. out: Collector[String]): Unit = {
  82. if (value.vc > currentHigh.value()) {
  83. // 如果传入值 大于 当前值, 则注册
  84. if (alarmTimer.value() == 0) {
  85. alarmTimer.update(value.ts * 1000 + 5000)
  86. ctx.timerService().registerEventTimeTimer(alarmTimer.value())
  87. }
  88. } else {
  89. // 否则删除 重新注册
  90. ctx.timerService().deleteEventTimeTimer(alarmTimer.value())
  91. alarmTimer.update(value.ts * 1000 + 5000)
  92. ctx.timerService().registerEventTimeTimer(alarmTimer.value())
  93. }
  94. currentHigh.update(value.vc.toLong)
  95. }
  96. }
  97. )
  98. // print
  99. markDS.print("mark>>")
  100. processDS.print("process>>")
  101. // 执行
  102. env.execute()
  103. }
  104. }

状态后端

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend
    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    4. <version>1.7.2</version>
    5. </dependency>
  1. val backend:StateBackend = new RocksDBStateBackend("c://tmp/output/flink")
  2. // 设置状态后端
  3. env.setStateBackend(backend)
  4. // 启用检查点
  5. // 1000ms 生成一条checkpoint数据 + 精准一次性处理
  6. env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE)