https://blog.csdn.net/boling_cavalry/article/details/106299167
https://blog.csdn.net/winterking3/article/details/113616018
https://blog.51cto.com/u_15320818/5137549

一、前言

KeyedProcessFunction是用来处理KeyedStream的。每有一个数据进入算子,则会触发一次processElement()的处理。它还提供了计时器的功能,在特定场景下,非常适合。

二、结构

KeyedProcessFunction继承AbstractRichFunction,它和ProcessFunction类似,都有processElement()、onTimer(),且都是富函数,自然有open()和close()方法。
image.png

1. processElement

先看下processElement的参数,依次是输入值、上下文、输出值。 :::info public abstract void processElement(I value, Context ctx, Collector out) :::

  • 每一个数据进入算子,都会执行processElement处理它
  • 返回0、1、多个输出值

    2. 上下文的使用

    (1)获取当前流的key

    ctx.getCurrentKey()

(2)侧输出流

OutputTag lowTempTag = new OutputTag<>(“lowTemp”); if (value.getRecord()< 10){ ctx.output(lowTempTag, value); }

(3)时间服务
事件时间
注意Watermark是整个数据流的,和在KeyBy之前还是之后没有关系,Watermark和source的并行度有关,如果在自己测试调试功能时,可以先暂时设置并行度为1,方便测试。

//获取当前数据流的水位线 long currentWatermark = ctx.timerService().currentWatermark(); //设置定时器的时间为当前水位线+10秒 long ts = currentWatermark + 10000L;

//注册事件时间定时器 ctx.timerService().registerEventTimeTimer(ts);

//删除事件时间定时器 ctx.timerService().deleteEventTimeTimer(ts);

处理时间

//获取当前数据处理时间 long currentProcessTime = ctx.timerService().currentProcessingTime();

//设置定时器的时间为当前水位线+10秒 long ts = currentProcessTime + 10000L;

//注册处理时间定时器 ctx.timerService().registerProcessingTimeTimer(ts);

//删除处理时间定时器 ctx.timerService().deleteProcessingTimeTimer(ts);

3. onTimer

在定时器满足时间条件时,会触发onTimer,可以用out输出返回值。

public void onTimer(long timestamp, OnTimerContext ctx, Collector out)