ChannelOutboundInvoker

方法名称 描述
bind 当 Channel 绑定到一个本地地址后,将调用此方法:在 ChannelPipeline 中传播 bind 事件(回调 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 bind() 方法)
connect 当 Channel 连接到一个远程地址后,将调用此方法:在管道中传播 connect 事件
disconnect 当 Channel 断开连接后,将调用此方法:在管道中传播 disconnect 事件
deregister 当 Channel 注销后,将调用此方法:在管道中传播 deregister 事件
close 当 Channel 关闭连接后,将调用此方法:在管道中传播 close 事件
read 请求从 Channel 中读取更多数据
write 当消息被写入 Channel 后,将调用此方法:在管道中传播 write 事件
flush 冲刷 Channel 所有挂起的数据。在管道中传播 flush 事件
writeAndFlush 先调用 write() 方法,再调用 flush 方法
newPromise 返回一个新的 ChannelPromise 对象
newProgressivePromise 返回一个新的 ChannelProgressivePromise 对象
newSucceededFuture 创建一个带有成功标志的 ChannelFuture,ChannelFuture#isSucceed() 总是返回 true
newFailedFuture 创建一个带有失败标志的 ChannelFuture,ChannelFuture#isSucceed() 总是返回 false
voidPromise 特殊的 ChannelPromise,可以重复利用

ChannelOutboundInvoker 接口是和管道强相关,相关方法实现是调用特定的 ChannelPipeline 的方法实现。但是我们也清楚 ChannelHandler 也实现了此接口(同时还实现了 ChannelInboundInvoker),那这两个实现有什么区别呢?主要区别如下:

  • 无论 Channel 实现类还是 ChannelHandler 实现类,它们都是和 ChannelPipeline 打交道。
  • ChannelHandler 的实现类会从管道的下一个节点开始传播相关事件。
  • Channel 的实现类会从管道的起始位置(可能从头或尾部)开始传播相关事件。

    Channel 结构层次图

    NioChannel_层次结构.png
    Netty 并没有没用 JDK 那一套 Channel 体系,而是自己构建了一套新的。上图是 NIO 的层次结构。下面我们通过 API 剖析内部设计思路。

    Channel

    Channel 是最顶层接口,但是它还继承了 ChannelOutboundInvoker ,ChannelOutboundInvoker 内部的接口本来都定义在 Channel 接口中,但是后面发现 ChannelHandlerContext, ChannelPipeline 和 Channel 共享某些方法,于是就抽象出这么一个接口。至于为何叫 OutboundInvoker 我也理解不了。下面看看 Channel 的所定义的 API:
方法名称 描述
id 唯一 ID 标识
eventLoop 返回已注册的 EventLoop
parent Channel 的父类(通道有父子级关系)
config Channel 的配置类(ChannelConfig)
isOpen Channel 是否处于打开状态
isRegistered Channel 是否成功注册到 EventLoop 对象上
isActive Channel 是不处于活跃状态。为 true 表示可以开始接收连接/数据
metada 描述此 Channel 性质的元数据
localAddress Channel 绑定的本地地址信息
remoteAddress Channel 对端地址信息
closeFuture 返回一个 ChannelFuture 实例,在 Channel 关闭后收到通知
isWritable 当且仅当 I/O 线程将立即执行请求的写操作时,才返回 true。
当此方法返回 false 时发出的所有写请求都将排队,直到 I/O 线程准备好处理排队的写请求为止。
bytesBeforeUnwritable 获取直到 isWritable() 返回 false 为止可以写入的字节数。 此数量将始终为非负数。 如果 isWritable() 为false,则为0。
bytesBeforeWritable 获取必须从基础缓冲区中耗尽多少字节,直到 isWritable()
返回true。 此数量将始终为非负数。 如果 isWritable() 为 true,则为 0。
unsafe 返回 Unsafe 对象
pipeline 返回 Channel 绑定的 ChannelPipeline
alloc 返回 ByteBuf 分配器
read 重写 ChannelOutboundInvoke#read 方法,从 Channel 中读取数据
flush 重写 ChannelOutboundInvoke#flush 方法,将数据写入 Socket

我们最关注的应该是读和写相关的 API,上面可以看到, Channel 接口重写了父类并修改了返回值(返回 Channel),其它的基本都是判断或取某个对象,大家有个印象即可。

