Process Function (Low-level Operations)

过程功能(低级操作)

The ProcessFunction

过程功能

The ProcessFunction is a low-level stream processing operation, giving access to the basic building blocks of all (acyclic) streaming applications: “ProcessFunction”是低级的流处理操作,提供对所有(非循环)流应用的基本构建块的访问:

  • events (stream elements)
  • 事件(流元素)
  • state (fault-tolerant, consistent, only on keyed stream)
  • 状态(容错、一致,仅在键控流中)
  • 定时器(事件时间和处理时间,仅在键控流中)

The ProcessFunction can be thought of as a FlatMapFunction with access to keyed state and timers. It handles events by being invoked for each event received in the input stream(s). “ProcessFunction”可以被认为是具有对键控状态和定时器的访问的“FlatMapFunction”。它通过为输入流中接收的每个事件调用来处理事件。

For fault-tolerant state, the ProcessFunction gives access to Flink’s keyed state, accessible via the RuntimeContext, similar to the way other stateful functions can access keyed state. 对于容错状态,“ProcessFunction”允许访问Flink的通过‘RuntimeContext’访问的键控state,‘,类似于其他有状态函数访问键控状态的方式。

The timers allow applications to react to changes in processing time and in event time. Every call to the function processElement(...) gets a Context object which gives access to the element’s event time timestamp, and to the TimerService. The TimerService can be used to register callbacks for future event-/processing-time instants. When a timer’s particular time is reached, the onTimer(...) method is called. During that call, all states are again scoped to the key with which the timer was created, allowing timers to manipulate keyed state. 定时器允许应用程序对处理时间和事件时间中的更改作出反应。对函数“ProcessElement(…)”的每个调用都会获得一个“上下文”对象,该对象将访问元素的事件时间戳和timerService。“定时器服务”可用于为将来的事件-/处理-时间瞬间注册回调。当达到定时器的特定时间时,调用“ontimer(…)”方法。在该调用过程中,所有状态再次范围为创建定时器的密钥,允许定时器操纵键状态。

Note If you want to access keyed state and timers you have to apply the ProcessFunction on a keyed stream: 注如果要访问键控状态和定时器,则必须在键控流上应用“ProcessFunction”:

  1. stream.keyBy(...).process(new MyProcessFunction())

Low-level Joins

低层连接

To realize low-level operations on two inputs, applications can use CoProcessFunction. This function is bound to two different inputs and gets individual calls to processElement1(...) and processElement2(...) for records from the two different inputs. 为了在两个输入上实现低级操作,应用程序可以使用“CoProcessFunction”。此函数绑定到两个不同的输入,并从两个不同的输入获取“ProcessElement1(…)”和“ProcessElement2(…)”的单独调用。

Implementing a low level join typically follows this pattern: 实现低级别连接通常遵循以下模式:

  • Create a state object for one input (or both)
  • 为一个输入创建状态对象(或两者)
  • Update the state upon receiving elements from its input
  • 从其输入接收元素时更新状态
  • Upon receiving elements from the other input, probe the state and produce the joined result
  • 接收来自其他输入的元素后,探测状态并生成连接结果。

For example, you might be joining customer data to financial trades, while keeping state for the customer data. If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade. 例如,您可能会将客户数据加入金融行业,同时保持客户数据的状态。如果您关注在无序事件的情况下具有完整和确定性的连接,则可以使用计时器来评估和发出交易的加入,以便当客户数据流的水印已经通过该交易的时间时进行交易。

Example

例子

The following example maintains counts per key, and emits a key/count pair whenever a minute passes (in event time) without an update for that key: 下面的示例维护每个键的计数,并且在没有更新该键的情况下,每当一分钟过去(在事件发生时)时,就会发出一个键/计数对:

  • The count, key, and last-modification-timestamp are stored in a ValueState, which is implicitly scoped by key.
  • 计数、密钥和最后修改时间戳存储在一个“Value State”中,该状态由密钥隐式范围。
  • For each record, the ProcessFunction increments the counter and sets the last-modification timestamp
  • 对于每个记录,“ProcessFunction”将递增计数器并设置最后修改时间戳
  • The function also schedules a callback one minute into the future (in event time)
  • 该函数还将回调调度为将来(在事件时间内)
  • Upon each callback, it checks the callback’s event time timestamp against the last-modification time of the stored count and emits the key/count if they match (i.e., no further update occurred during that minute)
  • 在每次回调时,它会检查回调的事件时间戳和存储计数的最后修改时间,如果它们匹配,则发出键/计数(也就是说,在这一分钟内没有发生进一步的更新)。

