什么是Side Outputs

旁路输出是一种方便的机制,可用于处理

  • 迟到的事件
  • 异常事件
  • 错误

或者用于分割流。

这些旁路输出流可以有不同的数据类型。

仅适用于 ProcessFunctions、Windows(用于延迟事件)和 CEP。

在排序的例子里,我们直接丢掉了迟到的事件。现在我们看另一种处理方案,利用旁路输出。
image.png

Side outputs (API)

  1. static final OutputTag<Event> lateEventsTag = new
  2. OutputTag<Event>("lateEvents") {};
  3. DataStream<Event> input = ...;
  4. SingleOutputStreamOperator<Event> sorted = input
  5. .keyBy(...)
  6. .process(new SortFunction());
  7. DataStream<Event> lateEvents = sorted.getSideOutput(lateEventsTag);

这还是我们前面看到的排序例子,加入了旁路输出。旁路输出和ProcessFunctions一起使用。旁路输出通过对一个具有名称和类型的OutputTag来引用。两个OutputTag对象如果有相同的名字,会被认为是同一个旁路输出。在一个流上应用process()方法的结果是SingleOutputStreamOperator。从这个Operator可以获得跟他相关的旁路输出。

Context / OnTimerContext API

  1. public abstract class Context {
  2. // Timestamp of the element currently being processed or timestamp of a firing timer.
  3. public abstract Long timestamp();
  4. // TimerService for querying time and registering timers.
  5. public abstract TimerService timerService();
  6. // ONLY OnTimerContext: The TimeDomain of the firing timer.
  7. public abstract TimeDomain timeDomain();
  8. // Emits a record to the side output identified by the OutputTag.
  9. public abstract <X> void output(OutputTag<X> outputTag, X value);
  10. // ONLY w/ KeyedProcessFunctions: Key of the element being processed.
  11. public abstract K getCurrentKey();
  12. }

传递给 SortFunction 的 Context 有一个 output() 方法,我们可以使用该方法将事件发送到特定的侧输出通道。

Side outputs (API)

  1. public void processElement(Event event, Context context, Collector<Event> out) {
  2. TimerService timerService = context.timerService();
  3. if (context.timestamp() > timerService.currentWatermark()) {
  4. PriorityQueue<Event> queue = queueState.value();
  5. if (queue == null) {
  6. queue = new PriorityQueue<>(10, new CompareByTimestamp());
  7. }
  8. queue.add(event);
  9. queueState.update(queue);
  10. timerService.registerEventTimeTimer(event.timestamp);
  11. } else { // event is late
  12. context.output(lateEventsTag, event);
  13. }
  14. }

对比前面的实现,我们在 processElement() 方法中所要做的只是添加一个处理延迟事件的 else 子句。context中有一个输出方法用来发送事件到output tag。在这里,我们可以使用与 main() 方法中相同或等效的 OutputTag(取决于该 OutputTag 是否在我们的 process 函数的范围内)。