什么是Side Outputs
旁路输出是一种方便的机制,可用于处理
- 迟到的事件
- 异常事件
- 错误
或者用于分割流。
这些旁路输出流可以有不同的数据类型。
仅适用于 ProcessFunctions、Windows(用于延迟事件)和 CEP。
在排序的例子里,我们直接丢掉了迟到的事件。现在我们看另一种处理方案,利用旁路输出。
Side outputs (API)
static final OutputTag<Event> lateEventsTag = new
OutputTag<Event>("lateEvents") {};
DataStream<Event> input = ...;
SingleOutputStreamOperator<Event> sorted = input
.keyBy(...)
.process(new SortFunction());
DataStream<Event> lateEvents = sorted.getSideOutput(lateEventsTag);
这还是我们前面看到的排序例子,加入了旁路输出。旁路输出和ProcessFunctions一起使用。旁路输出通过对一个具有名称和类型的OutputTag来引用。两个OutputTag对象如果有相同的名字,会被认为是同一个旁路输出。在一个流上应用process()方法的结果是SingleOutputStreamOperator。从这个Operator可以获得跟他相关的旁路输出。
Context / OnTimerContext API
public abstract class Context {
// Timestamp of the element currently being processed or timestamp of a firing timer.
public abstract Long timestamp();
// TimerService for querying time and registering timers.
public abstract TimerService timerService();
// ONLY OnTimerContext: The TimeDomain of the firing timer.
public abstract TimeDomain timeDomain();
// Emits a record to the side output identified by the OutputTag.
public abstract <X> void output(OutputTag<X> outputTag, X value);
// ONLY w/ KeyedProcessFunctions: Key of the element being processed.
public abstract K getCurrentKey();
}
传递给 SortFunction 的 Context 有一个 output() 方法,我们可以使用该方法将事件发送到特定的侧输出通道。
Side outputs (API)
public void processElement(Event event, Context context, Collector<Event> out) {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
PriorityQueue<Event> queue = queueState.value();
if (queue == null) {
queue = new PriorityQueue<>(10, new CompareByTimestamp());
}
queue.add(event);
queueState.update(queue);
timerService.registerEventTimeTimer(event.timestamp);
} else { // event is late
context.output(lateEventsTag, event);
}
}
对比前面的实现,我们在 processElement() 方法中所要做的只是添加一个处理延迟事件的 else 子句。context中有一个输出方法用来发送事件到output tag。在这里,我们可以使用与 main() 方法中相同或等效的 OutputTag(取决于该 OutputTag 是否在我们的 process 函数的范围内)。