- 获取数据 交给
Operator
处理
继承
public final class StreamTaskNetworkInput<T> implements StreamTaskInput<T> { ... }
核心方法
获取数据
@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {
while (true) {
// get the stream element from the deserializer
// 从反序列化器获取流元素
if (currentRecordDeserializer != null) {
DeserializationResult result;
try {
// TODO 当前记录反序列化器 获取下一条数据
result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
} catch (IOException e) {
throw new IOException(
String.format("Can't get next record for channel %s", lastChannel), e);
}
// TODO 缓冲区已消耗
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
// TODO 是完整记录
if (result.isFullRecord()) {
processElement(deserializationDelegate.getInstance(), output);
return InputStatus.MORE_AVAILABLE;
}
}
// TODO 从 InputGate 中获取 下一个 Buffer
Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
// 如果是 Buffer, 则设置 RecordDeserializer 处理下一条记录,否则释放 RecordDeserializer
if (bufferOrEvent.isPresent()) {
// return to the mailbox after receiving a checkpoint barrier to avoid processing of
// data after the barrier before checkpoint is performed for unaligned checkpoint
// mode
if (bufferOrEvent.get().isBuffer()) {
processBuffer(bufferOrEvent.get());
} else {
processEvent(bufferOrEvent.get());
return InputStatus.MORE_AVAILABLE;
}
} else {
if (checkpointedInputGate.isFinished()) {
checkState(
checkpointedInputGate.getAvailableFuture().isDone(),
"Finished BarrierHandler should be available");
return InputStatus.END_OF_INPUT;
}
return InputStatus.NOTHING_AVAILABLE;
}
}
}
不同的数据走不同的 处理模式
// TODO 不同的数据走不同的 处理模式
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
// 如果是 WaterMark 走这边
if (recordOrMark.isRecord()) {
// output 发出记录
output.emitRecord(recordOrMark.asRecord());
} else if (recordOrMark.isWatermark()) {
statusWatermarkValve.inputWatermark(
recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output);
} else if (recordOrMark.isLatencyMarker()) {
output.emitLatencyMarker(recordOrMark.asLatencyMarker());
} else if (recordOrMark.isStreamStatus()) {
statusWatermarkValve.inputStreamStatus(
recordOrMark.asStreamStatus(),
flattenedChannelIndices.get(lastChannel),
output);
} else {
throw new UnsupportedOperationException("Unknown type of StreamElement");
}
}
output 根据 Watermark 发送数据 ? 需要交给
DataOutput
的emitRecord
方法interface DataOutput<T> {
void emitRecord(StreamRecord<T> streamRecord) throws Exception;
void emitWatermark(Watermark watermark) throws Exception;
void emitStreamStatus(StreamStatus streamStatus) throws Exception;
void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception;
}
通过子类具体实现 触发
emitRecord
方法 —->OneInputStreamTask
的emitRecord
方法// TODO 一个流的 处理逻辑
@Override
public void emitRecord(StreamRecord<IN> record) throws Exception {
numRecordsIn.inc();
operator.setKeyContextElement1(record);
operator.processElement(record);
}
这里的
operator.processElement(record);
通过Input
的子类 触发了WindowOperator
的ProcessElement
计算// TODO 处理元素
@Override
public void processElement(StreamRecord<IN> element) throws Exception { ...}