项目背景
目标
“每小时小费”练习的任务是确定每小时赚取最多小费的司机。最简单的方法是通过两个步骤来解决这个问题:首先使用一个小时长的窗口来计算每个司机在一小时内的总小费,然后从该窗口结果流中找到每小时总小费最多的司机。 请注意,该程序应在事件时间运行。
输入
本练习的输入数据是由 Taxi Fare Stream Generator 生成的 TaxiFare 事件流。TaxiFareGenerator 使用时间戳和watermark注释生成的 DataStream
输出
本练习的结果是 Tuple3
代码分析
execute
public JobExecutionResult execute() throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start the data generator and arrange for watermarking
DataStream<TaxiFare> fares =
env.addSource(source)
.assignTimestampsAndWatermarks(
// taxi fares are in order
WatermarkStrategy.<TaxiFare>forMonotonousTimestamps()
.withTimestampAssigner(((fare, t) -> fare.getEventTimeMillis()))
);
// compute tips per hour for each driver
DataStream<Tuple3<Long,Long,Float>> hourlyTips =
fares.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new AddTips());
// find the driver with the highest sum of tips for each hour
DataStream<Tuple3<Long,Long,Float>> hourlyMax =
hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
/* You should explore how this alternative (commented out below) behaves.
* In what ways is the same as, and different from, the solution above (using a windowAll)?
*/
// DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips.keyBy(t -> t.f0).maxBy(2);
hourlyMax.addSink(sink);
// execute the transformation pipeline
return env.execute("Hourly Tips");
}
一共分为关键5步:
- add(source)
- 定义WatermarkStrategy,并加到source后
- keyBy+window+process,在process中处理每个小时窗口内的每个司机的小费加和运算
- windowAll+maxBy, 求每个窗口的最大小费值
process
下面是process的具体实现。
public static class AddTips extends ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>,
Long, TimeWindow>{
@Override
public void process(
Long key,
Context context,
Iterable<TaxiFare> fares,
Collector<Tuple3<Long, Long, Float>> out
){
float sumOfTips = 0F;
for (TaxiFare f : fares) {
sumOfTips += f.tip;
}
out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
}
进一步讨论
Java 和 Scala 参考解决方案说明了两种不同的方法,尽管它们有很多相似之处。两者都首先计算每小时每个司机的小费总和。HourlyTipsSolution.java 中,
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new AddTips());
ProcessWindowFunction 中有一个比较重的处理逻辑
public static class AddTips extends ProcessWindowFunction<
TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {
@Override
public void process(Long key, Context context, Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) throws Exception {
Float sumOfTips = 0F;
for (TaxiFare f : fares) {
sumOfTips += f.tip;
}
out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
}
}
这样实现很简单,但缺点是它会缓冲窗口中的所有 TaxiFare 对象,直到窗口被触发。这比使用 reduce 或 aggregate 函数增量计算小费的总和效率低。
Scala solution使用reduce函数
val hourlyTips = fares
.map((f: TaxiFare) => (f.driverId, f.tip))
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.reduce(
(f1: (Long, Float), f2: (Long, Float)) => { (f1._1, f1._2 + f2._2) },
new WrapWithWindowInfo())
ProcessWindowFunction如下
class WrapWithWindowInfo() extends ProcessWindowFunction[(Long, Float), (Long, Long, Float), Long, TimeWindow] {
override def process(key: Long, context: Context, elements: Iterable[(Long, Float)], out: Collector[(Long, Long, Float)]): Unit = {
val sumOfTips = elements.iterator.next()._2
out.collect((context.window.getEnd(), key, sumOfTips))
}
}
在计算了hourlyTips后,可以看看他的输出。hourlyTips.print() 产生如下结果:
2> (1577883600000,2013000185,33.0) 4> (1577883600000,2013000108,14.0) 3> (1577883600000,2013000087,14.0) 1> (1577883600000,2013000036,23.0) 4> (1577883600000,2013000072,13.0) 2> (1577883600000,2013000041,28.0) 3> (1577883600000,2013000123,33.0) 4> (1577883600000,2013000188,18.0) 1> (1577883600000,2013000098,23.0) 2> (1577883600000,2013000047,13.0)
每小时,每个司机,小费总和。那么怎么找到每小时内的最大值?解决方案大体上:
DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
.windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
.maxBy(2);
产生结果如下
3> (1577883600000,2013000089,76.0) 4> (1577887200000,2013000197,71.0) 1> (1577890800000,2013000118,83.0) 2> (1577894400000,2013000119,81.0) 3> (1577898000000,2013000195,73.0) 4> (1577901600000,2013000072,123.0)
如果我们采用下面这种实现方案呢?
DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
.keyBy(t -> t.f0)
.maxBy(2);
这表示按时间戳对 hourlyTips 流进行分组,并在每个时间戳内找到小费总和的最大值。看起来这正是我们想要的。虽然这个替代方案确实找到了相同的结果,但它不是一个很好的解决方案。原因如下:
首先,它不是在每个窗口结束时产生单个结果,而是通过这种方法,我们得到一个流,该流持续报告每个key(即每个小时)迄今为止达到的最大值,这是一种尴尬的消费结果的方式,我们想要的只是每小时的单个值。
1> (1577883600000,2013000108,14.0) 1> (1577883600000,2013000108,14.0) 1> (1577883600000,2013000188,18.0) 1> (1577883600000,2013000188,18.0) 1> (1577883600000,2013000188,18.0) 1> (1577883600000,2013000034,36.0) 1> (1577883600000,2013000183,70.0) 1> (1577883600000,2013000183,70.0) … 1> (1577883600000,2013000152,73.0) 1> (1577883600000,2013000152,73.0)
其次,Flink 将永远保持每个key(每小时)迄今为止看到的最大值。 Flink 不知道这些key是事件时间戳,并且watermark可以用作何时可以清除此状态的指示符——为了获得这些语义,我们需要使用 windows。