概述

Netty 的 Unsafe 并不是指 JDK Unsafe 类,它是 Netty 对 JDK 底层的 Socket API 的抽象,用于底层操作。

层次结构

Unsafe层次结构图(全).png
Unsafe 层次结构很好理解,Netty 主要提供三种不同的底层 I/O 实现方式,分别是 NIO、KQueue、Epoll。它们都对应不同的 Unsafe 实现类。而我们主要关注的还是 NIO,所以下图是与 NIO 相关的 Unsafe 层次结构。
NIOUnsafe层次结构.png

  • Unsafe 接口定义和 Socket 底层相关的 API。比如注册、绑定、连接、写入、刷新等。
  • NioUnsafe 是特殊的 Unsafe 的子类,提供访问底层 SelectableChannel 的 API,还提供重要的 read() 操作。
  • AbstractUnsafe 是 Unsafe 接口的实现骨架。所有的子类实现都必须继承 AbstractNioUnsafe。它基本实现了 Unsafe 接口,但也抽象出以 do 开头的抽象方法待特定类型的子类实现。内部有一个非常重要的被 volatile 修饰的变量 outboundBuffer,它是 Netty 向底层写入的数据缓冲池。我们重点关注它的 write() 方法。
  • AbstractUnsafe 主要实现了 Unsafe#connect()Unsafe#finishConnect() 接口以及 AbstractUnsafe 部分抽象方法。它也提供两个抽象类待子类实现,分别是 doConnect 和 doFinishConnect。
  • NioByteBufUnsafe 和 NioMessageUnsafe 最核心的区别在 read() 方法上,总结如下:
    • NioMessageUnsafe 是从 NioServerSocketChannel 类型通道中读取数据。而该通道只关注 OP_ACCEPT 事件,所以获取的是通过 ServerSocketChannel#accept() 返回的 SocketChannel 对象,并且会使用 NioSocketChannel 包装并写入到 List
    • NioByteUnsafe 则是从 SocketChannel 对象中读取数据并写入 ByteBuf 对象中。

      Unsafe

      Unsafe 接口详情如下:
    • 方法名称 描述
      recvBufAllocHandle 在接收数据时预分配 ByteBuf 对象
      localAddress 绑定到本地的 SocketAddress
      remoteAddress 对端的 SocketAddress,如果尚未绑定成功,则返回 null
      register 向 EventLoop 对象中注册 Channel,一旦完成便会通知 ChannelPromise(异步操作)
      bind 将通道绑定到本地端口,一旦完成便会通知 ChannelPromise(异步操作)
      connect 连接到对端,一旦完成便会通知 ChannelPromise(异步操作)
      disconnect 断开 Channel 连接,一旦完成便会通知 ChannelPromise(异步操作)
      close 关闭 Channel 连接,一旦完成便会通知 ChannelPromise(异步操作)
      closeForcibly 立即关闭 Channel,而不会触发任何事件。可能在尝试注册失败时使用
      deregister 从 EventLoop 中注销 Channel,一旦完成便会通知 ChannelPromise(异步操作)
      beginRead 安排读操作(内部调用 AbstraceChannel.doBeginRead 方法)。此操作会填充 ChannelPipeline 中第一个 ChannelInboundHandler 的入站缓冲区。如果已经有一个等待的读操作,此方法不会做任何事情
      write 安排写操作,一旦完成便会通知 ChannelPromise(异步操作)
      flush 向底层 Socket 写入所有通过 write() 方法添加的消息
      voidPromise 返回一个特殊的 ChannelPromise,它可以重复使用并传递给 Channel.Unsafe 中的操作
      它永远不会接收通知。仅仅直到一个占位符的作用
      outboundBuffer 返回输出缓冲区对象。该对象存储当前 Channel 所有 write 请求的消息

      Unsafe 接口类将一个通道的步骤抽象了出来,比如注册、绑定、连接、断开连接、通道关闭、开始读、写、刷新等操作。它们是底层通道连接必不可少的环节。同时,我们也注意到,和通道操作相关的多数方法都是异步方法,提供 ChannelPromise 通知回调,这也是 Netty 异步编程实现基础之一。

      Unsafe 定义了一个有意义的接口:beginRead()。那它对 NIO 意味着什么呢? 在 AbstractNioChannel#doBeginRead() 中我们可以看到其实就是把 OP_READ 事件添加到 SelectionKey 里面。Unsafe 接口并没有定义 read() API,而在 NioUnsafe 接口中我们就能看到 read() API 了。估计这个 API 对于其他类型的 I/O 可能是无共性。所以才会有 NioUnsafe 单独的接口定义和 NIO 相关的部分 API。

      AbstractUnsafe

      方法名称 描述
      recvBufAllocHandle 实现(会被子类重写)
      localAddress 实现
      remoteAddress 实现
      register 实现
      bind 实现(内部调用 AbstraceChannel#doBind 方法)
      connect 未实现
      disconnect 实现
      close 实现
      closeForcibly 实现
      deregister 实现
      beginRead 实现(内部调用 AbstraceChannel#doBeginRead 方法),此方法会被子类重写
      write 实现(会被子类重写)
      flush 实现(会被子类重写)
      voidPromise 实现
      outboundBuffer 实现

      AbstractUnsafe 是接口 Unsafe 的基础实现骨架,所有的 Unsafe 实现类都需要继承此抽象类。此外,我们也可以从描述中看到:除了 connect 接口没有被实现外,其余所有的接口都有默认实现。

      NioUnafe

      方法名称 描述
      ch 返回底层的 SelectableChannel 对象
      finishConnect 调用 SocketChannel.finishConnect() 方法判断 Channel 是否连接成功,否则会抛出异常
      read 从底层的 SelectableChannel 对象中读取数据
      forceFlush 强制刷新,直接调用 flush0() 方法

      NioUnsafe 是作为对 Java NIO 的补充接口类,它定义了四个重要的接口,其中最重要的就是 read 了。该接口的实现类会从 SelectableChannel 通道中读取数据。此外,我们还可以通过 ch 方法获取 selectableChanel 对象,这样,我们就可以调用底层相关方法进行数据读取和写入操作了。

      AbstractNioUnsafe

      方法名称 描述
      ch 实现
      connect 实现(内部调用AbstractNioChannel#doConnect)
      finishConnect 实现
      flush0 重写抽象类 AbstractUnsafe
      forceFlush 实现

      绝大部分 Unsafe 定义的方法都由抽象类 AbstraceUnsafe 所实现(除了 connect 接口),而对于 NIO 单独增加了 NioUnsafe 接口类,AbstractNioUnsafe 则是对此接口的实现。在这里,也对 Unsafe 定义的 connect 接口做了实现。

      NioByteUnsafe

      方法名称 描述
      read 实现

      NioByteBufUnsafe 是抽象类 AbstractNioUnsafe 的实现类之一。它只实现了 read 接口。

      NioMessageUnsafe

      方法名称 描述
      read 实现

      NioMessageBufUnsafe 是抽象类 AbstractNioUnsafe 的实现类之一。它和 NioByteBufUnsafe 一样也只实现了 read 接口,所以,他们的 read 操作是有区别的,可以这么理解:

      1. NioMessageUnsafe 用于 NioServerSocketChannel。内部它将调用 java.nio.channels.ServerSocketChannel#accept 方法获取 SocketChannel 对象。这也属于一次读操作。
      2. NioByteUnsafe 用于 NioSocketChannel。内部调用 java.nio.channels.ReadableByteChannel#read 方法从 Channel 中读取数据。
      3. 两者的区别是 NioMessageUnsafe 是获取 SocketChannel 对象,而 NioByteUnsafe 则是从 SocketChannel 对象中读取二进制数据。

        NioSocketChannelUnsafe

        | 方法名称 | 描述 | | :—-: | :—-: | | prepareToClose | 重写抽象类 AbstractUnsafe |

      该类是 NioByteUnsafe 的子类,仅重写 prepareToClose 方法,具体就不展开描述了。

      以上重点讲解了 NIO 侧的 Unsafe 结构层次,它也是我们最常使用的底层实现。理清 API 的定义以及实现类,有助于我们更对 Unsafe 的层次有更深入的了解。这样,当我们看源码的时候就不会云里雾里,找不着北了。
      接下来,我们重点讲解 NIO 的 write 和 flush 方法。但是讲解之前,我们先了解 ChannelOutboundBuffer。

      源码解析

      ChannelOutboundBuffer

      ChannelOutboundBuffer 是 Netty 定义的输出缓冲区,当我们程序调用 write(Object msg) 方法时,消息 msg 就会被存放到 ChannelOutboundBuffer 对象里,再次调用 flush() 方法后 Netty 才会把 ChannelOutboundBuffer 暂存的消息写入底层的 Socket 对象内部。这就是 ChannelOutboundBuffer 的作用,但是远远没有这么简单,比如对于非阻塞的 Channel,调用底层的 SocketChannel#write() 方法时可能写入的数据字节数为 0,这里 ChannelOutboundBuffer 需要做一些操作避免数据丢失。再者,ChannelOutboundBuffer 会根据水位线判断是否继续执行写入操作,这就是 Netty 帮助我们做了限流处理。

      内部变量

      我们先对 ChanneOutboundBuff 内部变量有一个简单的认识:
      ChannelOutboundBuffer内部变量.png

      • 保存 Channel 引用,可以调用相关 API 进行数据读取和写入操作(与 Socket 交互)。
      • 三个非常重要的指针变量,用来控制消息的添加和写入。
        • flushedEntry:链表中第一个已刷新节点。表示从当前节点向底层 Socket 写入消息的起始位置。
        • unflushedEntry:链表中第一个未刷新节点。表示还未进行刷新操作的消息起始位置。
        • tailEntry:链表中最后一个未刷新节点。
        • 相关写入操作如下图所示:

      写入刷新操作时指针变量情况.png

      • unwritable:1 表示不可写入。Netty 用做流量管控。当缓冲区内的字节总数超过最高水位线时,unwritable=1,此时不允许向 ChannelOutbound 写入数据。当缓冲区内的字节总数低于最低水位线时, unwritable=0,此时可以向 ChannelOutbound 写入数据。这样,就能避免输出缓冲区无限膨胀,导致系统崩溃。
      • totalPendingSize:现输出缓冲区排队的字节总数。通过此变量和水位线比较,控制 ChannelOutboundBuffer 是否继续接收新的消息。

      相关源码如下:

      1. // io.netty.channel.ChannelOutboundBuffer
      2. /**
      3. * (仅适用于传输层实现)一个内部数据结构,通常被 {@link AbstractChannel} 用来存储它的待处理的出站写请求。
      4. * 所有的方法都必须由 I/O 线程的传输实现调用:
      5. * {@link #size()} and {@link #isEmpty()}
      6. * {@link #isWritable()}
      7. * {@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)}
      8. */
      9. public final class ChannelOutboundBuffer {
      10. // Assuming a 64-bit JVM:
      11. // - 16 bytes object header
      12. // - 6 reference fields
      13. // - 2 long fields
      14. // - 2 int fields
      15. // - 1 boolean field
      16. // - padding
      17. static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
      18. SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
      19. private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
      20. // 本地线程缓存,可以从中获取一个长度为 1024 的 ByteBuffer 数组,二进制数据(消息)由 ByteBuffer 具象化,
      21. // ByteBuffer再由 ByteBuf 包装,而当添加到 ChannelOutboundBuffer 对象中时,它会被 Entry 包装。
      22. // 当我们需要向Socket写入数据时,首先需要从Entry单向链表中提取ByteBuffer,然后添加到 NIO_BUFFERS,最后调用
      23. // java.nio.channels.SocketChannel.write(java.nio.ByteBuffer[], int, int) 或
      24. // java.nio.channels.SocketChannel.write(java.nio.ByteBuffer) 向Socket写入数据
      25. private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
      26. @Override
      27. protected ByteBuffer[] initialValue() throws Exception {
      28. return new ByteBuffer[1024];
      29. }
      30. };
      31. // 绑定的通道
      32. private final Channel channel;
      33. // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
      34. // 链表中第一个「已刷新」的节点
      35. private Entry flushedEntry;
      36. // 链表中第一个「未刷新」的节点
      37. private Entry unflushedEntry;
      38. // 链表最后一个「未刷新」节点,新增的节点将会添加在它的后面
      39. private Entry tailEntry;
      40. // 已刷新(flushed)但未写入的节点数量
      41. private int flushed;
      42. // 现队列中排队刷新的Buffer数量
      43. private int nioBufferCount;
      44. // 现队列中排队刷新的字节总数
      45. private long nioBufferSize;
      46. // 是否失败
      47. private boolean inFail;
      48. @SuppressWarnings("UnusedDeclaration")
      49. private volatile long totalPendingSize;
      50. /**
      51. * 0:可写
      52. * 1: 不可写
      53. */
      54. @SuppressWarnings("UnusedDeclaration")
      55. private volatile int unwritable;
      56. // 异步任务
      57. private volatile Runnable fireChannelWritabilityChangedTask;
      58. // ...
      59. }

      在这里我们还看到了下 Entry 对象,下面是对它的简单理解。

      Entry

      Entry 是 ChannelOutboundBuffer 内部类,主要用来保存消息对象(一般是 ByteBuf 对象)和缓存 ByteBuffer 对象(这里向底层 Socket 写入的数据载体)。重要的变量如下图所示:
      Entry 结构示意图.png
      从上图看,Entry 类还是保存蛮多信息的,其中:

      • next 让 Entry 构成单向链表结构。消息有序。
      • 为了避免 GC,Entry 使用 ObjectPool 对象池技术。缓存 ByteBuffer 对象也是为因为频繁创建 ByteBuffer 对象是比较消耗性能,所以 Entry 就分别使用 ByteBuffer 和 ByteBuffer[] 分别对应单个和多个 ByteBuffer 情况。
      • total 表示可读字节数。如果 msg 为 ByteBuf 类型,它的值是和 size 相等,数值都为 ByteBuf.readableBytes()
      • pendingSize = size + 98。因为 Entry 对象本身的大小不可忽略,尤其是发送大量的小的或空的 ByteBuf 对象时影响尤其明显。

      更多详细说明,请看:

      // io.netty.channel.ChannelOutboundBuffer.Entry
      static final class Entry {
          private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
              @Override
              public Entry newObject(Handle<Entry> handle) {
                  return new Entry(handle);
              }
          });
      
          // 回收处理器
          private final Handle<Entry> handle;
          // 后向节点指针,构成单向链表
          Entry next;
          // 原始消息对象
          Object msg;
      
          // 缓存「ByteBuff」对象(主要用于CompositeByteBuf)
          ByteBuffer[] bufs;
          // 对应非组合的「ByteBuf」对象
          // 关于缓存讨论详见 https://github.com/netty/netty/issues/2761
          ByteBuffer buf;
      
          // ByteBuffer数量,默认值为:-1 表示未计数
          int count = -1;
      
          // 通知回调
          ChannelPromise promise;
      
          // 处理进度
          long progress;
          // 可读字节数。对CompositeByteBuf而言,它表示内部所有的ByteBuffer的可读字节数总和
          long total;
      
          // 消息可读字节数+本身对象空间大小(默认值:96)
          // 因为对于空的Entry对象本身也需要消耗内存,一旦量增多以后,这部分的空间开销不可以忽略不计
          int pendingSize;
          // 是否已经被取消,回收时用于判断
          boolean cancelled;
      
          // ...
      }
      

      Entry 内部有个核心方法,即 cancel,它会完成重要的资源清理工作:

      1. 释放 ByteBuf 资源。
      2. 清空缓存的 ByteBuffer。

        // io.netty.channel.ChannelOutboundBuffer.Entry#cancel
        int cancel() {
         if (!cancelled) {
             // 取消操作
             cancelled = true;
             int pSize = pendingSize;
        
             // 释放资源
             ReferenceCountUtil.safeRelease(msg);
             msg = Unpooled.EMPTY_BUFFER;
        
             pendingSize = 0;
             total = 0;
             progress = 0;
             bufs = null;
             buf = null;
             return pSize;
         }
         return 0;
        }
        

        对 Entry 做一个总结:

      • Entry 是 ChannelOutboundBuffer 用来存储消息(多数情况下是 ByteBuf 类型实例)和缓存 ByteBuffer(底层消息数据,能直接和 JDK SocketChannel 打交道)对象(优化一)。
      • 它们之间通过 next 引用构造单向链表。
      • 对 Entry 本身 Netty 使用对象池进行管理,避免频繁 GC(优化二)。
      • Entry#cancel() 方法完成重要的资源释放工作,避免资源泄漏。

        添加消息

        前面我们说过,ChannelOutbound 是输出缓冲区,当我们调用 write(Object msg) 方法时,实际消息会被写入到这个对象中。而里面核心方法是 addMessage(Object msg, int size, ChannelPromise)。对此方法有如下总结:
      1. 将原始消息对象使用内部类 Entry 进行包装。
      2. 更新相应指针变量。
      3. 将本次新增的字节总数和 totalPendingSize 相加并保存。
      4. 根据 totalPendingSize 判断是否需要更新 unwritable 变量。
        1. 如果超过最高水位线,则将 unwritable=1。

      相关源码如下:

      // io.netty.channel.ChannelOutboundBuffer#addMessage
      /**
       * ① 将消息对象「msg」使用「Entry」对象封装
       * ② 添加到内部的单向链表中
       * ③ 增加 {@link #totalPendingSize} 大小,并判断是否超过高水位线(默认值:65536)
       *    如果超出,则将当前「ChannelOutboundBuffer#unwritable」置为 1,意味着不可写。
       *    同时触发「channelWritability」方法回调。
       */
      public void addMessage(Object msg, int size, ChannelPromise promise) {
          // #1 将消息用「Entry」对象保存
          Entry entry = Entry.newInstance(msg, size, total(msg), promise);
      
          // #2 更新「tailEntry」和「flushedEntry」节点
          if (tailEntry == null) {
              flushedEntry = null;
          } else {
              Entry tail = tailEntry;
              tail.next = entry;
          }
      
          // #3 「tailEntry」指向当前「Entry」节点
          tailEntry = entry;
          if (unflushedEntry == null) {
              unflushedEntry = entry;
          }
      
          // 将消息添加到未刷新的数组后,递增待处理字节。
          // See https://github.com/netty/netty/issues/1619
          incrementPendingOutboundBytes(entry.pendingSize, false);
      }
      
      /**
       *
       * @param size
       * @param invokeLater
       */
      private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
          if (size == 0) {
              return;
          }
      
          // #1 更新「totalPendingSize」
          long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
      
          // #2 判断是否超过高水位线
          if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
              // #2-1 超过高水位线,更新「unwritable」的值并触发「channelWritability」事件回调方法
              setUnwritable(invokeLater);
          }
      }
      
      /**
       * 将 {@link #unwritable} 设置为 1
       * @param invokeLater
       */
      private void setUnwritable(boolean invokeLater) {
          for (;;) {
              final int oldValue = unwritable;
              final int newValue = oldValue | 1;
              if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                  // 更新成功,判断旧值状态是否为0
                  if (oldValue == 0) {
                      fireChannelWritabilityChanged(invokeLater);
                  }
                  break;
              }
          }
      }
      

      addMessage() 最主要的是将原始消息封装为 Entry 并添加到链表的末尾。期间附带判断当前输出缓冲的字节数量是否过多, 如果超出了最高水位线,则将 unwritable 置为 1,表示输出缓冲区不可写。注意,这里并非中断向底层 Socket 写入的操作,所以等到 totalPendingSize 低于「低水位线」后,unwritable 将被置为 0,这些,就可以向输出缓冲区写入数据了。这种机制可以在网络拥塞的情况下有效避免输出缓冲区剧烈膨胀,避免 OOME。此外,高低水位线的值可以动态设置,这就给做底层数据流控有一个很好的扩展。

      刷新消息

      向底层 Socket 写数据称为刷新操作,那 ChannelOutbound 是如何响应的呢?主要步骤如下:

      • addFlush():修改指针变量以及对每个 Entry 进行校验。
        • 将 flushedEntry 指向 unflushedEntry 节点。
        • 遍历链表中的所有节点,判断是否已经被取消。如果取消则调用 Entry#cancel() 方法并减少 totalPendingSize 的值。
        • unflushedEntry == null。
      • nioBuffers():以数组的形式返回链表中所有的 ByteBuffer 对象。
      • removeBytes(long):根据本次写入的字节数处于链表。因为不可能一次性向 Socket 写入所有数据,所以需要根据实际写入字节数对链表进行删减和修改操作。修改意思是某个 ByteBuffer 可能只写入了部分数据,所以需要更新其封装的 ByteBuf 的 readerIndex。

      看见了么? 一次刷新操作需要调用 ChannelOutbound 三个 API。其实也比较容易理解。可以总结为:

      1. 数据整理
      2. 获取元素
      3. 移除元素

        1/数据整理

        数据整理逻辑较为简单,前面也已经简要说过了,下面是相关源码:

        // io.netty.channel.ChannelOutboundBuffer#addFlush
        /**
        * ① 更新「flushedEntry」、「unflushedEntry」节点
        *    flushedEntry指向头节点
        * ② 循环遍历「Entry」链表,校验每个「Entry」节点是否已经被取消
        */
        public void addFlush() {
         // 如果之前已经有一个「flush」操作且本次没有新的「Entry」被添加到输出缓冲区中,
         // 那么本次就不需要处理所有的「Entry」实例了
         // See https://github.com/netty/netty/issues/2577
        
         // #1 判断「unflushedEntry」指针是否为空,为空表示没有待添加的新消息
         Entry entry = unflushedEntry;
         if (entry != null) {
             if (flushedEntry == null) {
                 flushedEntry = entry;
             }
        
             // #2 遍历所有的等待刷新的「Entry」节点,需要判断每个节点的状态
             do {
                 flushed ++;
        
                 // #3 将「Entry#promise」设置为不可取消状态
                 if (!entry.promise.setUncancellable()) {
                     // #3-1 设置失败,表明 Entry 已处于「已取消」状态,
                     // 所以需要重置「Entry」对象
                     int pending = entry.cancel();
                     // #3-2 更新「totalPendingSize」且判断是否需要更改「unwritable」恢复写操作
                     decrementPendingOutboundBytes(pending, false, true);
                 }
        
                 // #4 获取下一个节点,继续操作
                 entry = entry.next;
             } while (entry != null);
        
             // #5 所有的待写入「Entry」都已遍历完毕,更新「unflushedEntry=null」节点
             unflushedEntry = null;
         }
        }
        

        其中会通过 entry.promise.setUncancellable() 方法将 Entry 内部的 ChannelPromise 对象设置为不可取消状态,通常会成功设置,如果设置失败,表明 Entry 的消息出现异常。本次将不会被写入到 Socket。在处理完所有的 Entry 之后,会把 unflushedEntry 指针修改为 NULL。

        2/获取元素

        我们知道,Netty 的 ByteBuf 是用来简单 JDK 的 ByteBuffer 使用,但实际和 Socket API 进行数据交互的还是 ByteBuffer。因此,这个方法就是从 ByteBuf 对象中提取出 ByteBuffer 对象。相关源码如下:

        /**
        * 如果当前待处理的消息「仅由」{@link ByteBuf} 组成,则返回直接NIO缓冲区的数组。
        * {@link #nioBufferCount()} 返回数组中的NIO缓冲区数量。
        * {@link #nioBufferSize()}  NIO缓冲区的可读字节总数。
        *
        * 请注意:返回的数组已重用。因此不应该「escape」{@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
        * 例子详见 {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)}
        *
        * @param maxCount 将添加的最大ByteBuffer数量,默认值:Integer.MAX_VALUE
        * @param maxBytes 最大字节数。
        *                可能会超出此值,但是我们会尽最大努力在返回的数组中包含至少1个ByteBuffer,这样能确保写入进度。
        *                默认值:Integer.MAX_VALUE
        */
        public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
         assert maxCount > 0;
         assert maxBytes > 0;
         long nioBufferSize = 0;
         int nioBufferCount = 0;
        
         // #1 从「本地线程缓存」中获取ByteBuffer[]数组
         final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
         ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
        
         // #2 获取链表已刷新的起始节点的指针
         Entry entry = flushedEntry;
        
         // #3
         while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
             // #4 当前节点是否被取消
             if (!entry.cancelled) {
                 ByteBuf buf = (ByteBuf) entry.msg;
                 final int readerIndex = buf.readerIndex();
                 final int readableBytes = buf.writerIndex() - readerIndex;
        
                 // #5 判断「ByteBuf」可读字节数是否>0
                 if (readableBytes > 0) {
                     if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
                         // 如果 readableBytes+nioBufferSize 超过 maxBytes 并且至少有一个Entry对象,
                         // 我们将停止填充「ByteBuf[]」数组,主要有以下两个原因:
                         // ① bsd/osx 不允许一次「writev(...)」调用就写入多于Integer.MAX_VALUE的字节数,所以它会返回「EINVAL」,
                         //   这会导致IOException。在Linux上,根据架构和内核的不同,它可能会起作用,但为了安全起见,这里我们会执行这个限制。
                         // ② 在数据中放入超过操作系统可接受数据是无任何意义的。
                         // See also:
                         // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
                         // - https://linux.die.net//man/2/writev
                         break;
                     }
                     // 增加字节数量
                     nioBufferSize += readableBytes;
                     int count = entry.count;
                     if (count == -1) {
                         // noinspection ConstantValueVariableUse
                         entry.count = count = buf.nioBufferCount();
                     }
                     // 计算需要空间(对应数组长度)
                     int neededSpace = min(maxCount, nioBufferCount + count);
                     if (neededSpace > nioBuffers.length) {
                         // 超出默认数组长度(1024),需要进行扩容
                         nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
                         // 更新本地线程缓存
                         NIO_BUFFERS.set(threadLocalMap, nioBuffers);
                     }
        
                     if (count == 1) {
                         // ByteBuffer数量=1,直接放入
                         ByteBuffer nioBuf = entry.buf;
                         if (nioBuf == null) {
                             // 缓存ByteBuffer,因为如果它是一个派生缓冲区,它可能需要创建一个新的ByteBuffer实例。
                             entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                         }
                         nioBuffers[nioBufferCount++] = nioBuf;
                     } else {
                         nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
                     }
                     if (nioBufferCount >= maxCount) {
                         break;
                     }
                 }
             }
        
             // 获取下个节点
             entry = entry.next;
         }
         this.nioBufferCount = nioBufferCount;
         this.nioBufferSize = nioBufferSize;
        
         return nioBuffers;
        }
        

        对方法 nioBuffers 总结如下:

      4. 数组 ByesBuffer[] 是从 FastThreadLocal 对象中获取。每个线程都有独立的数组实例,默认长度为 1024。

      5. 消息对象必须是 ByteBuf 实现类且当前节点没有被取消。
      6. 通过计算默认的数组的容量无法满足,则需要进行扩容处理并存入 FastThreadLocal。
      7. 获取 ByteBuf 对象内部中所有的 ByteBuffer 并写入数组中。因为可能为 CompositeByteBuf 实例对象,内部可能含有 >=1 的 ByteBuffer。所以需要分别判断,进行不同的逻辑处理。这在源码中都有体现。
      8. 记录本次待 flush 的 ByteBuffer 数量和总共字节数。

      获取元素是指获取链表中所有的 ByteBuffer 对象并以数组的形式返回。这样就可以调用 java.nio.channels.SocketChannel#write(java.nio.ByteBuffer)java.nio.channels.SocketChannel#write(java.nio.ByteBuffer[], int, int) 向底层 Socket 写入数据了。

      3/移除元素

      当写完数据后,需要根据实际的写入数据量进行判断,对 Entry 链表进行处理,这就是 removeBytes(long) 的功能了,步骤总结如下:

      1. 获取 flushedEntry 指针对象所存储的消息(msg),并强转为 ByteBuf 对象。
      2. 将当前 ByteBuf 的可读字节数(readableBytes)和实际写入字节数(writtenBytes)相比较:
        • readableBytes <= writtenBytes:意味着当前 ByteBuf 的数据本次全部写入,需要将此节点删除。
        • readableBytes > writtenBytes:意味着当前 ByteBuf 的数据只有部分(或没有)成功写入。,所以需要修改 ByteBuf 的 readerIndex 并跳出循环,等到下次继续写入。
      3. 清空 NIO_BUFFERS 数组,这样可以对其进行 GC 操作。

      相关源码如下:

      // 
      /**
       * 删除已完全写入的条目并更新部分已写入条件的「readerIndex」,
       * 因为对象可能只写入了部分数据
       * 此操作会假设所有的消息都在 {@link ByteBuf} 对象中。
       */
      public void removeBytes(long writtenBytes) {
          for (;;) {
              // #1 获取当前已写入节点消息对象
              Object msg = current();
              if (!(msg instanceof ByteBuf)) {
                  // 非「ByteBuf」对象,成功写入数据量一定为0
                  assert writtenBytes == 0;
                  // 退出
                  break;
              }
      
              final ByteBuf buf = (ByteBuf) msg;
              final int readerIndex = buf.readerIndex();
              final int readableBytes = buf.writerIndex() - readerIndex;
      
              // 可写入字节数 <= 成功写入字节数:可写入数据已全部发送,则移除整个Entry对象
              if (readableBytes <= writtenBytes) {
                  // #2 如果当前ByteBuf对象的可读字节数<=已成功写入字节数
                  if (writtenBytes != 0) {
                      // #2-1 通知当前消息的 {@link ChannelPromise} 关于写进展情况
                      progress(readableBytes);
      
                      // 剩余字节
                      writtenBytes -= readableBytes;
                  }
                  // 移除节点
                  remove();
              } else { // readableBytes > writtenBytes
                  // 可写入字节数>成功写入字节数:只写了部分数据,需要修改「readerIndex」指针
                  if (writtenBytes != 0) {
                      // 修改「readerIndex」
                      buf.readerIndex(readerIndex + (int) writtenBytes);
                      // 通知
                      progress(writtenBytes);
                  }
                  // 跳出循环
                  break;
              }
          }
          // 将 NIO_BUFFERS 置为 NULL,
          clearNioBuffers();
      }
      

      小结

      ChannelOutboundBuffer 是 Netty 的输出缓冲池,所有的 writer() 写入的数据都会先存在这个地方,既然是一个池,他并不能是无限的空间,肯定有容量大小,这 ChannelOutboundBuffer 也做了保证和处理,我们不必担心会出现 OOME。
      关于 ChannelOutboundBuffer 的使用和源码在这里这算讲解完毕了,其他细枝末节这里就不再提及,相信各位聪明的读者朋友们能通过 DEBUG 快速理解和掌握。接下来,回到 Unsafe 流程主线,我们主要还是理解 Unsafe 在接收和发送数据的过程中扮演什么样的角色。一切的一切还得从那个类说起。

      NioMessageUnsafe、NioByteBufUnsafe 傻傻分不清楚

      在前面我们也讲过,NioMessageUnsafe 和 NioByteUnsafe 的区别是:

      • NioMessageUnsafe 服务于 NioServerSOcketChannel。
      • NioByteChannel 服务于 NioSocketChannel。

      不信,请看下图:
      AbstractNioChannel_层次结构图.png
      NioServerSocketChannel 继承 AbstractNioMessageChannel 抽象类,而 NioSocketChannel 继承 AbstractNioByteChannel 抽象类,从名字上就可以看出两者之间的对应关系了。下面从源码看看到底什么时候实例化具体的 Unsafe 对象。

      Unsafe 对象实例化时机

      在抽象类 AbstractChannel 中定义了这么一个抽象方法:

      // io.netty.channel.AbstractChannel#newUnsafe
      protected abstract AbstractUnsafe newUnsafe();
      

      它需要特定子类实现,不用说,这个方法肯定是在下面的两个抽象类实现,继续看:

      // io.netty.channel.nio.AbstractNioMessageChannel#newUnsafe
      @Override
      protected AbstractNioUnsafe newUnsafe() {
          return new NioMessageUnsafe();
      }
      
      
      // io.netty.channel.nio.AbstractNioByteChannel#newUnsafe
      @Override
      protected AbstractNioUnsafe newUnsafe() {
          return new NioByteUnsafe();
      }
      

      看清楚了么? 当 NioServerSokcetChannel 实例化时,就会通过 newUnsaef 创建 NioMessageUnsafe 对象。理清了这两个 Unsafe 对象的实例化时机,再重点看看它们的 read() 方法做了些什么事情?

      Unsafe 读操作

      NioMessageUnsafe

      NioMessageUnsafe 的读操作会做以下几件事情:

      1. 重置分配器中的计数器(allocHandle),该计数器的计数用来估计下次分配ByteBuf对象的大小。
      2. 从底层 Channel 中读取数据。对NIO来说,将调用 NioServerSocketChannel#doReadMessages(List) 方法,而这个方法底层是调用 ServerSocketChannel#accept() 接收新连接(得到 SocketChannel 对象),然后用 io.netty.channel.socket.nio.NioSocketChannel 封装原始的 SocketChannel。
      3. 增加计数
      4. 判断是否还需要继续进行读操作
      5. 遍历消息集合,挨个触发「ChannelRead」事件
      6. 触发「ChannelComplete」事件
      7. 有异常,处理异常(NioSocketChannel 会主动关闭 Channel)
      8. 根据条件判断是否需要移除OP_READ事件。只有在没有读等待且非自动读的情况下才会移除OP_READ事件,否则不进行任务操作。

      相关源码解析如下:

      // io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe
      /**
       * 服务于 {@link java.nio.channels.ServerSocketChannel}
       */
      private final class NioMessageUnsafe extends AbstractNioUnsafe {
      
          // 保存读取消息
          private final List<Object> readBuf = new ArrayList<Object>();
      
          /**
           * 底层调用 {@link java.nio.channels.ServerSocketChannel#accept()}接收一个新连接
           * ① 重置分配器中的计数器(allocHandle),该计数器的计数用来估计下次分配ByteBuf对象的大小。
           * ② 对NIO来说,将调用 {@link io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages(List)}
           *    而这个方法就是调用 {@link ServerSocketChannel#accept()} 接收新连接,
           *    然后用 {@link io.netty.channel.socket.nio.NioSocketChannel} 封装原始的 SocketChannel
           * ③ 增加计数
           * ④ 判断是否还需要继续进行读操作
           * ⑤ 遍历消息集合,挨个触发「ChannelRead」事件
           * ⑥ 触发「ChannelComplete」事件
           * ⑦ 有异常,处理异常(NioSocketChannel 会主动关闭 Channel)
           * ⑧ 根据条件判断是否需要移除OP_READ事件
           */
          @Override
          public void read() {
              assert eventLoop().inEventLoop();
              final ChannelConfig config = config();
              final ChannelPipeline pipeline = pipeline();
              final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
              // #1 重置任何已经累积的计数器,并建议下一个读循环应该读取多少消息/字节。
              allocHandle.reset(config);
      
              boolean closed = false;
              Throwable exception = null;
              try {
                  try {
                      do {
                          // 调用「AbstractNioMessageChannel#doReadMessages」
                          // 该方法会接收一个新的 Socket 连接并存入到「readBuf」列表中
                          int localRead = doReadMessages(readBuf);
                          if (localRead == 0) {
                              break;
                          }
                          if (localRead < 0) {
                              closed = true;
                              break;
                          }
      
                          // 增加当前读循环的信息数量
                          allocHandle.incMessagesRead(localRead);
                      } while (allocHandle.continueReading()); // 判断是否继续进行读操作
                  } catch (Throwable t) {
                      exception = t;
                  }
      
                  // 遍历集合,挨个触发「channelRead」事件
                  int size = readBuf.size();
                  for (int i = 0; i < size; i ++) {
                      // 本次读操作完成,即没有读等待
                      readPending = false;
                      pipeline.fireChannelRead(readBuf.get(i));
                  }
                  // 清除集合
                  readBuf.clear();
                  allocHandle.readComplete();
      
                  // 触发「channelReadComplete」事件
                  pipeline.fireChannelReadComplete();
      
                  if (exception != null) {
                      closed = closeOnReadError(exception);
                      pipeline.fireExceptionCaught(exception);
                  }
      
                  // 关闭通道
                  if (closed) {
                      inputShutdown = true;
                      if (isOpen()) {
                          close(voidPromise());
                      }
                  }
              } finally {
                  // 只有在没有读等待且非自动读的情况下才会移除OP_READ事件,否则不进行任务操作
                  // 添加readPending是为了解决以下问题:
                  // 检查是否还有尚未处理的读请求。这可能有两个原因:
                  // ① 用户在 channelRead(...)方法中调用了 Channel.read()
                  // ② 用户在 channelReadComplete(...)方法中调用了 Channel.read()
                  // See https://github.com/netty/netty/issues/2254
                  if (!readPending && !config.isAutoRead()) {
                      // ① Netty 每次只会处理部分「ByteBuf」中的数据,避免某一通道长时间占用导致性能下降。
                      // ② 这就可能导致通道缓冲区的数据一次性可能不会处理完所有数据。
                      // ③ 移除OP_ACCEPT是因为他工作在LT模式下,即便移除了,只要「fd」中存在数据,下次轮询仍然会把fd选出进行处理。
                      removeReadOp();
                  }
              }
          }
      }
      

      仅针对 ServerSocketChannel 通道类型而言,它仅仅是接收新的连接。

      NioByteUnsafe

      NioByteUnsafe 则适用于NioSocketChannel,它底层一般是调用 java.nio.channels.ReadableByteChannel#read API 从底层 Channel 中读取数据并写入 ByteBuffer 对象中。对 NioByteUnsafe 的读操作步骤总结如下:

      1. 根据配置和当前通道状态判断本次读操作是否应中断(取消),如果确认取消则移除OP_READ事件
      2. 获取「ByteBuf」分配器
      3. 分配器会根据上次读取到的数据大小分配本次的「ByteBuf」空间。
      4. 尝试从底层 SocketChannel 读取数据,一般来说 ByteBuf 刚好容纳通道中所有的数据。
      5. 根据返回实际读取字节数判断是否读取末尾或其他原因,并做出相应处理
      6. 触发「channelRead」事件
      7. 以下条件成立则继续读(默认条件下):
        1. 本轮读次数小于16
        2. 本轮读取字节数>0
        3. 本次读操作填满「ByteBuf」对象
      8. 触发「channelReadComplete」事件

      相关源码解析如下:

         /**
           * 从JDK底层P{@link SelectableChannel} 中读取数据
           * ① 根据配置和当前通道状态判断本次读操作是否应中断(取消),如果确认取消则移除OP_READ事件
           * ② 获取「ByteBuf」分配器
           * ③ 分配器会根据上次读取到的数据大小分配本次的「ByteBuf」空间。
           * ④ 尝试从底层 SocketChannel 读取数据,一般来说 ByteBuf 刚好容纳通道中所有的数据。
           * ⑤ 根据返回实际读取字节数判断是否读取末尾或其他原因,并做出相应处理
           * ⑥ 触发「channelRead」事件
           * ⑦ 以下条件成立则继续读(默认条件下):
           *    1. 本轮读次数小于16
           *    2. 本轮读取字节数>0
           *    3. 本次读操作填满「ByteBuf」对象
           * ⑧ 触发「channelReadComplete」事件
           */
          @Override
          public final void read() {
              // #1 获取通道配置信息
              final ChannelConfig config = config();
      
              // #2 是否应该中断读准备,默认为 false
              if (shouldBreakReadReady(config)) {
                  // #2-1 移除OP_READ事件
                  clearReadPending();
                  return;
              }
      
              // #3 待传播事件的管道
              final ChannelPipeline pipeline = pipeline();
      
              // #4 获取「ByteBuf」分配器。
              // 该分配器比较特殊,称为自适应分配器。会根据上次的分配情况预测本次所需ByteBuf的大小。
              final ByteBufAllocator allocator = config.getAllocator();
              final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
              // #4-1 重置统计数据
              allocHandle.reset(config);
      
              ByteBuf byteBuf = null;
              boolean close = false;
              try {
                  do {
                      // #5 分配「ByteBuf」对象
                      byteBuf = allocHandle.allocate(allocator);
      
                      // #6-1 doReadBytes:通过 java.nio.channels.ReadableByteChannel#read 方法从 Socket 读取数据并写入ByteBuffer对象中
                      // #6-2 将本次读取的字节数更新到「allocHandle」作为下次创建ByteBuf的大小的基准值
                      // 注意:doReadBytes 是 AbstractNioByteChannel 抽象类定义的抽象方法
                      allocHandle.lastBytesRead(doReadBytes(byteBuf));
                      if (allocHandle.lastBytesRead() <= 0) {
                          // #7 本次没有读到任何数据,释放创建的「ByteBuf」对象,避免内存泄漏
                          byteBuf.release();
                          // 加速GC
                          byteBuf = null;
      
                          // #8 如果本次读取的内容为EOF(可能读取文件/通道末尾或对端主动关闭通道)
                          close = allocHandle.lastBytesRead() < 0;
                          if (close) {
                              // 由于已经"结束"了,所以接下来对通道的读操作都作废
                              readPending = false;
                          }
                          // 跳出循环(没有读到任何数据)
                          break;
                      }
      
                      // #9 读操作次数+1
                      // 最多对一个通道循环读16次,超过就会暂停此 Channel 的读取操作
                      // 这样避免在某一个 Channel 消耗太长时间,整体吞吐量下降
                      allocHandle.incMessagesRead(1);
                      // 将readPending=false,这样在管道中如果还调用 read() 操作也可以读取数据
                      readPending = false;
      
                      // #10 触发「channelRead」事件
                      pipeline.fireChannelRead(byteBuf);
      
                      // #11 避免当前类持有 ByteBuf 的引用导致GC不了
                      byteBuf = null;
      
                  } while (allocHandle.continueReading()); // #12 判断是否继续进行读操作
      
                  // #11 默认的「HandleImpl」会记录本次总共读取的次数,预测下次「ByteBuf」的大小
                  allocHandle.readComplete();
      
                  // #12 触发「channelReadComplete」事件
                  pipeline.fireChannelReadComplete();
      
                  // #13 如果读到流的末尾则需要关闭
                  if (close) {
                      // 关闭通道的读侧,使通道处理半连接状态
                      closeOnRead(pipeline);
                  }
              } catch (Throwable t) {
                  handleReadException(pipeline, byteBuf, t, close, allocHandle);
              } finally {
                  // 检测是否有尚未处理的「readPending」
                  // 导致这种情况出现可能的原因有两个ByteToMessageDecoder :
                  // ① 用户在 channelRead(...)方法中调用 Channel.read() 或 ChannelHandlerContext.read()
                  // ② 用户在 channelReadComplete(...)方法中调用 Channel.read() 或 ChannelHandlerContext.read()
                  // 具体见 https://github.com/netty/netty/issues/2254
                  if (!readPending && !config.isAutoRead()) {
                      // 从当前通道的「SelectionKey」的感兴趣事件集中移除「OP_READ」事件
                      removeReadOp();
                  }
              }
          }
      }
      

      其实 NioMessageUnsafe 和 NioByteUnsafe 大体逻辑还是较为相似的,只不过对于 NioByteBuf 来说,对每次读取到的字节数都非常在意,需要记录下做为下次分配 ByteBuf 大小的基础值,而 NioMessageUnsafe 则不一样,它使用 List 存放消息(其实也就是 SocketChannel),这可能是它们最大的区别了吧。

      Unsafe 写操作

      Unsafe 的写操作基本逻辑是在抽象类 AbstractUnsafe 实现的。实现逻辑也比较简单:

      • 安全校验:如果outBoundBuff==null,表明通道已关闭,此时应立即释放资源,并快速通知失败消息。
      • 消息过滤/转换 filterOutboundMessage(Object)。
      • 获取消息可读字节数量。
      • 添加到 ChannelOutboundBuffer 输出缓冲区中。
      • 一旦异常抛出,需要将资源释放,避免资源泄漏。

      相关源码如下:

      // io.netty.channel.AbstractChannel.AbstractUnsafe#write
      /**
       * ① 安全校验:如果outBoundBuff==null,表明通道已关闭,此时应立即释放资源,并快速通知失败消息。
       * ② 消息过滤/转换 {@link #filterOutboundMessage(Object)}
       * ③ 获取消息可读字节数量
       * ④ 添加到 {@link ChannelOutboundBuffer} 输出缓冲区中
       * ⑤ 一旦异常抛出,需要将资源释放,避免资源泄漏
       * @param msg           待发送消息
       * @param promise        通知回调对象
       */
      @Override
      public final void write(Object msg, ChannelPromise promise) {
          assertEventLoop();
      
          ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
          // #1 安全校验
          if (outboundBuffer == null) {
              // 意味着通道已被关闭
              try {
                  // 释放消息以免资源泄漏
                  ReferenceCountUtil.release(msg);
              } finally {
                  // 通知快速失败
                  // See https://github.com/netty/netty/issues/2362
                  safeSetFailure(promise, newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
              }
              return;
          }
      
          int size;
          try {
              // #2 消息过滤/转换(通常子类会重写此方法)
              msg = filterOutboundMessage(msg);
      
              // #3 计算给定消息的大小,获取消息可读字节数
              size = pipeline.estimatorHandle().size(msg);
              if (size < 0) {
                  size = 0;
              }
          } catch (Throwable t) {
              try {
                  // 释放资源,避免资源泄漏
                  ReferenceCountUtil.release(msg);
              } finally {
                  safeSetFailure(promise, t);
              }
              return;
          }
      
          // #4 添加到输出缓冲区(即ChannelOutboundBuffer)
          outboundBuffer.addMessage(msg, size, promise);
      }
      

      通过源码可以看出,最后消息会被添加到 ChannelOutboundBuffer 输出缓冲区中,关闭里面的细节,前面已经有详细的介绍了。

      Unsafe flush 操作

      flush 方法的基本实现也是在 AbstractUnsafe 完成的:

      • 整理输出缓冲区的内部链表(更新相关指针并设置每个 Entry 的不可取消状态)。
      • 调用 flush0() 方法写入 Socket。

      相关源码如下:

      // io.netty.channel.AbstractChannel.AbstractUnsafe#flush
      /**
       * 将输出缓冲区的数据写入 Socket
       * ① 整理输出缓冲区的内部链表(更新相关指针并设置每个 Entry 的不可取消状态)
       * ② 调用 flush0() 方法写入 Socket
       */
      @Override
      public final void flush() {
          assertEventLoop();
      
          ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
          if (outboundBuffer == null) {
              return;
          }
      
          // #1 整理链表,更新「flushedEntry」指针
          outboundBuffer.addFlush();
      
          // #2 写入Socket
          flush0();
      }
      

      核心方法还是 flush0(),主要步骤如下:

      1. 当前通道为「IN_ACTIVE」状态,让所有等待的「请求」都置为失败后退出
      2. 修改 inFlush0=true,表示正在进行写入操作,避免重入
      3. 调用AbstractChannel#doWrite(io.netty.channel.ChannelOutboundBuffer)将数据写入 Socket
      4. 修改 inFlush0=false

      相关源码如下:

      // io.netty.channel.AbstractChannel.AbstractUnsafe#flush0
      /**
       * ① 当前通道为「IN_ACTIVE」状态,让所有等待的「请求」都置为失败后退出
       * ② 修改 inFlush0=true,表示正在进行写入操作,避免重入
       * ③ 调用AbstractChannel#doWrite(io.netty.channel.ChannelOutboundBuffer)将数据写入 Socket
       * ④ 修改 inFlush0=false
       */
      @SuppressWarnings("deprecation")
      protected void flush0() {
          if (inFlush0) {
              // 避免重入
              return;
          }
      
          final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
          if (outboundBuffer == null || outboundBuffer.isEmpty()) {
              return;
          }
      
          inFlush0 = true;
      
          // #1 当前通道为「IN_ACTIVE」状态,让所有等待的「请求」都置为失败
          if (!isActive()) {
              try {
                  // Check if we need to generate the exception at all.
                  if (!outboundBuffer.isEmpty()) {
                      if (isOpen()) {
                          outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                      } else {
                          // Do not trigger channelWritabilityChanged because the channel is closed already.
                          outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
                      }
                  }
              } finally {
                  inFlush0 = false;
              }
              // 完成相关失败后的操作,就直接返回
              return;
          }
      
          try {
              // #2 将数据刷新到底层Socket,直接调用AbstractChannel.doWrite()抽象类的子类实现方法
              // 而对于NIO而言,它的实现在NioSocketChannel.doWrite
              doWrite(outboundBuffer);
          } catch (Throwable t) {
              handleWriteError(t);
          } finally {
              inFlush0 = false;
          }
      }
      

      最后还是会调用 Channel#write() 方法向底层 Sokcet 写入数据,至于里面的细节,这里就不在详细讲述了。

      总结

      本篇文章从 Unsafe 的层次结构讲起,仔细拆解了 Unsafe 的 API 及实现类,也仔细对于相关实现类的不同之处。伴随着相关源码的解析,也对 Unsafe 的实现方法有一个大致的了解。同时,也详解一个特别重要的输出缓冲区的内部结构及实现源码细节,相信大伙能通过 DEBUG 的方式对相关 API 和流程掌握得更加清楚。