Flink 最终执行的时候形成了一个分布式 Dataflow , 子任务 Task 分布在不同的物理机服务器上, Task 之间不可避免地要交换数据

数据传递模式

PULL / PUSH 两种模式

  • Flink Batch 模式

Batch 的计算模型采用 PULL 模式,将计算结果分成多份阶段,上游完全计算完毕之后,下游从上游拉取数据开始下一阶段计算,直到最终所有的阶段都计算完毕,输出结果, Batch Job 结束退出

  • Flink Stream 模式

Stream 的计算模型采用 PUSH 模式,上游主动向下游推送数据,上下游之间采用生产者-消费者 模式

  • 下游收到数据触发计算,没有数据则进入等待状态

PUSH 模式的数据处理过程也叫 Pipeline ,提到 Pipeline 或者流水线的时候,一般是指 PUSH 模式的数据处理过程

对比点 PULL PUSH
延迟 延迟高(需要等待上游所有计算完毕) 延迟低 (上游边计算边向下游输出)
下游状态 有状态,需要知道何时拉取 无状态
上游状态 无状态 有状态,需要了解每个下游的推送点
连接状态 短连接 长连接

关键组件

RecordWriter

RecordWriter 负责将 Task 处理的数据输出,然后下游 Task 就可以继续处理了.

  • RecordWriter 面向的是 StreamRecord, 直接处理算子的输出结果.
  • ResultPartitionWriter 面向的是 Buffer ,起到承上启下的作用.

RecordWriterResultPartitionWriter 的层级要高,底层依赖于 ResultPartitionWriter .

数据分区,其中最核心的抽象是 ChannelSelector,在 RecordWriter 中实现了数据分区语义,将开发时对数据分区的 API 调用转换成了实际的物理操作 (DataStream.shuffle())等

最底层内存抽象是 MemorySegment ,用于数据传输的是 Buffer,那么,承上启下对接从 Java 对象转为 Buffer 的中间对象是什么呢? (StreamRecord)

RecordWriter 类负责将 StreamRecord 进行序列化,调用 SpanningRecordSerializer (新版本找不到),
再调用 BufferBuilder 写入 MemorySegment中 (每个 Task 都有自己的 LocalBufferPool ,LocalBufferPool 中包含了多个 MemorySegment)
image.png

  • 哪些数据会被序列化?
  • 数据元素 StreamElement

📘 <数据交换> - 图2

  • 事件 Event ; Flink 内部的系统事件 (CheckpointBarrier等)

image.png

单播

根据 ChannelSelector,对数据流中每一条数据记录都进行选择,有效地写入一个输出通道的 ResultSubPartition 中 , 适用于非 BroadcastPartitioin .

如果在开发的时候没有使用 Partition ,默认会使用 BoundRobinChannelSelector ,使用 RounRobin 算法选择输出通道循环写入本地输出通道对应的 ResultPartition,发送到下游 Task
image.png


广播

广播就是向下游所有的 Task 发送相同的数据,在所有的 ResultSubPartition 中写入 N 份相同数据.

在实际实现时,同时写入 N 份重复的数据是资源浪费,所以对于广播类型的输出,只会写入编号为 0 的 ResultSubPartition 中,下游 Task 对于广播类型的数据,都会从编号为 0 的 ResultSubPartition 中获取数据.
image.png


数据记录序列化器 <SpanningRecordSerializer>

数据记录序列化器 RecordSerializer , 负责数据的序列化 (SpanningRecordSerializer 是唯一实现).
SpanningRecordSerializer 是一种支持跨内存段的序列化器

  • 其实现借助于中间缓存区来缓存序列化后的数据,
  • 然后再往真正的目标 Buffer 里写,在写的时候会维护两个 “指针”.
  1. 一个是表示目标 Buffer 内存段长度的 limit
  2. 一个是表示其当前写入位置的 position
  • 因为一个 Buffer 对应着一个内存块,当将数据序列化并存入内存段时,其空间可能有剩余也可能不够.
  • 因此 RecordSerializer 定义了一个表示序列化结果的 SerializationResult

  • [x] 在序列化数据写入内存段的过程中,存在 3 种可能的结果:

  • PARTIAL_RECORD_MEMORY_SEGMENT_FULL
    • 内存段已满但记录的数据只写入了一部分,没有完全写完,需要申请新的内存段继续写入

**

  • FULL_RECORD_MEMORY_SEGMENT_FULL

    • 内存段写满,记录的数据已全部写入
  • FULL_RECORD

    • 记录的数据全部写入,但内存段并没有满

