需求:计算每辆出租车每小时的出站次数
flink window的三种操作
- 最简单的:flink sql
- 更多控件:DataStream窗口API
- 全部控制:过程功能
Flink SQL
Flink SQL> describe Rides;
root
|-- rideId: BIGINT
|-- taxiId: BIGINT
|-- isStart: BOOLEAN
|-- lon: FLOAT
|-- lat: FLOAT
|-- rideTime: TIMESTAMP(3) *ROWTIME*
|-- psgCnt: INT
SELECT
taxiId, window_end, count(*) AS cnt
FROM
TABLE(TUMBLE(TABLE Rides, DESCRIPTOR(rideTime), INTERVAL '1' HOUR))
WHERE
isStart
GROUP BY
taxiId, window_start, window_end;
TUMBLE是一个内置的窗口表值函数,由Flink SQL提供,以提高窗口的效率。从概念上讲,这个函数的作用是向输入表的每一行添加3个虚拟列,表示窗口的开始、结束和每一行被分配的窗口的窗口时间。
Window API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
rides
.filter(ride -> ride.isStart)
.keyBy(ride -> ride.taxiId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new CountTaxiRides(), new WrapWindowResult())
.print();
env.execute();
Q: 为什么在keyby前要用filter?
A:因为这将减少一半的流元素被序列化和发送到窗口
有一种更简单的方法来使用Window API,但这需要缓冲分配给每个窗口的所有事件,然后在触发窗口时计数它们。该实现递增地计算计数,这是首选的方法,因为
- 避免在窗户关闭那一刻集中需要大量计算
- 需要更少的状态
我们可以通过将每个TaxiRide映射到单个整数来进一步优化这一点——为了计算次数,我们所需要的就是taxiId。我们还需要时间戳,但它们在包装每个事件的StreamRecord的元数据中。(ps: 还需要isStart)
private static class CountTaxiRides implements AggregateFunction<TaxiRide, Integer, Integer> {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(TaxiRide ride, Integer accumulator) {
return 1 + accumulator;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
}
merge方法用于当会话窗口通过一个事件的到来合并它们时,该事件将两个会话之间的间隙分开,从而使它们现在应该统一为单个会话。
private static class WrapWindowResult
extends ProcessWindowFunction<Integer, Tuple3<Long, Long, Integer>, Long, TimeWindow> {
@Override
public void process(
Long taxiId,
Context context,
Iterable<Integer> iterableContainingPreaggregatedCount,
Collector<Tuple3<Long, Long, Integer>> out) throws Exception {
Tuple3<Long, Long, Integer> result = new Tuple3<>(
taxiId,
context.window().getEnd(),
iterableContainingPreaggregatedCount.iterator().next());
out.collect(result);
}
}
Process Function
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
rides
.filter(ride -> ride.isStart)
.keyBy(ride -> taxiId)
.process(new CountingWindow(Time.hours(1)))
.print();
env.execute();
public static class CountingWindow
extends KeyedProcessFunction<Long, TaxiRide, Tuple3<Long, Long, Integer>> {
private final long durationMsec;
public CountingWindow(Time duration) {
durationMsec = duration.toMilliseconds();
}
// Keyed, managed state, with an entry for each window, keyed by the window's end time.
private transient MapState<Long, Integer> counter;
@Override
public void open(Configuration conf) {
MapStateDescriptor<Long, Integer> desc =
new MapStateDescriptor<>("hourlyRides", Long.class, Integer.class);
counter = getRuntimeContext().getMapState(desc);
}
@Override
public void processElement(...)
@Override
public void onTimer(...)
}
这个过程函数有以下三种方法
- open : 设置我们将使用的状态
- processElement : 用于处理每个传入事件
- onTimer : 当计时器触发时调用
这里使用的中心数据结构是从窗口时间戳到计数器的映射,我们将在其中跟踪每个窗口看到了多少事件.
private transient MapState<Long, Integer> counter;
public void processElement(
TaxiRide ride,
Context ctx,
Collector<Tuple3<Long, Long, Integer>> out) throws Exception {
long eventTime = ride.getEventTime();
TimerService timerService = ctx.timerService();
if (eventTime > timerService.currentWatermark()) {
// Round up eventTime to the end of the window containing this event.
long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
// Count this ride.
Integer count = counter.get(endOfWindow);
count = (count == null) ? 1 : count + 1;
counter.put(endOfWindow, count);
// Schedule a callback for when the window has been completed.
timerService.registerEventTimeTimer(endOfWindow);
} else {
// Process late events?
}
}
对于每个事件,我们需要找出
- 它在哪个窗口中递增,
- 该窗口的计数器
- 确保当该窗口结束时我们有一个计时器
我们重新生成了与window API相同的时间窗口赋值语义。这是我们希望基于过程函数的窗口如何工作的一个方法,我们可以有其他方法。
public void onTimer(
long timestamp,
OnTimerContext context,
Collector<Tuple3<Long, Long, Integer>> out) throws Exception {
// Look up the result for the hour that just ended.
Integer count = counter.get(timestamp);
out.collect(new Tuple3<>(context.getCurrentKey(), timestamp, count));
// Clear the state for this window.
counter.remove(timestamp);
}
当窗口要关闭时,我们发出结果并清除窗口的状态。
总结
SQL使许多涉及窗口分析的用例变得容易
- 窗口tvf: TUMBLE, HOP和累积
- 窗口分组:TUMBLE, HOP,会话
- 窗口: 滚动聚合,例如,过去一小时的运行计数
在计算过去一个小时的运行计数的情况下,这在SQL中很容易完成,而在windows API中是不可能的。SELECT
rideTime,
taxiId,
COUNT(*) OVER last_hour AS rides_last_hour
FROM Rides
WINDOW last_hour AS (
PARTITION BY taxiId
ORDER BY rideTime
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)
Window API也很强大,提供了一些SQL中没有的特性
- 允许延迟、由于延迟而被删除的事件的旁路输出、自定义触发器
- 对于真正的自定义窗口,KeyedProcessFunction通常会更简单