所谓的“窗口”,一 般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算

时间语义

在流式数据处理的过程中,在事件发生之后,生成的数据被 收集起来,首先进入分布式消息队列,然后被 Flink 系统中的 Source 算子读取消费,进而向下游的转换算子(窗口算子)传递,最终由窗口算子进行计算处理。

时间和窗口 - 图1

时间点分为两种:

  • 事件时间:数据产生的时间
  • 处理时间:数据被真正处理的时间

数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实 就是这条数据记录的“时间戳”(Timestamp)。由于流处理中数据是源源不断产生的,一般来说,先产生的数据也 会先被处理,所以当任务不停地接到数据时,它们的时间戳也基本上是不断增长的,就可以代表时间的推进。是由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的 数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)。

窗口操作,到底是以那种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。

两种时间语义的对比

实际应用中,数据产生的时间和处理的时间可能是完全不同的。很长时间收集起来的数据, 处理或许只要一瞬间;也有可能数据量过大、处理能力不足,短时间堆了大量数据处理不完, 产生“背压”(back pressure)。

通常来说,处理时间是我们计算效率的衡量标准,而事件时间会更符合我们的业务计算逻辑。所以更多时候我们使用事件时间;不过处理时间也不是一无是处。对于处理时间而言,由于没有任何附加考虑,数据一来就直接处理,因此这种方式可以让我们的流处理延迟降到最低,效率达到最高。

除了事件时间和处理时间,Flink 还有一个“摄入时间”(Ingestion Time)的概念, 它是指数据进入 Flink 数据流的时间,也就是 Source 算子读入数据的时间。摄入时间相当于是 事件时间和处理时间的一个中和,它是把 Source 任务的处理时间,当作了数据的产生时间添 加到数据里。这样一来,水位线(watermark)也就基于这个时间直接生成,不需要单独指定 了。这种时间语义可以保证比较好的正确性,同时又不会引入太大的延迟。它的具体行为跟事 件时间非常像,可以当作特殊的事件时间来处理。

水位线(Watermark)

在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就 可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

时间和窗口 - 图2

有序流中的水位线

在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;也就是说,它们处 理的过程会保持原先的顺序不变,遵守先来后到的原则。这样的话我们从每个数据中提取时间戳,就可以保证总是从小到大增长的,从而插入的水位线也会不断增长、事件时钟不断向前推进。

实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一 条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同,同时涌来的数据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳。

时间和窗口 - 图3

水位线插入的“周期”,本身也是一个时间概念, 对于水位线的周期性生成,周期时间是指处理时间(系统时间),而不是事件时间。

乱序流中的水位线

在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性, 导致顺序发生改变,这就是所谓的“乱序数据”。

这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产 生时间而言的。

解决方法:

插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线

时间和窗口 - 图4

如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需 要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线 。

时间和窗口 - 图5

对于迟到的数据, 为了让窗口能够正确收集到迟到的数据,我们也可以等上 2 秒;也就是用当前已有数据的最大时间戳减去 2 秒,就是要插入的水位线的时间戳

时间和窗口 - 图6

水位线的特性

水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础 上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之 前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据

水位线是 Flink 流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。

生成水位线的总体原则

Flink 中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,并控制的权力交给了程序员,可以在代码中自定义水位线的生成策略。

水位线生成策略(Watermark Strategies)

DataStream API 中 ,有单独生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:

  1. public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)
  1. DataStream<Event> stream = env.addSource(new ClickSource());
  2. DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(<watermark strategy>);

assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就 是 所 谓 的 “ 水位线生成策略 ” 。 WatermarkStrategy 中包含了一个“ 时间戳分配器”TimestampAssigner 和一个“水位线生成器”WatermarkGenerator

  1. public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{
  2. @Override
  3. TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
  4. @Override
  5. WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
  6. }
  • TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
  • WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在 WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
  • onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳, 以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
  • onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间 为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为 200ms。
  1. env.getConfig().setAutoWatermarkInterval(60 * 1000L);

Flink内置水位线生成器

TimestampAssigner WatermarkGenerator 可以通过调用 WatermarkStrategy 的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。

(1)有序流