数据记录反序列化器

数据记录反序列化器 RecordDeserializer , 负责数据的反序列化, SpillingAdaptiveSpanningRecordDeserializer 是唯一的实现

RecordSerializer 类似,考虑到数据的数据大小以及 Buffer 对应的内存段的容量大小

  • 在反序列化时也存在不同的反序列化结果,以枚举 DeserializationResult 表示
  • PARTIAL_RECORD

    • 记录并未完全被读取,但缓存中的数据已被消费完成 (3)
  • INTERMEDIATE_RECORD_FROM_BUFFER

    • 记录的数据已被完全读取,但缓存中的数据并未完全消费 (1)
  • LAST_RECORD_FROM_BUFFER

    • 记录被完全读取,且缓存中的数据也正好被完全消费 (2)

SpillingAdaptiveSpanningRecordDeserializer 适用于数据相对较大且跨多个内存段的数据元素的反序列化,支持将溢出的数据写入临时文件中.序列化与反序列化的过程相反


结果子分区视图

结果子分区视图 ResultSubPartitionView ,其定义了从 ResultSubPartition 中读取数据、释放资源等抽象行为,其中 getNextBuffer 是最重要的方法,用来获取 Buffer

Flink 中设计了两种不同类型的结果子分区,其存储机制不同,对应于结果子分区的不同类型,定义了两个结果子分区视图
image.png

  • BoundedBlockingSubpartitionReader : 用来读取 BoundedBlockingSubpartition 中的数据
  • PipelinedSubpartitionView : 用来读取 PipelinedSubpartition 中的数据

数据输出

数据输出 (Output) 是算子向下游传递的数据抽象,定义了向下游发送 StreamRecordWatermarkLetencyMark 的行为,对于 StreamRecord 多了一个 SideOutPut 的行为定义.
image.png

  • WatermarkGaugeExposingOutput
  • 统计 Watermark 监控指标计算行为,将最后一次发送到下游的 Watermark 作为其指标值.
  • 其实现类负责计算指标值,在Fink WebUI 中,通过可视化 StreamGraph 看到的 Watermark 监控信息即来自于此

  • [x] RecordWriterOutput

  • 包装了 RecordWriter,使用 RecordWriter 把数据交给数据交换层.
  • RecordWriter 主要用来在线程间、网络间实现数据序列化、写入

  • [x] ChainingOutput & CopyingChainingOutput

  • 这两个类是在 OperatorChain 内部的算子之间传递数据用的,并不会有序列化的过程,直接在 Output 中调用下游算子的 processElement() 方法.
  • 在同一个线程内的算子直接传递数据,跟普通的 Java 方法调用一样,这样就直接省略了线程间的数据传送和网络间的数据传送的开销

  • [x] CopyingDirectedOutput & DirectedOutput

  • 包装类,基于一组 OutputSelector 选择发送给下游哪些 Task
  • DirectedOutput 为共享对象模式
  • CopyDirectedOutput 为费共享对象模式

  • [x] BroadcastingOutputCollector & CopyBroadcastingOutputCollector

  • 包装类,内部包含了一组 Output
  • 向所有的下游 Task 广播数据
  • Copying 和 非 Copying 区别在于是否重用对象

  • [x] CountingOutput

  • CountingOutputOutput 实现类的包装类,该类没有任何业务逻辑属性,只是用来标记其他 Output 实现类向下游发送的数据元素个数,并作为监控指标反馈给 Flink 集群

数据传递

数据处理的业务逻辑位于 UDFprocessElement() 方法中,算子调用 UDF 处理数据完毕之后,需要将数据交给下一个算子, Flink 的算子使用 Collector 接口进行数据传递

  • Flink 有 3 种数据传递的方式
  • 本地线程内的数据交换
  • 本地线程之间的数据传递
  • 跨网络的数据交换

本地线程内的数据交换

本地线程内的数据交换是最简单、效率最高的传递形式,其本质是属于同一个 OperatorChain 的算子之间的数据传递
image.png

  • 3 个算子属于同一个 OperatorChain , 在执行的时候,会被调度到同一个 Task .
  • 上游的算子处理数据,然后通过 Collector 接口直接调用下游算子的 processElement() 方法
  • 在同一个线程内执行普通的 Java 方法
  • 没有将数据序列化写入共享内存、下游读取数据再反序列化的过程
  • 线程切换的开销也省掉了.

本地线程之间的数据传递