AbstractChannel

AbstractChannel 抽象类是 Channel 的基本实现骨架。定义了十分重要的变量:

  1. /**
  2. * {@link Channel} 实现骨架
  3. */
  4. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
  5. private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
  6. private final Channel parent;
  7. // Channel 唯一标识
  8. private final ChannelId id;
  9. // 每个Channel都有自己的 Unsafe 对象
  10. // NioSocketChannel=>NioByteUnsafe
  11. // NioServerSOcketChannel=>NioMessageUnsafe
  12. private final Unsafe unsafe;
  13. // Channel 绑定的管道对象
  14. private final DefaultChannelPipeline pipeline;
  15. // 连接相关信息
  16. private volatile SocketAddress localAddress;
  17. private volatile SocketAddress remoteAddress;
  18. // 每个 Channel 绑定一个执行器,它负责 Channel 的所有生命周期
  19. private volatile EventLoop eventLoop;
  20. // 是否被注册
  21. private volatile boolean registered;
  22. // 关闭操作是否已经被初始化
  23. private boolean closeInitiated;
  24. private Throwable initialCloseCause;
  25. // 缓存此通道的字符串表示形式
  26. private boolean strValActive;
  27. private String strVal;
  28. private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
  29. private final CloseFuture closeFuture = new CloseFuture(this);
  30. // ...
  31. }

AbstractChannel 定义了 Channel 的共性变量,包含有

  • 父类 Channel。对于 NioSocketChannel 来说,它的父类通道就是 NioServerSocketChannel。
  • Channel 全局唯一标识 ID。这可用于比较两个通道是否一样。
  • Unsafe 实例对象。根据子类类型实例化对应的 Unsafe 实例。
  • Channel 绑定的 ChannelPipeline。

还有其他的变量,这里等到用到的时候再描述吧。
除了变量之外,当然还有 API 实现啦。AbstractChannel 是 Channel 接口的基本实现骨架,大多数接口都在抽象类实现了,实现细节如下:

方法名称 描述
id 实现
eventLoop 实现,会被子类重写
parent 实现,会被子类重写
config 未实现
isOpen 未实现
isRegistered 实现
isActive 未实现
metada 未实现
localAddress 实现
remoteAddress 实现
closeFuture 实现
isWritable 实现
bytesBeforeUnwritable 实现
bytesBeforeWritable 实现
unsafe 实现
pipeline 实现
alloc 实现
read 实现
flush 实现

以下抽象类是需要子类实现的:

方法名称 描述
isCompatible EventLoop 是否兼容
localAddress0 返回本地地址详情
remoteAddress0 返回绑定的对端地址详情
doBind 将 Channel 绑定到 SocketAddress
doDisconnect 断开和对端的连接
doClose 关闭 Channel
doBeginRead 从 EventLoop 中取消注册
doWrite 将输出缓冲区的内容向 Socket 写入

而我们重点关注的是读/写方法,看看源码是如何实现:

AbstractChannel#read 读操作

源码实现非常简单,就是委托 ChannelPipeline 来完成。

  1. // io.netty.channel.AbstractChannel#read
  2. @Override
  3. public Channel read() {
  4. pipeline.read();
  5. return this;
  6. }
  7. // io.netty.channel.DefaultChannelPipeline#read
  8. @Override
  9. public final ChannelPipeline read() {
  10. tail.read();
  11. return this;
  12. }

