ProcessFunction 是一个偏底层的处理操作,它类似于带有 Keyed State 键状态和 Timer 定时机制的 FlatMapFunction ,作用于每一个输入进来的元素。然而,并不是所有的流都能使用 Keyed StateTimer 机制的:

  • Keyed State ,获取键状态功能,只能在 Keyed Stream 中有效底层不支持
  • Timer ,注册定时回调功能,只能在 Keyed Stream 中有效底层不支持
  • Event ,数据元素,只要是数据流就都能获取到

有兴趣的小伙伴可以自己跑一下👉WrongProcessingFunctionExample.java

If you want to access keyed state and timers you have to apply the ProcessFunction on a keyed stream: stream.keyBy(...).process(new MyProcessFunction())

通常情况下, ProcessFunction (包括 KeyedProcessFunction )都包含以下这几个方法:

  1. // 处理元素的地方。每次有元素输入,就回调该方法
  2. public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
  3. // 当registerProcessingTimeTimer()了时间,到点了就会回调;这个到点取决于使用啥时间语义奥
  4. // 这里默认使用的ProcessingTime
  5. public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

然后因为 ProcessFunction 这货都是继承的 AbstractRichFunction (前面讲过 RichFunction ),所以就不多说那些带有生命周期色彩的方法了。

  1. public void open(Configuration parameters) throws Exception {}
  2. public void close() throws Exception {}

ConnectedStream的ProcessFunction

两个留连接在一起之后,可以对连接后的流 ConnectedStream 执行 CoProcessFunction (不能使用 Keyed StateTimer )和 KeyedCoProcessFunction (使用前需要对 ConnectedStream 执行 keyBy() ),这两个 Function 都要事先 processElement1()processElement2() 接口。
害,官方还提了一个实践建议:

  • Create a state object for one input (or both)
    • 只需要为一个流创建一个状态对象(当然,也可以每个流都创建)
  • Update the state upon receiving elements from its input
    • 从一侧Input接收到数据后更新状态
  • Upon receiving elements from the other input, probe the state and produce the joined result
    • 从另一侧input接受到数据后,探测状态并产生合并结果

笑着笑着就哭了.jpeg
我太菜了,并没能领悟它的建议。

Timer

ProcessingTimeEventTime 都由 Context 内部的 TimerService 维护。 TimerService 会删除每个键时间戳重复的计时器,即一个键和一个时间戳的组合最多只能存在一个。比如 “A,1597760565000 ”组合只会存在一个(也就是说对应时间戳的 onTimer() 回调也只有一次),但允许和“A,1597760565001 ”共存。
另外onTimer()回调是线程安全的:

Flink synchronizes invocations of onTimer() and processElement(). Hence, users do not have to worry about concurrent modification of state.

Fault Tolerance

Timers are fault tolerant and checkpointed along with the state of the application. In case of a failure recovery or when starting an application from a savepoint, the timers are restored. Note Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. This might happen when an application recovers from a failure or when it is started from a savepoint.

Timers are always asynchronously checkpointed, except for the combination of RocksDB backend / with incremental snapshots / with heap-based timers (will be resolved with FLINK-10026). Notice that large

numbers of timers can increase the checkpointing time because timers are part of the checkpointed state. See the “Timer Coalescing” section for advice on how to reduce the number of timers.

这一块等学了状态恢复的时候回头来处理

Timer Coalescing

这部分是关于时间合并的概念,前面有说过 Timer一个key+一个时间戳 作为一个元素的,前面也举过一个例子,相同的key不同时间戳的定时器就能共存。虽然唯一性是一个必不可少的特性,但是一定程度上还是带来了一些问题:

  • 要求1秒钟只需要回调一次,但是连续注册了两个间隔1毫秒的定时任务

遇到这种情况,我们可以采用时间合并。害,其实就是让时间不要这么精确,模糊掉一些:

  1. // 正常情况下是这样的,直接使用毫秒;但是太精确了,会在同一秒上注册多个定时回调
  2. // long coalescedTime = ctx.timestamp() + timeout;
  3. // 将时间戳取整,精确到秒,不要到毫秒。降低精确度,这样就能让保证每一秒只会有一次回调
  4. long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
  5. ctx.timerService().registerProcessingTimeTimer(coalescedTime);
  • 在EventTime语义下,因为 onTimer() 只有在水位线越过注册时间时才触发的,所以注册的时候不要基于事件时间注册!而且不能用当前水位线注册

    1. // 错误示范1(假设f1是Event的时间戳)。onTimer()只会被水位线触发,即便EventTime到了value.f1+timeout
    2. // 也不会触发(除非水位线到达了)
    3. // long coalescedTime = value.f1 + timeout;
    4. // 错误示范2,获取到的是当前的水位线时间戳!这个水位线已经被人家触发过了,还注册呢?
    5. // long coalescedTime = ctx.timerService().currentWatermark();
    6. // 正确示范,给当前的水位线+1。只要未来的水位线越过 currentWatermark() + 1 就会触发onTimer()
    7. long coalescedTime = ctx.timerService().currentWatermark() + 1;
    8. ctx.timerService().registerEventTimeTimer(coalescedTime);

    Cancel

    取消 onTimer() 的办法就是把指定要取消的时间戳告诉 timerService 。比如你注册了一个回调:1597760565000,那么你取消的时候也得传 1597760565000必须得在一个键空间里哦~就和删除Map里面得 key 一样( key 由键和时间戳组成)。

  • ProcessingTIme 语义下:

    1. long timestampOfTimerToStop = ...
    2. ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
  • EventTime 语义下:

    1. long timestampOfTimerToStop = ...
    2. ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);

    实战篇

    大家直接去看官方文档吧,因为它写的挺清楚了——《Flink Example》