位于同一个 TaskManager 的不同 Task 的算子之间,不会通过算子间的直接调用方法传输数据,而是通过本地内存进行数据传递.

  1. Source 算子所在线程与下游的 FlatMap 算子所在线程间的通信为例,这两个 Task 线程共享同一个 BufferPool,通过 wait()/notifyAll() 来同步.
  2. BufferNetty 中的 ByteBuf 功能类似,可以看作是一块共享的内存.
  3. InputGate 负责读取 BufferEvent.
  • 本地线程间的数据交换经历 5 个步骤
  1. FlatMap 所在线程首先会从 InputGateLocalInputChannel 中消费数据,如果没有数据则通过 InputGate 中的 InputChannelWithData.wait() 方法阻塞等待数据.

  2. Source 算子持续地从外部数据源 (如 Kafka)写入 ResultSubPartition 中.

  3. ResultSubPartition 将数据刷新写入 LocalBufferPool 中,然后通过 InputChannelWithData.notifyAll() 方法唤醒 FlatMap 线程.

  4. 唤醒 FlatMap 所在的线程 (通过 InputChannelWithData.notifyAll() 方法唤醒) .

  5. FlatMap 线程首先调用 LocalInputChannelLocalBuffer 中读取数据,然后进行数据的反序列化.

FlatMap 将反序列化之后的数据交给算子中的用户代码进行业务处理
image.png


跨网络的数据交换

跨网络数据传递,运行在不同 TaskManager JVM 中的 Task 之间的数据传递,与本地线程间的数据传递类似.
不同点在于,当没有 Buffer 可以消费时,会通过 PartitionRequestClientFlatMap Task 所在的进程发起 RPC 请求,远程的 PartitionRequestServerHandler 接收到请求后,读取 ResultPartition 所管理的 Buffer,并返回给 Client.

  • 跨网络的 FlatMap 算子 和 KeyedAgg/Sink 算子数据交换 如下

image.png

  • 跨网络的数据交换比本地线程间的数据传递要复杂一些
  1. Keyed/Agg 所在线程从 InputGateRemoteInputChannel 中消费数据,如果没有数据则阻塞在 RemoteInputChannel 中的 receviedBuffers 上等待数据.

  2. FlatMap 持续处理数据,并将数据写入 ResultSubPartition 中.

  3. ResultSubPartition 通知 PartitionRequestQueue 有新的数据.

  4. PartitionRequestQueueResultSubPartition 读取数据.

  5. ResultSubPartition 将数据通过 PartitionRequestServerHandler 写入 Netty Channel ,准备写入下游 Netty .

  6. Netty 将数据封装到 Response 消息中,推送到下游,此处需要下游对上游的 request 请求,用来建立数据从上游到下游的通道,此请求是对 ResultSubPartition 的数据请求,创建了 PartitionRequestQueue.

  7. 下游 Netty 收到 Response 消息,进行解码.

  8. CreditBasedPartitionRequestClientHandler 将解码后的数据写入 RemoteInputChannelBuffer 缓冲队列中,然后唤醒 Keyed/Agg 所在线程消费数据.

  9. Keyed/Agg 所在线程 RemoteInputChannel 中读取 Buffer 缓冲队列中的数据,然后进行数据的反序列化,交给算子中的用户代码进行业务处理.


数据传递过程

  • 从总体上来说,数据在 Task 之间传递分为如下几个大的步骤
  1. 数据在本算子处理完后,交给 RecordWriter .每条记录都要选择下游节点,所以要经过 ChannelSelector ,找到对应的结果子分区.

  2. 每个结果子分区都有一个独有的序列化器 (避免多线程竞争),把这条记录序列化为二进制数据.

  3. 数据被写入结果分区下的各个子分区中,此时该数据已经存入 DirectBuffer (MemorySegment).

  4. 单独的线程控制数据的 flush 速度,一旦触发 flush ,则通过 Nettynio 通道向对端写入.

  5. 对端的 Netty Client 接收到数据,解码出来,把数据复制到 Buffer 中,然后通知 InputChanel.

  6. 有可用的数据时,下游算子从阻塞醒来,从 InputChannel 取出 Buffer ,再反序列化成数据记录,交给算子执行用户代码 (UDF).


数据读取

ResultSubPartitionView 接收到数据可以通知之后,有两类对象会接收到该通知
image.png

  • LocalInputChannel 其实是本地 JVM 线程之间的数据传递
  • CreditBasedSequenceNumberingViewReader 用来对本机跨 JVM 或者跨网络的数据传递

