flink operator之间数据流动分为三种情况:
- operator chain之间流动,相当于方法调用。
- 同一个TM内,不同task之间流动,使用LocalInputChannel
- 不同TM之间,使用RemoteInputChannel
由于数据交互的情况非常多,这里主要分析OneInputStreamTask第三种情况。
Shuffle
首先需要了解下flink的shuffle实现,详细看FLIP-31: Pluggable Shuffle Service - Apache Flink - Apache Software Foundation
目前的shuffle实现为NettyShuffleMaster和NettyShuffleEnvironment。

Lifecycle management.
Shuffle Input/Output management.
Result partition management.
Input gate management.
createInputGates
updatePartitionInfo
数据输入角度
StreamTaskStreamInputProcessor
- 最上层StreamTaskStreamInputProcessor接口在StreamTask MailboxLoop(TM里介绍的runMailboxLoop)里调用,此处分析StreamOneInputProcessor
StreamOneInputProcessor包含
private StreamTaskInput<IN> input;private DataOutput<IN> output;
两个成员变量。StreamTaskInput
StreamTaskInput接口实现PushingAsyncDataInput接口,用于operator输入,接口主方法是
DataInputStatus emitNext(DataOutput<T> output) throws Exception;
。- DataOutput接口输出从StreamTaskInput里emit的element,如Record、Watermark等。如果element是Record,则调用
operator.processElement(record);
。 - StreamTaskInput主要实现为StreamTaskSourceInput和StreamTaskNetworkInput,分别用于source输入和后续task输入。
此处我们分析StreamTaskNetworkInput,里面包含CheckpointedInputGate成员变量。CheckpointedInputGate是InputGate的包装类,用来处理来自InputGate的CheckpointBarrier。数据最终来源自InputGate。调用链是在在emitNext方法里调用InputGate#pollNext
InputGate
- 在InputGate的注释里可以清晰的看到InputGate的设计。
InputGate实现了PullingAsyncDataInput接口,
Optional<T> pollNext() throws Exception;
方法继承自此接口。PullingAsyncDataInput和PushingAsyncDataInput的作用区别可以看PushingAsyncDataInput类注释。 The variant of PullingAsyncDataInput that is defined for handling both network input and source input in a unified way via emitNext(PushingAsyncDataInput.DataOutput) instead of returning Optional.empty() via PullingAsyncDataInput.pollNext(). 也就是说source input一般是pull,network input一般是push,加装了一层PushingAsyncDataInput对下层统一了两种input的处理。
InputGate主要两个子类SingleInputGate和UnionInputGate。Stream场景里通常只会用到SingleInputGate。
- SingleInputGate主要成员变量是
private final Map<SubpartitionInfo, InputChannel> inputChannels;
private final PrioritizedDeque<InputChannel> inputChannelsWithData = new PrioritizedDeque<>();
pollNext()
方法就是通过inputChannelsWithData.poll()获取有数据的channel,然后InputChannel.BufferAndAvailability来获取下一个element。- inputChannelsWithData队列通过InputChannel#notifyPriorityEvent回调SingleInputGate#notifyPriorityEvent add channel。
此处InputChannel我们分析的实现为RemoteInputChannel, 也就是文章开头提到的TM之间数据流动的方式。
InputChannel
- InputChannel注释
An input channel consumes a single ResultSubpartitionView. For each channel, the consumption life cycle is as follows: requestSubpartition() getNextBuffer() releaseAllResources()
每个InputChannel实际对应上一个task的ResultSubpartitionView。
- requestSubpartition()方法请求对应的Subpartition。
在获取到数据之前,要先告诉上游的TM当前task需要的subPartition。这一步是在TaskInvokable#restore初始化任务时调用的。此处也就是StreamTask#restore。 - getNextBuffer获取下一个Buffer,buffer包装了MemorySegment。具体在内存管理分析。
- requestSubpartition()
- 创建NettyPartitionRequestClient。最终通过NettyClient connect到上游的NettyServer。
SocketChannel接收的数据处理的逻辑在CreditBasedPartitionRequestClientHandler中。
Channel channel = nettyClient.connect(connectionId.getAddress()).sync().channel(); NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class); return new NettyPartitionRequestClient(channel, clientHandler, connectionId, this); ``` }private NettyPartitionRequestClient connect(ConnectionID connectionId)
throws RemoteTransportException, InterruptedException {
ChannelFuture connect(final InetSocketAddress serverSocketAddress) {
```
bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
// SSL handler should be added first in the pipeline
if (clientSSLFactory != null) {
SslHandler sslHandler =
clientSSLFactory.createNettySSLHandler(
channel.alloc(),
serverSocketAddress.getAddress().getCanonicalHostName(),
serverSocketAddress.getPort());
channel.pipeline().addLast("ssl", sslHandler);
}
//channel handler,真正的数据处理逻辑
channel.pipeline().addLast(protocol.getClientChannelHandlers());
}
});
try {
return bootstrap.connect(serverSocketAddress);
} catch (ChannelException e) {
```
}
} //Client ChannelHandler public ChannelHandler[] getClientChannelHandlers() { NetworkClientHandler networkClientHandler = new CreditBasedPartitionRequestClientHandler(); //ChannelPipeline中添加了三个Handler, //第一个是messageEncoder,用来写encode PartitionRequest //NettyMessageClientDecoderDelegate用来写decode收到的netty ByteBuf //networkClientHandler业务处理 return new ChannelHandler[] { messageEncoder, new NettyMessageClientDecoderDelegate(networkClientHandler), networkClientHandler }; }
2. NettyPartitionRequestClient向上游NettyServer发送PartitionRequest消息。<br />通过netty channel向NettyServer发送PartitionRequest。
```java
public void requestSubpartition(
final ResultPartitionID partitionId,
final int subpartitionIndex,
final RemoteInputChannel inputChannel,
int delayMs)
throws IOException {
final PartitionRequest request =
new PartitionRequest(
partitionId,
subpartitionIndex,
inputChannel.getInputChannelId(),
inputChannel.getInitialCredit());
```
if (delayMs == 0) {
ChannelFuture f = tcpChannel.writeAndFlush(request);
f.addListener(listener);
} else {
```
}
}
3. CreditBasedPartitionRequestClientHandler<br />netty接收到数据后,回调ChannelHandler的channelRead()-》decodeMsg-》decodeBufferOrEvent-》inputChannel.onBuffer<br />最终回调回RemoteInputChannel的onBuffer方法,onBuffer中进一步调用InputChannel#notifyPriorityEvent-》SingleInputGate#notifyPriorityEvent,将此InputChannel添加到inputChannelsWithData队列中。(InputGate第6节)
<a name="kB2Di"></a>
## Netty
netty的逻辑在org.apache.flink.runtime.io.network.netty包下。<br />参考[netty简介](https://www.yuque.com/u22594583/ydf48t/rnkhlz)这篇文章
<a name="gKOLp"></a>
# 数据输出角度
数据输出角度也分为几个层次:
1. NettyServer接收到client发的PartitionRequest(读取角度[InputChannel](#ai8IZ)发送的),创建对应的NetworkSequenceViewReader和ResultSubpartitionView。
1. 写数据到Buffer
1. 触发NettyServer发送数据。
<a name="q3K4q"></a>
## NettyServer
1. NettyServer和NettyClient都是在NettyConnectionManager中初始化的,NettyConnectionManager是在NettyShuffleEnvironment初始化的。
1. 从NettyServer init的方法中可知,server的ChannelHandler在NettyProtocol#getServerChannelHandlers中定义的。主要是PartitionRequestServerHandler和PartitionRequestQueue
<a name="mOIlJ"></a>
### PartitionRequestServerHandler
```java
reader =
new CreditBasedSequenceNumberingViewReader(
request.receiverId, request.credit, outboundQueue);
reader.requestSubpartitionView(
partitionProvider, request.partitionId, request.queueIndex);
outboundQueue.notifyReaderCreated(reader);
主要处理client发送的PartitionRequest,分三步
- 创建CreditBasedSequenceNumberingViewReader,此对象两个角色
BufferAvailabilityListener:触发发送数据
NetworkSequenceViewReader: - reader.requestSubpartitionView 创建PartitionRequest对应的ResultSubpartitionView,subpartition view是subpartition和viewreader的成员变量。
把创建的reader通知给PartitionRequestQueue,queue会持有所有创建的reader。
PartitionRequestQueue
最终发送数据的handler。主要逻辑在writeAndFlushNextMessageIfPossible方法。
从
private final ArrayDeque<NetworkSequenceViewReader> availableReaders = new ArrayDeque<>();
获取有数据的Reader- 读取数据
- 发送
availableReaders 队列里reader添加在下面介绍flush触发向availableReaders添加reader和发送数据
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
···
NetworkSequenceViewReader reader = pollAvailableReader();
···
next = reader.getNextBuffer();
if (next == null) {
···
} else {
BufferResponse msg =
new BufferResponse(
next.buffer(),
next.getSequenceNumber(),
reader.getReceiverId(),
next.buffersInBacklog());
// Write and flush and wait until this is done before
// trying to continue with the next buffer.
channel.writeAndFlush(msg).addListener(writeListener);
}
···
}
写数据到Buffer
写数据的入口,一般在Input#processElement中调用的Output#collect方法。
此处分析StreamTask,Output的实现类一般为RecordWriterOutput。
RecordWriterOutput的代码入口在StreamTask的成员变量OperatorChain里protected final RecordWriterOutput<?>[] streamOutputs;
RecordWriterOutput
RecordWriterOutput#collect()方法最终会调到RecordWriter#emit(T)方法。
RecordWriter
有两个子类:
BroadcastRecordWriter,对应于BroadCast场景
ChannelSelectorRecordWriter
- ChannelSelector计算出record所属的subpartition。
- 序列化record为_ByteBuffer`_serializeRecord(serializer, record)`
- ResultPartitionWriter#emitRecord(ByteBuffer record, int targetSubpartition)
-
ResultPartitionWriter
此处只分析PipelinedResultPartition实现。
emitRecord
BufferWritingResultPartition#emitRecord里主要逻辑是append BufferBuilder。- flush
触发往PartitionRequestQueue成员变量availableReaders添加reader
flush触发向availableReaders添加reader和发送数据
上一节写数据最后flush,PipelinedResultPartition#flush-》PipelinedSubpartition#flush。
Partition的flush会触发PipelinedSubpartitionView#notifyDataAvailable-》BufferAvailabilityListener(CreditBasedSequenceNumberingViewReader)#notifyDataAvailable-》最终到PartitionRequestQueue#notifyReaderNonEmpty。
void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
// The notification might come from the same thread. For the initial writes this
// might happen before the reader has set its reference to the view, because
// creating the queue and the initial notification happen in the same method call.
// This can be resolved by separating the creation of the view and allowing
// notifications.
// TODO This could potentially have a bad performance impact as in the
// worst case (network consumes faster than the producer) each buffer
// will trigger a separate event loop task being scheduled.
ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
}
notifyReaderNonEmpty中会通过ctx执行fireUserEventTriggered,保证了线程安全(netty evenloop)。
event的处理是在userEventTriggered中,调用enqueueAvailableReader-》registerAvailableReader
// Queue an available reader for consumption. If the queue is empty,
// we try trigger the actual write. Otherwise this will be handled by
// the writeAndFlushNextMessageIfPossible calls.
boolean triggerWrite = availableReaders.isEmpty();
registerAvailableReader(reader);
if (triggerWrite) {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
registerAvailableReader往availableReaders 添加当前reader,对应上面PartitionRequestQueue数据发送这一步。
如果availableReaders为空,则触发NettyServer的数据发送。