一、概念
处理函数(最底层的API),可以访问时间戳,水位线、KeyState以及注册定时时间,还可以输出特定事件(超时事件),如FlinkSQL就是使用Processing Function实现的。Flink主要提供了8个ProcessFunction:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
常用特性如下所示:
- 处理单个元素;
- 访问时间戳;
- 旁路输出;
比较重要的几个API:
- Non-keyed-strema(没有分流的)
- ProcessFunction[In,Out]
- processElemnt:来一条数据处理一次
- onTimer:定时器
- KeyedProcedssFunction[Key,In,Out]
- processElement:来一条数据处理一次
- onTimer:定时器
- WindowedStream(分流开窗)
- ProcessWindowFunction[In,Out,Key,TimeWindow]
- process
- ConnectStream(两条流的合并)
- CoprocerssFunction
- processelements1 处理第一条流
- procesElements2 处理第二条流
- onTimer 定时器
processElemnt:该方法会对流中的每条记录都调用一次,输出0个或者多个元素,类似于FlatMap的功能,通过Collector将结果发出。除此之外,该函数有一个Context 参数,用户可以通过Context 访问时间戳、当前记录的key值以及TimerService(关于TimerService,下面会详细解释)。另外还可以使用output方法将数据发送到side output,实现分流或者处理迟到数据的功能。
onTimer:该方法是一个回调函数,当在TimerService中注册的计时器(timers)被触发时,会回调该函数。其中@param timestamp参数表示触发计时器(timers)的时间戳,Collector可以将记录发出。细心的你可能会发现,这两个方法都有一个上下文参数,上面的方法传递的是Context 参数,onTimer方法传递的是OnTimerContext参数,这两个参数对象可以实现相似的功能。OnTimerContext还可以返回触发计时器的时间域(EVENT_TIME与PROCESSING_TIME)。
TimerService提供了以下几种方法:
currentProcessingTime()返回当前的处理时间currentWatermark()返回当前event-time水位线(watermark)时间戳registerProcessingTimeTimer(long time)针对当前key,注册一个processing time计时器(timers),当processing time的时间等于该计时器时钟时会被调用registerEventTimeTimer(long time)针对当前key,注册一个event time计时器(timers),当水位线时间戳大于等于该计时器时钟时会被调用deleteProcessingTimeTimer(long time)针对当前key,删除一个之前注册过的processing time计时器(timers),如果这个timer不存在,那么该方法不会起作用deleteEventTimeTimer(long time)针对当前key,删除一个之前注册过的event time计时器(timers),如果这个timer不存在,那么该方法不会起作用当计时器触发时,会回调onTimer()函数,系统对于ProcessElement()方法和onTimer()方法的调用是同步的
二、示例
KeyedProcessFunction的Demo,和温度报警定时器Demo
package processfunctionimport java.text.SimpleDateFormatimport java.util.Dateimport datasource.SensorReadingimport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collectorobject ProcessFunctionTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val inputStream: DataStream[String] = env.socketTextStream("127.0.0.1",7777)val dataStream : DataStream[SensorReading]= inputStream.map(data=>{val arr: Array[String] = data.split(",")SensorReading(arr(0),arr(1).trim.toLong,arr(2).trim.toDouble)})val resultStream = dataStream.keyBy(_.id).process(new TemperatureWarningFunction(10000L)).print()env.execute("Process Function Test")}}//当温度在interval(单位:毫秒)的区间内连续上升时候对数据进行报警class TemperatureWarningFunction(interval:Long) extends KeyedProcessFunction[String,SensorReading,String]{lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTempState",classOf[Double]))//引入状态,用于记录定时器的时间戳lazy val timerState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timetsState",classOf[Long]))private val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {//获取状态值val lastTempareture: Double = lastTempState.value()val timestamp: Long = timerState.value()//温度连续上升,且没有定时器(即在温度开始上升时设定一个定时器)if (value.temperature > lastTempareture && timestamp==0){val ts = ctx.timerService().currentProcessingTime() + intervalctx.timerService().registerProcessingTimeTimer(ts) // 注册定时器在 当前时间+interval 毫秒后启动lastTempState.update(value.temperature)timerState.update(ts)}else if (value.temperature < lastTempareture ){//温度下降,需要把定时器删除ctx.timerService().deleteProcessingTimeTimer(timestamp)timerState.clear()lastTempState.clear()}else{//如果温度持续上升,且定时器存在,则不进行任务操作直到onTimer启动}}// 当触发器触发时执行的逻辑override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {val curDate = format.format(new Date(timerState.value()-interval))val processDate = format.format(new Date(ctx.timerService().currentProcessingTime()))val times = curDate+" - "+processDateout.collect("传感器:"+ctx.getCurrentKey+" 的温度在区间"+times+" : "+interval/1000+"秒内连续上升")timerState.clear()}}//KeyedProcessFunction的示例class MyKeyedProcessFunctionDemo extends KeyedProcessFunction[String,SensorReading,String]{override def open(parameters: Configuration): Unit = {}//处理逻辑override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {ctx.getCurrentKey //当前的数据的keyctx.timestamp() // 当前数据的时间戳ctx.timerService().currentWatermark() //当前的watermarkctx.timerService().registerEventTimeTimer(System.currentTimeMillis()+60000L) //注册一个定时器,当前时间6秒后触发ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis()) //注册一个定时器ctx.timerService().deleteEventTimeTimer(1000L) //删除一个定时器,根据传入的时间戳判断是哪一个定时器}//定时器触发时执行的逻辑override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {}}//侧输出流的分流操作class SlideOutPutFunction(threshold: Double) extends ProcessFunction[SensorReading,SensorReading]{override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {if (value.temperature > threshold){out.collect(value)}else{ctx.output(new OutputTag[SensorReading]("lowTemperature"),value)}}}