无论是本地线程间数据交换还是跨网络的数据交换,对于数据消费端而言,其数据消费的线程会一直阻塞在 InputGate 上,等待可用的数据,并将可用的数据转换成 StreamRecord 交给算子进行处理.

其基本过程为 :
StreamTask.processInput()
StreamOneInputStreamOperator.processInput()
StreamTaskNetworkInput.emitNext()
SpillingAdaptiveSpanningRecordDeserializer

NetworkInput 中读取数据反序列化为 StreamRecord , 然后交给 DataOutput 向下游发送

  • [x] StreamTaskNetworkInput

    1. @Override
    2. public InputStatus emitNext(DataOutput<T> output) throws Exception {
    3. while (true) {
    4. // get the stream element from the deserializer
    5. if (currentRecordDeserializer != null) {
    6. DeserializationResult result;
    7. try {
    8. result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
    9. } catch (IOException e) {
    10. throw new IOException(
    11. String.format("Can't get next record for channel %s", lastChannel), e);
    12. }
    13. if (result.isBufferConsumed()) {
    14. currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
    15. currentRecordDeserializer = null;
    16. }
    17. if (result.isFullRecord()) {
    18. processElement(deserializationDelegate.getInstance(), output);
    19. return InputStatus.MORE_AVAILABLE;
    20. }
    21. }
    22. // 从 InputGate 中获取 下一个 Buffer
    23. Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
    24. // 如果是 Buffer, 则设置 RecordDeserializer 处理下一条记录,否则释放 RecordDeserializer
    25. if (bufferOrEvent.isPresent()) {
    26. // return to the mailbox after receiving a checkpoint barrier to avoid processing of
    27. // data after the barrier before checkpoint is performed for unaligned checkpoint
    28. // mode
    29. if (bufferOrEvent.get().isBuffer()) {
    30. processBuffer(bufferOrEvent.get());
    31. } else {
    32. processEvent(bufferOrEvent.get());
    33. return InputStatus.MORE_AVAILABLE;
    34. }
    35. } else {
    36. if (checkpointedInputGate.isFinished()) {
    37. checkState(
    38. checkpointedInputGate.getAvailableFuture().isDone(),
    39. "Finished BarrierHandler should be available");
    40. return InputStatus.END_OF_INPUT;
    41. }
    42. return InputStatus.NOTHING_AVAILABLE;
    43. }
    44. }
    45. }

    数据反序列化时,使用 DataInputView 从内存中读取二进制数据,根据数据的类型进行不同的反序列化.
    对于数据记录、则使用其类型对应的序列化器,对于其他类型的数据元素 如 Watermark、LatencyMarker``、StreamStatus 等,直接读取其二进制数据转换为数值类型.

  • [x] StreamElementSerializer

      @Override
      public StreamElement deserialize(DataInputView source) throws IOException {
          int tag = source.readByte();
          if (tag == TAG_REC_WITH_TIMESTAMP) {
              long timestamp = source.readLong();
              return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
          } else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
              return new StreamRecord<T>(typeSerializer.deserialize(source));
          } else if (tag == TAG_WATERMARK) {
              return new Watermark(source.readLong());
          } else if (tag == TAG_STREAM_STATUS) {
              return new StreamStatus(source.readInt());
          } else if (tag == TAG_LATENCY_MARKER) {
              return new LatencyMarker(
                      source.readLong(),
                      new OperatorID(source.readLong(), source.readLong()),
                      source.readInt());
          } else {
              throw new IOException("Corrupt stream, found tag: " + tag);
          }
      }
    

数据写出

