- 从RideCleansingSolution开始
- 目标
- 分割TaxiRide消息流
- 一个流仅包含在纽约市开始和结束的行程(如前所述)
- 另一条流包含其他行程(其他城市的行程)
- 分割TaxiRide消息流
- 期待输出
- 一个DataStream
打印到stdout(纽约市内行程) - 一个DataStream
打印到stderr(纽约市外行程) - 使用new PrintSinkFunction<>(true) 来创建一个sink到stderr
- 一个DataStream
- 注意
- 这种拆分可以通过 FilterFunctions 或旁路输出来实现
- 尝试并比较这两种方法
解决方案一:filter
public class RideSplitFilterSolution {
private final SourceFunction<TaxiRide> source;
private final SinkFunction<TaxiRide> sink;
private final SinkFunction<TaxiRide> sidesink;
/** Creates a job using the source and sinks provided. */
public RideSplitFilterSolution(
SourceFunction<TaxiRide> source,
SinkFunction<TaxiRide> sink,
SinkFunction<TaxiRide> sidesink) {
this.source = source;
this.sink = sink;
this.sidesink = sidesink;
}
/**
* Main method.
*
* @throws Exception which occurs during job execution.
*/
public static void main(String[] args) throws Exception {
RideSplitFilterSolution job =
new RideSplitFilterSolution(
new TaxiRideGenerator(),
new PrintSinkFunction<>(),
new PrintSinkFunction<>(true));
job.execute();
}
public JobExecutionResult execute() throws Exception {
// set up streaming execution environment
Configuration conf = new Configuration();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// attach a stream of TaxiRides
DataStream<TaxiRide> rides = env.addSource(source);
// split the stream
rides.filter(ride -> inNYC(ride)).addSink(sink).name("inside NYC");
rides.filter(ride -> !inNYC(ride)).addSink(sidesink).name("outside NYC");
// run the pipeline and return the result
return env.execute("Split with filters");
}
/** Return true for rides that both start and end in NYC. */
private static boolean inNYC(TaxiRide taxiRide) {
return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat)
&& GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
}
这是使用一组 FilterFunction 的解决方案的核心。我们已经分叉了行程,通过过滤器发送该流的一个副本,该过滤器保持在纽约市内开始和结束的行程,并通过执行相反操作的过滤器发送另一个副本。我们为接收器sink命名,以便在 Flink 仪表板中轻松识别它们。
我们将整个流的完整副本发送到两个过滤器。 因为在管道中的这一点上正在改变并行性,所以这很昂贵。 使用此解决方案,sink和filter构成chain。
解决方案二:基于旁路输出
public class RideSplitSideOutputSolution {
private final SourceFunction<TaxiRide> source;
private final SinkFunction<TaxiRide> sink;
private final SinkFunction<TaxiRide> sidesink;
private static final OutputTag<TaxiRide> outsideNYC = new OutputTag<TaxiRide>("outsideNYC") {};
/** Creates a job using the source and sinks provided. */
public RideSplitSideOutputSolution(
SourceFunction<TaxiRide> source,
SinkFunction<TaxiRide> sink,
SinkFunction<TaxiRide> sidesink) {
this.source = source;
this.sink = sink;
this.sidesink = sidesink;
}
/**
* Main method.
*
* @throws Exception which occurs during job execution.
*/
public static void main(String[] args) throws Exception {
RideSplitSideOutputSolution job =
new RideSplitSideOutputSolution(
new TaxiRideGenerator(),
new PrintSinkFunction<>(),
new PrintSinkFunction<>(true));
job.execute();
}
/**
* Creates and executes the long rides pipeline.
*
* @return {JobExecutionResult}
* @throws Exception which occurs during job execution.
*/
public JobExecutionResult execute() throws Exception {
// set up streaming execution environment
Configuration conf = new Configuration();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// attach a stream of TaxiRides
DataStream<TaxiRide> rides = env.addSource(source);
// split the stream
SingleOutputStreamOperator<TaxiRide> splitResult = rides.process(new StreamSplitter());
splitResult.addSink(sink).name("inside NYC");
splitResult.getSideOutput(outsideNYC).addSink(sidesink).name("outside NYC");
// run the pipeline and return the result
return env.execute("Split with side output");
}
public static class StreamSplitter extends ProcessFunction<TaxiRide, TaxiRide> {
@Override
public void processElement(
TaxiRide taxiRide,
Context ctx,
Collector<TaxiRide> out)
throws Exception {
if (GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat)
&& GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat)) {
out.collect(taxiRide);
} else {
ctx.output(outsideNYC, taxiRide);
}
}
}
}
因为我们不需要keyed state或计时器,所以我们不需要使用 keyBy,我们将使用 ProcessFunction 而不是 KeyedProcessFunction。
使用side output是这个方案的核心,分割逻辑在StreamSplitter()中。如果这是一个n-way split,这个方案将比维持1组n个filter函数要优雅一些。
在process中执行了split逻辑,job graph相当简单。注意普通sink和side output sink都在同一个task(process函数)中。