什么是Side Outputs
旁路输出是一种方便的机制,可用于处理
- 迟到的事件
- 异常事件
- 错误
或者用于分割流。
这些旁路输出流可以有不同的数据类型。
仅适用于 ProcessFunctions、Windows(用于延迟事件)和 CEP。
在排序的例子里,我们直接丢掉了迟到的事件。现在我们看另一种处理方案,利用旁路输出。
Side outputs (API)
static final OutputTag<Event> lateEventsTag = newOutputTag<Event>("lateEvents") {};DataStream<Event> input = ...;SingleOutputStreamOperator<Event> sorted = input.keyBy(...).process(new SortFunction());DataStream<Event> lateEvents = sorted.getSideOutput(lateEventsTag);
这还是我们前面看到的排序例子,加入了旁路输出。旁路输出和ProcessFunctions一起使用。旁路输出通过对一个具有名称和类型的OutputTag来引用。两个OutputTag对象如果有相同的名字,会被认为是同一个旁路输出。在一个流上应用process()方法的结果是SingleOutputStreamOperator。从这个Operator可以获得跟他相关的旁路输出。
Context / OnTimerContext API
public abstract class Context {// Timestamp of the element currently being processed or timestamp of a firing timer.public abstract Long timestamp();// TimerService for querying time and registering timers.public abstract TimerService timerService();// ONLY OnTimerContext: The TimeDomain of the firing timer.public abstract TimeDomain timeDomain();// Emits a record to the side output identified by the OutputTag.public abstract <X> void output(OutputTag<X> outputTag, X value);// ONLY w/ KeyedProcessFunctions: Key of the element being processed.public abstract K getCurrentKey();}
传递给 SortFunction 的 Context 有一个 output() 方法,我们可以使用该方法将事件发送到特定的侧输出通道。
Side outputs (API)
public void processElement(Event event, Context context, Collector<Event> out) {TimerService timerService = context.timerService();if (context.timestamp() > timerService.currentWatermark()) {PriorityQueue<Event> queue = queueState.value();if (queue == null) {queue = new PriorityQueue<>(10, new CompareByTimestamp());}queue.add(event);queueState.update(queue);timerService.registerEventTimeTimer(event.timestamp);} else { // event is latecontext.output(lateEventsTag, event);}}
对比前面的实现,我们在 processElement() 方法中所要做的只是添加一个处理延迟事件的 else 子句。context中有一个输出方法用来发送事件到output tag。在这里,我们可以使用与 main() 方法中相同或等效的 OutputTag(取决于该 OutputTag 是否在我们的 process 函数的范围内)。
