

ProcessFunction是一个低级流处理 算子操作,可以访问所有(非循环)流应用程序的基本构建块:

  • 事件(流数据元)
  • state(容错,一致,仅在被Key化的数据流上)
  • 定时器(事件时间和处理时间,仅限被Key化的数据流)


对于容错状态,ProcessFunction可以访问Flink的被Keys化状态,可以通过其访问 RuntimeContext,类似于其他有状态函数可以访问被Keys化状态的方式。

定时器允许应用程序对处理时间和事件时间的变化作出反应。每次调用该函数processElement(...)都会获得一个Context对象,该对象可以访问数据元的事件时间戳和TimerService。的TimerService可用于注册为将来事件- /处理-时刻回调。达到计时器的特定时间时,将onTimer(...)调用该方法。在该调用期间,所有状态再次限定为创建计时器的键,允许计时器操纵被Keys化状态。


  1. stream.keyBy(...).process(new MyProcessFunction())


要在两个输入上实现低级 算子操作,应用程序可以使用CoProcessFunction。此函数绑定到两个不同的输入,并从两个不同的输入获取单个调用processElement1(...)processElement2(...)记录。


  • 为一个输入(或两者)创建状态对象
  • 从输入接收数据元时更新状态
  • 从其他输入接收数据元后,探测状态并生成连接结果



  • count,key和last-modification-timestamp存储在a中ValueState,它由key隐式定义。
  • 对于每个记录,ProcessFunction递增计数器并设置最后修改时间戳
  • 该函数还会在未来一分钟内调度回调(在事件时间内)
  • 在每次回调时,它会根据存储计数的最后修改时间检查回调的事件时间时间戳,如果匹配则发出键/计数(即,在该分钟内没有进一步更新)


  1. import org.apache.flink.api.common.state.ValueState;
  2. import org.apache.flink.api.common.state.ValueStateDescriptor;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.configuration.Configuration;
  5. import org.apache.flink.streaming.api.functions.ProcessFunction;
  6. import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
  7. import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
  8. import org.apache.flink.util.Collector;
  9. // the source data stream
  10. DataStream<Tuple2<String, String>> stream = ...;
  11. // apply the process function onto a keyed stream
  12. DataStream<Tuple2<String, Long>> result = stream
  13. .keyBy(0)
  14. .process(new CountWithTimeoutFunction());
  15. /**
  16. * The data type stored in the state
  17. */
  18. public class CountWithTimestamp {
  19. public String key;
  20. public long count;
  21. public long lastModified;
  22. }
  23. /**
  24. * The implementation of the ProcessFunction that maintains the count and timeouts
  25. */
  26. public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
  27. /** The state that is maintained by this process function */
  28. private ValueState<CountWithTimestamp> state;
  29. @Override
  30. public void open(Configuration parameters) throws Exception {
  31. state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
  32. }
  33. @Override
  34. public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
  35. throws Exception {
  36. // retrieve the current count
  37. CountWithTimestamp current = state.value();
  38. if (current == null) {
  39. current = new CountWithTimestamp();
  40. current.key = value.f0;
  41. }
  42. // update the state's count
  43. current.count++;
  44. // set the state's timestamp to the record's assigned event time timestamp
  45. current.lastModified = ctx.timestamp();
  46. // write the state back
  47. state.update(current);
  48. // schedule the next timer 60 seconds from the current event time
  49. ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
  50. }
  51. @Override
  52. public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
  53. throws Exception {
  54. // get the state for the key that scheduled the timer
  55. CountWithTimestamp result = state.value();
  56. // check if this is an outdated timer or the latest timer
  57. if (timestamp == result.lastModified + 60000) {
  58. // emit the state on timeout
  59. out.collect(new Tuple2<String, Long>(result.key, result.count));
  60. }
  61. }
  62. }
  1. import org.apache.flink.api.common.state.ValueState
  2. import org.apache.flink.api.common.state.ValueStateDescriptor
  3. import org.apache.flink.streaming.api.functions.ProcessFunction
  4. import org.apache.flink.streaming.api.functions.ProcessFunction.Context
  5. import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
  6. import org.apache.flink.util.Collector
  7. // the source data stream val stream: DataStream[Tuple2[String, String]] = ...
  8. // apply the process function onto a keyed stream val result: DataStream[Tuple2[String, Long]] = stream
  9. .keyBy(0)
  10. .process(new CountWithTimeoutFunction())
  11. /**
  12. * The data type stored in the state
  13. */
  14. case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
  15. /**
  16. * The implementation of the ProcessFunction that maintains the count and timeouts
  17. */
  18. class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {
  19. /** The state that is maintained by this process function */
  20. lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
  21. .getState(new ValueStateDescriptor[CountWithTimestamp](a71b3b76b196cc32ecc6bcd3538f490e))
  22. override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
  23. // initialize or retrieve/update the state
  24. val current: CountWithTimestamp = state.value match {
  25. case null =>
  26. CountWithTimestamp(value._1, 1, ctx.timestamp)
  27. case CountWithTimestamp(key, count, lastModified) =>
  28. CountWithTimestamp(key, count + 1, ctx.timestamp)
  29. }
  30. // write the state back
  31. state.update(current)
  32. // schedule the next timer 60 seconds from the current event time
  33. ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
  34. }
  35. override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
  36. state.value match {
  37. case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
  38. out.collect((key, count))
  39. case _ =>
  40. }
  41. }
  42. }

注意:在Flink 1.4.0之前,当从处理时间计时器调用时,该ProcessFunction.onTimer()方法将当前处理时间设置为事件时间时间戳。此行为非常微妙,用户可能不会注意到。嗯,这是有害的,因为处理时间时间戳是不确定的,不与水印对齐。此外,用户实现的逻辑依赖于这个错误的时间戳,很可能是出乎意料的错误。所以我们决定解决它。升级到1.4.0后,使用此不正确的事件时间戳的Flink作业将失败,用户应将其作业调整为正确的逻辑。


KeyedProcessFunction作为其扩展ProcessFunction,可以在其onTimer(...) 方法中访问计时器的键。

  1. @Override
  2. public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {
  3. K key = ctx.getCurrentKey();
  4. // ...
  5. }
  1. override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {
  2. var key = ctx.getCurrentKey
  3. // ... }




注意 Flink同步onTimer()和的调用processElement()。因此,用户不必担心并发修改状态。








  1. long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
  2. ctx.timerService().registerProcessingTimeTimer(coalescedTime);
  1. val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
  2. ctx.timerService.registerProcessingTimeTimer(coalescedTime)


  1. long coalescedTime = ctx.timerService().currentWatermark() + 1;
  2. ctx.timerService().registerEventTimeTimer(coalescedTime);
  1. val coalescedTime = ctx.timerService.currentWatermark + 1
  2. ctx.timerService.registerEventTimeTimer(coalescedTime)



  1. long timestampOfTimerToStop = ...
  2. ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
  1. val timestampOfTimerToStop = ...
  2. ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)


  1. long timestampOfTimerToStop = ...
  2. ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
  1. val timestampOfTimerToStop = ...
  2. ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
