构建事件驱动应用

  • ProcessFunction结合了
    • 有状态的,一次一个事件的处理
    • 计时器
  • 这个模式有多种形式
    • keyed streams
    • connected streams
    • windowed and broadcast streams

从某种意义上说,API 的其余部分主要是为了方便应用。 ProcessFunction 是更多的通用构建块。ProcessFunction同样可以处理non-keyed streams,只是不能用计时器。

简单有效的API

  1. // Process one element from the input stream.
  2. void processElement(I value, Context ctx, Collector<O> out) throws Exception;
  3. // Called when a timer fires.
  4. void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
  • ProcessFunctions是Rich Functions;他们可以使用state
  • 两个回调都有一个collector,可以用来输出结果
  • 两个回调都传递了某种可以访问 TimerService 的上下文
  • TimerService可以提供

    • 当前系统时间
    • 当前watermark
    • 创建processing-time和event-time的计时器

      Context / OnTimerContext API

      1. public abstract class Context {
      2. // Timestamp of the element currently being processed or timestamp of a firing timer.
      3. public abstract Long timestamp();
      4. // TimerService for querying time and registering timers.
      5. public abstract TimerService timerService();
      6. // ONLY OnTimerContext: The TimeDomain of the firing timer.
      7. public abstract TimeDomain timeDomain();
      8. // Emits a record to the side output identified by the OutputTag.
      9. public abstract <X> void output(OutputTag<X> outputTag, X value);
      10. // ONLY w/ KeyedProcessFunctions: Key of the element being processed.
      11. public abstract K getCurrentKey();
      12. }

      这些上下文对象包括相当多的功能。下面首先看看timerService()。

      timerService

      1. public interface TimerService {
      2. // returns the current processing time.
      3. long currentProcessingTime();
      4. // returns the current event-time watermark.
      5. long currentWatermark();
      6. // registers a timer to be fired when processing time passes the given time.
      7. void registerProcessingTimeTimer(long time);
      8. // registers a timer to be fired when the event time watermark
      9. // passes the given time.
      10. void registerEventTimeTimer(long time);
      11. }

      TimerService 在处理时间和事件时间域中提供对当前时间的访问。它还提供了创建两种模式的计时器的方法。

ProcessFunction案例:心跳监测

背景

一组物联网设备,至少每100ms都会发送一条消息出来。 需要:

  • 为每个传入的 deviceId(key)维护一个事件计数器。
  • 如果该设备在过去 100 毫秒(处理时间内)没有发生事件,则以 (deviceId, count) 对的形式发出警报

实现思路

  • 存储ValueState(keyed/scoped by deviceId)
    • count
    • last mod timestamp
  • 对每个事件
    • 更新计数、最后修改时间
    • 删除当前计时器
    • 从此刻(processing time)开始,注册一个100ms的新计时器
  • 每当计时器触发时
    • 发送(deviceId, count)

关键类

A POJO for holding state

  1. // the data type stored in the state
  2. public class CountWithTimestamp {
  3. public long count;
  4. public long lastModified;
  5. }

Our application

  1. DataStream<Event> stream = ...
  2. // apply the process function to a keyed stream
  3. DataStream<Tuple2<String, Long>> result = stream
  4. .keyBy(e -> e.deviceId)
  5. .process(new CountWithTimeoutFunction());

ValueState是一个 POJO类型,我们会在ValueState中使用。我们也可以将deviceId存储在CountWithTimestamp中,但是不太有必要,因为在context中key会被传递给onTimer()函数。

通过KeyedProcessFunction来处理当前KeyedStream(keyed by deviceId)。

核心方法实现

  1. public class CountWithTimeoutFunction extends
  2. KeyedProcessFunction<String, Event, Tuple2<String, Long>> {
  3. @Override
  4. public void open(Configuration parameters) throws Exception {
  5. // register our state with the state backend
  6. }
  7. @Override
  8. public void processElement(Event event, Context ctx,
  9. Collector<Tuple2<String, Long>> out) throws Exception {
  10. // update our state and timer
  11. }
  12. @Override
  13. public void onTimer(long timestamp, OnTimerContext ctx,
  14. Collector<Tuple2<String, Long>> out) throws Exception {
  15. // emit alert
  16. }
  17. }