Note This simple example could have been implemented with session windows. We use ProcessFunction here to illustrate the basic pattern it provides. 注意,此简单示例可以在会话窗口中实现。我们在这里使用“ProcessFunction”来说明它提供的基本模式。

  1. import org.apache.flink.api.common.state.ValueState;
  2. import org.apache.flink.api.common.state.ValueStateDescriptor;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.configuration.Configuration;
  5. import org.apache.flink.streaming.api.functions.ProcessFunction;
  6. import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
  7. import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
  8. import org.apache.flink.util.Collector;
  9. // the source data stream
  10. DataStream<Tuple2<String, String>> stream = ...;
  11. // apply the process function onto a keyed stream
  12. DataStream<Tuple2<String, Long>> result = stream
  13. .keyBy(0)
  14. .process(new CountWithTimeoutFunction());
  15. /**
  16. * The data type stored in the state
  17. */
  18. public class CountWithTimestamp {
  19. public String key;
  20. public long count;
  21. public long lastModified;
  22. }
  23. /**
  24. * The implementation of the ProcessFunction that maintains the count and timeouts
  25. */
  26. public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
  27. /** The state that is maintained by this process function */
  28. private ValueState<CountWithTimestamp> state;
  29. @Override
  30. public void open(Configuration parameters) throws Exception {
  31. state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
  32. }
  33. @Override
  34. public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
  35. throws Exception {
  36. // retrieve the current count
  37. CountWithTimestamp current = state.value();
  38. if (current == null) {
  39. current = new CountWithTimestamp();
  40. current.key = value.f0;
  41. }
  42. // update the state's count
  43. current.count++;
  44. // set the state's timestamp to the record's assigned event time timestamp
  45. current.lastModified = ctx.timestamp();
  46. // write the state back
  47. state.update(current);
  48. // schedule the next timer 60 seconds from the current event time
  49. ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
  50. }
  51. @Override
  52. public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
  53. throws Exception {
  54. // get the state for the key that scheduled the timer
  55. CountWithTimestamp result = state.value();
  56. // check if this is an outdated timer or the latest timer
  57. if (timestamp == result.lastModified + 60000) {
  58. // emit the state on timeout
  59. out.collect(new Tuple2<String, Long>(result.key, result.count));
  60. }
  61. }
  62. }
  1. import org.apache.flink.api.common.state.ValueState
  2. import org.apache.flink.api.common.state.ValueStateDescriptor
  3. import org.apache.flink.streaming.api.functions.ProcessFunction
  4. import org.apache.flink.streaming.api.functions.ProcessFunction.Context
  5. import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
  6. import org.apache.flink.util.Collector
  7. // the source data stream val stream: DataStream[Tuple2[String, String]] = ...
  8. // apply the process function onto a keyed stream val result: DataStream[Tuple2[String, Long]] = stream
  9. .keyBy(0)
  10. .process(new CountWithTimeoutFunction())
  11. /**
  12. * The data type stored in the state
  13. */
  14. case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
  15. /**
  16. * The implementation of the ProcessFunction that maintains the count and timeouts
  17. */
  18. class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {
  19. /** The state that is maintained by this process function */
  20. lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
  21. .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
  22. override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
  23. // initialize or retrieve/update the state
  24. val current: CountWithTimestamp = state.value match {
  25. case null =>
  26. CountWithTimestamp(value._1, 1, ctx.timestamp)
  27. case CountWithTimestamp(key, count, lastModified) =>
  28. CountWithTimestamp(key, count + 1, ctx.timestamp)
  29. }
  30. // write the state back
  31. state.update(current)
  32. // schedule the next timer 60 seconds from the current event time
  33. ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
  34. }
  35. override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
  36. state.value match {
  37. case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
  38. out.collect((key, count))
  39. case _ =>
  40. }
  41. }
  42. }

