ChannelOutboundInvoker
方法名称 | 描述 |
---|---|
bind | 当 Channel 绑定到一个本地地址后,将调用此方法:在 ChannelPipeline 中传播 bind 事件(回调 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 bind() 方法) |
connect | 当 Channel 连接到一个远程地址后,将调用此方法:在管道中传播 connect 事件 |
disconnect | 当 Channel 断开连接后,将调用此方法:在管道中传播 disconnect 事件 |
deregister | 当 Channel 注销后,将调用此方法:在管道中传播 deregister 事件 |
close | 当 Channel 关闭连接后,将调用此方法:在管道中传播 close 事件 |
read | 请求从 Channel 中读取更多数据 |
write | 当消息被写入 Channel 后,将调用此方法:在管道中传播 write 事件 |
flush | 冲刷 Channel 所有挂起的数据。在管道中传播 flush 事件 |
writeAndFlush | 先调用 write() 方法,再调用 flush 方法 |
newPromise | 返回一个新的 ChannelPromise 对象 |
newProgressivePromise | 返回一个新的 ChannelProgressivePromise 对象 |
newSucceededFuture | 创建一个带有成功标志的 ChannelFuture,ChannelFuture#isSucceed() 总是返回 true |
newFailedFuture | 创建一个带有失败标志的 ChannelFuture,ChannelFuture#isSucceed() 总是返回 false |
voidPromise | 特殊的 ChannelPromise,可以重复利用 |
ChannelOutboundInvoker 接口是和管道强相关,相关方法实现是调用特定的 ChannelPipeline 的方法实现。但是我们也清楚 ChannelHandler 也实现了此接口(同时还实现了 ChannelInboundInvoker),那这两个实现有什么区别呢?主要区别如下:
- 无论 Channel 实现类还是 ChannelHandler 实现类,它们都是和 ChannelPipeline 打交道。
- ChannelHandler 的实现类会从管道的下一个节点开始传播相关事件。
- Channel 的实现类会从管道的起始位置(可能从头或尾部)开始传播相关事件。
Channel 结构层次图
Netty 并没有没用 JDK 那一套 Channel 体系,而是自己构建了一套新的。上图是 NIO 的层次结构。下面我们通过 API 剖析内部设计思路。Channel
Channel 是最顶层接口,但是它还继承了 ChannelOutboundInvoker ,ChannelOutboundInvoker 内部的接口本来都定义在 Channel 接口中,但是后面发现 ChannelHandlerContext, ChannelPipeline 和 Channel 共享某些方法,于是就抽象出这么一个接口。至于为何叫 OutboundInvoker 我也理解不了。下面看看 Channel 的所定义的 API:
方法名称 | 描述 |
---|---|
id | 唯一 ID 标识 |
eventLoop | 返回已注册的 EventLoop |
parent | Channel 的父类(通道有父子级关系) |
config | Channel 的配置类(ChannelConfig) |
isOpen | Channel 是否处于打开状态 |
isRegistered | Channel 是否成功注册到 EventLoop 对象上 |
isActive | Channel 是不处于活跃状态。为 true 表示可以开始接收连接/数据 |
metada | 描述此 Channel 性质的元数据 |
localAddress | Channel 绑定的本地地址信息 |
remoteAddress | Channel 对端地址信息 |
closeFuture | 返回一个 ChannelFuture 实例,在 Channel 关闭后收到通知 |
isWritable | 当且仅当 I/O 线程将立即执行请求的写操作时,才返回 true。 当此方法返回 false 时发出的所有写请求都将排队,直到 I/O 线程准备好处理排队的写请求为止。 |
bytesBeforeUnwritable | 获取直到 isWritable() 返回 false 为止可以写入的字节数。 此数量将始终为非负数。 如果 isWritable() 为false,则为0。 |
bytesBeforeWritable | 获取必须从基础缓冲区中耗尽多少字节,直到 isWritable() 返回true。 此数量将始终为非负数。 如果 isWritable() 为 true,则为 0。 |
unsafe | 返回 Unsafe 对象 |
pipeline | 返回 Channel 绑定的 ChannelPipeline |
alloc | 返回 ByteBuf 分配器 |
read | 重写 ChannelOutboundInvoke#read 方法,从 Channel 中读取数据 |
flush | 重写 ChannelOutboundInvoke#flush 方法,将数据写入 Socket |
我们最关注的应该是读和写相关的 API,上面可以看到, Channel 接口重写了父类并修改了返回值(返回 Channel),其它的基本都是判断或取某个对象,大家有个印象即可。
AbstractChannel
AbstractChannel 抽象类是 Channel 的基本实现骨架。定义了十分重要的变量:
/**
* {@link Channel} 实现骨架
*/
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
private final Channel parent;
// Channel 唯一标识
private final ChannelId id;
// 每个Channel都有自己的 Unsafe 对象
// NioSocketChannel=>NioByteUnsafe
// NioServerSOcketChannel=>NioMessageUnsafe
private final Unsafe unsafe;
// Channel 绑定的管道对象
private final DefaultChannelPipeline pipeline;
// 连接相关信息
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
// 每个 Channel 绑定一个执行器,它负责 Channel 的所有生命周期
private volatile EventLoop eventLoop;
// 是否被注册
private volatile boolean registered;
// 关闭操作是否已经被初始化
private boolean closeInitiated;
private Throwable initialCloseCause;
// 缓存此通道的字符串表示形式
private boolean strValActive;
private String strVal;
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
// ...
}
AbstractChannel 定义了 Channel 的共性变量,包含有
- 父类 Channel。对于 NioSocketChannel 来说,它的父类通道就是 NioServerSocketChannel。
- Channel 全局唯一标识 ID。这可用于比较两个通道是否一样。
- Unsafe 实例对象。根据子类类型实例化对应的 Unsafe 实例。
- Channel 绑定的 ChannelPipeline。
还有其他的变量,这里等到用到的时候再描述吧。
除了变量之外,当然还有 API 实现啦。AbstractChannel 是 Channel 接口的基本实现骨架,大多数接口都在抽象类实现了,实现细节如下:
方法名称 | 描述 |
---|---|
id | 实现 |
eventLoop | 实现,会被子类重写 |
parent | 实现,会被子类重写 |
config | 未实现 |
isOpen | 未实现 |
isRegistered | 实现 |
isActive | 未实现 |
metada | 未实现 |
localAddress | 实现 |
remoteAddress | 实现 |
closeFuture | 实现 |
isWritable | 实现 |
bytesBeforeUnwritable | 实现 |
bytesBeforeWritable | 实现 |
unsafe | 实现 |
pipeline | 实现 |
alloc | 实现 |
read | 实现 |
flush | 实现 |
以下抽象类是需要子类实现的:
方法名称 | 描述 |
---|---|
isCompatible | EventLoop 是否兼容 |
localAddress0 | 返回本地地址详情 |
remoteAddress0 | 返回绑定的对端地址详情 |
doBind | 将 Channel 绑定到 SocketAddress |
doDisconnect | 断开和对端的连接 |
doClose | 关闭 Channel |
doBeginRead | 从 EventLoop 中取消注册 |
doWrite | 将输出缓冲区的内容向 Socket 写入 |
AbstractChannel#read 读操作
源码实现非常简单,就是委托 ChannelPipeline 来完成。
// io.netty.channel.AbstractChannel#read
@Override
public Channel read() {
pipeline.read();
return this;
}
// io.netty.channel.DefaultChannelPipeline#read
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
这里简单说一下 ChannelPipeline 是如何实现 read()
的:
- 首先从尾部处理器(TailContext)在管道中向前传播事件。
- 最后会到头部处理器(HeadContext)处理:
- 头部处理器是调用
Unsafe#beginRead()
方法。Unsafe#beginRead()
的整体细节我们在 TODO 讲过,它主要是向 Selector 添加 OP_READ 事件。这样,一旦通道有新数据到达,上层就能感知到就可以从 Channel 中读取数据。AbstractChannel#write 写操作
write 方法的源码实现也是非常简单,还是委托 ChannelPipeline 来完成。 ```java // io.netty.channel.AbstractChannel#read @Override public ChannelFuture write(Object msg) { return pipeline.write(msg); }
// io.netty.channel.DefaultChannelPipeline#write(java.lang.Object) @Override public final ChannelFuture write(Object msg) { return tail.write(msg); }
这里简单说一下 ChannelPipeline 是如何实现 `write()` 的:
- 首先从**尾部处理器**(TailContext)在管道中向前传播事件。
- 最后会到**头部处理器**(HeadContext)处理:
- **头部处理器**是调用 `Unsafe#write()` 方法。
- `Unsafe#write()` 的整体细节我们在 TODO 讲过,它主要是把消息添加到输出缓冲区中。<br />
对于注册、绑定等操作就不再详细讨论了,原理和上面的一样,都是从尾部处理器向前传播事件(可能这就是实现 ChannelOutboundInvoker 接口的原因吧)。最后,还是通过 HeadContext 委托 Unsafe 实例对象完成。
<a name="k5NKt"></a>
## ServerChannel
用作标记接口,没有定义任何接口。
<a name="HKgVl"></a>
## ServerSocketChannel
定义只适用于 TCP/IP 协议的 ServerChannel。相关接口定义如下:
```java
/**
* 只适用于 TCP/IP 协议 的 {@link ServerChannel}
*/
public interface ServerSocketChannel extends ServerChannel {
/**
* 返回 {@link ServerChannel} 配置信息
* 重写父类方法,更改返回值为 {@link ServerSocketChannelConfig}
* @return
*/
@Override
ServerSocketChannelConfig config();
/**
* 返回 TCP/IP 相关的本地地址信息
* @return
*/
@Override
InetSocketAddress localAddress();
/**
* 返回 TCP/IP 相关的对端地址信息
* @return
*/
@Override
InetSocketAddress remoteAddress();
}
就是重写了相关方法,并修改返回值。NioServerSocketChannel 会继承此接口并实现相关方法。
AbstractNioChannel
抽象类,是 NIO 的基本实现骨架。抽象类 AbstractChannel 内部变量是各种不同类型的 Channel 的共性抽象,而 AbstractNioChannel 是 NIO 类型共性抽象。内部定义了以下几个核心变量:
- SelectableChannel。JDK 底层的 Channel 对象。这也是底层实际打交道的类。
- readInterestOp。其实值就是 OP_READ(16)。
- SelectionKey。一个表示 SelectableChannel 注册到 Selector 的标记。用来设置相关的感兴趣事件。
我们看到,主要定义的还是和 JDK 底层的类,对熟悉 NIO 编程的人来说,这也是他们所熟知的类。相关源码:
// io.netty.channel.nio.AbstractNioChannel
/**
* 抽象类,是 {@link Channel} 接口的基本实现,内部使用 Selector。
*
*/
public abstract class AbstractNioChannel extends AbstractChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AbstractNioChannel.class);
// JDK Channel
private final SelectableChannel ch;
// 读兴趣事件
protected final int readInterestOp;
// JDK,一个代表 SelectableChannel 注册到 Selector 的标记
// 使用「volatile」主要是保证引用的可见性:当一个Selector被触发重建时,
// 需要保证selectionKey的可见性,否则会抛出「CancelledKeyException」异常
volatile SelectionKey selectionKey;
// 当前通道是否有读操作在等待
boolean readPending;
// 用于异步执行的任务
private final Runnable clearReadPendingRunnable = new Runnable() {
@Override
public void run() {
clearReadPending0();
}
};
/**
*
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
// ...
}
对于 AbstractNioChannel 抽象类来说,其构造方法也是特别重要的一环,主要完成以下两件事情:
- 初始化相关属性:全局唯一标识id、魔法类unsafe、实例化 DefaultChannelPipeline。
- 把 JDK 底层的 SelectableChannel 配置为非阻塞模式。
至于相关 API 实现与抽象,请看下面列表:// io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { // #1 初始化相关属性:全局唯一标识id、魔法类unsafe、实例化 DefaultChannelPipeline super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { // #2 把JDK底层的Channel配置为非阻塞模式 ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { logger.warn("Failed to close a partially initialized socket.", e2); } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
方法名称 | 描述 |
---|---|
isOpen | 实现 |
unsafe | 实现,重写 AbstractChannel#unsafe 方法,修改返回值 |
javaChannel | 默认实现,会被子类重写 |
eventLoop | 实现,重写 AbstractChannel#eventLoop,修改返回值 |
isCompatible | 实现(抽象方法) |
doRegister | 实现,重写 AbstractChannel#doRegister |
doDeregister | 实现,重写 AbstractChannel#doDeregister |
doBeginRead | 实现(抽象方法),会被 AbstractNioMessageChannel 子类重写 |
doClose | 实现(抽象方法),会被子类重写 |
doRegister 算是此类的核心方法,它底层是调用 java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object)
完成 Channel 向 Selector 注册。源码如下:
// io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
同时,该抽象类还提供以下抽象方法待子类实现,如下表所示:
方法名称 | 描述 |
---|---|
doConnect | 连接。抽象方法,待子类实现。 |
doFinishConnect | 抽象方法,待子类实现。 |
doFinishConnect 会被 NioSocketChannel 实现,它主要是判断当前的非阻塞通道是否已经完成连接。
- true:已完成连接。
- false:仍未完成连接。
- 连接失败,调用此方法会抛出 IOException。
AbstractNioMessageChannel
这是一个抽象类,继承 AbstractNioChannel,从名字可以看出,它是用于处理 Message 消息,一般适用于像 UDP 等协议。当然,对于 NIO 来说,Netty 把刚成功建立连接的 SocketChannel 对象也当成一个消息传递。相关 API 实现情况如下表所示:
方法名称 | 描述 |
---|---|
newUnsafe | 实现。返回 NioMessageUnsafe 类型实现对象。 |
doBeginRead | 重写 AbstractNioChannel 方法,核心逻辑并没有改变,主要是添加一个输入通道是否忆关闭的判断 |
doWrite | 实现 AbstractChannel 抽象方法,将给定的缓冲区写入底层 Socket |
continueOnWriteError | 写操作异常时是否仍继续,默认 false |
closeOnReadError | 读操作异常时是否关闭通道 |
其实,两个重要的读和写方法都定义为抽象方法:
方法名称 | 描述 |
---|---|
doReadMessage | 从底层 Channel 读取消息并放入 List 集合,返回总共读取数量 |
doWriteMessage | 向底层 Channel 写入消息 |
因为我们主要是针对 TCP 的 NIO 来讲解 Channel 的层次结构,所以对于 NIO 而言,主要是 Main Reactor 通过 doReadMessage
方法用以接收新的连接。
NioServerSocketChannel
服务端 Nio Channel 实现类,它只专注一个任务:接收新连接。核心方法是调用 java.nio.channels.ServerSocketChannel#accept
并返回 java.nio.channels.SocketChannel
对象。先看看所实现的 API 情况:
方法名称 | 描述 |
---|---|
localAddress | 重写 AbstractChannel 实现类,重写目的修改返回值类型 |
metadata | 实现 Channel 接口,获得通道元数据信息 |
config | 实现 ServerSocketChannel 接口,获取 Channel 配置信息 |
isActive | 实现 Channel 接口,判断当前 Channel 是否处于活跃状态 |
remoteAddress | 实现 ServerSocketChannel 接口,返回 null。因为 ServerSocketChannel 只接收连接,所以不需要和任何一个对端 Channel 进行绑定,也就没有对端地址详情 |
javaChannel | 重写 AbstractNioChannel,重写目的是修改返回值类型 |
localAddress0 | 实现 AbstractChannel 抽象方法,获取本地地址详情 |
doBind | 实现 AbstractChannel 抽象方法,绑定本地地址 |
doClose | 重写 AbstractNioChannel,关闭连接 |
doReadMessage | 实现 AbstractNioMessageChannel 抽象方法,接收一个新的连接并使用 NioSocketChannel 包装后放入 List 集合中 |
doConnect | 抛出异常,不支持该操作 |
doFinishConnect | 抛出异常,不支持该操作 |
remoteAddress0 | 返回 null |
doDisconnect | 抛出异常,不支持该操作 |
doWriteMessage | 抛出异常,不支持该操作 |
filterOutboundMessage | 抛出异常,不支持该操作 |
NioServerSocketChannel 的核心方法是 doReadMessage,它会完成以下步骤:
- 调用
java.nio.channels.ServerSocketChannel#accept
接收与此通道建立的连接。它会返回一个java.nio.channels.SocketChannel
对象。对ServerSocketChannel#accept
解释如下:- 非阻塞模式,当前没有待定连接(pending connections)即已完成的连接队列为空,这个方法将会立即返回 null。ServerSocketChannel 默认是非阻塞。
- 阻塞模式下,当前没有待定连接,它将会无限期阻塞直到一个新的连接可用或出现 I/O 错误。
- 使用
io.netty.channel.socket.nio.NioSocketChannel
包装原始的 SocketChannel 并放入 List 集合中。
相关源码如下:
// io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
/**
* 从通道中接收新连接并存入到 {@param buf} List集合中
* @param buf
* @return
* @throws Exception
*/
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// #1 从底层中接收新的连接,获取「SocketChannel」对象
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// #2 使用「NioSocketChannel」包装新连接对象
// 别忘记,「NioSocketChannel」构造器做了很多重要的事情
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
// #3 抛出异常则关闭通道
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
还有一个方法需要值得注意,就是 isActive,目的是判断 Channel 是否处于活跃状态,需要同时满足下面的两个条件:
- 当前通道处于打开(open)状态。
- ServerSocket 已绑定。
// io.netty.channel.socket.nio.NioServerSocketChannel#isActive /** * 判断当前通道是否处于活跃状态,通道活跃必须同时满足以下两个条件 * ① 当前通道处于打开(open)状态 * ② ServerSocket已绑定 * 注意:{@link ServerSocket#isBound()} 在套接字关闭之后仍然会返回true, * 所以需要和 {@link #isOpen()}一起判断。 * @return */ @Override public boolean isActive() { return isOpen() && javaChannel().socket().isBound(); }
AbstractNioByteChannel
它是 AbstractNioChannel 的基类,从名字上可以看出,该抽象类是处理字节数据(虽然消息也是由字节组成的)。
相关 API 实现情况:
方法名称 | 描述 |
---|---|
doWrite | 重写 AbstractChannel,将给定的缓冲区的内容通过 Socket 发送到对端。会被子类重写 |
其实 AbstractNioByteChannel 并非只实现一个方法,只不过我把个人认为重要的方法罗列出来,下面是此抽象类所定义的抽象方法:
方法名称 | 描述 |
---|---|
shutdownInput | 关闭 Channel 的输入端 |
doWriteFileRegion | 零拷贝 |
doReadBytes | 从 Channel 中读取数据并返回本次读取到的字节数量 |
doWriteBytes | 交给定的 ByteBuf 写入到底层的 Channel |
NioSocketChannel
NioSokcetChannel 是非常重要的一环,因为它实现从底层 Channel连接、读取和写入。
连接 connect
doConnect 是 NioSocketChannel 实现的核心方法,解析如下:
- 如果入参 localAddress 不为空,则绑定本地地址。
- 调用
java.nio.channels.SocketChannel#connect
方法连接对端。- 若 Channel 为非阻塞模式,会立即返回。
- 若 Channel 为阻塞模式,会阻塞直到连接成功或出现 IO 异常。
- 判断连接是否成功,如果连接失败,则将 OP_CONNECT 事件添加到感兴趣事件集合中。
- 若在中途出现异常,则立即关闭 Channel。
相关源码如下:
// io.netty.channel.socket.nio.NioSocketChannel#doConnect
/**
* 连接对端
* @param remoteAddress
* @param localAddress
* @return
* @throws Exception
*/
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
// #1 如果「localAddress」不为空则绑定到本地端口,否则系统会随机分配一个端口与通道绑定
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
// #2 连接对端
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
// #3 将 OP_CONNECT 事件添加到感兴趣事件集合中,
// 待下次循环有I/O连接成功则会接收并做后续处理
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
// #4 连接中途出现异常,立即关闭 Channel
if (!success) {
doClose();
}
}
}
读取数据 doReadBytes
读数据也是比较简单的,向入参 ByteBuf 对象写入数据即可,这有现成的 API 可以使用。步骤解析:
- 获取自适应分配器以更新本次估计获取值。该值就是 ByteBuf 可写入字节数。
- 调用 ByteBuf API 从底层的 Channel 读取数据并填充 ByteBuf 对象
相关源码如下:
/**
* 从 {@link SocketChannel} 中读取数据
* @param byteBuf
* @return
* @throws Exception
*/
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
// #1 获取自适应分配器以更新本次估计获取值
// 这个值是ByteBuf的可写空间大小,它的含义是如果本次从 Channel 中读取的数据填满此 ByteBuf 的话,
// 那么它可能还有数据,因此就是循环读下去,直到不填满 ByteBuf 可写空间为止。第一次默认的 ByteBuf 空间大小为 2048
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// #2 更新预估值
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
// #3 从「SocketChannel」中读取数据并填充「ByteBuf」
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
这里会获取一个自适应处理器(RecvByteBufAllocator.Handler),这里记录预估读取的字节数,其值是 ByteBuf 可写字节数量,这个值会用来判断是否会继续读下去:
- 如果实际字节读取数等于预估值,则会继续读。因为本次填充满了,虽然存在刚好的情况,但是一般概率比较小,所以还是会尝试继续读。
写数据 doWrite
写数据是 NioSocketChannel 的核心方法,源码步骤相对来讲比较清晰明了,解析如下:
- 获取写循环次数(默认值:16),意味着每调用一次{@link #doWrite(ChannelOutboundBuffer)} 最多可循环向底层 Channel 写16次数据
- 判断输出缓冲区对象是否为空,如果为空对象,表示当前 Channel 已经关闭,需要做一些清理动作。
- 从配置文件中获取每次写最大合并字节数。默认值:Integer.MAX_VALUE。
- 合并输出缓冲区数据。得到 ByteBuffer[] 数组。
- 根据数组长度进行调用合适的 API 向底层 Channel 写入数据。
- 调整「maxBytesPerGatheringWrite」的值以适应OS对「SO_SNDBUF」的改变。
- 根据实际写入字节数处理输出缓冲区。
- 判断本次写操作是否将「输出缓冲区」内部数据全部写入 Channel,如果没有,则注册「OP_WRITE」事件,等待下次继续执行写操作。
源码如下:
// io.netty.channel.socket.nio.NioSocketChannel#doWrite
/**
* 向「Socket」写入数据
* ① 获取写循环次数(默认值:16),意味着每调用一次{@link #doWrite(ChannelOutboundBuffer)} 最多可循环向底层 Channel 写16次数据
* ② 判断输出缓冲区对象是否为空,如果为空对象,表示当前 Channel 已经关闭,需要做一些清理动作。
* ③ 从配置文件中获取每次写最大合并字节数。默认值:Integer.MAX_VALUE。
* ④ 合并输出缓冲区数据。得到 ByteBuffer[] 数组
* ⑤ 根据数组长度进行调用合适的 API 向底层 Channel 写入数据
* ⑥ 调整「maxBytesPerGatheringWrite」的值以适应OS对「SO_SNDBUF」的改变
* ⑦ 根据实际写入字节数处理输出缓冲区
* ⑧ 判断本次写操作是否将「输出缓冲区」内部数据全部写入 Channel,如果没有,则注册「OP_WRITE」事件,等待下次继续执行写操作
* @param in
* @throws Exception
*/
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
// #1 获取底层通道对象
SocketChannel ch = javaChannel();
// #2 获取写循环次数,默认值:16
int writeSpinCount = config().getWriteSpinCount();
do {
// #3 输出缓冲区为空:
// ① 清除OP_WRITE兴趣事件
// ② 直接返回(incompleteWrite(...) 方法不会被调用)
if (in.isEmpty()) {
clearOpWrite();
return;
}
// 确保所有待写入数据都是由「ByteBuf」组成
// #4 输出缓冲区不为空
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
// Buffer总数量
int nioBufferCnt = in.nioBufferCount();
// 总是使用「nioBuffers()」来解决数据损坏的问题
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// 除了「ByteBuffer」对象之外,还有其他可写的数据(比如 FileRegion),所以我们回退到正常写模式
writeSpinCount -= doWrite0(in);
break;
case 1: {
// 只有一个可写的「ByteBuffer」对象所以使用「非聚集写入」
// 0长度的buffers是不会被添加到ChannelOutboundBuffer的,所以这里不需要重复检查所有的buffers的大小
ByteBuffer buffer = nioBuffers[0];
// 尝试写入字节数
int attemptedBytes = buffer.remaining();
// 调用「java.nio.channels.SocketChannel.write(java.nio.ByteBuffer)」向Socket写入数据
// localWrittenBytes:实际写入字节数(对于非阻塞模式的通道来说,可能本次不会写入任务数据,即可能返回0)
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
// 如果本次写入数据量为0,则表明底层Socket缓冲区满了,无空闲空间容纳新的数据
// 那就添加OP_WRITE事件,待下次写入吧
incompleteWrite(true);
// 直接返回
return;
}
// 调整「maxBytesPerGatheringWrite」的值
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
// 处理Entry对象及内部消息
in.removeBytes(localWrittenBytes);
//
--writeSpinCount;
// 跳出
break;
}
default: {
// ChannelOutboundBuffer不会将零长度的缓冲区添加到nioBuffers中,因此无需检查所有缓冲区的总大小是否为非零。
// 我们将最大数量限制为int以上,因此强制转换是安全的
long attemptedBytes = in.nioBufferSize();
// 一次性向Socket中写入多个ByteBuffer对象数据
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// 强制转换为int是安全的,因为我们将nioBuffers中的数据总量限制为Integer.MAX_VALUE
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite);
// 处理Entry对象及内部消息
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
// 如果在默认的16次循环中写完所有数据,则移除写事件(OP_WRITE)。
// 否则,需要添加(OP_WRITE)待下次IO操作时继续向Socket写入数据
incompleteWrite(writeSpinCount < 0);
}
简单总结,就是将输出缓冲区的数据写入 Channel。但是这个过程并非简单,有以下几点值得我们关注:
- 我们不能一味向底层 Channel 写入数据。所以 Netty 定义一个
writeSpinCount
参数限制本次的循环写操作次数。如果输出缓冲区过大,没有这个参数限制的话就会”阻塞”(并非真正意义上的阻塞)当前 EventLoop 处理其他 IO 事件和任务,导致整体吞吐量下降。 - 流控。如果网络状况良好,SO_SNDBUF 空间足够容纳当前输出缓冲区的所有内部,则根据条件判断是否增长合并字节数。由
adjustMaxBytesPerGatheringWrite
方法体现。如果现在出现拥塞,SO_SNDBUF 并非能完全容纳,那就需要减少合并字节数。 - 每次向底层写入数据后,都需要清理输出缓冲区,这样有利于内存回收。
- 当 while 循环结束后,需要根据实际情况判断是否还需要注册 OP_WRITE 事件。
- 如果循环 16 次都还没有向 Channel 写完所有输出缓冲区内的数据,那就需要注册。否则,清除 OP_WRITE。
总结
这篇文章主要讲述了 Channel 的层次结构和对相关的 API 进行分解。我的本意是希望各位看官能从点及面,深刻透彻理解 Netty 的层次设计。