Storm core 支持处理落在窗口内的一组元组。窗口操作指定了一下两个参数

  1. 1.窗口的长度 - 窗口的长度或持续时间
  2. 2.滑动间隔 - 窗口滑动的时间间隔

滑动窗口

元组被分组在窗口和每个滑动间隔窗口中。 一个元组可以属于多个窗口。

例如一个持续时间长度为 10 秒和滑动间隔 5 秒的滑动窗口。 ........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |... -5 0 5 10 15 -> time |<------- w1 -->| |<---------- w2 ----->| |<-------------- w3 ---->| 窗口每5秒进行一次评估,第一个窗口中的某些元组与第二个窗口重叠。

注意:窗口第一次滑动在 t = 5s,并且将包含在前 5 秒钟内收到的事件。

Tumbling Window

元组根据时间或数量被分组在一个窗口中。任何元组只属于其中一个窗口。

例如一个持续时间长度为 5s 的 tumbling window。

  1. | e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
  2. 0 5 10 15 -> time
  3. w1 w2 w3

窗口每五秒进行一次评估,并且没有窗口重叠。

Storm 支持指定窗口长度和滑动间隔作为元组数的计数或持续时间。 bolt 接口 IWindowedBolt 需要由窗口支持的bolts来实现。

  1. public interface IWindowedBolt extends IComponent {
  2. void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
  3. /**
  4. * Process tuples falling within the window and optionally emit
  5. * new tuples based on the tuples in the input window.
  6. */
  7. void execute(TupleWindow inputWindow);
  8. void cleanup();
  9. }

每次窗口激活时,都会调用 execute 方法。TupleWindow 的参数允许访问窗口中的当前元组,过期的元组以及自上次窗口计算后添加的新元组,这对于高效的窗口计算将是有用的。

需要窗口支持的 Bolts 一般会扩展为 BaseWindowedBolt,它有用来指定窗口长度和滑动间隔的apis.

例如

  1. public class SlidingWindowBolt extends BaseWindowedBolt {
  2. private OutputCollector collector;
  3. @Override
  4. public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  5. this.collector = collector;
  6. }
  7. @Override
  8. public void execute(TupleWindow inputWindow) {
  9. for(Tuple tuple: inputWindow.get()) {
  10. // do the windowing computation
  11. ...
  12. }
  13. // emit the results
  14. collector.emit(new Values(computedValue));
  15. }
  16. }
  17. public static void main(String[] args) {
  18. TopologyBuilder builder = new TopologyBuilder();
  19. builder.setSpout("spout", new RandomSentenceSpout(), 1);
  20. builder.setBolt("slidingwindowbolt",
  21. new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),
  22. 1).shuffleGrouping("spout");
  23. Config conf = new Config();
  24. conf.setDebug(true);
  25. conf.setNumWorkers(1);
  26. StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
  27. }

支持以下窗口配置

  1. withWindow(Count windowLength, Count slidingInterval)
  2. 基于元组计数的滑动窗口,在多个tuples进行 `slidingInterval`滑动之后。
  3. withWindow(Count windowLength)
  4. 基于元组计数的窗口,它与每个传入的元组一起滑动。
  5. withWindow(Count windowLength, Duration slidingInterval)
  6. 基于元组计数的滑动窗口,在`slidingInterval`持续时间滑动之后。
  7. withWindow(Duration windowLength, Duration slidingInterval)
  8. 基于持续时间的滑动窗口,在`slidingInterval`持续时间滑动之后。
  9. withWindow(Duration windowLength)
  10. 基于持续时间的窗口,它与每个传入的元组一起滑动。
  11. withWindow(Duration windowLength, Count slidingInterval)
  12. 基于时间的滑动窗口配置在`slidingInterval`多个元组之后滑动。
  13. withTumblingWindow(BaseWindowedBolt.Count count)
  14. 计数的tumbling窗口在指定的元组数之后tumbles.
  15. withTumblingWindow(BaseWindowedBolt.Duration duration)
  16. 基于持续时间的tumbling窗口在指定的持续时间后tumbles

元组时间戳和乱序元组

默认情况下,在窗口中追踪的时间戳是 bolt 处理元组的时间。窗口计算是根据正在处理的时间戳进行的。 Storm 支持基于源生成的时间戳的追踪窗口。

  1. /**
  2. * Specify a field in the tuple that represents the timestamp as a long value. If this
  3. * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
  4. *
  5. * @param fieldName the name of the field that contains the timestamp
  6. */
  7. public BaseWindowedBolt withTimestampField(String fieldName)

