代码仓
https://github.com/ververica/flink-training/tree/master/rides-and-fares

背景

  • 目标
    • 将TaxiRide的start事件和TaxiFare事件做Join,通过RichCoFlatMapFunction函数来实现
  • 期望输出
    • DataStream输出到控制台
  • 涉及到的类
    • RidesAndFaresExercise, RidesAndFaresSolution
    • RidesAndFaresUnitTest, RidesAndFaresIntegrationTest
  • 其他
    • 对每个rideId,我们是无法控制Ride还是Fare事件的到来顺序
    • 在这个练习题中,可以假设每个TaxiRide仅有一个start event和一个end event,以及一个TaxiFare事件
    • 需要清理掉创建的state

代码解析

定义输入输出以及构造函数

  1. private final SourceFunction<TaxiRide> rideSource;
  2. private final SourceFunction<TaxiFare> fareSource;
  3. private final SinkFunction<RideAndFare> sink;
  4. /** Creates a job using the sources and sink provided. */
  5. public RidesAndFaresExercise(
  6. SourceFunction<TaxiRide> rideSource,
  7. SourceFunction<TaxiFare> fareSource,
  8. SinkFunction<RideAndFare> sink) {
  9. this.rideSource = rideSource;
  10. this.fareSource = fareSource;
  11. this.sink = sink;
  12. }

execute

  1. public JobExecutionResult execute() throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. // A stream of taxi ride START events, keyed by rideId.
  4. DataStream<TaxiRide> rides =
  5. env.addSource(rideSource).filter(ride -> ride.isStart).keyBy(ride -> ride.rideId);
  6. // A stream of taxi fare events, also keyed by rideId.
  7. DataStream<TaxiFare> fares = env.addSource(fareSource).keyBy(fare -> fare.rideId);
  8. // Create the pipeline.
  9. rides.connect(fares).flatMap(new EnrichmentFunction()).addSink(sink);
  10. // Execute the pipeline and return the result.
  11. return env.execute("Join Rides with Fares");
  12. }
  • 定义了2个stream,一个rides,一个fares,并且都keyBy rideId。
  • 注意rides流对于TaxiRide进行了过滤,仅保留start事件
  • 此处join采用的是connect方法

    EnrichmentFunction实现

    1. public static class EnrichmentFunction
    2. extends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {
    3. @Override
    4. public void open(Configuration config) throws Exception {
    5. throw new MissingSolutionException();
    6. }
    7. @Override
    8. public void flatMap1(TaxiRide ride, Collector<RideAndFare> out) throws Exception {
    9. throw new MissingSolutionException();
    10. }
    11. @Override
    12. public void flatMap2(TaxiFare fare, Collector<RideAndFare> out) throws Exception {
    13. throw new MissingSolutionException();
    14. }
    15. }

    RichCoFlatMapFunction运行在一个单独的线程里面,用户不需要过多的考虑这些方法的同步问题。

对于rideId而言,有3个事件:ride-start, ride-end, and fare events。其中ride-end已经被过滤掉了,因此flatMap1和flatMap2对于每个key对应的event只会处理1次(前提假设:数据没有重复)。

考虑你需要什么state,且需要关注的是,对于每个rideId,你可能先接收到TaxiRide或者TaxiFare其中之一,然后在一段时间后,接收到对应的另外一个事件。特别还需要注意的每个函数的返回值,接收的入参分别是什么。

