flink operator之间数据流动分为三种情况:

  1. operator chain之间流动,相当于方法调用。
  2. 同一个TM内,不同task之间流动,使用LocalInputChannel
  3. 不同TM之间,使用RemoteInputChannel

由于数据交互的情况非常多,这里主要分析OneInputStreamTask第三种情况。

Shuffle

首先需要了解下flink的shuffle实现,详细看FLIP-31: Pluggable Shuffle Service - Apache Flink - Apache Software Foundation
目前的shuffle实现为NettyShuffleMaster和NettyShuffleEnvironment。

image.png
Lifecycle management.

start/close

Shuffle Input/Output management.

Result partition management.

createResultPartitionWriters

Input gate management.

createInputGates
updatePartitionInfo

数据输入角度

数据读取分为几个层次:

StreamTaskStreamInputProcessor

  1. 最上层StreamTaskStreamInputProcessor接口在StreamTask MailboxLoop(TM里介绍的runMailboxLoop)里调用,此处分析StreamOneInputProcessor
  2. StreamOneInputProcessor包含private StreamTaskInput<IN> input;private DataOutput<IN> output;两个成员变量。

    StreamTaskInput

  3. StreamTaskInput接口实现PushingAsyncDataInput接口,用于operator输入,接口主方法是DataInputStatus emitNext(DataOutput<T> output) throws Exception;

  4. DataOutput接口输出从StreamTaskInput里emit的element,如Record、Watermark等。如果element是Record,则调用operator.processElement(record);
  5. StreamTaskInput主要实现为StreamTaskSourceInput和StreamTaskNetworkInput,分别用于source输入和后续task输入。

此处我们分析StreamTaskNetworkInput,里面包含CheckpointedInputGate成员变量。CheckpointedInputGate是InputGate的包装类,用来处理来自InputGate的CheckpointBarrier。数据最终来源自InputGate。调用链是在在emitNext方法里调用InputGate#pollNext

InputGate

  1. 在InputGate的注释里可以清晰的看到InputGate的设计。

image.png

  1. InputGate实现了PullingAsyncDataInput接口,Optional<T> pollNext() throws Exception;方法继承自此接口。

    PullingAsyncDataInput和PushingAsyncDataInput的作用区别可以看PushingAsyncDataInput类注释。 The variant of PullingAsyncDataInput that is defined for handling both network input and source input in a unified way via emitNext(PushingAsyncDataInput.DataOutput) instead of returning Optional.empty() via PullingAsyncDataInput.pollNext(). 也就是说source input一般是pull,network input一般是push,加装了一层PushingAsyncDataInput对下层统一了两种input的处理。

  2. InputGate主要两个子类SingleInputGate和UnionInputGate。Stream场景里通常只会用到SingleInputGate。

  3. SingleInputGate主要成员变量是
    private final Map<SubpartitionInfo, InputChannel> inputChannels;
    private final PrioritizedDeque<InputChannel> inputChannelsWithData = new PrioritizedDeque<>();
  4. pollNext()方法就是通过inputChannelsWithData.poll()获取有数据的channel,然后InputChannel.BufferAndAvailability来获取下一个element。
  5. inputChannelsWithData队列通过InputChannel#notifyPriorityEvent回调SingleInputGate#notifyPriorityEvent add channel。

此处InputChannel我们分析的实现为RemoteInputChannel, 也就是文章开头提到的TM之间数据流动的方式。

InputChannel

  1. InputChannel注释

    An input channel consumes a single ResultSubpartitionView. For each channel, the consumption life cycle is as follows: requestSubpartition() getNextBuffer() releaseAllResources()

