从collector到buffer

下面我们从数据源出开始分析数据是如何写入到Flink缓存中的。
NonTimestampContext.collect方法。该方法位于数据源(SourceFunction)中。

  1. @Override
  2. public void collect(T element) {
  3. synchronized (lock) {
  4. output.collect(reuse.replace(element));
  5. }
  6. }

这里调用的是output对象的collect方法。Output对象是Output>类型。经过debug我们发现这里的output真实类型为CountingOutput类型。
CountingOutput仅仅是一个包装类型,包装了一个Output。相比于其他Output而言多出了收集元素数量的监控。CountingOutput维护了一个计数器类型监控变量:

  1. private final Counter numRecordsOut;


在collect元素的时候调用了numRecordsOut.inc()方法,实现了对收集元素数量的监控。

NoTimestampContext的CountingOuput封装的output是什么类型的呢?我们通过debug查看发现内层的类型为RecordWriterOutput。

RecordWriterOutput的collect方法如下所示:

  1. @Override
  2. public void collect(StreamRecord<OUT> record) {
  3. if (this.outputTag != null) {
  4. // we are not responsible for emitting to the main output.
  5. return;
  6. }
  7. pushToRecordWriter(record);
  8. }

pushToRecordWriter方法使用序列化代理,将record传递给recordWriter。代码如下:

  1. private <X> void pushToRecordWriter(StreamRecord<X> record) {
  2. serializationDelegate.setInstance(record);
  3. try {
  4. recordWriter.emit(serializationDelegate);
  5. }
  6. catch (Exception e) {
  7. throw new RuntimeException(e.getMessage(), e);
  8. }
  9. }

RecordWriter负责把数据序列化,然后写入到缓存中。它有两个实现类:

  • BroadcastRecordWriter: 维护了多个下游channel,发送数据到下游所有的channel中。
  • ChannelSelectorRecordWriter: 通过channelSelector对象判断数据需要发往下游的哪个channel。keyBy算子用的正是这个RecordWriter。

这里我们分析下ChannelSelectorRecordWriter的emit方法:

  1. public void emit(T record) throws IOException {
  2. emit(record, channelSelector.selectChannel(record));
  3. }

很明显这里使用了channelSelector.selectChannel方法。该方法为record和对应下游channel id的函数关系。
接下来我们又回到了父类RecordWriter

  1. protected void emit(T record, int targetSubpartition) throws IOException {
  2. checkErroneous();
  3. // 序列化record,并向指定的目标分区发送
  4. targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);
  5. // 判断是否进行刷新操作
  6. if (flushAlways) {
  7. targetPartition.flush(targetSubpartition);
  8. }
  9. }

关键的逻辑在emitRecord方法内,进一步分析该方法。
ResultPartition用于保存单个task产生的数据。它有四个实现类。
BufferWritingResultPartition:负责将buffers直接ResultSubpartition。BufferWritingResultPartition有两个实现类:
BoundedBlockingResultPartition:保存单个任务的输出数据结果。用于批量数据。
PipelinedResultPartition:用于流式数据。
SortMergeResultPartition:用于排序合并结果数据。

主要查看一下BufferWritingResultPartition的emitRecord方法

  1. @Override
  2. public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
  3. // 将record追加到缓存中
  4. BufferBuilder buffer = appendUnicastDataForNewRecord(record, targetSubpartition);
  5. while (record.hasRemaining()) {
  6. // full buffer, partial record
  7. finishUnicastBufferBuilder(targetSubpartition);
  8. buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition);
  9. }
  10. if (buffer.isFull()) {
  11. // full buffer, full record
  12. finishUnicastBufferBuilder(targetSubpartition);
  13. }
  14. // partial buffer, full record
  15. }

进一步分析appendUnicastDataForNewRecord方法。