上述fieldName的值将从传入的元组中查找并考虑进行窗口计算。如果该元组中不存在该字段,将抛出异常。或者,TimestampExtractor可以用于从元组导出时间戳值(例如,从元组中的嵌套字段提取时间戳)。

  1. /**
  2. * Specify the timestamp extractor implementation.
  3. *
  4. * @param timestampExtractor the {@link TimestampExtractor} implementation
  5. */
  6. public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor)

与时间戳字段 name/extractor 一起,可以指定一个时间滞后参数,它指示具有无序时间戳的元组的最大时间限制。

  1. /**
  2. * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
  3. * cannot be out of order by more than this amount.
  4. *
  5. * @param duration the max lag duration
  6. */
  7. public BaseWindowedBolt withLag(Duration duration)

例如:如果滞后是5秒,并且元组t1到达时间戳为06:00:05没有元组可能会在早于06:00:00的元组时间戳到达。 如果一个元组在t1之后到达时间戳05:59:59,并且窗口已经移动过t1了,它将被视为迟到的元组。 默认情况下不处理迟到的元组,只需在INFO级别打印到工作日志文件。 ```java /* Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the {@link org.apache.storm.topology.WindowedBoltExecutor#LATETUPLE_FIELD} field. It must be defined on a per-component basis, and in conjunction with the {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown. _ @param streamId the name of the stream used to emit late tuples on / public BaseWindowedBolt withLateTupleStream(String streamId)

  1. 通过指定上述 `streamId` 来更改此行为。 在这种情况下,迟到的元组将在指定的流中发出并可通过`WindowedBoltExecutor.LATE_TUPLE_FIELD` 访问
  2. 字段。
  3. ### Watermarks
  4. 为了处理具有时间戳字段的元组,storm 根据传入的元组时间戳内部计算 watermarksWatermark 是所有输入流中最新的元组时间戳(减去滞后)的最小值。在较高级别,watermark类似于 Flink Google MillWheel 用于跟踪基于事件的时间戳的概念。
  5. 定期的(默认每秒),watermark时间戳被发出,如果基于元组的时间戳被使用,这被认为是窗口计算的 clock tick(时钟勾)。可以用下面的api来改变发出 watermarks 的时间间隔。
  6. ```java
  7. /**
  8. * Specify the watermark event generation interval. For tuple based timestamps, watermark events
  9. * are used to track the progress of time
  10. *
  11. * @param interval the interval at which watermark events are generated
  12. */
  13. public BaseWindowedBolt withWatermarkInterval(Duration interval)

当接收到watermark时,将对所有时间戳记进行评估。

例如,考虑具有以下窗口参数基于元组的时间戳处理,

Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s

  1. |-----|-----|-----|-----|-----|-----|-----|
  2. 0 10 20 30 40 50 60 70

当前 ts = 09:00:00

9:00:009:00:01收到的元组e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)

在time t = 09:00:01, watermark w1 = 6:00:31被发出,没有早于6:00:31的元组可以到达。

三个窗口将被评估。通过采取最早的事件时间戳(06:00:03)并基于滑动间隔(10s)计算上限来计算第一个窗口结束在 ts(06:00:10)。

  1. 5:59:50 - 06:00:10 有元组 e1, e2, e3
  2. 6:00:00 - 06:00:20 有元组 e1, e2, e3, e4
  3. 6:00:10 - 06:00:30 有元组 e4, e5

e6未被评估,因为 watermark 时间戳6:00:31比元组 ts6:00:36更旧。

9:00:019:00:02之间,接收到的元组e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)

在 time t = 09:00:02另一个 watermark w2 = 08:00:34被发出,没有元组比8:00:34更早到达。

三个窗口将被评估

  1. 6:00:20 - 06:00:40 有元组 e5, e6 (从早期批次)
  2. 6:00:30 - 06:00:50 有元组 e6 (从早期批次)
  3. 8:00:10 - 08:00:30 有元组 e7, e8, e9

e10 不被评估,因为元组 ts 8:00:39超出了watermark time 8:00:34.

窗口计算考虑时间间隔,并基于元组时间戳计算窗口。

Guarantees

storm core的窗口功能目前提供一致性保证。执行(TupleWindow inputWindow)方法发出的值将自动锁定到 inputWindow 中的所有元组。预计下游 bolts 将确认接收的元组(即从窗口 bolt 发出的元组)以完成元组树。如果不是,元组将重播,并且重新评估窗口计算。

窗口中的元组会在过期后被自动确认,即当它们在windowLength + slidingInterval之后从窗口中滑落出来。请注意,配置topology.message.timeout.secs应该远远超过基于时间窗口的windowLength + slidingInterval; 否则元组将超时并重播,并可能导致重复的评估。对于基于计数的窗口,应该调整配置,使得在超时时间段内可以接收到windowLength + slidingInterval元组。

拓扑示例

示例拓扑滑动窗口拓扑显示了如何使用apis来计算滑动窗口总和和滚动窗口平均值。