从上面类可以看出一共需要实现3个方法:open(), processElement(), and onTimer()。注意上面KeyedProcessFunction内接的泛型参数

  1. private ValueState<CountWithTimestamp> state;
  2. @Override
  3. public void open(Configuration parameters) throws Exception {
  4. state = getRuntimeContext().getState(
  5. new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
  6. }

open的写法和前面举的例子差不多。

  1. static final int TIMEOUT = 100;
  2. @Override
  3. public void processElement(Event event, Context ctx,
  4. Collector<Tuple2<String, Long>> out) throws Exception {
  5. CountWithTimestamp current = state.value();
  6. if (current == null) {
  7. current = new CountWithTimestamp();
  8. } else {
  9. ctx.timerService().deleteProcessingTimeTimer(current.lastModified + TIMEOUT);
  10. }
  11. current.count++;
  12. current.lastModified = ctx.timerService().currentProcessingTime();
  13. state.update(current);
  14. ctx.timerService().registerProcessingTimeTimer(current.lastModified + TIMEOUT);
  15. }

定义了一个静态变量TIMEOUT。每个事件到达的时候,都会调用processElement方法来处理,processElement一共做了几件事:

  1. 获取当前event的key对应的状态,判断状态中是否有该key。
  2. 如果有,删除原来该key老的定时器;如果没有,生成一个新CountWithTimestamp对象
  3. 更新CountWithTimestamp值,count和lastModified;并更新state
  4. 重新注册ProcessingTime计时器。

注意,所有的定时器操作都是通过context的timerService接口下的具体方法操作的。方法里定义的输出Collector对象out没有使用。

  1. @Override
  2. public void onTimer(long timestamp, OnTimerContext ctx,
  3. Collector<Tuple2<String, Long>> out) throws Exception {
  4. CountWithTimestamp result = state.value();
  5. String deviceId = ctx.getCurrentKey();
  6. out.collect(new Tuple2<String, Long>(deviceId, result.count));
  7. }

onTimer方法中处理,当计时器触发时,业务逻辑如何走:

  1. 根据题目背景,输出是一个Tuple对象
  2. deviceId可以通过OnTimerContext中的key来获取。
  3. count可以从state中获取,表示当前设备在第几次出现后,100ms内没有发送下一个心跳事件。

思考:是否需要清理状态?
答案是:不需要。因为本题中的设备是有限的,并且每个设备只维护1条状态。

ProcessFunction案例: 无序事件流排序

背景

注:Sorting an O-o-o Event Stream,其中 O-o-o代表Out-of-order。有些场景,比如模式识别或者是时序分析,是需要保证元素的顺序的。

待排序的乱序流如下,跟我们前面讲watermark的例子一样
image.png

实现思路

需要缓存一段流数据,因为我们无法保证看到的第1个元素,就是真实排序数据中的第1个元素。如本案例中的11,就不是。

思考:将这些元素缓存在哪里?
PriorityQueue是比较方便的一种选项,会自动排序。
image.png
image.png
采用一个有界无序watermark,设置5的延迟。watermark(10)的到来标志着10以前的数据完整了,因此10之前的7和9作为有序流的第一部分可以被释放掉。
image.png
image.png

关键类:排序

  1. DataStream<Event> input = ...;
  2. DataStream<Event> sorted = input
  3. .keyBy(...)
  4. .process(new SortFunction());
  5. public static class SortFunction extends KeyedProcessFunction<KeyType, Event, Event> {
  6. private ValueState<PriorityQueue<Event>> queueState =
  7. getRuntimeContext().getState(…);
  8. @Override
  9. public void processElement(Event event, Context context, Collector<Event> out) {
  10. ...
  11. }
  12. @Override
  13. public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) {
  14. ...
  15. }
  16. }

这不是一个全局排序,但是是对应key内的排序。注意ValueState中的类型。
TODO
没有open()方法?