每个InputChannel实际对应上一个task的ResultSubpartitionView。

  1. requestSubpartition()方法请求对应的Subpartition。
    在获取到数据之前,要先告诉上游的TM当前task需要的subPartition。这一步是在TaskInvokable#restore初始化任务时调用的。此处也就是StreamTask#restore。
  2. getNextBuffer获取下一个Buffer,buffer包装了MemorySegment。具体在内存管理分析。
    1. requestSubpartition()
  3. 创建NettyPartitionRequestClient。最终通过NettyClient connect到上游的NettyServer。
    SocketChannel接收的数据处理的逻辑在CreditBasedPartitionRequestClientHandler中。
    1. private NettyPartitionRequestClient connect(ConnectionID connectionId)
    2. throws RemoteTransportException, InterruptedException {
    Channel channel = nettyClient.connect(connectionId.getAddress()).sync().channel(); NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class); return new NettyPartitionRequestClient(channel, clientHandler, connectionId, this); ``` }

ChannelFuture connect(final InetSocketAddress serverSocketAddress) {

  1. ```
  2. bootstrap.handler(
  3. new ChannelInitializer<SocketChannel>() {
  4. @Override
  5. public void initChannel(SocketChannel channel) throws Exception {
  6. // SSL handler should be added first in the pipeline
  7. if (clientSSLFactory != null) {
  8. SslHandler sslHandler =
  9. clientSSLFactory.createNettySSLHandler(
  10. channel.alloc(),
  11. serverSocketAddress.getAddress().getCanonicalHostName(),
  12. serverSocketAddress.getPort());
  13. channel.pipeline().addLast("ssl", sslHandler);
  14. }
  15. //channel handler,真正的数据处理逻辑
  16. channel.pipeline().addLast(protocol.getClientChannelHandlers());
  17. }
  18. });
  19. try {
  20. return bootstrap.connect(serverSocketAddress);
  21. } catch (ChannelException e) {
  22. ```
  23. }

} //Client ChannelHandler public ChannelHandler[] getClientChannelHandlers() { NetworkClientHandler networkClientHandler = new CreditBasedPartitionRequestClientHandler(); //ChannelPipeline中添加了三个Handler, //第一个是messageEncoder,用来写encode PartitionRequest //NettyMessageClientDecoderDelegate用来写decode收到的netty ByteBuf //networkClientHandler业务处理 return new ChannelHandler[] { messageEncoder, new NettyMessageClientDecoderDelegate(networkClientHandler), networkClientHandler }; }

  1. 2. NettyPartitionRequestClient向上游NettyServer发送PartitionRequest消息。<br />通过netty channelNettyServer发送PartitionRequest
  2. ```java
  3. public void requestSubpartition(
  4. final ResultPartitionID partitionId,
  5. final int subpartitionIndex,
  6. final RemoteInputChannel inputChannel,
  7. int delayMs)
  8. throws IOException {
  1. final PartitionRequest request =
  2. new PartitionRequest(
  3. partitionId,
  4. subpartitionIndex,
  5. inputChannel.getInputChannelId(),
  6. inputChannel.getInitialCredit());
  7. ```
  8. if (delayMs == 0) {
  9. ChannelFuture f = tcpChannel.writeAndFlush(request);
  10. f.addListener(listener);
  11. } else {
  12. ```
  13. }
  14. }
  1. 3. CreditBasedPartitionRequestClientHandler<br />netty接收到数据后,回调ChannelHandlerchannelRead()-》decodeMsg-》decodeBufferOrEvent-》inputChannel.onBuffer<br />最终回调回RemoteInputChannelonBuffer方法,onBuffer中进一步调用InputChannel#notifyPriorityEvent-》SingleInputGate#notifyPriorityEvent,将此InputChannel添加到inputChannelsWithData队列中。(InputGate第6节)
  2. <a name="kB2Di"></a>
  3. ## Netty
  4. netty的逻辑在org.apache.flink.runtime.io.network.netty包下。<br />参考[netty简介](https://www.yuque.com/u22594583/ydf48t/rnkhlz)这篇文章
  5. <a name="gKOLp"></a>
  6. # 数据输出角度
  7. 数据输出角度也分为几个层次:
  8. 1. NettyServer接收到client发的PartitionRequest(读取角度[InputChannel](#ai8IZ)发送的),创建对应的NetworkSequenceViewReaderResultSubpartitionView
  9. 1. 写数据到Buffer
  10. 1. 触发NettyServer发送数据。
  11. <a name="q3K4q"></a>
  12. ## NettyServer
  13. 1. NettyServerNettyClient都是在NettyConnectionManager中初始化的,NettyConnectionManager是在NettyShuffleEnvironment初始化的。
  14. 1. NettyServer init的方法中可知,serverChannelHandlerNettyProtocol#getServerChannelHandlers中定义的。主要是PartitionRequestServerHandler和PartitionRequestQueue
  15. <a name="mOIlJ"></a>
  16. ### PartitionRequestServerHandler
  17. ```java
  18. reader =
  19. new CreditBasedSequenceNumberingViewReader(
  20. request.receiverId, request.credit, outboundQueue);
  21. reader.requestSubpartitionView(
  22. partitionProvider, request.partitionId, request.queueIndex);
  23. outboundQueue.notifyReaderCreated(reader);

主要处理client发送的PartitionRequest,分三步

  1. 创建CreditBasedSequenceNumberingViewReader,此对象两个角色
    BufferAvailabilityListener:触发发送数据
    NetworkSequenceViewReader:
  2. reader.requestSubpartitionView 创建PartitionRequest对应的ResultSubpartitionView,subpartition view是subpartition和viewreader的成员变量。
  3. 把创建的reader通知给PartitionRequestQueue,queue会持有所有创建的reader。

    PartitionRequestQueue

    最终发送数据的handler。主要逻辑在writeAndFlushNextMessageIfPossible方法。

  4. private final ArrayDeque<NetworkSequenceViewReader> availableReaders = new ArrayDeque<>();获取有数据的Reader

  5. 读取数据
  6. 发送

availableReaders 队列里reader添加在下面介绍flush触发向availableReaders添加reader和发送数据

  1. private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
  2. ···
  3. NetworkSequenceViewReader reader = pollAvailableReader();
  4. ···
  5. next = reader.getNextBuffer();
  6. if (next == null) {
  7. ···
  8. } else {
  9. BufferResponse msg =
  10. new BufferResponse(
  11. next.buffer(),
  12. next.getSequenceNumber(),
  13. reader.getReceiverId(),
  14. next.buffersInBacklog());
  15. // Write and flush and wait until this is done before
  16. // trying to continue with the next buffer.
  17. channel.writeAndFlush(msg).addListener(writeListener);
  18. }
  19. ···
  20. }

写数据到Buffer

写数据的入口,一般在Input#processElement中调用的Output#collect方法。
此处分析StreamTask,Output的实现类一般为RecordWriterOutput。
RecordWriterOutput的代码入口在StreamTask的成员变量OperatorChain里protected final RecordWriterOutput<?>[] streamOutputs;

RecordWriterOutput

RecordWriterOutput#collect()方法最终会调到RecordWriter#emit(T)方法。

RecordWriter

有两个子类:
BroadcastRecordWriter,对应于BroadCast场景

ChannelSelectorRecordWriter

  1. ChannelSelector计算出record所属的subpartition。
  2. 序列化record为_ByteBuffer`_serializeRecord(serializer, record)`
  3. ResultPartitionWriter#emitRecord(ByteBuffer record, int targetSubpartition)
  4. ResultPartitionWriter#flush

    ResultPartitionWriter

    此处只分析PipelinedResultPartition实现。

  5. emitRecord
    BufferWritingResultPartition#emitRecord里主要逻辑是append BufferBuilder。

  6. flush
    触发往PartitionRequestQueue成员变量availableReaders添加reader

flush触发向availableReaders添加reader和发送数据

上一节写数据最后flush,PipelinedResultPartition#flush-》PipelinedSubpartition#flush。
Partition的flush会触发PipelinedSubpartitionView#notifyDataAvailable-》BufferAvailabilityListener(CreditBasedSequenceNumberingViewReader)#notifyDataAvailable-》最终到PartitionRequestQueue#notifyReaderNonEmpty。

  1. void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
  2. // The notification might come from the same thread. For the initial writes this
  3. // might happen before the reader has set its reference to the view, because
  4. // creating the queue and the initial notification happen in the same method call.
  5. // This can be resolved by separating the creation of the view and allowing
  6. // notifications.
  7. // TODO This could potentially have a bad performance impact as in the
  8. // worst case (network consumes faster than the producer) each buffer
  9. // will trigger a separate event loop task being scheduled.
  10. ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
  11. }

notifyReaderNonEmpty中会通过ctx执行fireUserEventTriggered,保证了线程安全(netty evenloop)。
event的处理是在userEventTriggered中,调用enqueueAvailableReader-》registerAvailableReader

  1. // Queue an available reader for consumption. If the queue is empty,
  2. // we try trigger the actual write. Otherwise this will be handled by
  3. // the writeAndFlushNextMessageIfPossible calls.
  4. boolean triggerWrite = availableReaders.isEmpty();
  5. registerAvailableReader(reader);
  6. if (triggerWrite) {
  7. writeAndFlushNextMessageIfPossible(ctx.channel());
  8. }

registerAvailableReader往availableReaders 添加当前reader,对应上面PartitionRequestQueue数据发送这一步。
如果availableReaders为空,则触发NettyServer的数据发送。