一、概念
处理函数(最底层的API),可以访问时间戳,水位线、KeyState以及注册定时时间,还可以输出特定事件(超时事件),如FlinkSQL就是使用Processing Function实现的。Flink主要提供了8个ProcessFunction:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
常用特性如下所示:
- 处理单个元素;
- 访问时间戳;
- 旁路输出;
比较重要的几个API:
- Non-keyed-strema(没有分流的)
- ProcessFunction[In,Out]
- processElemnt:来一条数据处理一次
- onTimer:定时器
- KeyedProcedssFunction[Key,In,Out]
- processElement:来一条数据处理一次
- onTimer:定时器
- WindowedStream(分流开窗)
- ProcessWindowFunction[In,Out,Key,TimeWindow]
- process
- ConnectStream(两条流的合并)
- CoprocerssFunction
- processelements1 处理第一条流
- procesElements2 处理第二条流
- onTimer 定时器
processElemnt:该方法会对流中的每条记录都调用一次,输出0个或者多个元素,类似于FlatMap的功能,通过Collector将结果发出。除此之外,该函数有一个Context 参数,用户可以通过Context 访问时间戳、当前记录的key值以及TimerService(关于TimerService,下面会详细解释)。另外还可以使用output方法将数据发送到side output,实现分流或者处理迟到数据的功能。
onTimer:该方法是一个回调函数,当在TimerService中注册的计时器(timers)被触发时,会回调该函数。其中@param timestamp参数表示触发计时器(timers)的时间戳,Collector可以将记录发出。细心的你可能会发现,这两个方法都有一个上下文参数,上面的方法传递的是Context 参数,onTimer方法传递的是OnTimerContext参数,这两个参数对象可以实现相似的功能。OnTimerContext还可以返回触发计时器的时间域(EVENT_TIME与PROCESSING_TIME)。
TimerService提供了以下几种方法:
currentProcessingTime()
返回当前的处理时间
currentWatermark()
返回当前event-time水位线(watermark)时间戳
registerProcessingTimeTimer(long time)
针对当前key,注册一个processing time计时器(timers),当processing time的时间等于该计时器时钟时会被调用
registerEventTimeTimer(long time)
针对当前key,注册一个event time计时器(timers),当水位线时间戳大于等于该计时器时钟时会被调用
deleteProcessingTimeTimer(long time)
针对当前key,删除一个之前注册过的processing time计时器(timers),如果这个timer不存在,那么该方法不会起作用
deleteEventTimeTimer(long time)
针对当前key,删除一个之前注册过的event time计时器(timers),如果这个timer不存在,那么该方法不会起作用
当计时器触发时,会回调onTimer()函数,系统对于ProcessElement()方法和onTimer()方法的调用是同步的
二、示例
KeyedProcessFunction的Demo,和温度报警定时器Demo
package processfunction
import java.text.SimpleDateFormat
import java.util.Date
import datasource.SensorReading
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object ProcessFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream: DataStream[String] = env.socketTextStream("127.0.0.1",7777)
val dataStream : DataStream[SensorReading]= inputStream.map(data=>{
val arr: Array[String] = data.split(",")
SensorReading(arr(0),arr(1).trim.toLong,arr(2).trim.toDouble)
})
val resultStream = dataStream.keyBy(_.id).process(new TemperatureWarningFunction(10000L)).print()
env.execute("Process Function Test")
}
}
//当温度在interval(单位:毫秒)的区间内连续上升时候对数据进行报警
class TemperatureWarningFunction(interval:Long) extends KeyedProcessFunction[String,SensorReading,String]{
lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTempState",classOf[Double]))
//引入状态,用于记录定时器的时间戳
lazy val timerState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timetsState",classOf[Long]))
private val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
//获取状态值
val lastTempareture: Double = lastTempState.value()
val timestamp: Long = timerState.value()
//温度连续上升,且没有定时器(即在温度开始上升时设定一个定时器)
if (value.temperature > lastTempareture && timestamp==0){
val ts = ctx.timerService().currentProcessingTime() + interval
ctx.timerService().registerProcessingTimeTimer(ts) // 注册定时器在 当前时间+interval 毫秒后启动
lastTempState.update(value.temperature)
timerState.update(ts)
}else if (value.temperature < lastTempareture ){
//温度下降,需要把定时器删除
ctx.timerService().deleteProcessingTimeTimer(timestamp)
timerState.clear()
lastTempState.clear()
}else{
//如果温度持续上升,且定时器存在,则不进行任务操作直到onTimer启动
}
}
// 当触发器触发时执行的逻辑
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
val curDate = format.format(new Date(timerState.value()-interval))
val processDate = format.format(new Date(ctx.timerService().currentProcessingTime()))
val times = curDate+" - "+processDate
out.collect("传感器:"+ctx.getCurrentKey+" 的温度在区间"+times+" : "+interval/1000+"秒内连续上升")
timerState.clear()
}
}
//KeyedProcessFunction的示例
class MyKeyedProcessFunctionDemo extends KeyedProcessFunction[String,SensorReading,String]{
override def open(parameters: Configuration): Unit = {}
//处理逻辑
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
ctx.getCurrentKey //当前的数据的key
ctx.timestamp() // 当前数据的时间戳
ctx.timerService().currentWatermark() //当前的watermark
ctx.timerService().registerEventTimeTimer(System.currentTimeMillis()+60000L) //注册一个定时器,当前时间6秒后触发
ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis()) //注册一个定时器
ctx.timerService().deleteEventTimeTimer(1000L) //删除一个定时器,根据传入的时间戳判断是哪一个定时器
}
//定时器触发时执行的逻辑
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
}
}
//侧输出流的分流操作
class SlideOutPutFunction(threshold: Double) extends ProcessFunction[SensorReading,SensorReading]{
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if (value.temperature > threshold){
out.collect(value)
}else{
ctx.output(new OutputTag[SensorReading]("lowTemperature"),value)
}
}
}