TODO代码实现详情请参考 github地址

  1. public static class EnrichmentFunction
  2. extends RichCoFlatMapFunction<TaxiRide,TaxiFare,RideAndFare> {
  3. ValueState<TaxiRide> rideVs;
  4. ValueState<TaxiFare> fareVs;
  5. @Override
  6. public void open(Configuration config){
  7. ValueStateDescriptor rideDes = new ValueStateDescriptor<TaxiRide>("saved ride",TaxiRide.class);
  8. ValueStateDescriptor fareDes = new ValueStateDescriptor<TaxiFare>("saved fare",TaxiFare.class);
  9. rideVs = getRuntimeContext().getState(rideDes);
  10. fareVs = getRuntimeContext().getState(fareDes);
  11. }
  12. /**
  13. * flatMap函数返回类型是void,实际上输出类型都在Collector中体现
  14. * @param ride
  15. * @param out
  16. * @throws IOException
  17. */
  18. @Override
  19. public void flatMap1(TaxiRide ride, Collector<RideAndFare> out) throws IOException {
  20. rideVs.update(ride);
  21. if(fareVs.value() != null){
  22. out.collect(new RideAndFare(ride,fareVs.value()));
  23. }
  24. }
  25. /**
  26. * flatMap函数返回类型是void,实际上输出类型都在Collector中体现
  27. * @param fare
  28. * @param out
  29. * @throws IOException
  30. */
  31. @Override
  32. public void flatMap2(TaxiFare fare, Collector<RideAndFare> out) throws IOException {
  33. fareVs.update(fare);
  34. if(rideVs.value() != null){
  35. out.collect(new RideAndFare(rideVs.value(),fare));
  36. }
  37. }
  38. }

讨论

state的存储是有成本的,我们应该在何时清除状态?

我们可以在处理每个事件,如ride时,看看fareVs中是否有该key;

  • 如果有,那么这两个事件可以做关联输出,将fareVs中的状态清除;
  • 如果没有,更新rideVs即可

    1. public static class EnrichmentFunction
    2. extends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {
    3. private ValueState<TaxiRide> rideState;
    4. private ValueState<TaxiFare> fareState;
    5. @Override
    6. public void open(Configuration config) {
    7. rideState =
    8. getRuntimeContext()
    9. .getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
    10. fareState =
    11. getRuntimeContext()
    12. .getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
    13. }
    14. @Override
    15. public void flatMap1(TaxiRide ride, Collector<RideAndFare> out) throws Exception {
    16. TaxiFare fare = fareState.value();
    17. if (fare != null) {
    18. fareState.clear();
    19. out.collect(new RideAndFare(ride, fare));
    20. } else {
    21. rideState.update(ride);
    22. }
    23. }
    24. @Override
    25. public void flatMap2(TaxiFare fare, Collector<RideAndFare> out) throws Exception {
    26. TaxiRide ride = rideState.value();
    27. if (ride != null) {
    28. rideState.clear();
    29. out.collect(new RideAndFare(ride, fare));
    30. } else {
    31. fareState.update(fare);
    32. }
    33. }

    上面这种写法,把中间过程分步写了,我们很容易能看到对ValueState取value()方法后,返回什么。注意上面代码中任意的state的处理,都是没有指定key的,这是因为框架底层在处理上下文时,已经将key信息对齐了。

    key-partitioned state

  • ValueState:存储,对于每个rideId而言,一个TaxiRide和一个TaxiFare

    • keyBy(rideId)
    • ValueState
    • ValueState
    • 有效的创建了2个分布式map,keyed by rideId
      • rideId → TaxiRide
      • rideId → TaxiFare

这可能是 DataStream API 中最常被误解的部分,所以理解这一点很重要。

尽管看起来很有必要,但是在这个exercise中我们不需要使用以rideId 作为键的 MapState。当使用keyed state时,每个 ValueState 对象已经代表了一个分布式的keyed map。那么 MapState 什么时候有用呢?当需要为原始流的每个不同key存储整个映射时。举个例子

  • MapState:存储,对每个taxiId而言,开过这辆出租车的每个司机的最近一趟行程
    • keyBy(taxiId)
    • MapState
      • driverId → timestamp, driverId → timestamp, …
    • 有效的创建了1个分布式map
      • taxiId → (driverId → timestamp, driverId → timestamp, …)

可以用一个Tuple状态类型来代替2个state吗?

我们可以看到在示例代码中

  1. private ValueState<TaxiRide> rideState;
  2. private ValueState<TaxiFare> fareState;

那么我们是否可以用一种复合结构来代替上面的2种状态?例如,ValueState> state.

答案是:在state存储采用RocksDB情况下,不支持。因为TupleSerializer不能处理null fields,会抛异常。