Task 调用算子执行 UDF 之后,需要将数据交给下游进行处理.
RecordWriter 类负责将 StreamRecord 进行序列化,调用 SpaningRecordSerializer ,再调用 BufferBuilder 写入 MemorySegment 中 (每个 Task 都有自己的 LocalBufferPool,LocalBufferPool 中包含了多个 MemorySegment )

  • [x] RecordWriter ```java private void emit(T record, int targetChannel) throws IOException, InterruptedException {

      // 把 StreamRecord 序列化为 二进制数组
      serializer.serializeRecord(record);
    
      // 将数据写入 ResultPartition 的 MemorySegment 中 ???
      if (copyFromSerializerToTargetChannel(targetChannel)) {
          serializer.prune();
      }
    

    }


在数据序列化之后,通过 `Channel` 编号 选择结果子分区,将数据复制到结果子分区中,在写入过程中,序列化器将数据复制到 `BufferBuilder` 中

- 如果数据太大,则需要写入多个 `Buffer` ,即跨内存段写入数据

- [x] `RecordWriter`
```java
    private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
        // We should reset the initial position of the intermediate serialization buffer before
        // copying, so the serialization results can be copied to multiple target buffers.
        serializer.reset();

        boolean pruneTriggered = false;
        // 获取对应 targetChannel 的 BufferBuilder 
        BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
        // 将序列化器中 StreamRecord 的二进制复制到 Buffer中
        SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
        while (result.isFullBuffer()) {
            numBytesOut.inc(bufferBuilder.finish());
            numBuffersOut.inc();

            // If this was a full record, we are done. Not breaking out of the loop at this point
            // will lead to another buffer request before breaking out (that would not be a
            // problem per se, but it can lead to stalls in the pipeline).
            // 数据完全写入到 Buffer 中,写入完毕
            if (result.isFullRecord()) {
                pruneTriggered = true;
                bufferBuilders[targetChannel] = Optional.empty();
                break;
            }

            // 数据没有完全写入,buffer已满,则申请新的 BufferBuilder 继续写入
            bufferBuilder = requestNewBufferBuilder(targetChannel);
            result = serializer.copyToBufferBuilder(bufferBuilder);
        }
        checkState(!serializer.hasSerializedData(), "All data should be written at once");

        // 数据写入之后,通过 flushAlways 控制是立即发送还是延迟发送
        if (flushAlways) {
            targetPartition.flush(targetChannel);
        }
        return pruneTriggered;
    }

数据清理

image.png
RecordWriterStreamRecord 序列化完成之后,会根据 flushAlways 参数决定是否立即将数据进行推送,相当于每条记录发送一次,这样做延迟最低,但是吞吐量会下降,Flink 默认的做法是单独启动一个线程,每隔一个固定时间刷新 (flush) 一次所有的 Channel ,本质上是一种批量处理 .

当所有数据都写入完成之后需要调用 Flush 方法将可能残留在序列化器 Buffer 中的数据都强制输出.
Flush 方法会遍历每个 ResultSubPartition 然后依次取出该 ResultSubPartition 对应的序列化器,如果其中还有残留的数据,则将数据全部输出. (这也是每个 ResultSubPartition 都对应一个序列化器的原因)

数据写入之后,无论是即时 Flush 还是定时 Flush , 根据结果子分区的类型的不同,行为都会有所不同

  • PipelinedSubpartition

如果是立即刷新,则相当于一条记录向下游推送一次,延迟最低,但是吞吐量会下降,Flink 默认的做法是单独启动一个线程,默认 100ms 刷新一次,本质上是一种 mini-batch ,这种 mini-batch 只是为了增大吞吐量

ResultPartition 遍历自身的 PipeliedSubPartition,逐一进行 Flush , Flush 之后 通知 ResultSubPartitionView 有可用数据,可以进行数据的读取了.

    private void notifyDataAvailable() {
        if (readView != null) {
            readView.notifyDataAvailable();
        }
    }
  • BoundedBlockingSubpartition

如果是立即刷新,则相当于 1 条记录向文件中写入1次,否则认为是延迟刷新,每隔一定时间周期将该事件周期内的数据进行批量处理,默认是 100ms 刷新一次, 数据刷新的时候根据 ResultPartitionResultSubPartition 的类型的不同有不同的刷新行为.

ResultPartition 遍历自身的 BoundedBlockingSubPartition , 逐一进行 Flush , 写入之后回收 Buffer .
该类型的 SubPartition 并不会触发数据可用通知

    private void writeAndCloseBufferConsumer(BufferConsumer bufferConsumer) throws IOException {
        try {
            final Buffer buffer = bufferConsumer.build();
            try {
                if (parent.canBeCompressed(buffer)) {
                    final Buffer compressedBuffer =
                            parent.bufferCompressor.compressToIntermediateBuffer(buffer);
                    data.writeBuffer(compressedBuffer);
                    if (compressedBuffer != buffer) {
                        compressedBuffer.recycleBuffer();
                    }
                } else {
                    data.writeBuffer(buffer);
                }

                numBuffersAndEventsWritten++;
                if (buffer.isBuffer()) {
                    numDataBuffersWritten++;
                }
            } finally {
                buffer.recycleBuffer();
            }
        } finally {
            bufferConsumer.close();
        }
    }

网络通信

一般的流计算系统德消息传递,都是基于推送机制的

  • 数据从上游处理节点推送给下游处理节点

