https://blog.csdn.net/boling_cavalry/article/details/106299167
https://blog.csdn.net/winterking3/article/details/113616018
https://blog.51cto.com/u_15320818/5137549
一、前言
KeyedProcessFunction是用来处理KeyedStream的。每有一个数据进入算子,则会触发一次processElement()的处理。它还提供了计时器的功能,在特定场景下,非常适合。
二、结构
KeyedProcessFunction继承AbstractRichFunction,它和ProcessFunction类似,都有processElement()、onTimer(),且都是富函数,自然有open()和close()方法。
1. processElement
先看下processElement的参数,依次是输入值、上下文、输出值。
:::info
public abstract void processElement(I value, Context ctx, Collector
(2)侧输出流
OutputTag
lowTempTag = new OutputTag<>(“lowTemp”); if (value.getRecord()< 10){ ctx.output(lowTempTag, value); }
(3)时间服务
事件时间
注意Watermark是整个数据流的,和在KeyBy之前还是之后没有关系,Watermark和source的并行度有关,如果在自己测试调试功能时,可以先暂时设置并行度为1,方便测试。
//获取当前数据流的水位线 long currentWatermark = ctx.timerService().currentWatermark(); //设置定时器的时间为当前水位线+10秒 long ts = currentWatermark + 10000L;
//注册事件时间定时器 ctx.timerService().registerEventTimeTimer(ts);
//删除事件时间定时器 ctx.timerService().deleteEventTimeTimer(ts);
处理时间
//获取当前数据处理时间 long currentProcessTime = ctx.timerService().currentProcessingTime();
//设置定时器的时间为当前水位线+10秒 long ts = currentProcessTime + 10000L;
//注册处理时间定时器 ctx.timerService().registerProcessingTimeTimer(ts);
//删除处理时间定时器 ctx.timerService().deleteProcessingTimeTimer(ts);
3. onTimer
在定时器满足时间条件时,会触发onTimer,可以用out输出返回值。
public void onTimer(long timestamp, OnTimerContext ctx, Collector
out)