代码解析
定义输入输出以及构造函数
private final SourceFunction<TaxiRide> source;private final SinkFunction<Long> sink;public LongRidesExercise(SourceFunction<TaxiRide> source, SinkFunction<Long> sink){this.source = source;this.sink = sink;}
execute()
- 定义流环境以及输入
- 定义watermarkStrategy
- keyBy处理,process
addSink
public JobExecutionResult execute() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<TaxiRide> rides = env.addSource(source);// TODO,自行定义与处理watermarkStrategy,此处用途是什么?WatermarkStrategy<TaxiRide> watermarkStrategy =WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60)).withTimestampAssigner((ride,streamRecordTimestamp) -> ride.getEventTimeMillis());rides.assignTimestampsAndWatermarks(watermarkStrategy).keyBy(ride -> ride.rideId).process(new AlertFunction()).addSink(sink);return env.execute();}
AlertFunction
继承抽象类KeyedProcessFunction
- 分别实现open、processElement、onTimer 3个方法
- open函数处理状态
- processElement函数处理业务逻辑,包括判断状态、生成计时器,处理输出流、清理状态等
- onTimer函数处理触发到计时器后的逻辑
- getTimerTime和RideTooLong是辅助函数
public static class AlertFunction extends KeyedProcessFunction<Long,TaxiRide,Long>{private ValueState<TaxiRide> rideState;@Overridepublic void open(Configuration config){ValueStateDescriptor<TaxiRide> rideStateDescriptor =new ValueStateDescriptor<>("ride event", TaxiRide.class);rideState = getRuntimeContext().getState(rideStateDescriptor);}@Overridepublic void processElement(TaxiRide taxiRide, Context context, Collector<Long> out) throws Exception {TaxiRide firstRideEvent = rideState.value();if(firstRideEvent == null){//无论是开始事件和结束事件谁先到来,都记录到状态中rideState.update(taxiRide);// 如果是开始事件,生成一个计时器if(taxiRide.isStart){context.timerService().registerEventTimeTimer(getTimerTime(taxiRide));}}else{if(taxiRide.isStart){if (rideTooLong(taxiRide, firstRideEvent)) {out.collect(taxiRide.rideId);}}else{//状态中存储了start事件,计时器仍在运行,除非它被触发了。此处删除计时器context.timerService().deleteEventTimeTimer(getTimerTime(firstRideEvent));// 有可能没有触发计时器报警,再次判断下输出if(rideTooLong(firstRideEvent,taxiRide)){out.collect(taxiRide.rideId);}}//因为两个事件都出现了,所以可以清空状态//当一个事件丢失时,本处理方法可能会导致内存泄漏// TODO,后面我们更多讨论rideState.clear();}}@Overridepublic void onTimer(long timestamp, OnTimerContext context, Collector<Long> out) throws IOException {// 超时行程,触发计时器out.collect(rideState.value().rideId);// 清空状态,防止再次触发。但是会引发一个新问题,当end事件到达时,会漏掉之前的开始状态rideState.clear();}private long getTimerTime(TaxiRide ride) throws RuntimeException {if (ride.isStart) {return ride.eventTime.plusSeconds(120 * 60).toEpochMilli();} else {throw new RuntimeException("Can not get start time from END event.");}}private boolean rideTooLong(TaxiRide startEvent, TaxiRide endEvent) {return Duration.between(startEvent.eventTime, endEvent.eventTime).compareTo(Duration.ofHours(2))> 0;}}
