• 获取数据 交给 Operator 处理

继承

  1. public final class StreamTaskNetworkInput<T> implements StreamTaskInput<T> { ... }

核心方法

获取数据

  1. @Override
  2. public InputStatus emitNext(DataOutput<T> output) throws Exception {
  3. while (true) {
  4. // get the stream element from the deserializer
  5. // 从反序列化器获取流元素
  6. if (currentRecordDeserializer != null) {
  7. DeserializationResult result;
  8. try {
  9. // TODO 当前记录反序列化器 获取下一条数据
  10. result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
  11. } catch (IOException e) {
  12. throw new IOException(
  13. String.format("Can't get next record for channel %s", lastChannel), e);
  14. }
  15. // TODO 缓冲区已消耗
  16. if (result.isBufferConsumed()) {
  17. currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
  18. currentRecordDeserializer = null;
  19. }
  20. // TODO 是完整记录
  21. if (result.isFullRecord()) {
  22. processElement(deserializationDelegate.getInstance(), output);
  23. return InputStatus.MORE_AVAILABLE;
  24. }
  25. }
  26. // TODO 从 InputGate 中获取 下一个 Buffer
  27. Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
  28. // 如果是 Buffer, 则设置 RecordDeserializer 处理下一条记录,否则释放 RecordDeserializer
  29. if (bufferOrEvent.isPresent()) {
  30. // return to the mailbox after receiving a checkpoint barrier to avoid processing of
  31. // data after the barrier before checkpoint is performed for unaligned checkpoint
  32. // mode
  33. if (bufferOrEvent.get().isBuffer()) {
  34. processBuffer(bufferOrEvent.get());
  35. } else {
  36. processEvent(bufferOrEvent.get());
  37. return InputStatus.MORE_AVAILABLE;
  38. }
  39. } else {
  40. if (checkpointedInputGate.isFinished()) {
  41. checkState(
  42. checkpointedInputGate.getAvailableFuture().isDone(),
  43. "Finished BarrierHandler should be available");
  44. return InputStatus.END_OF_INPUT;
  45. }
  46. return InputStatus.NOTHING_AVAILABLE;
  47. }
  48. }
  49. }

不同的数据走不同的 处理模式

  1. // TODO 不同的数据走不同的 处理模式
  2. private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
  3. // 如果是 WaterMark 走这边
  4. if (recordOrMark.isRecord()) {
  5. // output 发出记录
  6. output.emitRecord(recordOrMark.asRecord());
  7. } else if (recordOrMark.isWatermark()) {
  8. statusWatermarkValve.inputWatermark(
  9. recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output);
  10. } else if (recordOrMark.isLatencyMarker()) {
  11. output.emitLatencyMarker(recordOrMark.asLatencyMarker());
  12. } else if (recordOrMark.isStreamStatus()) {
  13. statusWatermarkValve.inputStreamStatus(
  14. recordOrMark.asStreamStatus(),
  15. flattenedChannelIndices.get(lastChannel),
  16. output);
  17. } else {
  18. throw new UnsupportedOperationException("Unknown type of StreamElement");
  19. }
  20. }
  • output 根据 Watermark 发送数据 ? 需要交给 DataOutputemitRecord 方法

    1. interface DataOutput<T> {
    2. void emitRecord(StreamRecord<T> streamRecord) throws Exception;
    3. void emitWatermark(Watermark watermark) throws Exception;
    4. void emitStreamStatus(StreamStatus streamStatus) throws Exception;
    5. void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception;
    6. }
  • 通过子类具体实现 触发 emitRecord 方法 —-> OneInputStreamTaskemitRecord 方法

    1. // TODO 一个流的 处理逻辑
    2. @Override
    3. public void emitRecord(StreamRecord<IN> record) throws Exception {
    4. numRecordsIn.inc();
    5. operator.setKeyContextElement1(record);
    6. operator.processElement(record);
    7. }
  • 这里的 operator.processElement(record); 通过 Input 的子类 触发了 WindowOperatorProcessElement 计算

    1. // TODO 处理元素
    2. @Override
    3. public void processElement(StreamRecord<IN> element) throws Exception { ...}