- 来源于关系
public abstract class Trigger<T, W extends Window> implements Serializable { ... }
抽象方法
onElement
调用添加到窗格中的每个元素。其结果将确定是否评估窗格以发出结果。
/** * Called for every element that gets added to a pane. The result of this will determine whether * the pane is evaluated to emit results. * @param element The element that arrived. * @param timestamp The timestamp of the element that arrived. * @param window The window to which the element is being added. * @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
onProcessingTime
当使用触发器上下文设置的处理时间计时器触发时调用 ```java /**
- Called when a processing-time timer that was set using the trigger context fires. *
- @param time The timestamp at which the timer fired.
- @param window The window for which the timer fired.
- @param ctx A context object that can be used to register timer callbacks.
*/
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
throws Exception;
<a name="zUQfs"></a>
#### onEventTime
- 当使用触发器上下文设置的事件时间计时器触发时调用
```java
/**
* Called when an event-time timer that was set using the trigger context fires.
*
* @param time The timestamp at which the timer fired.
* @param window The window for which the timer fired.
* @param ctx A context object that can be used to register timer callbacks.
*/
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)
throws Exception;
onMerge
- 当多个窗口已被合并到一个窗口中时调用
```java
/**
- Called when several windows have been merged into one window by the {@link
- org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. *
- @param window The new window that results from the merge.
- @param ctx A context object that can be used to register timer callbacks and access state. */ public void onMerge(W window, OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException(“This trigger does not support merging.”); }
<a name="xRDMd"></a>
#### clear
- 清除触发器可能仍在给定窗口中保留的所有状态
```java
/**
* Clears any state that the trigger might still hold for the given window. This is called when
* a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and
* {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as
* state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
*/
public abstract void clear(W window, TriggerContext ctx) throws Exception;
canMerge
- 如果此触发器支持合并触发器状态,则返回true
/** * Returns true if this trigger supports merging of trigger state and can therefore be used with * a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}. * * <p>If this returns {@code true} you must properly implement {@link #onMerge(Window, * OnMergeContext)} */ public boolean canMerge() { return false; }
具体子类实现
CountTrigger
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
EventTimeTrigger
如果窗口最大时间戳 <= 当前 Watermark 表示应该触发计算 不然就跳过该事件
@Override public TriggerResult onElement( Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } }
触发计算
@Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; }
ProcessingTimeTrigger
之间注册 Timer 注册时间为窗口最大时间
@Override public TriggerResult onElement( Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; }
触发计算
@Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE; }