需求:计算每辆出租车每小时的出站次数

1653791312(1).png

flink window的三种操作

  • 最简单的:flink sql
  • 更多控件:DataStream窗口API
  • 全部控制:过程功能

Flink SQL

  1. Flink SQL> describe Rides;
  2. root
  3. |-- rideId: BIGINT
  4. |-- taxiId: BIGINT
  5. |-- isStart: BOOLEAN
  6. |-- lon: FLOAT
  7. |-- lat: FLOAT
  8. |-- rideTime: TIMESTAMP(3) *ROWTIME*
  9. |-- psgCnt: INT
  1. SELECT
  2. taxiId, window_end, count(*) AS cnt
  3. FROM
  4. TABLE(TUMBLE(TABLE Rides, DESCRIPTOR(rideTime), INTERVAL '1' HOUR))
  5. WHERE
  6. isStart
  7. GROUP BY
  8. taxiId, window_start, window_end;

TUMBLE是一个内置的窗口表值函数,由Flink SQL提供,以提高窗口的效率。从概念上讲,这个函数的作用是向输入表的每一行添加3个虚拟列,表示窗口的开始、结束和每一行被分配的窗口的窗口时间。

Window API

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
  3. rides
  4. .filter(ride -> ride.isStart)
  5. .keyBy(ride -> ride.taxiId)
  6. .window(TumblingEventTimeWindows.of(Time.hours(1)))
  7. .aggregate(new CountTaxiRides(), new WrapWindowResult())
  8. .print();
  9. env.execute();

Q: 为什么在keyby前要用filter?
A:因为这将减少一半的流元素被序列化和发送到窗口
有一种更简单的方法来使用Window API,但这需要缓冲分配给每个窗口的所有事件,然后在触发窗口时计数它们。该实现递增地计算计数,这是首选的方法,因为

  • 避免在窗户关闭那一刻集中需要大量计算
  • 需要更少的状态

我们可以通过将每个TaxiRide映射到单个整数来进一步优化这一点——为了计算次数,我们所需要的就是taxiId。我们还需要时间戳,但它们在包装每个事件的StreamRecord的元数据中。(ps: 还需要isStart)

  1. private static class CountTaxiRides implements AggregateFunction<TaxiRide, Integer, Integer> {
  2. @Override
  3. public Integer createAccumulator() {
  4. return 0;
  5. }
  6. @Override
  7. public Integer add(TaxiRide ride, Integer accumulator) {
  8. return 1 + accumulator;
  9. }
  10. @Override
  11. public Integer getResult(Integer accumulator) {
  12. return accumulator;
  13. }
  14. @Override
  15. public Integer merge(Integer a, Integer b) {
  16. return a + b;
  17. }
  18. }

merge方法用于当会话窗口通过一个事件的到来合并它们时,该事件将两个会话之间的间隙分开,从而使它们现在应该统一为单个会话。

  1. private static class WrapWindowResult
  2. extends ProcessWindowFunction<Integer, Tuple3<Long, Long, Integer>, Long, TimeWindow> {
  3. @Override
  4. public void process(
  5. Long taxiId,
  6. Context context,
  7. Iterable<Integer> iterableContainingPreaggregatedCount,
  8. Collector<Tuple3<Long, Long, Integer>> out) throws Exception {
  9. Tuple3<Long, Long, Integer> result = new Tuple3<>(
  10. taxiId,
  11. context.window().getEnd(),
  12. iterableContainingPreaggregatedCount.iterator().next());
  13. out.collect(result);
  14. }
  15. }

Process Function

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
  3. rides
  4. .filter(ride -> ride.isStart)
  5. .keyBy(ride -> taxiId)
  6. .process(new CountingWindow(Time.hours(1)))
  7. .print();
  8. env.execute();
  1. public static class CountingWindow
  2. extends KeyedProcessFunction<Long, TaxiRide, Tuple3<Long, Long, Integer>> {
  3. private final long durationMsec;
  4. public CountingWindow(Time duration) {
  5. durationMsec = duration.toMilliseconds();
  6. }
  7. // Keyed, managed state, with an entry for each window, keyed by the window's end time.
  8. private transient MapState<Long, Integer> counter;
  9. @Override
  10. public void open(Configuration conf) {
  11. MapStateDescriptor<Long, Integer> desc =
  12. new MapStateDescriptor<>("hourlyRides", Long.class, Integer.class);
  13. counter = getRuntimeContext().getMapState(desc);
  14. }
  15. @Override
  16. public void processElement(...)
  17. @Override
  18. public void onTimer(...)
  19. }

这个过程函数有以下三种方法

  • open : 设置我们将使用的状态
  • processElement : 用于处理每个传入事件
  • onTimer : 当计时器触发时调用

这里使用的中心数据结构是从窗口时间戳到计数器的映射,我们将在其中跟踪每个窗口看到了多少事件.

  1. private transient MapState<Long, Integer> counter;
  1. public void processElement(
  2. TaxiRide ride,
  3. Context ctx,
  4. Collector<Tuple3<Long, Long, Integer>> out) throws Exception {
  5. long eventTime = ride.getEventTime();
  6. TimerService timerService = ctx.timerService();
  7. if (eventTime > timerService.currentWatermark()) {
  8. // Round up eventTime to the end of the window containing this event.
  9. long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
  10. // Count this ride.
  11. Integer count = counter.get(endOfWindow);
  12. count = (count == null) ? 1 : count + 1;
  13. counter.put(endOfWindow, count);
  14. // Schedule a callback for when the window has been completed.
  15. timerService.registerEventTimeTimer(endOfWindow);
  16. } else {
  17. // Process late events?
  18. }
  19. }

对于每个事件,我们需要找出

  • 它在哪个窗口中递增,
  • 该窗口的计数器
  • 确保当该窗口结束时我们有一个计时器

我们重新生成了与window API相同的时间窗口赋值语义。这是我们希望基于过程函数的窗口如何工作的一个方法,我们可以有其他方法。

  1. public void onTimer(
  2. long timestamp,
  3. OnTimerContext context,
  4. Collector<Tuple3<Long, Long, Integer>> out) throws Exception {
  5. // Look up the result for the hour that just ended.
  6. Integer count = counter.get(timestamp);
  7. out.collect(new Tuple3<>(context.getCurrentKey(), timestamp, count));
  8. // Clear the state for this window.
  9. counter.remove(timestamp);
  10. }

当窗口要关闭时,我们发出结果并清除窗口的状态。

总结

  • SQL使许多涉及窗口分析的用例变得容易

    • 窗口tvf: TUMBLE, HOP和累积
    • 窗口分组:TUMBLE, HOP,会话
    • 窗口: 滚动聚合,例如,过去一小时的运行计数
      1. SELECT
      2. rideTime,
      3. taxiId,
      4. COUNT(*) OVER last_hour AS rides_last_hour
      5. FROM Rides
      6. WINDOW last_hour AS (
      7. PARTITION BY taxiId
      8. ORDER BY rideTime
      9. RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
      10. )
      在计算过去一个小时的运行计数的情况下,这在SQL中很容易完成,而在windows API中是不可能的。
  • Window API也很强大,提供了一些SQL中没有的特性

    • 允许延迟、由于延迟而被删除的事件的旁路输出、自定义触发器
  • 对于真正的自定义窗口,KeyedProcessFunction通常会更简单