这里简单说一下 ChannelPipeline 是如何实现 read() 的:

  • 首先从尾部处理器(TailContext)在管道中向前传播事件。
  • 最后会到头部处理器(HeadContext)处理:
  • 头部处理器是调用 Unsafe#beginRead() 方法。
    • Unsafe#beginRead() 的整体细节我们在 TODO 讲过,它主要是向 Selector 添加 OP_READ 事件。这样,一旦通道有新数据到达,上层就能感知到就可以从 Channel 中读取数据。

      AbstractChannel#write 写操作

      write 方法的源码实现也是非常简单,还是委托 ChannelPipeline 来完成。 ```java // io.netty.channel.AbstractChannel#read @Override public ChannelFuture write(Object msg) { return pipeline.write(msg); }

// io.netty.channel.DefaultChannelPipeline#write(java.lang.Object) @Override public final ChannelFuture write(Object msg) { return tail.write(msg); }

  1. 这里简单说一下 ChannelPipeline 是如何实现 `write()` 的:
  2. - 首先从**尾部处理器**(TailContext)在管道中向前传播事件。
  3. - 最后会到**头部处理器**(HeadContext)处理:
  4. - **头部处理器**是调用 `Unsafe#write()` 方法。
  5. - `Unsafe#write()` 的整体细节我们在 TODO 讲过,它主要是把消息添加到输出缓冲区中。<br />
  6. 对于注册、绑定等操作就不再详细讨论了,原理和上面的一样,都是从尾部处理器向前传播事件(可能这就是实现 ChannelOutboundInvoker 接口的原因吧)。最后,还是通过 HeadContext 委托 Unsafe 实例对象完成。
  7. <a name="k5NKt"></a>
  8. ## ServerChannel
  9. 用作标记接口,没有定义任何接口。
  10. <a name="HKgVl"></a>
  11. ## ServerSocketChannel
  12. 定义只适用于 TCP/IP 协议的 ServerChannel。相关接口定义如下:
  13. ```java
  14. /**
  15. * 只适用于 TCP/IP 协议 的 {@link ServerChannel}
  16. */
  17. public interface ServerSocketChannel extends ServerChannel {
  18. /**
  19. * 返回 {@link ServerChannel} 配置信息
  20. * 重写父类方法,更改返回值为 {@link ServerSocketChannelConfig}
  21. * @return
  22. */
  23. @Override
  24. ServerSocketChannelConfig config();
  25. /**
  26. * 返回 TCP/IP 相关的本地地址信息
  27. * @return
  28. */
  29. @Override
  30. InetSocketAddress localAddress();
  31. /**
  32. * 返回 TCP/IP 相关的对端地址信息
  33. * @return
  34. */
  35. @Override
  36. InetSocketAddress remoteAddress();
  37. }

就是重写了相关方法,并修改返回值。NioServerSocketChannel 会继承此接口并实现相关方法。

AbstractNioChannel

抽象类,是 NIO 的基本实现骨架。抽象类 AbstractChannel 内部变量是各种不同类型的 Channel 的共性抽象,而 AbstractNioChannel 是 NIO 类型共性抽象。内部定义了以下几个核心变量:

  • SelectableChannel。JDK 底层的 Channel 对象。这也是底层实际打交道的类。
  • readInterestOp。其实值就是 OP_READ(16)。
  • SelectionKey。一个表示 SelectableChannel 注册到 Selector 的标记。用来设置相关的感兴趣事件。

我们看到,主要定义的还是和 JDK 底层的类,对熟悉 NIO 编程的人来说,这也是他们所熟知的类。相关源码:

// io.netty.channel.nio.AbstractNioChannel
/**
 * 抽象类,是 {@link Channel} 接口的基本实现,内部使用 Selector。
 *
 */
public abstract class AbstractNioChannel extends AbstractChannel {

    private static final InternalLogger logger =
            InternalLoggerFactory.getInstance(AbstractNioChannel.class);

    // JDK Channel
    private final SelectableChannel ch;
    // 读兴趣事件
    protected final int readInterestOp;
    // JDK,一个代表 SelectableChannel 注册到 Selector 的标记
    // 使用「volatile」主要是保证引用的可见性:当一个Selector被触发重建时,
    // 需要保证selectionKey的可见性,否则会抛出「CancelledKeyException」异常
    volatile SelectionKey selectionKey;
    // 当前通道是否有读操作在等待
    boolean readPending;
    // 用于异步执行的任务
    private final Runnable clearReadPendingRunnable = new Runnable() {
        @Override
        public void run() {
            clearReadPending0();
        }
    };

    /**
     *
     * The future of the current connection attempt.  If not null, subsequent
     * connection attempts will fail.
     */
    private ChannelPromise connectPromise;
    private ScheduledFuture<?> connectTimeoutFuture;
    private SocketAddress requestedRemoteAddress;

    // ...
}

对于 AbstractNioChannel 抽象类来说,其构造方法也是特别重要的一环,主要完成以下两件事情:

  1. 初始化相关属性:全局唯一标识id、魔法类unsafe、实例化 DefaultChannelPipeline。
  2. 把 JDK 底层的 SelectableChannel 配置为非阻塞模式。
    // io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
     // #1 初始化相关属性:全局唯一标识id、魔法类unsafe、实例化 DefaultChannelPipeline
     super(parent);
     this.ch = ch;
     this.readInterestOp = readInterestOp;
     try {
         // #2 把JDK底层的Channel配置为非阻塞模式
         ch.configureBlocking(false);
     } catch (IOException e) {
         try {
             ch.close();
         } catch (IOException e2) {
             logger.warn("Failed to close a partially initialized socket.", e2);
         }
         throw new ChannelException("Failed to enter non-blocking mode.", e);
     }
    }
    
    至于相关 API 实现与抽象,请看下面列表:
方法名称 描述
isOpen 实现
unsafe 实现,重写 AbstractChannel#unsafe 方法,修改返回值
javaChannel 默认实现,会被子类重写
eventLoop 实现,重写 AbstractChannel#eventLoop,修改返回值
isCompatible 实现(抽象方法)
doRegister 实现,重写 AbstractChannel#doRegister
doDeregister 实现,重写 AbstractChannel#doDeregister
doBeginRead 实现(抽象方法),会被 AbstractNioMessageChannel 子类重写
doClose 实现(抽象方法),会被子类重写

doRegister 算是此类的核心方法,它底层是调用 java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object) 完成 Channel 向 Selector 注册。源码如下:

// io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

同时,该抽象类还提供以下抽象方法待子类实现,如下表所示:

方法名称 描述
doConnect 连接。抽象方法,待子类实现。
doFinishConnect 抽象方法,待子类实现。

doFinishConnect 会被 NioSocketChannel 实现,它主要是判断当前的非阻塞通道是否已经完成连接。

  • true:已完成连接。
  • false:仍未完成连接。
  • 连接失败,调用此方法会抛出 IOException。

    AbstractNioMessageChannel

    这是一个抽象类,继承 AbstractNioChannel,从名字可以看出,它是用于处理 Message 消息,一般适用于像 UDP 等协议。当然,对于 NIO 来说,Netty 把刚成功建立连接的 SocketChannel 对象也当成一个消息传递。相关 API 实现情况如下表所示:
方法名称 描述
newUnsafe 实现。返回 NioMessageUnsafe 类型实现对象。
doBeginRead 重写 AbstractNioChannel 方法,核心逻辑并没有改变,主要是添加一个输入通道是否忆关闭的判断
doWrite 实现 AbstractChannel 抽象方法,将给定的缓冲区写入底层 Socket
continueOnWriteError 写操作异常时是否仍继续,默认 false
closeOnReadError 读操作异常时是否关闭通道

其实,两个重要的读和写方法都定义为抽象方法:

方法名称 描述
doReadMessage 从底层 Channel 读取消息并放入 List 集合,返回总共读取数量
doWriteMessage 向底层 Channel 写入消息

因为我们主要是针对 TCP 的 NIO 来讲解 Channel 的层次结构,所以对于 NIO 而言,主要是 Main Reactor 通过 doReadMessage 方法用以接收新的连接。

NioServerSocketChannel

服务端 Nio Channel 实现类,它只专注一个任务:接收新连接。核心方法是调用 java.nio.channels.ServerSocketChannel#accept 并返回 java.nio.channels.SocketChannel 对象。先看看所实现的 API 情况:

方法名称 描述
localAddress 重写 AbstractChannel 实现类,重写目的修改返回值类型
metadata 实现 Channel 接口,获得通道元数据信息
config 实现 ServerSocketChannel 接口,获取 Channel 配置信息
isActive 实现 Channel 接口,判断当前 Channel 是否处于活跃状态
remoteAddress 实现 ServerSocketChannel 接口,返回 null。因为 ServerSocketChannel 只接收连接,所以不需要和任何一个对端 Channel 进行绑定,也就没有对端地址详情
javaChannel 重写 AbstractNioChannel,重写目的是修改返回值类型
localAddress0 实现 AbstractChannel 抽象方法,获取本地地址详情
doBind 实现 AbstractChannel 抽象方法,绑定本地地址
doClose 重写 AbstractNioChannel,关闭连接
doReadMessage 实现 AbstractNioMessageChannel 抽象方法,接收一个新的连接并使用 NioSocketChannel 包装后放入 List 集合中
doConnect 抛出异常,不支持该操作
doFinishConnect 抛出异常,不支持该操作
remoteAddress0 返回 null
doDisconnect 抛出异常,不支持该操作
doWriteMessage 抛出异常,不支持该操作
filterOutboundMessage 抛出异常,不支持该操作

NioServerSocketChannel 的核心方法是 doReadMessage,它会完成以下步骤:

  1. 调用 java.nio.channels.ServerSocketChannel#accept 接收与此通道建立的连接。它会返回一个java.nio.channels.SocketChannel 对象。对 ServerSocketChannel#accept 解释如下:
    1. 非阻塞模式,当前没有待定连接(pending connections)即已完成的连接队列为空,这个方法将会立即返回 null。ServerSocketChannel 默认是非阻塞。
    2. 阻塞模式下,当前没有待定连接,它将会无限期阻塞直到一个新的连接可用或出现 I/O 错误。
  2. 使用 io.netty.channel.socket.nio.NioSocketChannel 包装原始的 SocketChannel 并放入 List 集合中。

相关源码如下:

// io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
/**
 * 从通道中接收新连接并存入到 {@param buf} List集合中
 * @param buf
 * @return
 * @throws Exception
 */
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    // #1 从底层中接收新的连接,获取「SocketChannel」对象
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            // #2 使用「NioSocketChannel」包装新连接对象
            // 别忘记,「NioSocketChannel」构造器做了很多重要的事情
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            // #3 抛出异常则关闭通道
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

还有一个方法需要值得注意,就是 isActive,目的是判断 Channel 是否处于活跃状态,需要同时满足下面的两个条件:

  1. 当前通道处于打开(open)状态。
  2. ServerSocket 已绑定。
    // io.netty.channel.socket.nio.NioServerSocketChannel#isActive
    /**
    * 判断当前通道是否处于活跃状态,通道活跃必须同时满足以下两个条件
    * ① 当前通道处于打开(open)状态
    * ② ServerSocket已绑定
    * 注意:{@link ServerSocket#isBound()} 在套接字关闭之后仍然会返回true,
    * 所以需要和 {@link #isOpen()}一起判断。
    * @return
    */
    @Override
    public boolean isActive() {
     return isOpen()
             && javaChannel().socket().isBound();
    }
    

    AbstractNioByteChannel

    它是 AbstractNioChannel 的基类,从名字上可以看出,该抽象类是处理字节数据(虽然消息也是由字节组成的)。
    相关 API 实现情况:
方法名称 描述
doWrite 重写 AbstractChannel,将给定的缓冲区的内容通过 Socket 发送到对端。会被子类重写

其实 AbstractNioByteChannel 并非只实现一个方法,只不过我把个人认为重要的方法罗列出来,下面是此抽象类所定义的抽象方法:

方法名称 描述
shutdownInput 关闭 Channel 的输入端
doWriteFileRegion 零拷贝
doReadBytes 从 Channel 中读取数据并返回本次读取到的字节数量
doWriteBytes 交给定的 ByteBuf 写入到底层的 Channel

我们看到,重要的读写操作还需要子类实现。

NioSocketChannel

NioSokcetChannel 是非常重要的一环,因为它实现从底层 Channel连接、读取和写入。

连接 connect

doConnect 是 NioSocketChannel 实现的核心方法,解析如下:

  1. 如果入参 localAddress 不为空,则绑定本地地址。
  2. 调用 java.nio.channels.SocketChannel#connect 方法连接对端。
    1. 若 Channel 为非阻塞模式,会立即返回。
    2. 若 Channel 为阻塞模式,会阻塞直到连接成功或出现 IO 异常。
  3. 判断连接是否成功,如果连接失败,则将 OP_CONNECT 事件添加到感兴趣事件集合中。
  4. 若在中途出现异常,则立即关闭 Channel。

相关源码如下:

// io.netty.channel.socket.nio.NioSocketChannel#doConnect
/**
 * 连接对端
 * @param remoteAddress
 * @param localAddress
 * @return
 * @throws Exception
 */
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    // #1 如果「localAddress」不为空则绑定到本地端口,否则系统会随机分配一个端口与通道绑定
    if (localAddress != null) {
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        // #2 连接对端
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        if (!connected) {
            // #3 将 OP_CONNECT 事件添加到感兴趣事件集合中,
            // 待下次循环有I/O连接成功则会接收并做后续处理
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        // #4 连接中途出现异常,立即关闭 Channel
        if (!success) {
            doClose();
        }
    }
}

读取数据 doReadBytes

读数据也是比较简单的,向入参 ByteBuf 对象写入数据即可,这有现成的 API 可以使用。步骤解析:

  1. 获取自适应分配器以更新本次估计获取值。该值就是 ByteBuf 可写入字节数。
  2. 调用 ByteBuf API 从底层的 Channel 读取数据并填充 ByteBuf 对象

相关源码如下:

/**
 * 从 {@link SocketChannel} 中读取数据
 * @param byteBuf
 * @return
 * @throws Exception
 */
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    // #1 获取自适应分配器以更新本次估计获取值
    // 这个值是ByteBuf的可写空间大小,它的含义是如果本次从 Channel 中读取的数据填满此 ByteBuf 的话,
    // 那么它可能还有数据,因此就是循环读下去,直到不填满 ByteBuf 可写空间为止。第一次默认的 ByteBuf 空间大小为 2048
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

    // #2 更新预估值
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());

    // #3 从「SocketChannel」中读取数据并填充「ByteBuf」
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

这里会获取一个自适应处理器(RecvByteBufAllocator.Handler),这里记录预估读取的字节数,其值是 ByteBuf 可写字节数量,这个值会用来判断是否会继续读下去:

  • 如果实际字节读取数等于预估值,则会继续读。因为本次填充满了,虽然存在刚好的情况,但是一般概率比较小,所以还是会尝试继续读。

    写数据 doWrite

    写数据是 NioSocketChannel 的核心方法,源码步骤相对来讲比较清晰明了,解析如下:
  1. 获取写循环次数(默认值:16),意味着每调用一次{@link #doWrite(ChannelOutboundBuffer)} 最多可循环向底层 Channel 写16次数据
  2. 判断输出缓冲区对象是否为空,如果为空对象,表示当前 Channel 已经关闭,需要做一些清理动作。
  3. 从配置文件中获取每次写最大合并字节数。默认值:Integer.MAX_VALUE。
  4. 合并输出缓冲区数据。得到 ByteBuffer[] 数组。
  5. 根据数组长度进行调用合适的 API 向底层 Channel 写入数据。
  6. 调整「maxBytesPerGatheringWrite」的值以适应OS对「SO_SNDBUF」的改变。
  7. 根据实际写入字节数处理输出缓冲区。
  8. 判断本次写操作是否将「输出缓冲区」内部数据全部写入 Channel,如果没有,则注册「OP_WRITE」事件,等待下次继续执行写操作。

源码如下:

// io.netty.channel.socket.nio.NioSocketChannel#doWrite
/**
 * 向「Socket」写入数据
 * ① 获取写循环次数(默认值:16),意味着每调用一次{@link #doWrite(ChannelOutboundBuffer)} 最多可循环向底层 Channel 写16次数据
 * ② 判断输出缓冲区对象是否为空,如果为空对象,表示当前 Channel 已经关闭,需要做一些清理动作。
 * ③ 从配置文件中获取每次写最大合并字节数。默认值:Integer.MAX_VALUE。
 * ④ 合并输出缓冲区数据。得到 ByteBuffer[] 数组
 * ⑤ 根据数组长度进行调用合适的 API 向底层 Channel 写入数据
 * ⑥ 调整「maxBytesPerGatheringWrite」的值以适应OS对「SO_SNDBUF」的改变
 * ⑦ 根据实际写入字节数处理输出缓冲区
 * ⑧ 判断本次写操作是否将「输出缓冲区」内部数据全部写入 Channel,如果没有,则注册「OP_WRITE」事件,等待下次继续执行写操作
 * @param in
 * @throws Exception
 */
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    // #1 获取底层通道对象
    SocketChannel ch = javaChannel();

    // #2 获取写循环次数,默认值:16
    int writeSpinCount = config().getWriteSpinCount();
    do {
        // #3 输出缓冲区为空:
        // ① 清除OP_WRITE兴趣事件
        // ② 直接返回(incompleteWrite(...) 方法不会被调用)
        if (in.isEmpty()) {
            clearOpWrite();
            return;
        }

        // 确保所有待写入数据都是由「ByteBuf」组成
        // #4 输出缓冲区不为空
        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
        // Buffer总数量
        int nioBufferCnt = in.nioBufferCount();

        // 总是使用「nioBuffers()」来解决数据损坏的问题
        // See https://github.com/netty/netty/issues/2761
        switch (nioBufferCnt) {
            case 0:
                // 除了「ByteBuffer」对象之外,还有其他可写的数据(比如 FileRegion),所以我们回退到正常写模式
                writeSpinCount -= doWrite0(in);
                break;
            case 1: {
                // 只有一个可写的「ByteBuffer」对象所以使用「非聚集写入」
                // 0长度的buffers是不会被添加到ChannelOutboundBuffer的,所以这里不需要重复检查所有的buffers的大小
                ByteBuffer buffer = nioBuffers[0];

                // 尝试写入字节数
                int attemptedBytes = buffer.remaining();

                // 调用「java.nio.channels.SocketChannel.write(java.nio.ByteBuffer)」向Socket写入数据
                // localWrittenBytes:实际写入字节数(对于非阻塞模式的通道来说,可能本次不会写入任务数据,即可能返回0)
                final int localWrittenBytes = ch.write(buffer);

                if (localWrittenBytes <= 0) {
                    // 如果本次写入数据量为0,则表明底层Socket缓冲区满了,无空闲空间容纳新的数据
                    // 那就添加OP_WRITE事件,待下次写入吧
                    incompleteWrite(true);
                    // 直接返回
                    return;
                }

                // 调整「maxBytesPerGatheringWrite」的值
                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);

                // 处理Entry对象及内部消息
                in.removeBytes(localWrittenBytes);
                //
                --writeSpinCount;
                // 跳出
                break;
            }
            default: {
                // ChannelOutboundBuffer不会将零长度的缓冲区添加到nioBuffers中,因此无需检查所有缓冲区的总大小是否为非零。
                // 我们将最大数量限制为int以上,因此强制转换是安全的
                long attemptedBytes = in.nioBufferSize();

                // 一次性向Socket中写入多个ByteBuffer对象数据
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                // 强制转换为int是安全的,因为我们将nioBuffers中的数据总量限制为Integer.MAX_VALUE
                adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite);
                // 处理Entry对象及内部消息
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
        }
    } while (writeSpinCount > 0);

    // 如果在默认的16次循环中写完所有数据,则移除写事件(OP_WRITE)。
    // 否则,需要添加(OP_WRITE)待下次IO操作时继续向Socket写入数据
    incompleteWrite(writeSpinCount < 0);
}

简单总结,就是将输出缓冲区的数据写入 Channel。但是这个过程并非简单,有以下几点值得我们关注:

  • 我们不能一味向底层 Channel 写入数据。所以 Netty 定义一个 writeSpinCount 参数限制本次的循环写操作次数。如果输出缓冲区过大,没有这个参数限制的话就会”阻塞”(并非真正意义上的阻塞)当前 EventLoop 处理其他 IO 事件和任务,导致整体吞吐量下降。
  • 流控。如果网络状况良好,SO_SNDBUF 空间足够容纳当前输出缓冲区的所有内部,则根据条件判断是否增长合并字节数。由 adjustMaxBytesPerGatheringWrite 方法体现。如果现在出现拥塞,SO_SNDBUF 并非能完全容纳,那就需要减少合并字节数。
  • 每次向底层写入数据后,都需要清理输出缓冲区,这样有利于内存回收。
  • 当 while 循环结束后,需要根据实际情况判断是否还需要注册 OP_WRITE 事件。
    • 如果循环 16 次都还没有向 Channel 写完所有输出缓冲区内的数据,那就需要注册。否则,清除 OP_WRITE。

总结

这篇文章主要讲述了 Channel 的层次结构和对相关的 API 进行分解。我的本意是希望各位看官能从点及面,深刻透彻理解 Netty 的层次设计。