• 从RideCleansingSolution开始
  • 目标
    • 分割TaxiRide消息流
      • 一个流仅包含在纽约市开始和结束的行程(如前所述)
      • 另一条流包含其他行程(其他城市的行程)
  • 期待输出
    • 一个DataStream打印到stdout(纽约市内行程)
    • 一个DataStream打印到stderr(纽约市外行程)
      • 使用new PrintSinkFunction<>(true) 来创建一个sink到stderr
  • 注意
    • 这种拆分可以通过 FilterFunctions 或旁路输出来实现
    • 尝试并比较这两种方法

解决方案一:filter

  1. public class RideSplitFilterSolution {
  2. private final SourceFunction<TaxiRide> source;
  3. private final SinkFunction<TaxiRide> sink;
  4. private final SinkFunction<TaxiRide> sidesink;
  5. /** Creates a job using the source and sinks provided. */
  6. public RideSplitFilterSolution(
  7. SourceFunction<TaxiRide> source,
  8. SinkFunction<TaxiRide> sink,
  9. SinkFunction<TaxiRide> sidesink) {
  10. this.source = source;
  11. this.sink = sink;
  12. this.sidesink = sidesink;
  13. }
  14. /**
  15. * Main method.
  16. *
  17. * @throws Exception which occurs during job execution.
  18. */
  19. public static void main(String[] args) throws Exception {
  20. RideSplitFilterSolution job =
  21. new RideSplitFilterSolution(
  22. new TaxiRideGenerator(),
  23. new PrintSinkFunction<>(),
  24. new PrintSinkFunction<>(true));
  25. job.execute();
  26. }
  1. public JobExecutionResult execute() throws Exception {
  2. // set up streaming execution environment
  3. Configuration conf = new Configuration();
  4. StreamExecutionEnvironment env =
  5. StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
  6. // attach a stream of TaxiRides
  7. DataStream<TaxiRide> rides = env.addSource(source);
  8. // split the stream
  9. rides.filter(ride -> inNYC(ride)).addSink(sink).name("inside NYC");
  10. rides.filter(ride -> !inNYC(ride)).addSink(sidesink).name("outside NYC");
  11. // run the pipeline and return the result
  12. return env.execute("Split with filters");
  13. }
  14. /** Return true for rides that both start and end in NYC. */
  15. private static boolean inNYC(TaxiRide taxiRide) {
  16. return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat)
  17. && GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
  18. }

这是使用一组 FilterFunction 的解决方案的核心。我们已经分叉了行程,通过过滤器发送该流的一个副本,该过滤器保持在纽约市内开始和结束的行程,并通过执行相反操作的过滤器发送另一个副本。我们为接收器sink命名,以便在 Flink 仪表板中轻松识别它们。
image.png
我们将整个流的完整副本发送到两个过滤器。 因为在管道中的这一点上正在改变并行性,所以这很昂贵。 使用此解决方案,sink和filter构成chain。

解决方案二:基于旁路输出

  1. public class RideSplitSideOutputSolution {
  2. private final SourceFunction<TaxiRide> source;
  3. private final SinkFunction<TaxiRide> sink;
  4. private final SinkFunction<TaxiRide> sidesink;
  5. private static final OutputTag<TaxiRide> outsideNYC = new OutputTag<TaxiRide>("outsideNYC") {};
  6. /** Creates a job using the source and sinks provided. */
  7. public RideSplitSideOutputSolution(
  8. SourceFunction<TaxiRide> source,
  9. SinkFunction<TaxiRide> sink,
  10. SinkFunction<TaxiRide> sidesink) {
  11. this.source = source;
  12. this.sink = sink;
  13. this.sidesink = sidesink;
  14. }
  15. /**
  16. * Main method.
  17. *
  18. * @throws Exception which occurs during job execution.
  19. */
  20. public static void main(String[] args) throws Exception {
  21. RideSplitSideOutputSolution job =
  22. new RideSplitSideOutputSolution(
  23. new TaxiRideGenerator(),
  24. new PrintSinkFunction<>(),
  25. new PrintSinkFunction<>(true));
  26. job.execute();
  27. }
  28. /**
  29. * Creates and executes the long rides pipeline.
  30. *
  31. * @return {JobExecutionResult}
  32. * @throws Exception which occurs during job execution.
  33. */
  34. public JobExecutionResult execute() throws Exception {
  35. // set up streaming execution environment
  36. Configuration conf = new Configuration();
  37. StreamExecutionEnvironment env =
  38. StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
  39. // attach a stream of TaxiRides
  40. DataStream<TaxiRide> rides = env.addSource(source);
  41. // split the stream
  42. SingleOutputStreamOperator<TaxiRide> splitResult = rides.process(new StreamSplitter());
  43. splitResult.addSink(sink).name("inside NYC");
  44. splitResult.getSideOutput(outsideNYC).addSink(sidesink).name("outside NYC");
  45. // run the pipeline and return the result
  46. return env.execute("Split with side output");
  47. }
  48. public static class StreamSplitter extends ProcessFunction<TaxiRide, TaxiRide> {
  49. @Override
  50. public void processElement(
  51. TaxiRide taxiRide,
  52. Context ctx,
  53. Collector<TaxiRide> out)
  54. throws Exception {
  55. if (GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat)
  56. && GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat)) {
  57. out.collect(taxiRide);
  58. } else {
  59. ctx.output(outsideNYC, taxiRide);
  60. }
  61. }
  62. }
  63. }

因为我们不需要keyed state或计时器,所以我们不需要使用 keyBy,我们将使用 ProcessFunction 而不是 KeyedProcessFunction。

使用side output是这个方案的核心,分割逻辑在StreamSplitter()中。如果这是一个n-way split,这个方案将比维持1组n个filter函数要优雅一些。
image.png
在process中执行了split逻辑,job graph相当简单。注意普通sink和side output sink都在同一个task(process函数)中。