继承
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {...}
WindowAssigner
assignWindows
分配窗口
TumblingWindow
TumblingEventTimeWindows
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
if (staggerOffset == null) {
staggerOffset =
windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
}
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start =
TimeWindow.getWindowStartWithOffset(
timestamp, (globalOffset + staggerOffset) % size, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException(
"Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
+ "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
+ "'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
TumblingProcessingTimeWindows
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
if (staggerOffset == null) {
staggerOffset =
windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
}
long start =
TimeWindow.getWindowStartWithOffset(
now, (globalOffset + staggerOffset) % size, size);
return Collections.singletonList(new TimeWindow(start, start + size));
}
SlidingWindows
SlidingEventTimeWindows
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart; start > timestamp - size; start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException(
"Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
+ "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
+ "'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
SlidingProcessingTimeWindows
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
timestamp = context.getCurrentProcessingTime();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart; start > timestamp - size; start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
}
SessionWindow <- Abstract -> MergingWindowAssigner
EventTimeSessionWindows
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}
ProcessingTimeSessionWindows
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
// 获取当前处理时间
long currentProcessingTime = context.getCurrentProcessingTime();
// 通过 Collections.创建单例集合 TimeWindow 窗口时间为 : 当前处理时间 , 当前处理时间 + 会话时间
return Collections.singletonList(
new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}
DynamicEventTimeSessionWindows
@Override
public Collection<TimeWindow> assignWindows(
T element, long timestamp, WindowAssignerContext context) {
long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
if (sessionTimeout <= 0) {
throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
}
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}
DynamicProcessingTimeSessionWindows
@Override
public Collection<TimeWindow> assignWindows(
T element, long timestamp, WindowAssignerContext context) {
long currentProcessingTime = context.getCurrentProcessingTime();
long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
if (sessionTimeout <= 0) {
throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
}
return Collections.singletonList(
new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}
GlobalWindows
@Override
public Collection<GlobalWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(GlobalWindow.get());
}