- 获取数据 交给
Operator处理
继承
public final class StreamTaskNetworkInput<T> implements StreamTaskInput<T> { ... }
核心方法
获取数据
@Overridepublic InputStatus emitNext(DataOutput<T> output) throws Exception {while (true) {// get the stream element from the deserializer// 从反序列化器获取流元素if (currentRecordDeserializer != null) {DeserializationResult result;try {// TODO 当前记录反序列化器 获取下一条数据result = currentRecordDeserializer.getNextRecord(deserializationDelegate);} catch (IOException e) {throw new IOException(String.format("Can't get next record for channel %s", lastChannel), e);}// TODO 缓冲区已消耗if (result.isBufferConsumed()) {currentRecordDeserializer.getCurrentBuffer().recycleBuffer();currentRecordDeserializer = null;}// TODO 是完整记录if (result.isFullRecord()) {processElement(deserializationDelegate.getInstance(), output);return InputStatus.MORE_AVAILABLE;}}// TODO 从 InputGate 中获取 下一个 BufferOptional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();// 如果是 Buffer, 则设置 RecordDeserializer 处理下一条记录,否则释放 RecordDeserializerif (bufferOrEvent.isPresent()) {// return to the mailbox after receiving a checkpoint barrier to avoid processing of// data after the barrier before checkpoint is performed for unaligned checkpoint// modeif (bufferOrEvent.get().isBuffer()) {processBuffer(bufferOrEvent.get());} else {processEvent(bufferOrEvent.get());return InputStatus.MORE_AVAILABLE;}} else {if (checkpointedInputGate.isFinished()) {checkState(checkpointedInputGate.getAvailableFuture().isDone(),"Finished BarrierHandler should be available");return InputStatus.END_OF_INPUT;}return InputStatus.NOTHING_AVAILABLE;}}}
不同的数据走不同的 处理模式
// TODO 不同的数据走不同的 处理模式private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {// 如果是 WaterMark 走这边if (recordOrMark.isRecord()) {// output 发出记录output.emitRecord(recordOrMark.asRecord());} else if (recordOrMark.isWatermark()) {statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output);} else if (recordOrMark.isLatencyMarker()) {output.emitLatencyMarker(recordOrMark.asLatencyMarker());} else if (recordOrMark.isStreamStatus()) {statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(),flattenedChannelIndices.get(lastChannel),output);} else {throw new UnsupportedOperationException("Unknown type of StreamElement");}}
output 根据 Watermark 发送数据 ? 需要交给
DataOutput的emitRecord方法interface DataOutput<T> {void emitRecord(StreamRecord<T> streamRecord) throws Exception;void emitWatermark(Watermark watermark) throws Exception;void emitStreamStatus(StreamStatus streamStatus) throws Exception;void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception;}
通过子类具体实现 触发
emitRecord方法 —->OneInputStreamTask的emitRecord方法// TODO 一个流的 处理逻辑@Overridepublic void emitRecord(StreamRecord<IN> record) throws Exception {numRecordsIn.inc();operator.setKeyContextElement1(record);operator.processElement(record);}
这里的
operator.processElement(record);通过Input的子类 触发了WindowOperator的ProcessElement计算// TODO 处理元素@Overridepublic void processElement(StreamRecord<IN> element) throws Exception { ...}
