项目背景

目标

“每小时小费”练习的任务是确定每小时赚取最多小费的司机。最简单的方法是通过两个步骤来解决这个问题:首先使用一个小时长的窗口来计算每个司机在一小时内的总小费,然后从该窗口结果流中找到每小时总小费最多的司机。 请注意,该程序应在事件时间运行。

输入

本练习的输入数据是由 Taxi Fare Stream Generator 生成的 TaxiFare 事件流。TaxiFareGenerator 使用时间戳和watermark注释生成的 DataStream。因此,无需提供自定义时间戳和watermark分配器即可正确使用事件时间。

输出

本练习的结果是 Tuple3 记录的数据流,每小时一个。每个小时记录应包含该小时结束时的时间戳、该小时内获得小费最多的司机的 driverId 以及他们的实际小费总数。 结果流应打印到标准输出。

代码分析

类的大体结构和前面几道题类似,这里剖析下核心方法。

execute

  1. public JobExecutionResult execute() throws Exception {
  2. // set up streaming execution environment
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. // start the data generator and arrange for watermarking
  5. DataStream<TaxiFare> fares =
  6. env.addSource(source)
  7. .assignTimestampsAndWatermarks(
  8. // taxi fares are in order
  9. WatermarkStrategy.<TaxiFare>forMonotonousTimestamps()
  10. .withTimestampAssigner(((fare, t) -> fare.getEventTimeMillis()))
  11. );
  12. // compute tips per hour for each driver
  13. DataStream<Tuple3<Long,Long,Float>> hourlyTips =
  14. fares.keyBy((TaxiFare fare) -> fare.driverId)
  15. .window(TumblingEventTimeWindows.of(Time.hours(1)))
  16. .process(new AddTips());
  17. // find the driver with the highest sum of tips for each hour
  18. DataStream<Tuple3<Long,Long,Float>> hourlyMax =
  19. hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
  20. /* You should explore how this alternative (commented out below) behaves.
  21. * In what ways is the same as, and different from, the solution above (using a windowAll)?
  22. */
  23. // DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips.keyBy(t -> t.f0).maxBy(2);
  24. hourlyMax.addSink(sink);
  25. // execute the transformation pipeline
  26. return env.execute("Hourly Tips");
  27. }

一共分为关键5步:

  • add(source)
  • 定义WatermarkStrategy,并加到source后
  • keyBy+window+process,在process中处理每个小时窗口内的每个司机的小费加和运算
  • windowAll+maxBy, 求每个窗口的最大小费值

process

下面是process的具体实现。

  1. public static class AddTips extends ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>,
  2. Long, TimeWindow>{
  3. @Override
  4. public void process(
  5. Long key,
  6. Context context,
  7. Iterable<TaxiFare> fares,
  8. Collector<Tuple3<Long, Long, Float>> out
  9. ){
  10. float sumOfTips = 0F;
  11. for (TaxiFare f : fares) {
  12. sumOfTips += f.tip;
  13. }
  14. out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
  15. }

进一步讨论

Java 和 Scala 参考解决方案说明了两种不同的方法,尽管它们有很多相似之处。两者都首先计算每小时每个司机的小费总和。HourlyTipsSolution.java 中,

  1. DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
  2. .keyBy((TaxiFare fare) -> fare.driverId)
  3. .window(TumblingEventTimeWindows.of(Time.hours(1)))
  4. .process(new AddTips());

ProcessWindowFunction 中有一个比较重的处理逻辑

  1. public static class AddTips extends ProcessWindowFunction<
  2. TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {
  3. @Override
  4. public void process(Long key, Context context, Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) throws Exception {
  5. Float sumOfTips = 0F;
  6. for (TaxiFare f : fares) {
  7. sumOfTips += f.tip;
  8. }
  9. out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
  10. }
  11. }

这样实现很简单,但缺点是它会缓冲窗口中的所有 TaxiFare 对象,直到窗口被触发。这比使用 reduce 或 aggregate 函数增量计算小费的总和效率低。

Scala solution使用reduce函数

  1. val hourlyTips = fares
  2. .map((f: TaxiFare) => (f.driverId, f.tip))
  3. .keyBy(_._1)
  4. .window(TumblingEventTimeWindows.of(Time.hours(1)))
  5. .reduce(
  6. (f1: (Long, Float), f2: (Long, Float)) => { (f1._1, f1._2 + f2._2) },
  7. new WrapWithWindowInfo())

ProcessWindowFunction如下

  1. class WrapWithWindowInfo() extends ProcessWindowFunction[(Long, Float), (Long, Long, Float), Long, TimeWindow] {
  2. override def process(key: Long, context: Context, elements: Iterable[(Long, Float)], out: Collector[(Long, Long, Float)]): Unit = {
  3. val sumOfTips = elements.iterator.next()._2
  4. out.collect((context.window.getEnd(), key, sumOfTips))
  5. }
  6. }

在计算了hourlyTips后,可以看看他的输出。hourlyTips.print() 产生如下结果:

2> (1577883600000,2013000185,33.0) 4> (1577883600000,2013000108,14.0) 3> (1577883600000,2013000087,14.0) 1> (1577883600000,2013000036,23.0) 4> (1577883600000,2013000072,13.0) 2> (1577883600000,2013000041,28.0) 3> (1577883600000,2013000123,33.0) 4> (1577883600000,2013000188,18.0) 1> (1577883600000,2013000098,23.0) 2> (1577883600000,2013000047,13.0)

每小时,每个司机,小费总和。那么怎么找到每小时内的最大值?解决方案大体上:

  1. DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
  2. .windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
  3. .maxBy(2);

产生结果如下

3> (1577883600000,2013000089,76.0) 4> (1577887200000,2013000197,71.0) 1> (1577890800000,2013000118,83.0) 2> (1577894400000,2013000119,81.0) 3> (1577898000000,2013000195,73.0) 4> (1577901600000,2013000072,123.0)

如果我们采用下面这种实现方案呢?

  1. DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
  2. .keyBy(t -> t.f0)
  3. .maxBy(2);

这表示按时间戳对 hourlyTips 流进行分组,并在每个时间戳内找到小费总和的最大值。看起来这正是我们想要的。虽然这个替代方案确实找到了相同的结果,但它不是一个很好的解决方案。原因如下:

  • 首先,它不是在每个窗口结束时产生单个结果,而是通过这种方法,我们得到一个流,该流持续报告每个key(即每个小时)迄今为止达到的最大值,这是一种尴尬的消费结果的方式,我们想要的只是每小时的单个值。

    1> (1577883600000,2013000108,14.0) 1> (1577883600000,2013000108,14.0) 1> (1577883600000,2013000188,18.0) 1> (1577883600000,2013000188,18.0) 1> (1577883600000,2013000188,18.0) 1> (1577883600000,2013000034,36.0) 1> (1577883600000,2013000183,70.0) 1> (1577883600000,2013000183,70.0) … 1> (1577883600000,2013000152,73.0) 1> (1577883600000,2013000152,73.0)

  • 其次,Flink 将永远保持每个key(每小时)迄今为止看到的最大值。 Flink 不知道这些key是事件时间戳,并且watermark可以用作何时可以清除此状态的指示符——为了获得这些语义,我们需要使用 windows。