代码解析
定义输入输出以及构造函数
private final SourceFunction<TaxiRide> source;
private final SinkFunction<Long> sink;
public LongRidesExercise(SourceFunction<TaxiRide> source, SinkFunction<Long> sink){
this.source = source;
this.sink = sink;
}
execute()
- 定义流环境以及输入
- 定义watermarkStrategy
- keyBy处理,process
addSink
public JobExecutionResult execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<TaxiRide> rides = env.addSource(source);
// TODO,自行定义与处理watermarkStrategy,此处用途是什么?
WatermarkStrategy<TaxiRide> watermarkStrategy =
WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner((ride,streamRecordTimestamp) -> ride.getEventTimeMillis());
rides.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(ride -> ride.rideId)
.process(new AlertFunction())
.addSink(sink);
return env.execute();
}
AlertFunction
继承抽象类KeyedProcessFunction
- 分别实现open、processElement、onTimer 3个方法
- open函数处理状态
- processElement函数处理业务逻辑,包括判断状态、生成计时器,处理输出流、清理状态等
- onTimer函数处理触发到计时器后的逻辑
- getTimerTime和RideTooLong是辅助函数
public static class AlertFunction extends KeyedProcessFunction<Long,TaxiRide,Long>{
private ValueState<TaxiRide> rideState;
@Override
public void open(Configuration config){
ValueStateDescriptor<TaxiRide> rideStateDescriptor =
new ValueStateDescriptor<>("ride event", TaxiRide.class);
rideState = getRuntimeContext().getState(rideStateDescriptor);
}
@Override
public void processElement(TaxiRide taxiRide, Context context, Collector<Long> out) throws Exception {
TaxiRide firstRideEvent = rideState.value();
if(firstRideEvent == null){
//无论是开始事件和结束事件谁先到来,都记录到状态中
rideState.update(taxiRide);
// 如果是开始事件,生成一个计时器
if(taxiRide.isStart){
context.timerService().registerEventTimeTimer(getTimerTime(taxiRide));
}
}else{
if(taxiRide.isStart){
if (rideTooLong(taxiRide, firstRideEvent)) {
out.collect(taxiRide.rideId);
}
}else{
//状态中存储了start事件,计时器仍在运行,除非它被触发了。此处删除计时器
context.timerService().deleteEventTimeTimer(getTimerTime(firstRideEvent));
// 有可能没有触发计时器报警,再次判断下输出
if(rideTooLong(firstRideEvent,taxiRide)){
out.collect(taxiRide.rideId);
}
}
//因为两个事件都出现了,所以可以清空状态
//当一个事件丢失时,本处理方法可能会导致内存泄漏
// TODO,后面我们更多讨论
rideState.clear();
}
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Long> out) throws IOException {
// 超时行程,触发计时器
out.collect(rideState.value().rideId);
// 清空状态,防止再次触发。但是会引发一个新问题,当end事件到达时,会漏掉之前的开始状态
rideState.clear();
}
private long getTimerTime(TaxiRide ride) throws RuntimeException {
if (ride.isStart) {
return ride.eventTime.plusSeconds(120 * 60).toEpochMilli();
} else {
throw new RuntimeException("Can not get start time from END event.");
}
}
private boolean rideTooLong(TaxiRide startEvent, TaxiRide endEvent) {
return Duration.between(startEvent.eventTime, endEvent.eventTime)
.compareTo(Duration.ofHours(2))
> 0;
}
}