过程函数(低级 算子操作)

译者:flink.sojb.cn

ProcessFunction

ProcessFunction是一个低级流处理 算子操作,可以访问所有(非循环)流应用程序的基本构建块:

  • 事件(流数据元)
  • state(容错,一致,仅在被Key化的数据流上)
  • 定时器(事件时间和处理时间,仅限被Key化的数据流)

ProcessFunction可被认为是一个FlatMapFunction可以访问Keys状态和定时器。它通过为输入流中接收的每个事件调用来处理事件。

对于容错状态,ProcessFunction可以访问Flink的被Keys化状态,可以通过其访问 RuntimeContext,类似于其他有状态函数可以访问被Keys化状态的方式。

定时器允许应用程序对处理时间和事件时间的变化作出反应。每次调用该函数processElement(...)都会获得一个Context对象,该对象可以访问数据元的事件时间戳和TimerService。的TimerService可用于注册为将来事件- /处理-时刻回调。达到计时器的特定时间时,将onTimer(...)调用该方法。在该调用期间,所有状态再次限定为创建计时器的键,允许计时器操纵被Keys化状态。

注意如果要访问被Keys化状态和计时器,则必须应用ProcessFunction被Key化的数据流:

  1. stream.keyBy(...).process(new MyProcessFunction())

低级联接

要在两个输入上实现低级 算子操作,应用程序可以使用CoProcessFunction。此函数绑定到两个不同的输入,并从两个不同的输入获取单个调用processElement1(...)processElement2(...)记录。

实现低级别连接通常遵循以下模式:

  • 为一个输入(或两者)创建状态对象
  • 从输入接收数据元时更新状态
  • 从其他输入接收数据元后,探测状态并生成连接结果

例如,您可能会将客户数据关联金融交易,同时保持客户数据的状态。如果您在面对乱序事件时需要完全和确定性的连接,那么当客户数据流的水印已经超过该交易时,您可以使用计时器来评估和发出交易的连接。

以下示例维护每个键的计数,并在每分钟通过(事件时间)时发出键/计数对,而不更新该键:

  • count,key和last-modification-timestamp存储在a中ValueState,它由key隐式定义。
  • 对于每个记录,ProcessFunction递增计数器并设置最后修改时间戳
  • 该函数还会在未来一分钟内调度回调(在事件时间内)
  • 在每次回调时,它会根据存储计数的最后修改时间检查回调的事件时间时间戳,如果匹配则发出键/计数(即,在该分钟内没有进一步更新)

注意这个简单的例子可以用会话窗口实现。我们ProcessFunction在这里用它来说明它提供的基本模式。

  1. import org.apache.flink.api.common.state.ValueState;
  2. import org.apache.flink.api.common.state.ValueStateDescriptor;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.configuration.Configuration;
  5. import org.apache.flink.streaming.api.functions.ProcessFunction;
  6. import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
  7. import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
  8. import org.apache.flink.util.Collector;
  9. // the source data stream
  10. DataStream<Tuple2<String, String>> stream = ...;
  11. // apply the process function onto a keyed stream
  12. DataStream<Tuple2<String, Long>> result = stream
  13. .keyBy(0)
  14. .process(new CountWithTimeoutFunction());
  15. /**
  16. * The data type stored in the state
  17. */
  18. public class CountWithTimestamp {
  19. public String key;
  20. public long count;
  21. public long lastModified;
  22. }
  23. /**
  24. * The implementation of the ProcessFunction that maintains the count and timeouts
  25. */
  26. public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
  27. /** The state that is maintained by this process function */
  28. private ValueState<CountWithTimestamp> state;
  29. @Override
  30. public void open(Configuration parameters) throws Exception {
  31. state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
  32. }
  33. @Override
  34. public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
  35. throws Exception {
  36. // retrieve the current count
  37. CountWithTimestamp current = state.value();
  38. if (current == null) {
  39. current = new CountWithTimestamp();
  40. current.key = value.f0;
  41. }
  42. // update the state's count
  43. current.count++;
  44. // set the state's timestamp to the record's assigned event time timestamp
  45. current.lastModified = ctx.timestamp();
  46. // write the state back
  47. state.update(current);
  48. // schedule the next timer 60 seconds from the current event time
  49. ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
  50. }
  51. @Override
  52. public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
  53. throws Exception {
  54. // get the state for the key that scheduled the timer
  55. CountWithTimestamp result = state.value();
  56. // check if this is an outdated timer or the latest timer
  57. if (timestamp == result.lastModified + 60000) {
  58. // emit the state on timeout
  59. out.collect(new Tuple2<String, Long>(result.key, result.count));
  60. }
  61. }
  62. }
  1. import org.apache.flink.api.common.state.ValueState
  2. import org.apache.flink.api.common.state.ValueStateDescriptor
  3. import org.apache.flink.streaming.api.functions.ProcessFunction
  4. import org.apache.flink.streaming.api.functions.ProcessFunction.Context
  5. import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
  6. import org.apache.flink.util.Collector
  7. // the source data stream val stream: DataStream[Tuple2[String, String]] = ...
  8. // apply the process function onto a keyed stream val result: DataStream[Tuple2[String, Long]] = stream
  9. .keyBy(0)
  10. .process(new CountWithTimeoutFunction())
  11. /**
  12. * The data type stored in the state
  13. */
  14. case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
  15. /**
  16. * The implementation of the ProcessFunction that maintains the count and timeouts
  17. */
  18. class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {
  19. /** The state that is maintained by this process function */
  20. lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
  21. .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
  22. override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
  23. // initialize or retrieve/update the state
  24. val current: CountWithTimestamp = state.value match {
  25. case null =>
  26. CountWithTimestamp(value._1, 1, ctx.timestamp)
  27. case CountWithTimestamp(key, count, lastModified) =>
  28. CountWithTimestamp(key, count + 1, ctx.timestamp)
  29. }
  30. // write the state back
  31. state.update(current)
  32. // schedule the next timer 60 seconds from the current event time
  33. ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
  34. }
  35. override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
  36. state.value match {
  37. case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
  38. out.collect((key, count))
  39. case _ =>
  40. }
  41. }
  42. }

