什么是反压问题?

  1. 反压通常产生于这样的场景:短时负载高峰导致系统接收数据的速率远高于它处理数据的速率。许多日常问题都会导致反压,例如,垃圾回收停顿可能会导致流入的数据快速堆积,或者遇到大促或秒杀活动导致流量陡增。反压如果不能得到正确的处理,可能会导致资源耗尽甚至系统崩溃

Flink中的反压

  1. Flink在运行时主要由operatorsstreams构成,每个operator会消费中间态的流并在流上转换,形成新的流。
  2. Flink的实现反压的机制类似于Java中的BlockingQueue队列,通过LocalBufferPool来实现。

网络传输中的内存管理

  1. 网络上传输的数据会首先写到该Task(任务链)的**InputGate**(IG)中有Task处理后再写到**ResultPartition**中(RS),输入和输出的数据存在**LocalBufferPool**中,该Buffer是**MemorySegment**的包装类

0101反压补充 - 图1

初始化步骤

初始化NetworkEnvironment

  1. **TaskManager**启动时会先初始化**NetworkEnvironment**对象,TM中所有与网络相关的类由该类来管理,其中就包括**NetworkBufferPool**。
  2. 会在NetworkBufferPool中生成一定数量的内存块**MemorySegment**默认是2048 * 32 Kb,内存块的总数量代表了网络传输所有可以用的内存,NetworkEnvironment NetworkBufferPool Task之间共享,每个TM只会实例化一个。

Task申请网络缓冲资源

  1. Task 线程启动时,会向 NetworkEnvironment 注册,NetworkEnvironment 会为 Task InputGateIG)和 ResultPartitionRP 分别创建一个 **LocalBufferPool**(缓冲池)并设置可申请的 MemorySegment(内存块)数量。IG 对应的缓冲池初始的内存块数量与 IG **InputChannel** 数量一致,RP 对应的缓冲池初始的内存块数量与 RP 中的 **ResultSubpartition** 数量一致。

InputGate为一个Task中的,InputChannel为一个SubTask中的;RP与Rsp的意义同理

  1. 不过,每当创建或销毁缓冲池时,NetworkBufferPool 会计算剩余空闲的内存块数量,并平均分配给已创建的缓冲池。
  2. **注意,这个过程只是指定了缓冲池所能使用的内存块数量,并没有真正分配内存块,只有当需要时才分配。**
  3. 为什么要动态地为缓冲池扩容呢?因为内存越多,意味着系统可以更轻松地应对瞬时压力(如GC),不会频繁地进入反压状态,所以我们要利用起那部分闲置的内存块。

Task线程接收到数据时

  1. 此时Netty端接收到数据会向本地池申请,如果本地池达到上限则会向NetworkBufferPool申请,如果均载满,则会停止读取

Buffer回收

  1. 当一个内存块被消费完之后,即输入端的字节被序列化成对象,输出端的字节写入到了netty channel中,则会调用buffer.recycle()方法还给LocalBufferPool
  2. 如果当前申请的数量超过了池子的容量,则会还给NetworkBufferPool,如果没有则会继续留在池子中,减少反复开销。

反压步骤

  1. 记录“A”进入了 Flink 并且被 Task 1 处理。(这里省略了 Netty 接收、反序列化等过程)
  2. 记录被序列化到 buffer 中。
  3. 该 buffer 被发送到 Task 2,然后 Task 2 从这个 buffer 中读出记录。

0101反压补充 - 图2

总结

  1. Task 1 在输出端有一个相关联的 LocalBufferPool(称缓冲池1),Task 2 在输入端也有一个相关联的 LocalBufferPool(称缓冲池2)。如果缓冲池1中有空闲可用的 buffer 来序列化记录 A”,我们就序列化并发送该 buffer

同节点反压和跨节点反压

Netty水位机制:跨节点反压

NettyServer初始化

  1. // 默认高水位值为2个buffer大小, 当接收端消费速度跟不上,发送端会立即感知到
  2. //低水位,一个buffer大小+1
  3. bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
  4. //高水位,2个buffer大小
  5. bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize());

发送代码

当输出缓冲中的字节数超过了高水位值, 则 Channel.isWritable() 会返回false。当输出缓存中的字节数又掉到了低水位值以下, 则 Channel.isWritable() 会重新返回true。Flink 中发送数据的核心代码在 PartitionRequestQueue 中,该类是 server channel pipeline 的最后一层。发送数据关键代码如下所示

  1. private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
  2. if (fatalError) {
  3. return;
  4. }
  5. Buffer buffer = null;
  6. try {
  7. // channel.isWritable() 配合 WRITE_BUFFER_LOW_WATER_MARK
  8. // 和 WRITE_BUFFER_HIGH_WATER_MARK 实现发送端的流量控制
  9. if (channel.isWritable()) {
  10. // 注意: 一个while循环也就最多只发送一个BufferResponse, 连续发送BufferResponse是通过writeListener回调实现的
  11. while (true) {
  12. if (currentPartitionQueue == null && (currentPartitionQueue = queue.poll()) == null) {
  13. return;
  14. }
  15. buffer = currentPartitionQueue.getNextBuffer();
  16. if (buffer == null) {
  17. // 跳过这部分代码
  18. ...
  19. }
  20. else {
  21. // 构造一个response返回给客户端
  22. BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(), currentPartitionQueue.getReceiverId());
  23. if (!buffer.isBuffer() &&
  24. EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) {
  25. // 跳过这部分代码。batch 模式中 subpartition 的数据准备就绪,通知下游消费者。
  26. ...
  27. }
  28. // 将该response发到netty channel, 当写成功后,
  29. // 通过注册的writeListener又会回调进来, 从而不断地消费 queue 中的请求
  30. channel.writeAndFlush(resp).addListener(writeListener);
  31. return;
  32. }
  33. }
  34. }
  35. }
  36. catch (Throwable t) {
  37. if (buffer != null) {
  38. buffer.recycle();
  39. }
  40. throw new IOException(t.getMessage(), t);
  41. }
  42. }
  43. // 当水位值降下来后(channel 再次可写),会重新触发发送函数
  44. @Override
  45. public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
  46. writeAndFlushNextMessageIfPossible(ctx.channel());
  47. }