window

countWindow

  • 创建全局窗口 根据参数决定走哪个窗口
  • countWindow(long size)
    1. public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
    2. // 创建一个 countTrigger 来作为触发条件
    3. return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    4. }

    可以看到countWindow创建的是全局窗口GlobalWindows,并指定了触发器PurgingTrigger(全局窗口必须指定触发器,默认是永远不触发的)

  1. public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
  2. private static final long serialVersionUID = 1L;
  3. private Trigger<T, W> nestedTrigger;
  4. private PurgingTrigger(Trigger<T, W> nestedTrigger) {
  5. this.nestedTrigger = nestedTrigger;
  6. }

包装 CountTrigger 并调用Trigger 的抽象方法

  1. public class CountTrigger<W extends Window> extends Trigger<Object, W> {
  2. private static final long serialVersionUID = 1L;
  3. private final long maxCount;
  4. private final ReducingStateDescriptor<Long> stateDesc =
  5. new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
  6. private CountTrigger(long maxCount) {
  7. this.maxCount = maxCount;
  8. }
  9. @Override
  10. public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
  11. ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
  12. count.add(1L);
  13. if (count.get() >= maxCount) {
  14. count.clear();
  15. return TriggerResult.FIRE;
  16. }
  17. return TriggerResult.CONTINUE;
  18. }
  19. public static <W extends Window> CountTrigger<W> of(long maxCount) {
  20. return new CountTrigger<>(maxCount);
  21. }
  22. private static class Sum implements ReduceFunction<Long> {
  23. private static final long serialVersionUID = 1L;
  24. @Override
  25. public Long reduce(Long value1, Long value2) throws Exception {
  26. return value1 + value2;
  27. }
  28. }

onElement()方法,用了一个ReducingStateDescriptor状态数据来对窗口中的数据量进行累加,当数据量达到指定的窗口大小时,就会clear清空状态数据并触发窗口函数。

对于onEventTime()和onProcessingTime()都是返回的TriggerResult.CONTINUE,也就是不触发。


  • countWindow(long size, long slide)
    1. public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
    2. return window(GlobalWindows.create())
    3. .evictor(CountEvictor.of(size))
    4. .trigger(CountTrigger.of(slide));
    5. }

    count Evict

  1. private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
  2. if (size <= maxCount) {
  3. return;
  4. } else {
  5. int evictedCount = 0;
  6. for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
  7. iterator.next();
  8. evictedCount++;
  9. if (evictedCount > size - maxCount) {
  10. break;
  11. } else {
  12. iterator.remove();
  13. }
  14. }
  15. }
  16. }

当元素个数 > 窗口长度时 则会触发evictor


timewindow

  • 根据时间语义走 走对应的时间窗口
  • timeWindow(Time size,Time slide) ```java public WindowedStream timeWindow(Time size, Time slide) {
    1. if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
    2. return window(SlidingProcessingTimeWindows.of(size, slide));
    3. } else {
    4. return window(SlidingEventTimeWindows.of(size, slide));
    5. }
    }
public static TumblingProcessingTimeWindows of(Time size) {
    return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
}

public static TumblingEventTimeWindows of(Time size) {
    return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
}

- timeWindow**(**Time size**)**
```java
    public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
            return window(TumblingProcessingTimeWindows.of(size));
        } else {
            return window(TumblingEventTimeWindows.of(size));
        }
    }


    public static SlidingProcessingTimeWindows of(Time size, Time slide) {
        return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
    }

    public static SlidingEventTimeWindows of(Time size, Time slide) {
        return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
    }
    @PublicEvolving
    public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
        return new WindowedStream<>(this, assigner);
    }
    public WindowedStream(KeyedStream<T, K> input,
            WindowAssigner<? super T, W> windowAssigner) {
        this.input = input;
        this.windowAssigner = windowAssigner;
        this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
    }
    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
  • 根据 时间语义 创建对应的窗口
    • TumblingProcessingTimeWindows
    • TumblingEventTimeWindows
    • SlidingProcessingTimeWindows
    • SlidingEventTimeWindows
  • 窗口创建成功 封装进 WindowedStream 对象中
    • 根据env 获取 默认Trigger
    • 返回 WindowedStream 对象

window

  • 自定义 需要使用的 WindowAssigner
  • 目前支持的有

Flink Source code - 图1

.window(TumblingEventTimeWindows.of(Time.seconds(1), Time.seconds(9)))
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.window(SlidingEventTimeWindows.of(Time.seconds(1), Time.seconds(1)))
.window(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.seconds(1)))
.window(EventTimeSessionWindows.withGap(Time.seconds(1)))
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1)))
.window(GlobalWindows.create())
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(null))
.window(DynamicEventTimeSessionWindows.withDynamicGap(null))

AllWindow

.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(1),Time.seconds(1)))
.windowAll(SlidingEventTimeWindows.of(Time.seconds(1),Time.seconds(1)))
.windowAll(DynamicProcessingTimeSessionWindows.withDynamicGap(null))
.windowAll(DynamicProcessingTimeSessionWindows.withDynamicGap(null))
.window(GlobalWindows.create())
  • 全局设置 processTime 设置Watermark后 走 EventTimeWindows 没有问题
  • 全局设置 processTime 不设置Watermark 走 EventTimeWindows 会报错

  • 全局设置 eventTime 不设置Watermark 走 EventTimeWindows 会报错

  • 全局设置 eventTime 设置Watermark 走 processTime 没有问题(不走Watermark)