代码仓
https://github.com/ververica/flink-training/tree/master/rides-and-fares
背景
- 目标
- 将TaxiRide的start事件和TaxiFare事件做Join,通过RichCoFlatMapFunction函数来实现
- 期望输出
- DataStream
输出到控制台
- DataStream
- 涉及到的类
- RidesAndFaresExercise, RidesAndFaresSolution
- RidesAndFaresUnitTest, RidesAndFaresIntegrationTest
- 其他
- 对每个rideId,我们是无法控制Ride还是Fare事件的到来顺序
- 在这个练习题中,可以假设每个TaxiRide仅有一个start event和一个end event,以及一个TaxiFare事件
- 需要清理掉创建的state
代码解析
定义输入输出以及构造函数
private final SourceFunction<TaxiRide> rideSource;
private final SourceFunction<TaxiFare> fareSource;
private final SinkFunction<RideAndFare> sink;
/** Creates a job using the sources and sink provided. */
public RidesAndFaresExercise(
SourceFunction<TaxiRide> rideSource,
SourceFunction<TaxiFare> fareSource,
SinkFunction<RideAndFare> sink) {
this.rideSource = rideSource;
this.fareSource = fareSource;
this.sink = sink;
}
execute
public JobExecutionResult execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// A stream of taxi ride START events, keyed by rideId.
DataStream<TaxiRide> rides =
env.addSource(rideSource).filter(ride -> ride.isStart).keyBy(ride -> ride.rideId);
// A stream of taxi fare events, also keyed by rideId.
DataStream<TaxiFare> fares = env.addSource(fareSource).keyBy(fare -> fare.rideId);
// Create the pipeline.
rides.connect(fares).flatMap(new EnrichmentFunction()).addSink(sink);
// Execute the pipeline and return the result.
return env.execute("Join Rides with Fares");
}
- 定义了2个stream,一个rides,一个fares,并且都keyBy rideId。
- 注意rides流对于TaxiRide进行了过滤,仅保留start事件
-
EnrichmentFunction实现
public static class EnrichmentFunction
extends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {
@Override
public void open(Configuration config) throws Exception {
throw new MissingSolutionException();
}
@Override
public void flatMap1(TaxiRide ride, Collector<RideAndFare> out) throws Exception {
throw new MissingSolutionException();
}
@Override
public void flatMap2(TaxiFare fare, Collector<RideAndFare> out) throws Exception {
throw new MissingSolutionException();
}
}
RichCoFlatMapFunction运行在一个单独的线程里面,用户不需要过多的考虑这些方法的同步问题。
对于rideId而言,有3个事件:ride-start, ride-end, and fare events。其中ride-end已经被过滤掉了,因此flatMap1和flatMap2对于每个key对应的event只会处理1次(前提假设:数据没有重复)。
考虑你需要什么state,且需要关注的是,对于每个rideId,你可能先接收到TaxiRide或者TaxiFare其中之一,然后在一段时间后,接收到对应的另外一个事件。特别还需要注意的每个函数的返回值,接收的入参分别是什么。
TODO代码实现详情请参考 github地址
public static class EnrichmentFunction
extends RichCoFlatMapFunction<TaxiRide,TaxiFare,RideAndFare> {
ValueState<TaxiRide> rideVs;
ValueState<TaxiFare> fareVs;
@Override
public void open(Configuration config){
ValueStateDescriptor rideDes = new ValueStateDescriptor<TaxiRide>("saved ride",TaxiRide.class);
ValueStateDescriptor fareDes = new ValueStateDescriptor<TaxiFare>("saved fare",TaxiFare.class);
rideVs = getRuntimeContext().getState(rideDes);
fareVs = getRuntimeContext().getState(fareDes);
}
/**
* flatMap函数返回类型是void,实际上输出类型都在Collector中体现
* @param ride
* @param out
* @throws IOException
*/
@Override
public void flatMap1(TaxiRide ride, Collector<RideAndFare> out) throws IOException {
rideVs.update(ride);
if(fareVs.value() != null){
out.collect(new RideAndFare(ride,fareVs.value()));
}
}
/**
* flatMap函数返回类型是void,实际上输出类型都在Collector中体现
* @param fare
* @param out
* @throws IOException
*/
@Override
public void flatMap2(TaxiFare fare, Collector<RideAndFare> out) throws IOException {
fareVs.update(fare);
if(rideVs.value() != null){
out.collect(new RideAndFare(rideVs.value(),fare));
}
}
}
讨论
state的存储是有成本的,我们应该在何时清除状态?
我们可以在处理每个事件,如ride时,看看fareVs中是否有该key;
- 如果有,那么这两个事件可以做关联输出,将fareVs中的状态清除;
如果没有,更新rideVs即可
public static class EnrichmentFunction
extends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {
private ValueState<TaxiRide> rideState;
private ValueState<TaxiFare> fareState;
@Override
public void open(Configuration config) {
rideState =
getRuntimeContext()
.getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
fareState =
getRuntimeContext()
.getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
}
@Override
public void flatMap1(TaxiRide ride, Collector<RideAndFare> out) throws Exception {
TaxiFare fare = fareState.value();
if (fare != null) {
fareState.clear();
out.collect(new RideAndFare(ride, fare));
} else {
rideState.update(ride);
}
}
@Override
public void flatMap2(TaxiFare fare, Collector<RideAndFare> out) throws Exception {
TaxiRide ride = rideState.value();
if (ride != null) {
rideState.clear();
out.collect(new RideAndFare(ride, fare));
} else {
fareState.update(fare);
}
}
上面这种写法,把中间过程分步写了,我们很容易能看到对ValueState取value()方法后,返回什么。注意上面代码中任意的state的处理,都是没有指定key的,这是因为框架底层在处理上下文时,已经将key信息对齐了。
key-partitioned state
ValueState:存储,对于每个rideId而言,一个TaxiRide和一个TaxiFare
- keyBy(rideId)
- ValueState
- ValueState
- 有效的创建了2个分布式map,keyed by rideId
- rideId → TaxiRide
- rideId → TaxiFare
这可能是 DataStream API 中最常被误解的部分,所以理解这一点很重要。
尽管看起来很有必要,但是在这个exercise中我们不需要使用以rideId 作为键的 MapState。当使用keyed state时,每个 ValueState 对象已经代表了一个分布式的keyed map。那么 MapState 什么时候有用呢?当需要为原始流的每个不同key存储整个映射时。举个例子
- MapState:存储,对每个taxiId而言,开过这辆出租车的每个司机的最近一趟行程
- keyBy(taxiId)
- MapState
- driverId → timestamp, driverId → timestamp, …
- 有效的创建了1个分布式map
- taxiId → (driverId → timestamp, driverId → timestamp, …)
可以用一个Tuple状态类型来代替2个state吗?
我们可以看到在示例代码中
private ValueState<TaxiRide> rideState;
private ValueState<TaxiFare> fareState;
那么我们是否可以用一种复合结构来代替上面的2种状态?例如,ValueState
答案是:在state存储采用RocksDB情况下,不支持。因为TupleSerializer不能处理null fields,会抛异常。