window
countWindow
- 创建全局窗口 根据参数决定走哪个窗口
- countWindow(long size)
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
// 创建一个 countTrigger 来作为触发条件
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
可以看到countWindow创建的是全局窗口GlobalWindows,并指定了触发器PurgingTrigger(全局窗口必须指定触发器,默认是永远不触发的)
public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
private static final long serialVersionUID = 1L;
private Trigger<T, W> nestedTrigger;
private PurgingTrigger(Trigger<T, W> nestedTrigger) {
this.nestedTrigger = nestedTrigger;
}
包装 CountTrigger 并调用Trigger 的抽象方法
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private CountTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
public static <W extends Window> CountTrigger<W> of(long maxCount) {
return new CountTrigger<>(maxCount);
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
onElement()方法,用了一个ReducingStateDescriptor状态数据来对窗口中的数据量进行累加,当数据量达到指定的窗口大小时,就会clear清空状态数据并触发窗口函数。
对于onEventTime()和onProcessingTime()都是返回的TriggerResult.CONTINUE,也就是不触发。
- countWindow(long size, long slide)
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
count Evict
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
if (size <= maxCount) {
return;
} else {
int evictedCount = 0;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
iterator.next();
evictedCount++;
if (evictedCount > size - maxCount) {
break;
} else {
iterator.remove();
}
}
}
}
当元素个数 > 窗口长度时 则会触发evictor
timewindow
- 根据时间语义走 走对应的时间窗口
- timeWindow(Time size,Time slide)
```java
public WindowedStream
timeWindow(Time size, Time slide) {
}if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
public static TumblingProcessingTimeWindows of(Time size) {
return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
}
public static TumblingEventTimeWindows of(Time size) {
return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
}
- timeWindow**(**Time size**)**
```java
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingEventTimeWindows.of(size));
}
}
public static SlidingProcessingTimeWindows of(Time size, Time slide) {
return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
}
public static SlidingEventTimeWindows of(Time size, Time slide) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
}
@PublicEvolving
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
return new WindowedStream<>(this, assigner);
}
public WindowedStream(KeyedStream<T, K> input,
WindowAssigner<? super T, W> windowAssigner) {
this.input = input;
this.windowAssigner = windowAssigner;
this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeTrigger.create();
}
- 根据 时间语义 创建对应的窗口
- TumblingProcessingTimeWindows
- TumblingEventTimeWindows
- SlidingProcessingTimeWindows
- SlidingEventTimeWindows
- 窗口创建成功 封装进 WindowedStream 对象中
- 根据env 获取 默认Trigger
- 返回 WindowedStream 对象
window
- 自定义 需要使用的
WindowAssigner
- 目前支持的有
.window(TumblingEventTimeWindows.of(Time.seconds(1), Time.seconds(9)))
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.window(SlidingEventTimeWindows.of(Time.seconds(1), Time.seconds(1)))
.window(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.seconds(1)))
.window(EventTimeSessionWindows.withGap(Time.seconds(1)))
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1)))
.window(GlobalWindows.create())
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(null))
.window(DynamicEventTimeSessionWindows.withDynamicGap(null))
AllWindow
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(1),Time.seconds(1)))
.windowAll(SlidingEventTimeWindows.of(Time.seconds(1),Time.seconds(1)))
.windowAll(DynamicProcessingTimeSessionWindows.withDynamicGap(null))
.windowAll(DynamicProcessingTimeSessionWindows.withDynamicGap(null))
.window(GlobalWindows.create())
- 全局设置 processTime 设置Watermark后 走 EventTimeWindows 没有问题
全局设置 processTime 不设置Watermark 走 EventTimeWindows 会报错
全局设置 eventTime 不设置Watermark 走 EventTimeWindows 会报错
- 全局设置 eventTime 设置Watermark 走 processTime 没有问题(不走Watermark)