NOTE: Before Flink 1.4.0, when called from a processing-time timer, the ProcessFunction.onTimer() method sets the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it’s harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic depends on this wrong timestamp highly likely is unintendedly faulty. So we’ve decided to fix it. Upon upgrading to 1.4.0, Flink jobs that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. 注:在Flink1.4.0之前,当从处理时间计时器调用时,`Process Function.onTimer()方法将当前处理时间设置为事件时间时间戳。这种行为非常微妙,用户可能不会注意到。嗯,这是有害的,因为处理时间戳是不确定的,不与水印对齐。此外,用户实现的逻辑依赖于这个错误的时间戳,很可能是意外的错误。所以我们决定修好它。升级到1.4.0后,使用此不正确事件时间戳的Flink作业将失败,用户应该将其作业调整到正确的逻辑。

The KeyedProcessFunction

键控处理功能

KeyedProcessFunction, as an extension of ProcessFunction, gives access to the key of timers in its onTimer(...) method. “KeyedProcessFunction”作为“ProcessFunction”的一个扩展,可以访问其“OnTimer(…)”方法中的定时器的密钥。

  1. @Override
  2. public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {
  3. K key = ctx.getCurrentKey();
  4. // ...
  5. }
  1. override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {
  2. var key = ctx.getCurrentKey
  3. // ... }

Timers

定时器

Both types of timers (processing-time and event-time) are internally maintained by the TimerService and enqueued for execution. 这两种类型的定时器(处理时间和事件时间)都由‘TimerService’内部维护,并排队等待执行。

The TimerService deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the onTimer() method will be called just once. “TimerService”对每个键的计时器和时间戳进行重复,即每个键最多有一个定时器和时间戳。如果为同一时间戳注册了多个定时器,则将只调用一次‘onTimer()’方法。

Note Flink synchronizes invocations of onTimer() and processElement(). Hence, users do not have to worry about concurrent modification of state. 注意flink同步“ontimer()”和“Processor()”的调用。因此,用户不必担心并发修改状态。

Fault Tolerance

容错(性)

Timers are fault tolerant and checkpointed along with the state of the application. In case of a failure recovery or when starting an application from a savepoint, the timers are restored. 计时器与应用程序的状态一起具有容错性和检查点。如果出现故障恢复,或者从保存点启动应用程序时,计时器将被还原。

Note Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. This might happen when an application recovers from a failure or when it is started from a savepoint. 注:检查点处理-时间计时器,应该在他们恢复之前开火,将立即开火。当应用程序从故障中恢复或从保存点启动时,可能会发生这种情况。

Note Timers are always asynchronously checkpointed, except for the combination of RocksDB backend / with incremental snapshots / with heap-based timers (will be resolved with FLINK-10026). Notice that large numbers of timers can increase the checkpointing time because timers are part of the checkpointed state. See the “Timer Coalescing” section for advice on how to reduce the number of timers. 注定时器总是异步校验,除了RocksDB后端/与增量快照/与基于堆的定时器的组合(将用‘FLINK-10026’解析)。注意,大量计时器会增加检查点时间,因为定时器是校验点状态的一部分。有关如何减少计时器数量的建议,请参阅“计时器合并”部分。

Timer Coalescing

定时器合并

Since Flink maintains only one timer per key and timestamp, you can reduce the number of timers by reducing the timer resolution to coalesce them. 因为Flink每个键只维护一个计时器和时间戳,所以您可以通过降低定时器分辨率来减少计时器的数量。

For a timer resolution of 1 second (event or processing time), you can round down the target time to full seconds. Timers will fire at most 1 second earlier but not later than requested with millisecond accuracy. As a result, there are at most one timer per key and second. 对于1秒(事件或处理时间)的计时器分辨率,您可以将目标时间向下舍入为满秒。计时器将在较早的1秒内发生火灾,但不迟于以毫秒精度要求的时间。因此,每秒最多有一个计时器。

  1. long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
  2. ctx.timerService().registerProcessingTimeTimer(coalescedTime);
  1. val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
  2. ctx.timerService.registerProcessingTimeTimer(coalescedTime)

Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce these timers with the next watermark by using the current one: 由于事件-时间计时器仅使用传入的水印发生火灾,您还可以通过使用当前的一个水印来安排和合并这些计时器和下一个水印:

  1. long coalescedTime = ctx.timerService().currentWatermark() + 1;
  2. ctx.timerService().registerEventTimeTimer(coalescedTime);
  1. val coalescedTime = ctx.timerService.currentWatermark + 1
  2. ctx.timerService.registerEventTimeTimer(coalescedTime)

Timers can also be stopped and removed as follows:

Stopping a processing-time timer:

  1. long timestampOfTimerToStop = ...
  2. ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
  1. val timestampOfTimerToStop = ...
  2. ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)

Stopping an event-time timer:

  1. long timestampOfTimerToStop = ...
  2. ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
  1. val timestampOfTimerToStop = ...
  2. ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)

Note Stopping a timer has no effect if no such timer with the given timestamp is registered. 注如果没有注册带有给定时间戳的定时器,停止定时器就没有效果。