在这个过程中如果下游的处理能力无法应对上游的数据发送速度,那么就会导致数据在下游处理节点累积,一旦超过了处理限度,就可能会发生数据丢失、进程错误、内存空间不足、CPU 使用率过高等 各种难以预测的情况,最终导致资源耗尽甚至系统崩溃

  • 为了保持整个流计算系统的稳定性,需要对上游节点发送数据的速度进行流量控制,这种控制机制 叫做反压 (BackPressurs)

很多种情况都会导致反压

  • JVM 垃圾回收的停顿导致数据的积压
  • 秒杀等高并发情况导致的瞬间流量爆发
  • Task 中业务逻辑复杂导致数据记录处理比较慢

网络连接

Flink 的 Task 在物理层上时基于 TCP 连接,且 TaskManager 上所有 Task 共享一个 TCP 连接

  • 所以 Flink 1.5 版本之前的流控机制是基于连接的
  • 跨网络的数据交换 [上]
  • 经过 ResultPartitionNettyBufferTCPInputGate

无流控

Task 的发送缓冲池耗尽时,也就是结果子分区的 Buffer 队列中或者更底层的基于 Netty 的网络栈的网络缓冲区满时,生产者 Task 就被阻塞,无法继续处理数据,开始产生背压.
Task 的接收缓冲区耗尽时,也是类似的效果,较低层网络栈中传入的 Netty Buffer 需要通过网络缓冲区提供给 Flink.
如果相应 Task 的缓冲池中没有可用的 Buffer,Flink 停止从该 InputChannel 读取,直到在缓冲区中可用 Buffer 时才会开始读取数据并交给算子进行处理.
这将对使用该 TCP 连接的所有 Task 造成背压,影响范围是整个 TaskManager 上运行的 Task.
image.png


基于信用的流控

Flink 原有流控机智的基础上,拓展出了新的流控机制 — 基于信用的流量控制,用来解决同一个 TaskManagerTask 之间相互影响的问题.
基于信用的流控机制可以确保下游总是有足够的内存接受上游的数据,不会出现下游无法接收上游数据的情况,

该流控机制作用于 Flink 的数据传输层,在结果子分区 (ResultSubPartition) 和输入通道 (InputChannel) 引入了信用机制,每个远端的 InputChannel 现在都有自己的一组 独占缓冲区,不再使用共享的本地缓冲池.
LocalBufferPool 中的 Buffer 称为 浮动缓存 (Float Buffer),因为 LocalBufferPool 的大小是浮动的,并可以用于所有 InputChannel.

  1. 下游接收端将可用的 Buffer 数量作为信用值 (1 Buffer = 1 信用) 通知给上游
  2. 每个结果子分区 (ResultSubPartition) 将跟踪其对应的 InputChannel 的信用值
  3. 如果信用可用,则缓存仅转发到较低层的网络栈,每发送一个 Buffer 都会对 InputChannel 的信用值 减 1
  4. 在发送 Buffer 的同时,还会发送前结果子分区 (ResultSubPartition) 队列中的积压数据量 (Backlog Size)
  5. 下游的接收端会根据积压数据量从浮动缓冲区申请适当数量的 Buffer , 以便更快的处理上游积压等待发送的数据
  6. 下游接收端首先会尝试获取与 Backlog 大小一样多的 Buffer,但浮动缓冲区的可用 Buffer 数量可能不够,只能尝试一部分甚至获取不到 Buffer
    • 保证每次上游发送的数据都是下游 InputChannelBuffer 可以承受的数据量
  7. 下游接收端会充分利用获取到的 Buffer ,并且会持续等待新的可用 Buffer

基于流控的流量控制使用配置参数 buffer-per-channel 参数来设置独占的缓冲池大小,使用配置参数 floating-buffers-per-gate 设置 InputGate 输入网关的缓冲池大小,输入网关的缓冲池由属于该网关的 InputChannel 共享,其缓冲区上限与基于连接的流控机制相同.

一般情况下,使用这两个参数的默认值,理论上可以达到与不采用流控机制的吞吐量一样高.

流量控制的最大吞吐量至少于没有流量控制时一样高,前提是网络的延迟比较低.

相比没有流量控制的接收器的背压机制,信用机制提供了更直接的控制逻辑:

  • 如果接收缓存不足,其可用信用值会降到 0 ,发送方会停止发送.
  • 这样只在这个 InputChannel 上存在背压,而不会影响其他 Task, 从而避免一个Task 的处理能力不足导致其所在的 TaskManager 上所有的 Task 都无法接收数据的问题.