从collector到buffer
下面我们从数据源出开始分析数据是如何写入到Flink缓存中的。
NonTimestampContext.collect方法。该方法位于数据源(SourceFunction)中。
@Override
public void collect(T element) {
synchronized (lock) {
output.collect(reuse.replace(element));
}
}
这里调用的是output对象的collect方法。Output对象是Output
CountingOutput仅仅是一个包装类型,包装了一个Output。相比于其他Output而言多出了收集元素数量的监控。CountingOutput维护了一个计数器类型监控变量:
private final Counter numRecordsOut;
在collect元素的时候调用了numRecordsOut.inc()方法,实现了对收集元素数量的监控。
NoTimestampContext的CountingOuput封装的output是什么类型的呢?我们通过debug查看发现内层的类型为RecordWriterOutput。
RecordWriterOutput的collect方法如下所示:
@Override
public void collect(StreamRecord<OUT> record) {
if (this.outputTag != null) {
// we are not responsible for emitting to the main output.
return;
}
pushToRecordWriter(record);
}
pushToRecordWriter方法使用序列化代理,将record传递给recordWriter。代码如下:
private <X> void pushToRecordWriter(StreamRecord<X> record) {
serializationDelegate.setInstance(record);
try {
recordWriter.emit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
RecordWriter负责把数据序列化,然后写入到缓存中。它有两个实现类:
- BroadcastRecordWriter: 维护了多个下游channel,发送数据到下游所有的channel中。
- ChannelSelectorRecordWriter: 通过channelSelector对象判断数据需要发往下游的哪个channel。keyBy算子用的正是这个RecordWriter。
这里我们分析下ChannelSelectorRecordWriter的emit方法:
public void emit(T record) throws IOException {
emit(record, channelSelector.selectChannel(record));
}
很明显这里使用了channelSelector.selectChannel方法。该方法为record和对应下游channel id的函数关系。
接下来我们又回到了父类RecordWriter。
protected void emit(T record, int targetSubpartition) throws IOException {
checkErroneous();
// 序列化record,并向指定的目标分区发送
targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);
// 判断是否进行刷新操作
if (flushAlways) {
targetPartition.flush(targetSubpartition);
}
}
关键的逻辑在emitRecord方法内,进一步分析该方法。
ResultPartition用于保存单个task产生的数据。它有四个实现类。
�BufferWritingResultPartition:负责将buffers直接ResultSubpartition。BufferWritingResultPartition有两个实现类:
BoundedBlockingResultPartition:保存单个任务的输出数据结果。用于批量数据。
PipelinedResultPartition:用于流式数据。
SortMergeResultPartition:用于排序合并结果数据。
主要查看一下BufferWritingResultPartition的emitRecord方法
@Override
public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
// 将record追加到缓存中
BufferBuilder buffer = appendUnicastDataForNewRecord(record, targetSubpartition);
while (record.hasRemaining()) {
// full buffer, partial record
finishUnicastBufferBuilder(targetSubpartition);
buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition);
}
if (buffer.isFull()) {
// full buffer, full record
finishUnicastBufferBuilder(targetSubpartition);
}
// partial buffer, full record
}
进一步分析appendUnicastDataForNewRecord方法。