有序流的主要特点是时间单调递增, 所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用 WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。

  1. stream.assignTimestampsAndWatermarks(
  2. WatermarkStrategy.<Event>forMonotonousTimestamps()
  3. .withTimestampAssigner(new SerializableTimestampAssigner<Event>(){
  4. @Override
  5. public long extractTimestamp(Event element, long recordTimestamp)
  6. {
  7. return element.timestamp;
  8. }
  9. })
  10. );
  1. **withTimestampAssigner**()方法,将数据中的 **timestamp **字段提取出来, 作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。这样,提取出的数据时间戳,就是处理计算的事件时间。

(2)乱序流

乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。

调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参 数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。

  1. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  4. import java.time.Duration;
  5. public class WatermarkTest {
  6. public static void main(String[] args) throws Exception {
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8. env.setParallelism(1);
  9. env.addSource(new ClickSource())
  10. // 插入水位线的逻辑
  11. .assignTimestampsAndWatermarks(
  12. // 针对乱序流插入水位线,延迟时间设置为 5s
  13. WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
  14. .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
  15. // 抽取时间戳的逻辑
  16. @Override
  17. public long extractTimestamp(Event element, long recordTimestamp) {
  18. return element.timestamp;
  19. }
  20. })
  21. )
  22. .print();
  23. env.execute();
  24. }
  25. }
  1. 有序流的水位线生成器本质上和乱序流是一样的,相当于延迟设为0的乱序流水位线生成器,两者等同:
  1. WatermarkStrategy.forMonotonousTimestamps()
  2. WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))

自定义水位线策略

WatermarkStrategy 中,时间戳分配器 TimestampAssigner 都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于 WatermarkGenerator 的实现。整体说来,Flink 有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)。

(1)周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。

  1. import org.apache.flink.api.common.eventtime.*;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. // 自定义水位线的产生
  4. public class CustomWatermarkTest {
  5. public static void main(String[] args) throws Exception {
  6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  7. env.setParallelism(1);
  8. env.addSource(new ClickSource())
  9. .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
  10. .print();
  11. env.execute();
  12. }
  13. public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
  14. @Override
  15. public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
  16. return new SerializableTimestampAssigner<Event>() {
  17. @Override
  18. public long extractTimestamp(Event element, long recordTimestamp) {
  19. return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
  20. }
  21. };
  22. }
  23. @Override
  24. public WatermarkGenerator<Event>
  25. createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
  26. return new CustomPeriodicGenerator();
  27. }
  28. }
  29. public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
  30. private Long delayTime = 5000L; // 延迟时间
  31. private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳
  32. @Override
  33. public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
  34. // 每来一条数据就调用一次
  35. maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
  36. }
  37. @Override
  38. public void onPeriodicEmit(WatermarkOutput output) {
  39. // 发射水位线,默认 200ms 调用一次
  40. output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
  41. }
  42. }
  43. }
  1. **onPeriodicEmit**()里调用 **output**.**emitWatermark**(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认 200ms 一次。所以水位线的时间戳是依赖当前已有数据的最 大时间戳的(这里的实现与内置生成器类似,也是减去延迟时间再减 1),但具体什么时候生成与数据无关。

(2)断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时, 就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。

  1. public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {
  2. @Override
  3. public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
  4. // 只有在遇到特定的 itemId 时,才发出水位线
  5. if (r.user.equals("Mary")) {
  6. output.emitWatermark(new Watermark(r.timestamp - 1));
  7. }
  8. }
  9. @Override
  10. public void onPeriodicEmit(WatermarkOutput output) {
  11. // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线
  12. }
  13. }

自定义数据源中发送水位线

我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用 assignTimestampsAndWatermarks 方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。

  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  3. import org.apache.flink.streaming.api.watermark.Watermark;
  4. import java.util.Calendar;
  5. import java.util.Random;
  6. public class EmitWatermarkInSourceFunction {
  7. public static void main(String[] args) throws Exception {
  8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. env.setParallelism(1);
  10. env.addSource(new ClickSourceWithWatermark()).print();
  11. env.execute();
  12. }
  13. // 泛型是数据源中的类型
  14. public static class ClickSourceWithWatermark implements SourceFunction<Event>{
  15. private boolean running = true;
  16. @Override
  17. public void run(SourceContext<Event> sourceContext) throws Exception {
  18. Random random = new Random();
  19. String[] userArr = {"Mary", "Bob", "Alice"};
  20. String[] urlArr = {"./home", "./cart", "./prod?id=1"};
  21. while (running) {
  22. long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
  23. String username = userArr[random.nextInt(userArr.length)];
  24. String url = urlArr[random.nextInt(urlArr.length)];
  25. Event event = new Event(username, url, currTs);
  26. // 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段
  27. sourceContext.collectWithTimestamp(event, event.timestamp);
  28. // 发送水位线
  29. sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
  30. Thread.sleep(1000L);
  31. }
  32. }
  33. @Override
  34. public void cancel() {
  35. running = false;
  36. }
  37. }
  38. }