注意:在Flink 1.4.0之前,当从处理时间计时器调用时,该ProcessFunction.onTimer()方法将当前处理时间设置为事件时间时间戳。此行为非常微妙,用户可能不会注意到。嗯,这是有害的,因为处理时间时间戳是不确定的,不与水印对齐。此外,用户实现的逻辑依赖于这个错误的时间戳,很可能是出乎意料的错误。所以我们决定解决它。升级到1.4.0后,使用此不正确的事件时间戳的Flink作业将失败,用户应将其作业调整为正确的逻辑。

KeyedProcessFunction

KeyedProcessFunction作为其扩展ProcessFunction,可以在其onTimer(...) 方法中访问计时器的键。

  1. @Override
  2. public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {
  3. K key = ctx.getCurrentKey();
  4. // ...
  5. }
  1. override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {
  2. var key = ctx.getCurrentKey
  3. // ... }

计时器

两种类型的计时器(处理时间和事件时间)都由内部维护TimerService并排队执行。

TimerService每个键和时间戳,即删除重复数据计时器,还有每键和时间戳最多一个计时器。如果为同一时间戳注册了多个计时器,则只onTimer()调用一次该方法。

注意 Flink同步onTimer()和的调用processElement()。因此,用户不必担心并发修改状态。

容错

定时器具有容错能力,并且与应用程序的状态一起检查点。如果故障恢复或从保存点启动应用程序,则会恢复计时器。

注意在恢复之前应该点火的检查点处理时间计时器将立即触发。当应用程序从故障中恢复或从保存点启动时,可能会发生这种情况。

注意除了RocksDB后台/增量SNAPSHOT/基于堆的定时器(将与之解决FLINK-10026)的组合之外,定时器始终是异步检查点。请注意,大量的计时器可以增加检查点时间,因为计时器是检查点状态的一部分。有关如何Reduce定时器数量的建议,请参阅“定时器合并”部分。

定时器合并

由于Flink每个键和时间戳只保存一个计时器,因此可以通过降低计时器分辨率来合并它们来Reduce计时器的数量。

对于1秒的定时器分辨率(事件或处理时间),您可以将目标时间向下舍入为完整秒数。定时器最多提前1秒发射,但不迟于要求,精确到毫秒。结果,每个键最多有一个计时器,第二个。

  1. long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
  2. ctx.timerService().registerProcessingTimeTimer(coalescedTime);
  1. val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
  2. ctx.timerService.registerProcessingTimeTimer(coalescedTime)

由于事件时间计时器仅在出现水印的情况下触发,因此您还可以使用当前的水印来计划和合并这些计时器和下一个水印:

  1. long coalescedTime = ctx.timerService().currentWatermark() + 1;
  2. ctx.timerService().registerEventTimeTimer(coalescedTime);
  1. val coalescedTime = ctx.timerService.currentWatermark + 1
  2. ctx.timerService.registerEventTimeTimer(coalescedTime)

也可以按如下方式停止和删除计时器:

停止处理时间计时器:

  1. long timestampOfTimerToStop = ...
  2. ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
  1. val timestampOfTimerToStop = ...
  2. ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)

停止事件时间计时器:

  1. long timestampOfTimerToStop = ...
  2. ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
  1. val timestampOfTimerToStop = ...
  2. ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)

注意如果没有注册具有给定时间戳的此类计时器,则停止计时器无效。