项目背景
目标
- “长途驾驶警报”练习的目标是在出租车持续超过两个小时时发出警告。
- 应该使用数据流中提供的event timestamp和watermark来完成。
- 流是无序的,并且可能会在其 START 事件之前处理骑行的 END 事件。
- END 事件可能会丢失,但可以假设没有重复的事件,也没有丢失的 START 事件
- 仅仅等待 END 事件并计算持续时间是不够的,因为我们希望尽快收到关于长途驾驶的警报。
- 应用最终应该清除其创建的任何状态。
输入
本练习的输入数据是出租车乘车事件的 DataStream。输出
练习的结果应该是一个 DataStream,其中包含持续时间超过两小时的行程的rideId。
结果流应打印到标准输出。
代码解析
定义输入输出以及构造函数
private final SourceFunction<TaxiRide> source;
private final SinkFunction<Long> sink;
public LongRidesExercise(SourceFunction<TaxiRide> source, SinkFunction<Long> sink){
this.source = source;
this.sink = sink;
}
execute()
- 定义流环境以及输入
- 定义watermarkStrategy
- keyBy处理,process
addSink
public JobExecutionResult execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<TaxiRide> rides = env.addSource(source);
// TODO,自行定义与处理watermarkStrategy,此处用途是什么?
WatermarkStrategy<TaxiRide> watermarkStrategy =
WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner((ride,streamRecordTimestamp) -> ride.getEventTimeMillis());
rides.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(ride -> ride.rideId)
.process(new AlertFunction())
.addSink(sink);
return env.execute();
}
AlertFunction
继承抽象类KeyedProcessFunction
- 分别实现open、processElement、onTimer 3个方法
- open函数处理状态
- processElement函数处理业务逻辑,包括判断状态、生成计时器,处理输出流、清理状态等
- onTimer函数处理触发到计时器后的逻辑
- getTimerTime和RideTooLong是辅助函数
public static class AlertFunction extends KeyedProcessFunction<Long,TaxiRide,Long>{
private ValueState<TaxiRide> rideState;
@Override
public void open(Configuration config){
ValueStateDescriptor<TaxiRide> rideStateDescriptor =
new ValueStateDescriptor<>("ride event", TaxiRide.class);
rideState = getRuntimeContext().getState(rideStateDescriptor);
}
@Override
public void processElement(TaxiRide taxiRide, Context context, Collector<Long> out) throws Exception {
TaxiRide firstRideEvent = rideState.value();
if(firstRideEvent == null){
//无论是开始事件和结束事件谁先到来,都记录到状态中
rideState.update(taxiRide);
// 如果是开始事件,生成一个计时器
if(taxiRide.isStart){
context.timerService().registerEventTimeTimer(getTimerTime(taxiRide));
}
}else{
if(taxiRide.isStart){
if (rideTooLong(taxiRide, firstRideEvent)) {
out.collect(taxiRide.rideId);
}
}else{
//状态中存储了start事件,计时器仍在运行,除非它被触发了。此处删除计时器
context.timerService().deleteEventTimeTimer(getTimerTime(firstRideEvent));
// 有可能没有触发计时器报警,再次判断下输出
if(rideTooLong(firstRideEvent,taxiRide)){
out.collect(taxiRide.rideId);
}
}
//因为两个事件都出现了,所以可以清空状态
//当一个事件丢失时,本处理方法可能会导致内存泄漏
// TODO,后面我们更多讨论
rideState.clear();
}
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Long> out) throws IOException {
// 超时行程,触发计时器
out.collect(rideState.value().rideId);
// 清空状态,防止再次触发。但是会引发一个新问题,当end事件到达时,会漏掉之前的开始状态
rideState.clear();
}
private long getTimerTime(TaxiRide ride) throws RuntimeException {
if (ride.isStart) {
return ride.eventTime.plusSeconds(120 * 60).toEpochMilli();
} else {
throw new RuntimeException("Can not get start time from END event.");
}
}
private boolean rideTooLong(TaxiRide startEvent, TaxiRide endEvent) {
return Duration.between(startEvent.eventTime, endEvent.eventTime)
.compareTo(Duration.ofHours(2))
> 0;
}
}
进一步讨论
分析
- 开始事件丢失,那么结束事件就会一直在状态中(会导致内存泄漏)。
- 结束事件丢失,计时器会被触发并且状态会被清理(it’s ok)
- 计时器被触发且状态被清理掉之后结束事件才到达,这种情况下结束事件也会一直存在状态中(会导致内存泄漏)。
这些泄漏可以采用state TTL或者另一个计时器(timer)来解决,最终会清理掉这些逗留的事件状态。
基本原则
不管我们怎么聪明的设计我们保存状态的机制,以及我们选择保持多长时间,我们最终都应该清除它——否则我们的状态将以无限的方式增长。当然如果丢失了这些信息,我们将面临延迟事件导致错误或重复结果的风险。
这种在无限期地保持状态与在事件延迟时偶尔出错之间的权衡是有状态流处理固有的挑战。
如果你想更近一步
对于其中的每一种情况,添加测试以检查所需的行为。
- 扩展解决方案,使其永远不会泄漏状态。
- 定义事件丢失的含义,检测丢失的 START 和 END 事件,并将他们作为消息通知发送到旁路输出。