水位线的传递

水位线定义的本质了:它表示的是“当前时间之前的数据,都已经到齐了”。

如果一个任务收到了来自上游并行任务 的不同的水位线,说明上游各个分区处理得有快有慢,进度各不相同比如上游有两个并行子任务都发来了水位线,一个是 5 秒,一个是 7 秒;这代表第一个并行任务已经处理完 5 秒之前的 所有数据,而第二个并行任务处理到了 7 秒。

所有的上游并行任务就像围成木桶的一块块木板,它们中最短的那一块,决定了我们桶中的水位。

时间和窗口 - 图7

水位线的总结

水位线在事件时间的世界里面,承担了时钟的角色。也就是说在事件时间的流中,水位线是唯一的时间尺度。如果想要知道现在几点,就要看水位线的大小。

水位线是一种特殊的事件,由程序员通过编程插入的数据流里面,然后跟随数据流向下游流动。

水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒。

在数据流开始之前,Flink 会插入一个大小是负无穷大(在 Java 中是-Long.MAX_VALUE) 的水位线,而在数据流结束时,Flink 会插入一个正无穷大(Long.MAX_VALUE)的水位线,保证所有的窗口闭合以及所有的定时器都被触发。

对于离线数据集,Flink 也会将其作为流读入,也就是一条数据一条数据的读取。在这种情况下,Flink 对于离线数据集,只会插入两次水位线,也就是在最开始处插入负无穷大的水位线,在结束位置插入一个正无穷大的水位线。因为只需要插入两次水位线,就可以保证计算 的正确,无需在数据流的中间插入水位线了。

窗口

  1. “**窗口”(Window)聚合操作 :** 把无界流进行切分,每一段数据分别进行聚合,结果只输出一次。相当于将无界流的聚合转化为了有界数据集的聚合。

窗口的概念

将无限数据切割成有限的“数据块”进行处理,这就是“窗口”(Window)。

在 Flink 中, 窗口就是用来处理无界流的核心。把窗口想象成一个固定位置的 “框”,数据源源不断地流过来,到某个时间点窗口该关闭了,就停止收集数据、触发计算并输出结果。例如,定义一个时间窗口,每 10 秒统计一次数据,那么就相当于把窗口放在那里,从 0 秒开始收集数据;到 10 秒时,处理当前窗口内所有数据,输出一个结果,然后清空 窗口继续收集数据;到 20 秒时,再对窗口内所有数据进行计算处理,输出结果;依次类推。

时间和窗口 - 图8 定义窗口都是包含起始时间、不包含结束时间的,用数学符号表示就是一个左闭右开的区间,例如 0~10 秒的窗口可以表示为[0, 10),这里单位为秒。

Flink 中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。

时间和窗口 - 图9

窗口的处理过程:

  1. 第一个数据时间戳为 2,判断之后创建第一个窗口[0, 10),并将 2 秒数据保存进去;
  2. 后续数据依次到来,时间戳均在 [0, 10)范围内,所以全部保存进第一个窗口;
  3. 11 秒数据到来,判断它不属于[0, 10)窗口,所以创建第二个窗口[10, 20),并将 11 秒的数据保存进去。由于水位线设置延迟时间为 2 秒,所以现在的时钟是 9 秒,第一个窗口也 没有到关闭时间;
  4. 之后又有 9 秒数据到来,同样进入[0, 10)窗口中;
  5. 12 秒数据到来,判断属于[10, 20)窗口,保存进去。这时产生的水位线推进到了 10 秒,所以 [0, 10)窗口应该关闭了。第一个窗口收集到了所有的 7 个数据,进行处理计算后输 出结果,并将窗口关闭销毁;
  6. 同样的,之后的数据依次进入第二个窗口,遇到 20 秒的数据时会创建第三个窗口[20, 30)并将数据保存进去;遇到 22 秒数据时,水位线达到了 20 秒,第二个窗口触发计算,输出结果并关闭。

    Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时, 窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。