实现方法

processElement

  1. public void processElement(Event event, Context context, Collector<Event> out) {
  2. TimerService timerService = context.timerService();
  3. // skip late events
  4. if (context.timestamp() > timerService.currentWatermark()) {
  5. PriorityQueue<Event> queue = queueState.value();
  6. if (queue == null) {
  7. queue = new PriorityQueue<>(10, new CompareByTimestamp());
  8. }
  9. queue.add(event);
  10. queueState.update(queue);
  11. timerService.registerEventTimeTimer(context.timestamp());
  12. }
  13. }

context.timestamp()是event的timestamp。这个案例中假设数据不会late(注意,有乱序但是不late)。
然后我们将元素加入到PriorityQueue中。这里有一个可能引发困惑的地方是何时触发定时器。在context.timestamp()设置定时器,期望watermark追赶上当前的事件时间戳时,onTimer()函数被调用。

onTimer

public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) {
    PriorityQueue<Event> queue = queueState.value();
    Long watermark = context.timerService().currentWatermark();
    Event event = queue.peek();
    while (event != null && event.timestamp <= watermark) {
        out.collect(event);
        queue.remove(event);
        event = queue.peek();
    }
    queueState.update(queue);
}

预警:

  • 在某些情形下上面这种实现会有性能问题
  • 不要使用上面的这种实现方式作为参考方案/上线方案

后面我们讨论状态后端时,会更多的讨论这个预警提示。简单来讲,就是需要在每次访问一些状态后端时,需要反序列化状态字节对象,然后再更新时再序列化。采用PriorityQueue这种实现方案,性能消耗比较大。我们可以采用MapState,而不是ValueState,但是实现起来更复杂。CEP和Table API就是采用MapState实现的。
TODO
为什么不直接用event.timestamp?这里的timerService.currentWatermark()是怎么生成的?

晚到元素如何处理?

image.png
在这个例子中,元素18是晚到数据,在上面processElement()实现方法中,我们直接丢弃了晚到的数据。

当一个事件晚到了之后,它错过了在排序输出流中给的卡槽,但是我们可以把晚到的数据单独放到一个输出流中。

Side outputs旁路输出

  • 允许从一个用户函数发出多个流
  • 发出的流可以有不同的数据类型
  • 旁路输出是split()/select()的更高级的替代
  • 与 ProcessFunctions、Windows(延迟事件)和 CEP 一起使用

我们没有在本系列文章中没有介绍 split()/select(),因为它已被弃用,并且存在一些重大缺陷。

Context / OnTimerContext API

public abstract class Context {

    // Timestamp of the element currently being processed or timestamp of a firing timer.
    public abstract Long timestamp();

    // TimerService for querying time and registering timers.
    public abstract TimerService timerService();

    // ONLY OnTimerContext: The TimeDomain of the firing timer.
    public abstract TimeDomain timeDomain();

    // Emits a record to the side output identified by the OutputTag.
    public abstract <X> void output(OutputTag<X> outputTag, X value);

    // ONLY w/ KeyedProcessFunctions: Key of the element being processed.
    public abstract K getCurrentKey();
}

旁路输出API

static final OutputTag<Event> lateEventsTag = new
    OutputTag<Event>("lateEvents") {};

DataStream<Event> input = ...;

SingleOutputStreamOperator<Event> sorted = input
    .keyBy(...)
    .process(new SortFunction());

DataStream<Event> lateEvents = sorted.getSideOutput(lateEventsTag);

这还是我们前面看到的排序例子,加入了旁路输出。旁路输出和ProcessFunctions一起使用。旁路输出通过对一个具有名称和类型的OutputTag来引用。两个OutputTag对象如果有相同的名字,会被认为是同一个旁路输出。在一个流上应用process()方法的结果是SingleOutputStreamOperator。从这个Operator可以获得跟他相关的旁路输出。

