代码解析

定义输入输出以及构造函数

  1. private final SourceFunction<TaxiRide> source;
  2. private final SinkFunction<Long> sink;
  3. public LongRidesExercise(SourceFunction<TaxiRide> source, SinkFunction<Long> sink){
  4. this.source = source;
  5. this.sink = sink;
  6. }

execute()

  • 定义流环境以及输入
  • 定义watermarkStrategy
  • keyBy处理,process
  • addSink

    1. public JobExecutionResult execute() throws Exception {
    2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    3. DataStream<TaxiRide> rides = env.addSource(source);
    4. // TODO,自行定义与处理watermarkStrategy,此处用途是什么?
    5. WatermarkStrategy<TaxiRide> watermarkStrategy =
    6. WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60))
    7. .withTimestampAssigner((ride,streamRecordTimestamp) -> ride.getEventTimeMillis());
    8. rides.assignTimestampsAndWatermarks(watermarkStrategy)
    9. .keyBy(ride -> ride.rideId)
    10. .process(new AlertFunction())
    11. .addSink(sink);
    12. return env.execute();
    13. }

    AlertFunction

  • 继承抽象类KeyedProcessFunction

  • 分别实现open、processElement、onTimer 3个方法
  • open函数处理状态
  • processElement函数处理业务逻辑,包括判断状态、生成计时器,处理输出流、清理状态等
  • onTimer函数处理触发到计时器后的逻辑
  • getTimerTime和RideTooLong是辅助函数
  1. public static class AlertFunction extends KeyedProcessFunction<Long,TaxiRide,Long>{
  2. private ValueState<TaxiRide> rideState;
  3. @Override
  4. public void open(Configuration config){
  5. ValueStateDescriptor<TaxiRide> rideStateDescriptor =
  6. new ValueStateDescriptor<>("ride event", TaxiRide.class);
  7. rideState = getRuntimeContext().getState(rideStateDescriptor);
  8. }
  9. @Override
  10. public void processElement(TaxiRide taxiRide, Context context, Collector<Long> out) throws Exception {
  11. TaxiRide firstRideEvent = rideState.value();
  12. if(firstRideEvent == null){
  13. //无论是开始事件和结束事件谁先到来,都记录到状态中
  14. rideState.update(taxiRide);
  15. // 如果是开始事件,生成一个计时器
  16. if(taxiRide.isStart){
  17. context.timerService().registerEventTimeTimer(getTimerTime(taxiRide));
  18. }
  19. }else{
  20. if(taxiRide.isStart){
  21. if (rideTooLong(taxiRide, firstRideEvent)) {
  22. out.collect(taxiRide.rideId);
  23. }
  24. }else{
  25. //状态中存储了start事件,计时器仍在运行,除非它被触发了。此处删除计时器
  26. context.timerService().deleteEventTimeTimer(getTimerTime(firstRideEvent));
  27. // 有可能没有触发计时器报警,再次判断下输出
  28. if(rideTooLong(firstRideEvent,taxiRide)){
  29. out.collect(taxiRide.rideId);
  30. }
  31. }
  32. //因为两个事件都出现了,所以可以清空状态
  33. //当一个事件丢失时,本处理方法可能会导致内存泄漏
  34. // TODO,后面我们更多讨论
  35. rideState.clear();
  36. }
  37. }
  38. @Override
  39. public void onTimer(long timestamp, OnTimerContext context, Collector<Long> out) throws IOException {
  40. // 超时行程,触发计时器
  41. out.collect(rideState.value().rideId);
  42. // 清空状态,防止再次触发。但是会引发一个新问题,当end事件到达时,会漏掉之前的开始状态
  43. rideState.clear();
  44. }
  45. private long getTimerTime(TaxiRide ride) throws RuntimeException {
  46. if (ride.isStart) {
  47. return ride.eventTime.plusSeconds(120 * 60).toEpochMilli();
  48. } else {
  49. throw new RuntimeException("Can not get start time from END event.");
  50. }
  51. }
  52. private boolean rideTooLong(TaxiRide startEvent, TaxiRide endEvent) {
  53. return Duration.between(startEvent.eventTime, endEvent.eventTime)
  54. .compareTo(Duration.ofHours(2))
  55. > 0;
  56. }
  57. }