代码仓
https://github.com/ververica/flink-training/tree/master/rides-and-fares
背景
- 目标
- 将TaxiRide的start事件和TaxiFare事件做Join,通过RichCoFlatMapFunction函数来实现
- 期望输出
- DataStream
输出到控制台
- DataStream
- 涉及到的类
- RidesAndFaresExercise, RidesAndFaresSolution
- RidesAndFaresUnitTest, RidesAndFaresIntegrationTest
- 其他
- 对每个rideId,我们是无法控制Ride还是Fare事件的到来顺序
- 在这个练习题中,可以假设每个TaxiRide仅有一个start event和一个end event,以及一个TaxiFare事件
- 需要清理掉创建的state
代码解析
定义输入输出以及构造函数
private final SourceFunction<TaxiRide> rideSource;private final SourceFunction<TaxiFare> fareSource;private final SinkFunction<RideAndFare> sink;/** Creates a job using the sources and sink provided. */public RidesAndFaresExercise(SourceFunction<TaxiRide> rideSource,SourceFunction<TaxiFare> fareSource,SinkFunction<RideAndFare> sink) {this.rideSource = rideSource;this.fareSource = fareSource;this.sink = sink;}
execute
public JobExecutionResult execute() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// A stream of taxi ride START events, keyed by rideId.DataStream<TaxiRide> rides =env.addSource(rideSource).filter(ride -> ride.isStart).keyBy(ride -> ride.rideId);// A stream of taxi fare events, also keyed by rideId.DataStream<TaxiFare> fares = env.addSource(fareSource).keyBy(fare -> fare.rideId);// Create the pipeline.rides.connect(fares).flatMap(new EnrichmentFunction()).addSink(sink);// Execute the pipeline and return the result.return env.execute("Join Rides with Fares");}
- 定义了2个stream,一个rides,一个fares,并且都keyBy rideId。
- 注意rides流对于TaxiRide进行了过滤,仅保留start事件
-
EnrichmentFunction实现
public static class EnrichmentFunctionextends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {@Overridepublic void open(Configuration config) throws Exception {throw new MissingSolutionException();}@Overridepublic void flatMap1(TaxiRide ride, Collector<RideAndFare> out) throws Exception {throw new MissingSolutionException();}@Overridepublic void flatMap2(TaxiFare fare, Collector<RideAndFare> out) throws Exception {throw new MissingSolutionException();}}
RichCoFlatMapFunction运行在一个单独的线程里面,用户不需要过多的考虑这些方法的同步问题。
对于rideId而言,有3个事件:ride-start, ride-end, and fare events。其中ride-end已经被过滤掉了,因此flatMap1和flatMap2对于每个key对应的event只会处理1次(前提假设:数据没有重复)。
考虑你需要什么state,且需要关注的是,对于每个rideId,你可能先接收到TaxiRide或者TaxiFare其中之一,然后在一段时间后,接收到对应的另外一个事件。特别还需要注意的每个函数的返回值,接收的入参分别是什么。
TODO代码实现详情请参考 github地址
public static class EnrichmentFunctionextends RichCoFlatMapFunction<TaxiRide,TaxiFare,RideAndFare> {ValueState<TaxiRide> rideVs;ValueState<TaxiFare> fareVs;@Overridepublic void open(Configuration config){ValueStateDescriptor rideDes = new ValueStateDescriptor<TaxiRide>("saved ride",TaxiRide.class);ValueStateDescriptor fareDes = new ValueStateDescriptor<TaxiFare>("saved fare",TaxiFare.class);rideVs = getRuntimeContext().getState(rideDes);fareVs = getRuntimeContext().getState(fareDes);}/*** flatMap函数返回类型是void,实际上输出类型都在Collector中体现* @param ride* @param out* @throws IOException*/@Overridepublic void flatMap1(TaxiRide ride, Collector<RideAndFare> out) throws IOException {rideVs.update(ride);if(fareVs.value() != null){out.collect(new RideAndFare(ride,fareVs.value()));}}/*** flatMap函数返回类型是void,实际上输出类型都在Collector中体现* @param fare* @param out* @throws IOException*/@Overridepublic void flatMap2(TaxiFare fare, Collector<RideAndFare> out) throws IOException {fareVs.update(fare);if(rideVs.value() != null){out.collect(new RideAndFare(rideVs.value(),fare));}}}
讨论
state的存储是有成本的,我们应该在何时清除状态?
我们可以在处理每个事件,如ride时,看看fareVs中是否有该key;
- 如果有,那么这两个事件可以做关联输出,将fareVs中的状态清除;
如果没有,更新rideVs即可
public static class EnrichmentFunctionextends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {private ValueState<TaxiRide> rideState;private ValueState<TaxiFare> fareState;@Overridepublic void open(Configuration config) {rideState =getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));fareState =getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));}@Overridepublic void flatMap1(TaxiRide ride, Collector<RideAndFare> out) throws Exception {TaxiFare fare = fareState.value();if (fare != null) {fareState.clear();out.collect(new RideAndFare(ride, fare));} else {rideState.update(ride);}}@Overridepublic void flatMap2(TaxiFare fare, Collector<RideAndFare> out) throws Exception {TaxiRide ride = rideState.value();if (ride != null) {rideState.clear();out.collect(new RideAndFare(ride, fare));} else {fareState.update(fare);}}
上面这种写法,把中间过程分步写了,我们很容易能看到对ValueState取value()方法后,返回什么。注意上面代码中任意的state的处理,都是没有指定key的,这是因为框架底层在处理上下文时,已经将key信息对齐了。
key-partitioned state
ValueState:存储,对于每个rideId而言,一个TaxiRide和一个TaxiFare
- keyBy(rideId)
- ValueState
- ValueState
- 有效的创建了2个分布式map,keyed by rideId
- rideId → TaxiRide
- rideId → TaxiFare
这可能是 DataStream API 中最常被误解的部分,所以理解这一点很重要。
尽管看起来很有必要,但是在这个exercise中我们不需要使用以rideId 作为键的 MapState。当使用keyed state时,每个 ValueState 对象已经代表了一个分布式的keyed map。那么 MapState 什么时候有用呢?当需要为原始流的每个不同key存储整个映射时。举个例子
- MapState:存储,对每个taxiId而言,开过这辆出租车的每个司机的最近一趟行程
- keyBy(taxiId)
- MapState
- driverId → timestamp, driverId → timestamp, …
- 有效的创建了1个分布式map
- taxiId → (driverId → timestamp, driverId → timestamp, …)
可以用一个Tuple状态类型来代替2个state吗?
我们可以看到在示例代码中
private ValueState<TaxiRide> rideState;private ValueState<TaxiFare> fareState;
那么我们是否可以用一种复合结构来代替上面的2种状态?例如,ValueState
答案是:在state存储采用RocksDB情况下,不支持。因为TupleSerializer不能处理null fields,会抛异常。