窗口的分类

按照驱动类型分类

  1. 是以什么标准来开始和结束数据的截取,叫作窗口的“驱动类型”。

按时间段去截取数据就叫做 “时间窗口”(Time Window)。

按照固定的个数,来截取一段数据集,这种窗口叫作“计数窗口”(Count Window)。

时间和窗口 - 图10

(1)时间窗口(Time Window)

时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。

用结束时间减去开始时间,得到这段时间的长度,就是窗口的大小(window size)。

Flink 中有一个专门的类来表示时间窗口,名称就叫作 TimeWindow。这个类只有两个私有属性:start end,表示窗口的开始和结束的时间戳,单位为毫秒。

  1. private final long start;
  2. private final long end;

可以调用公有的 getStart()和 getEnd()方法直接获取这两个时间戳。另外,TimeWindow 还提供了一个 maxTimestamp()方法,用来获取窗口中能够包含数据的最大时间戳。

  1. public long maxTimestamp() {
  2. return end - 1;
  3. }

(2)计数窗口(Count Window)

计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。

只需指定窗口大小,就可以把数据分配到对应的窗口中了。在 Flink 内部也并没有对应的类来表示计数窗口,底层是通过“全局窗口”(Global Window)来实现的。

按照窗口分配数据的规则分类

根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。 下面我们来做具体介绍。

(1)滚动窗口(Tumbling Windows)

滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠, 也不会有间隔,是“首尾相接”的状态。

滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。

:::danger 可以定义一个长度为 1 小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为 10 的滚动计数窗口,就会每 10 个数进行一次统计。

:::

时间和窗口 - 图11

小圆点表示流中的数据,我们对数据按照 userId 做了分区。当固定了窗口大小之后,所有分区的窗口划分都是一致的;窗口没有重叠,每个数据只属于一个窗口。

(2)滑动窗口(Sliding Windows)

与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的, 而是可以“错开”一定的位置。

定义滑动窗口的参数有两 个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代 表了窗口计算的频率。滑动的距离代表了下个窗口开始的时间间隔,而窗口大小是固定的,所 以也就是两个窗口结束时间的间隔;窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率。

:::danger 定义一个长度为 1 小时、滑动步长为 5 分钟的滑动窗口,那么就会统计 1 小时内的数据,每 5 分钟统计一次。同样,滑动窗口可以基于时间定义,也可以基于数据个数定义。

:::

时间和窗口 - 图12

当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据也可能会 被同时分配到多个窗口中。而具体的个数,就由窗口大小和滑动步长的比值(size/slide)来决定。

(3)会话窗口(Session Windows)

是基于“会话”(session)来对数据进行分组, 数据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来, 那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。

对于会话窗口而言,最重要的参数就是这段时间的长度(size),它表示会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的大小 (size),那说明还在保持会话,它们就属于同一个窗口;如果 gap 大于 size,那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。

在具体实现上,可以设置静态固定的大小(size),也可以通过一个自定义的提取器(gap extractor)动态提取最小间隔 gap 的值。

Flink 底层,对会话窗口的处理会比较特殊:每来一个新的数据,都会创建一个新的会话窗口;然后判断已有窗口之间的距离,如果小于给定的 size,就对它们进行合并(merge) 操作。在 Window 算子中,对会话窗口会有单独的处理逻辑。

时间和窗口 - 图13

(4)全局窗口(Global Windows)

全局窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以 这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理, 还需要自定义“触发器”(Trigger)。

时间和窗口 - 图14

窗口 API 概览

按键分区(Keyed)和非按键分区(Non-Keyed)

在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流 KeyedStream 来开窗,还是直接在没有按键分区的 DataStream 上开窗。也就是说,在调用窗口算子之前, 是否有 keyBy 操作。

(1)按键分区窗口(Keyed Windows)

经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream

基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。

要先对 DataStream 调用.keyBy()进行按键分区,然后再调 用.window()定义窗口。

  1. stream.keyBy(...).window(...)

(2)非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只 能在一个任务(task)上执行,就相当于并行度变成了 1。在实际应用中一般不推荐使用这种方式。

在代码中,直接基于 DataStream 调用.windowAll()定义窗口。

  1. stream.windowAll(...)

