一、概念

处理函数(最底层的API),可以访问时间戳,水位线、KeyState以及注册定时时间,还可以输出特定事件(超时事件),如FlinkSQL就是使用Processing Function实现的。Flink主要提供了8个ProcessFunction:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessJoinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

常用特性如下所示:

  1. 处理单个元素;
  2. 访问时间戳;
  3. 旁路输出;

比较重要的几个API:

  1. Non-keyed-strema(没有分流的)
  • ProcessFunction[In,Out]
    • processElemnt:来一条数据处理一次
    • onTimer:定时器
  • KeyedProcedssFunction[Key,In,Out]
    • processElement:来一条数据处理一次
    • onTimer:定时器
  1. WindowedStream(分流开窗)
  • ProcessWindowFunction[In,Out,Key,TimeWindow]
    • process
  1. ConnectStream(两条流的合并)
  • CoprocerssFunction
    • processelements1 处理第一条流
    • procesElements2 处理第二条流
    • onTimer 定时器

processElemnt:该方法会对流中的每条记录都调用一次,输出0个或者多个元素,类似于FlatMap的功能,通过Collector将结果发出。除此之外,该函数有一个Context 参数,用户可以通过Context 访问时间戳、当前记录的key值以及TimerService(关于TimerService,下面会详细解释)。另外还可以使用output方法将数据发送到side output,实现分流或者处理迟到数据的功能。

onTimer:该方法是一个回调函数,当在TimerService中注册的计时器(timers)被触发时,会回调该函数。其中@param timestamp参数表示触发计时器(timers)的时间戳,Collector可以将记录发出。细心的你可能会发现,这两个方法都有一个上下文参数,上面的方法传递的是Context 参数,onTimer方法传递的是OnTimerContext参数,这两个参数对象可以实现相似的功能。OnTimerContext还可以返回触发计时器的时间域(EVENT_TIME与PROCESSING_TIME)。

TimerService提供了以下几种方法:

  1. currentProcessingTime()
  2. 返回当前的处理时间
  3. currentWatermark()
  4. 返回当前event-time水位线(watermark)时间戳
  5. registerProcessingTimeTimer(long time)
  6. 针对当前key,注册一个processing time计时器(timers),当processing time的时间等于该计时器时钟时会被调用
  7. registerEventTimeTimer(long time)
  8. 针对当前key,注册一个event time计时器(timers),当水位线时间戳大于等于该计时器时钟时会被调用
  9. deleteProcessingTimeTimer(long time)
  10. 针对当前key,删除一个之前注册过的processing time计时器(timers),如果这个timer不存在,那么该方法不会起作用
  11. deleteEventTimeTimer(long time)
  12. 针对当前key,删除一个之前注册过的event time计时器(timers),如果这个timer不存在,那么该方法不会起作用
  13. 当计时器触发时,会回调onTimer()函数,系统对于ProcessElement()方法和onTimer()方法的调用是同步的

二、示例

KeyedProcessFunction的Demo,和温度报警定时器Demo

  1. package processfunction
  2. import java.text.SimpleDateFormat
  3. import java.util.Date
  4. import datasource.SensorReading
  5. import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
  6. import org.apache.flink.configuration.Configuration
  7. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  8. import org.apache.flink.streaming.api.scala._
  9. import org.apache.flink.util.Collector
  10. object ProcessFunctionTest {
  11. def main(args: Array[String]): Unit = {
  12. val env = StreamExecutionEnvironment.getExecutionEnvironment
  13. env.setParallelism(1)
  14. val inputStream: DataStream[String] = env.socketTextStream("127.0.0.1",7777)
  15. val dataStream : DataStream[SensorReading]= inputStream.map(data=>{
  16. val arr: Array[String] = data.split(",")
  17. SensorReading(arr(0),arr(1).trim.toLong,arr(2).trim.toDouble)
  18. })
  19. val resultStream = dataStream.keyBy(_.id).process(new TemperatureWarningFunction(10000L)).print()
  20. env.execute("Process Function Test")
  21. }
  22. }
  23. //当温度在interval(单位:毫秒)的区间内连续上升时候对数据进行报警
  24. class TemperatureWarningFunction(interval:Long) extends KeyedProcessFunction[String,SensorReading,String]{
  25. lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTempState",classOf[Double]))
  26. //引入状态,用于记录定时器的时间戳
  27. lazy val timerState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timetsState",classOf[Long]))
  28. private val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  29. override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
  30. //获取状态值
  31. val lastTempareture: Double = lastTempState.value()
  32. val timestamp: Long = timerState.value()
  33. //温度连续上升,且没有定时器(即在温度开始上升时设定一个定时器)
  34. if (value.temperature > lastTempareture && timestamp==0){
  35. val ts = ctx.timerService().currentProcessingTime() + interval
  36. ctx.timerService().registerProcessingTimeTimer(ts) // 注册定时器在 当前时间+interval 毫秒后启动
  37. lastTempState.update(value.temperature)
  38. timerState.update(ts)
  39. }else if (value.temperature < lastTempareture ){
  40. //温度下降,需要把定时器删除
  41. ctx.timerService().deleteProcessingTimeTimer(timestamp)
  42. timerState.clear()
  43. lastTempState.clear()
  44. }else{
  45. //如果温度持续上升,且定时器存在,则不进行任务操作直到onTimer启动
  46. }
  47. }
  48. // 当触发器触发时执行的逻辑
  49. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
  50. val curDate = format.format(new Date(timerState.value()-interval))
  51. val processDate = format.format(new Date(ctx.timerService().currentProcessingTime()))
  52. val times = curDate+" - "+processDate
  53. out.collect("传感器:"+ctx.getCurrentKey+" 的温度在区间"+times+" : "+interval/1000+"秒内连续上升")
  54. timerState.clear()
  55. }
  56. }
  57. //KeyedProcessFunction的示例
  58. class MyKeyedProcessFunctionDemo extends KeyedProcessFunction[String,SensorReading,String]{
  59. override def open(parameters: Configuration): Unit = {}
  60. //处理逻辑
  61. override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
  62. ctx.getCurrentKey //当前的数据的key
  63. ctx.timestamp() // 当前数据的时间戳
  64. ctx.timerService().currentWatermark() //当前的watermark
  65. ctx.timerService().registerEventTimeTimer(System.currentTimeMillis()+60000L) //注册一个定时器,当前时间6秒后触发
  66. ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis()) //注册一个定时器
  67. ctx.timerService().deleteEventTimeTimer(1000L) //删除一个定时器,根据传入的时间戳判断是哪一个定时器
  68. }
  69. //定时器触发时执行的逻辑
  70. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
  71. }
  72. }
  73. //侧输出流的分流操作
  74. class SlideOutPutFunction(threshold: Double) extends ProcessFunction[SensorReading,SensorReading]{
  75. override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
  76. if (value.temperature > threshold){
  77. out.collect(value)
  78. }else{
  79. ctx.output(new OutputTag[SensorReading]("lowTemperature"),value)
  80. }
  81. }
  82. }