• 来源于关系
    1. public abstract class Trigger<T, W extends Window> implements Serializable { ... }

抽象方法

Trigger-1.png

onElement

  • 调用添加到窗格中的每个元素。其结果将确定是否评估窗格以发出结果。

     /**
       * Called for every element that gets added to a pane. The result of this will determine whether
       * the pane is evaluated to emit results.
       * @param element The element that arrived.
       * @param timestamp The timestamp of the element that arrived.
       * @param window The window to which the element is being added.
       * @param ctx A context object that can be used to register timer callbacks.
       */
      public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
              throws Exception;
    

    onProcessingTime

  • 当使用触发器上下文设置的处理时间计时器触发时调用 ```java /**

    • Called when a processing-time timer that was set using the trigger context fires. *
    • @param time The timestamp at which the timer fired.
    • @param window The window for which the timer fired.
    • @param ctx A context object that can be used to register timer callbacks. */ public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
       throws Exception;
      
<a name="zUQfs"></a>
#### onEventTime

-  当使用触发器上下文设置的事件时间计时器触发时调用 
```java
    /**
     * Called when an event-time timer that was set using the trigger context fires.
     *
     * @param time The timestamp at which the timer fired.
     * @param window The window for which the timer fired.
     * @param ctx A context object that can be used to register timer callbacks.
     */
    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)
            throws Exception;

onMerge

  • 当多个窗口已被合并到一个窗口中时调用 ```java /**
    • Called when several windows have been merged into one window by the {@link
    • org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. *
    • @param window The new window that results from the merge.
    • @param ctx A context object that can be used to register timer callbacks and access state. */ public void onMerge(W window, OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException(“This trigger does not support merging.”); }
<a name="xRDMd"></a>
#### clear

- 清除触发器可能仍在给定窗口中保留的所有状态
```java
    /**
     * Clears any state that the trigger might still hold for the given window. This is called when
     * a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and
     * {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as
     * state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
     */
    public abstract void clear(W window, TriggerContext ctx) throws Exception;

canMerge

  • 如果此触发器支持合并触发器状态,则返回true
      /**
       * Returns true if this trigger supports merging of trigger state and can therefore be used with
       * a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.
       *
       * <p>If this returns {@code true} you must properly implement {@link #onMerge(Window,
       * OnMergeContext)}
       */
      public boolean canMerge() {
          return false;
      }
    

具体子类实现

Trigger.pngCountTrigger

    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
            throws Exception {
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }


EventTimeTrigger

  • 如果窗口最大时间戳 <= 当前 Watermark 表示应该触发计算 不然就跳过该事件

      @Override
      public TriggerResult onElement(
              Object element, long timestamp, TimeWindow window, TriggerContext ctx)
              throws Exception {
          if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
              // if the watermark is already past the window fire immediately
              return TriggerResult.FIRE;
          } else {
              ctx.registerEventTimeTimer(window.maxTimestamp());
              return TriggerResult.CONTINUE;
          }
      }
    
  • 触发计算

      @Override
      public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
          return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
      }
    

ProcessingTimeTrigger

  • 之间注册 Timer 注册时间为窗口最大时间

      @Override
      public TriggerResult onElement(
              Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
          ctx.registerProcessingTimeTimer(window.maxTimestamp());
          return TriggerResult.CONTINUE;
      }
    
  • 触发计算

    @Override
      public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
          return TriggerResult.FIRE;
      }