代码中窗口 API 的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

  1. stream.keyBy(<key selector>)
  2. .window(<window assigner>)
  3. .aggregate(<window function>)

.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate() 方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式, 而窗口函数的调用方法也不只.aggregate()一种,

窗口分配器(Window Assigners)

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。窗口分配器其实就是在指定窗口的类型。

窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个 WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调 用.windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream

时间窗口

时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。

(1)滚动处理时间窗口

窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()。

  1. stream.keyBy(...)
  2. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  3. .aggregate(...)

.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,这里创建了一个长度为 5 秒的滚动窗口。

.of()还有一个重载方法,可以传入两个 Time 类型的参数:size offset。第一个参 数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。

我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0 点开启窗口,这时是北京时间早上 8 点。为了得到北京时间每天 0 点开启的滚动窗口, 只要设置-8 小时的偏移量就可以了:

  1. .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))

(2)滑动处理时间窗口

窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。

  1. stream.keyBy(...)
  2. .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  3. .aggregate(...)

.of()方法需要传入两个 Time 类型的参数:size slide,前者表示滑动窗口的大小, 后者表示滑动窗口的滑动步长。

这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗 口。

滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全 一致。

(3)处理时间会话窗口

窗口分配器由类 ProcessingTimeSessionWindows 提供,需要调用它的静态方法.withGap() 或者.withDynamicGap()。

  1. stream.keyBy(...)
  2. .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
  3. .aggregate(...)

withGap()方法需要传入一个 Time 类型的参数 size,表示会话的超时时间,也就是最小间隔 session gap。这里创建了静态会话超时时间为 10 秒的会话窗口。

  1. .window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
  2. @Override
  3. public long extract(Tuple2<String, Long> element) {
  4. // 提取 session gap 值返回, 单位毫秒
  5. return element.f0.length() * 1000;
  6. }
  7. }))

withDynamicGap()方法需要传入一个 SessionWindowTimeGapExtractor 作为参数,用来定义 session gap 的动态提取逻辑。在这里提取了数据元素的第一个字段,用它的长度乘以 1000 作为会话超时的间隔。

(4)滚动事件时间窗口

窗口分配器由类 TumblingEventTimeWindows 提供,用法与滚动处理事件窗口完全一致。

  1. stream.keyBy(...)
  2. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  3. .aggregate(...)
  1. .**of**()方法也可以传入第二个参数 **offset**,用于设置窗口起始点的偏移量。

(5)滑动事件时间窗口

窗口分配器由类 SlidingEventTimeWindows 提供,用法与滑动处理事件窗口完全一致。

  1. stream.keyBy(...)
  2. .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  3. .aggregate(...)

(6)事件时间会话窗口

窗口分配器由类 EventTimeSessionWindows 提供,用法与处理事件会话窗口完全一致。

  1. stream.keyBy(...)
  2. .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
  3. .aggregate(...)

计数窗口

计数窗口本身底层是基于全局窗口(Global Window)实现的。Flink 提供了非常方便的接口:直接调用.countWindow()方法。根据分配规则的不同,又可以分为 滚动计数窗口滑动计数窗口两类。

(1)滚动计数窗口

滚动计数窗口只需要传入一个长整型的参数 size,表示窗口的大小。

  1. stream.keyBy(...)
  2. .countWindow(10)

(2)滑动计数窗口

.countWindow()调用时传入两个参数:size slide,前者表示窗口大小,后者表示滑动步长。

  1. stream.keyBy(...)
  2. .countWindow(103)

全局窗口

全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由 GlobalWindows 类提供。

  1. stream.keyBy(...)
  2. .window(GlobalWindows.create());

需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

窗口函数(Window Functions)

在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是“窗口函数”(window functions)。

经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是 DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream 。

时间和窗口 - 图15

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数全窗口函数

增量聚合函数(incremental aggregation functions)

  1. 窗口将数据收集起来,最基本的处理操作当然就是进行聚合。
  2. 每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态,等到窗口到了结束时间需要输出计算结果的时候,再拿出之前聚合的状态直接输出。
  3. 典型的增量聚合函数有两个:**ReduceFunction **和 **AggregateFunction**。

(1)归约函数(ReduceFunction)

(2)聚合函数(AggregateFunction)

全窗口函数(full window functions)

测试水位线和窗口的使用

其他 API

窗口的生命周期