public void processElement(Event event, Context context, Collector<Event> out) {
    TimerService timerService = context.timerService();

    if (context.timestamp() > timerService.currentWatermark()) {
        PriorityQueue<Event> queue = queueState.value();
        if (queue == null) {
            queue = new PriorityQueue<>(10, new CompareByTimestamp());
        }
        queue.add(event);
        queueState.update(queue);
        timerService.registerEventTimeTimer(event.timestamp);
    } else { // event is late
        context.output(lateEventsTag, event);
    }
}

对比前面的实现,我们在 processElement() 方法中所要做的只是添加一个处理延迟事件的 else 子句。context中有一个输出方法用来发送事件到output tag。

旁路输出用于异常报告

final OutputTag<String> errors = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<Integer> task1 = ...;
SingleOutputStreamOperator<Integer> task2 = ...;
SingleOutputStreamOperator<Integer> task3 = ...;

DataStream<String> exceptions1 = task1.getSideOutput(errors);
DataStream<String> exceptions2 = task2.getSideOutput(errors);
DataStream<String> exceptions3 = task3.getSideOutput(errors);

DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);

exceptions.addSink(new FlinkKafkaProducer(...));

在这里,我们将在一系列流程函数中捕获的所有异常收集在一起,并将所有这些发送到一个 kafka 主题。然后使用union函数将他们组合起来。

一些关于Timers的事实

onTimer()什么时候被调用

  • Processing time timer
    • 当机器的时钟时间到达定时器的时间戳时
  • Event time timer
    • 当算子的watermark达到或超过计时器的时间戳时
  • 对 onTimer() 和 processElement() 方法的调用是同步的

可以用很多定时器

  • 定时器成本不高,你可以用很多
  • 每个key有个定时器,有数百万个key是相当正常的;但是要避免这数百万个定时器同时触发。
  • 定时器是可以删除的

使用案例:投资银行希望每天早上生成一份报告,其中包含前一个工作日的所有未结算交易;在收到交易订单后,他们会创建一个处理时间计时器,设置为第二天上午9点触发;当计时器触发时,他们会检查交易是否未结算。这将导致大量计时器在上午 9 点全部触发。
解决方案1:将计时器分散在一个时间间隔内,例如上午 8 点到 10 点
解决方案2:结算后立即删除交易计时器

定时器是去重的

  • 对于任意给定的 key和timestamp,只能有1个定时器
  • registerEventTimeTimer(currentWatermark + 1)
    • 保证每当当前watermark前进时都会调用 onTimer()
    • 由于计时器是去重的,因此在每个事件上执行此操作并且计时器触发消耗并不大;
  • 可以通过四舍五入到最接近的秒/分钟/小时来减少正在使用的计时器数量
    long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
    ctx.timerService().registerProcessingTimeTimer(coalescedTime);
    
    这种 currentWatermark + 1 习惯用语并不经常需要,但有时应用可能希望在currentWatermark前进时收到通知,这是使用标准的公共 API 实现这一目标的唯一方法。TODO没看懂?

定时器去重的事实更经常被利用来减少定时器的数量,方法是将定时器向下舍入到整分钟或整小时(或秒,如示例中)。在本书中将此称为计时器合并。注意,在每个key有多个计时器的情况下才讨论这个。

计时器在checkpoints中

  • 故障恢复后,事件时间计时器将继续等待触发watermark
  • 故障恢复后,所有应该在中断期间触发的处理时间计时器将立即触发

计时器只在keyed streams有用

但是你总是可以将unkeyed stream转化为keyed streams

  • 为每条记录添加一个包含随机数的字段
  • 使用它作为key
  • keyBy(random.nextLong()) TODO
    • keys并不随消息一并传播,他们是按需重新计算的
    • 因此,keys必须是确定的

可选解决方案:

  • keyBy()一个常量
  • 暗含并行度为1

总结

  • ProcessFunction是构建Flink事件驱动应用的基础,包括:
    • 事件处理
    • state
    • time
  • 旁路输出是一种方便的、类型安全的拆分流的机制,可用于处理
    • 晚到事件
    • 非常规事件/异常事件
    • 错误
  • 计时器是分析和事件驱动pipeline的重要且灵活的组成部分

参考