构建事件驱动应用
- ProcessFunction结合了
- 有状态的,一次一个事件的处理
- 计时器
- 这个模式有多种形式
- keyed streams
- connected streams
- windowed and broadcast streams
从某种意义上说,API 的其余部分主要是为了方便应用。 ProcessFunction 是更多的通用构建块。ProcessFunction同样可以处理non-keyed streams,只是不能用计时器。
简单有效的API
// Process one element from the input stream.
void processElement(I value, Context ctx, Collector<O> out) throws Exception;
// Called when a timer fires.
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
- ProcessFunctions是Rich Functions;他们可以使用state
- 两个回调都有一个collector,可以用来输出结果
- 两个回调都传递了某种可以访问 TimerService 的上下文
TimerService可以提供
- 当前系统时间
- 当前watermark
创建processing-time和event-time的计时器
Context / OnTimerContext API
public abstract class Context {
// Timestamp of the element currently being processed or timestamp of a firing timer.
public abstract Long timestamp();
// TimerService for querying time and registering timers.
public abstract TimerService timerService();
// ONLY OnTimerContext: The TimeDomain of the firing timer.
public abstract TimeDomain timeDomain();
// Emits a record to the side output identified by the OutputTag.
public abstract <X> void output(OutputTag<X> outputTag, X value);
// ONLY w/ KeyedProcessFunctions: Key of the element being processed.
public abstract K getCurrentKey();
}
这些上下文对象包括相当多的功能。下面首先看看timerService()。
timerService
public interface TimerService {
// returns the current processing time.
long currentProcessingTime();
// returns the current event-time watermark.
long currentWatermark();
// registers a timer to be fired when processing time passes the given time.
void registerProcessingTimeTimer(long time);
// registers a timer to be fired when the event time watermark
// passes the given time.
void registerEventTimeTimer(long time);
}
TimerService 在处理时间和事件时间域中提供对当前时间的访问。它还提供了创建两种模式的计时器的方法。
ProcessFunction案例:心跳监测
背景
一组物联网设备,至少每100ms都会发送一条消息出来。 需要:
- 为每个传入的 deviceId(key)维护一个事件计数器。
- 如果该设备在过去 100 毫秒(处理时间内)没有发生事件,则以 (deviceId, count) 对的形式发出警报
实现思路
- 存储ValueState(keyed/scoped by deviceId)
- count
- last mod timestamp
- 对每个事件
- 更新计数、最后修改时间
- 删除当前计时器
- 从此刻(processing time)开始,注册一个100ms的新计时器
- 每当计时器触发时
- 发送(deviceId, count)
关键类
A POJO for holding state
// the data type stored in the state
public class CountWithTimestamp {
public long count;
public long lastModified;
}
Our application
DataStream<Event> stream = ...
// apply the process function to a keyed stream
DataStream<Tuple2<String, Long>> result = stream
.keyBy(e -> e.deviceId)
.process(new CountWithTimeoutFunction());
ValueState
通过KeyedProcessFunction来处理当前KeyedStream(keyed by deviceId)。
核心方法实现
public class CountWithTimeoutFunction extends
KeyedProcessFunction<String, Event, Tuple2<String, Long>> {
@Override
public void open(Configuration parameters) throws Exception {
// register our state with the state backend
}
@Override
public void processElement(Event event, Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// update our state and timer
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// emit alert
}
}
从上面类可以看出一共需要实现3个方法:open(), processElement(), and onTimer()。注意上面KeyedProcessFunction内接的泛型参数
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(
new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
open的写法和前面举的例子差不多。
static final int TIMEOUT = 100;
@Override
public void processElement(Event event, Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
} else {
ctx.timerService().deleteProcessingTimeTimer(current.lastModified + TIMEOUT);
}
current.count++;
current.lastModified = ctx.timerService().currentProcessingTime();
state.update(current);
ctx.timerService().registerProcessingTimeTimer(current.lastModified + TIMEOUT);
}
定义了一个静态变量TIMEOUT。每个事件到达的时候,都会调用processElement方法来处理,processElement一共做了几件事:
- 获取当前event的key对应的状态,判断状态中是否有该key。
- 如果有,删除原来该key老的定时器;如果没有,生成一个新CountWithTimestamp对象
- 更新CountWithTimestamp值,count和lastModified;并更新state
- 重新注册ProcessingTime计时器。
注意,所有的定时器操作都是通过context的timerService接口下的具体方法操作的。方法里定义的输出Collector对象out没有使用。
@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
CountWithTimestamp result = state.value();
String deviceId = ctx.getCurrentKey();
out.collect(new Tuple2<String, Long>(deviceId, result.count));
}
onTimer方法中处理,当计时器触发时,业务逻辑如何走:
- 根据题目背景,输出是一个Tuple对象
。 - deviceId可以通过OnTimerContext中的key来获取。
- count可以从state中获取,表示当前设备在第几次出现后,100ms内没有发送下一个心跳事件。
思考:是否需要清理状态?
答案是:不需要。因为本题中的设备是有限的,并且每个设备只维护1条状态。
ProcessFunction案例: 无序事件流排序
背景
注:Sorting an O-o-o Event Stream,其中 O-o-o代表Out-of-order。有些场景,比如模式识别或者是时序分析,是需要保证元素的顺序的。
待排序的乱序流如下,跟我们前面讲watermark的例子一样
实现思路
需要缓存一段流数据,因为我们无法保证看到的第1个元素,就是真实排序数据中的第1个元素。如本案例中的11,就不是。
思考:将这些元素缓存在哪里?
PriorityQueue是比较方便的一种选项,会自动排序。
采用一个有界无序watermark,设置5的延迟。watermark(10)的到来标志着10以前的数据完整了,因此10之前的7和9作为有序流的第一部分可以被释放掉。
关键类:排序
DataStream<Event> input = ...;
DataStream<Event> sorted = input
.keyBy(...)
.process(new SortFunction());
public static class SortFunction extends KeyedProcessFunction<KeyType, Event, Event> {
private ValueState<PriorityQueue<Event>> queueState =
getRuntimeContext().getState(…);
@Override
public void processElement(Event event, Context context, Collector<Event> out) {
...
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) {
...
}
}
这不是一个全局排序,但是是对应key内的排序。注意ValueState中的类型。
TODO
没有open()方法?
实现方法
processElement
public void processElement(Event event, Context context, Collector<Event> out) {
TimerService timerService = context.timerService();
// skip late events
if (context.timestamp() > timerService.currentWatermark()) {
PriorityQueue<Event> queue = queueState.value();
if (queue == null) {
queue = new PriorityQueue<>(10, new CompareByTimestamp());
}
queue.add(event);
queueState.update(queue);
timerService.registerEventTimeTimer(context.timestamp());
}
}
context.timestamp()是event的timestamp。这个案例中假设数据不会late(注意,有乱序但是不late)。
然后我们将元素加入到PriorityQueue中。这里有一个可能引发困惑的地方是何时触发定时器。在context.timestamp()设置定时器,期望watermark追赶上当前的事件时间戳时,onTimer()函数被调用。
onTimer
public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) {
PriorityQueue<Event> queue = queueState.value();
Long watermark = context.timerService().currentWatermark();
Event event = queue.peek();
while (event != null && event.timestamp <= watermark) {
out.collect(event);
queue.remove(event);
event = queue.peek();
}
queueState.update(queue);
}
预警:
- 在某些情形下上面这种实现会有性能问题
- 不要使用上面的这种实现方式作为参考方案/上线方案
后面我们讨论状态后端时,会更多的讨论这个预警提示。简单来讲,就是需要在每次访问一些状态后端时,需要反序列化状态字节对象,然后再更新时再序列化。采用PriorityQueue这种实现方案,性能消耗比较大。我们可以采用MapState,而不是ValueState
TODO
为什么不直接用event.timestamp?这里的timerService.currentWatermark()是怎么生成的?
晚到元素如何处理?
在这个例子中,元素18是晚到数据,在上面processElement()实现方法中,我们直接丢弃了晚到的数据。
当一个事件晚到了之后,它错过了在排序输出流中给的卡槽,但是我们可以把晚到的数据单独放到一个输出流中。
Side outputs旁路输出
- 允许从一个用户函数发出多个流
- 发出的流可以有不同的数据类型
- 旁路输出是split()/select()的更高级的替代
- 与 ProcessFunctions、Windows(延迟事件)和 CEP 一起使用
我们没有在本系列文章中没有介绍 split()/select(),因为它已被弃用,并且存在一些重大缺陷。
Context / OnTimerContext API
public abstract class Context {
// Timestamp of the element currently being processed or timestamp of a firing timer.
public abstract Long timestamp();
// TimerService for querying time and registering timers.
public abstract TimerService timerService();
// ONLY OnTimerContext: The TimeDomain of the firing timer.
public abstract TimeDomain timeDomain();
// Emits a record to the side output identified by the OutputTag.
public abstract <X> void output(OutputTag<X> outputTag, X value);
// ONLY w/ KeyedProcessFunctions: Key of the element being processed.
public abstract K getCurrentKey();
}
旁路输出API
static final OutputTag<Event> lateEventsTag = new
OutputTag<Event>("lateEvents") {};
DataStream<Event> input = ...;
SingleOutputStreamOperator<Event> sorted = input
.keyBy(...)
.process(new SortFunction());
DataStream<Event> lateEvents = sorted.getSideOutput(lateEventsTag);
这还是我们前面看到的排序例子,加入了旁路输出。旁路输出和ProcessFunctions一起使用。旁路输出通过对一个具有名称和类型的OutputTag来引用。两个OutputTag对象如果有相同的名字,会被认为是同一个旁路输出。在一个流上应用process()方法的结果是SingleOutputStreamOperator。从这个Operator可以获得跟他相关的旁路输出。
public void processElement(Event event, Context context, Collector<Event> out) {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
PriorityQueue<Event> queue = queueState.value();
if (queue == null) {
queue = new PriorityQueue<>(10, new CompareByTimestamp());
}
queue.add(event);
queueState.update(queue);
timerService.registerEventTimeTimer(event.timestamp);
} else { // event is late
context.output(lateEventsTag, event);
}
}
对比前面的实现,我们在 processElement() 方法中所要做的只是添加一个处理延迟事件的 else 子句。context中有一个输出方法用来发送事件到output tag。
旁路输出用于异常报告
final OutputTag<String> errors = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> task1 = ...;
SingleOutputStreamOperator<Integer> task2 = ...;
SingleOutputStreamOperator<Integer> task3 = ...;
DataStream<String> exceptions1 = task1.getSideOutput(errors);
DataStream<String> exceptions2 = task2.getSideOutput(errors);
DataStream<String> exceptions3 = task3.getSideOutput(errors);
DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);
exceptions.addSink(new FlinkKafkaProducer(...));
在这里,我们将在一系列流程函数中捕获的所有异常收集在一起,并将所有这些发送到一个 kafka 主题。然后使用union函数将他们组合起来。
一些关于Timers的事实
onTimer()什么时候被调用
- Processing time timer
- 当机器的时钟时间到达定时器的时间戳时
- Event time timer
- 当算子的watermark达到或超过计时器的时间戳时
- 对 onTimer() 和 processElement() 方法的调用是同步的
可以用很多定时器
- 定时器成本不高,你可以用很多
- 每个key有个定时器,有数百万个key是相当正常的;但是要避免这数百万个定时器同时触发。
- 定时器是可以删除的
使用案例:投资银行希望每天早上生成一份报告,其中包含前一个工作日的所有未结算交易;在收到交易订单后,他们会创建一个处理时间计时器,设置为第二天上午9点触发;当计时器触发时,他们会检查交易是否未结算。这将导致大量计时器在上午 9 点全部触发。
解决方案1:将计时器分散在一个时间间隔内,例如上午 8 点到 10 点
解决方案2:结算后立即删除交易计时器
定时器是去重的
- 对于任意给定的 key和timestamp,只能有1个定时器
- registerEventTimeTimer(currentWatermark + 1)
- 保证每当当前watermark前进时都会调用 onTimer()
- 由于计时器是去重的,因此在每个事件上执行此操作并且计时器触发消耗并不大;
- 可以通过四舍五入到最接近的秒/分钟/小时来减少正在使用的计时器数量
这种 currentWatermark + 1 习惯用语并不经常需要,但有时应用可能希望在currentWatermark前进时收到通知,这是使用标准的公共 API 实现这一目标的唯一方法。TODO没看懂?long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000; ctx.timerService().registerProcessingTimeTimer(coalescedTime);
定时器去重的事实更经常被利用来减少定时器的数量,方法是将定时器向下舍入到整分钟或整小时(或秒,如示例中)。在本书中将此称为计时器合并。注意,在每个key有多个计时器的情况下才讨论这个。
计时器在checkpoints中
- 故障恢复后,事件时间计时器将继续等待触发watermark
- 故障恢复后,所有应该在中断期间触发的处理时间计时器将立即触发
计时器只在keyed streams有用
但是你总是可以将unkeyed stream转化为keyed streams
- 为每条记录添加一个包含随机数的字段
- 使用它作为key
- keyBy(random.nextLong()) TODO
- keys并不随消息一并传播,他们是按需重新计算的
- 因此,keys必须是确定的
可选解决方案:
- keyBy()一个常量
- 暗含并行度为1
总结
- ProcessFunction是构建Flink事件驱动应用的基础,包括:
- 事件处理
- state
- time
- 旁路输出是一种方便的、类型安全的拆分流的机制,可用于处理
- 晚到事件
- 非常规事件/异常事件
- 错误
- 计时器是分析和事件驱动pipeline的重要且灵活的组成部分
参考
- 另一个例子:
- 官网文档: