概述
前面的章节对 Netty 的整体架构进行简要概述,本篇文章会从引导器 Bootstrap 切入,开始一步一步构建一个 Netty 完整的知识体系,通过图+源码的方式让大家对 Netty 有更深刻的认知,方便后续在使用 Netty 构建底层高性能服务框架时信心十足。
源码版本 4.1.58
引导(Bootstrap)
Bootstrap 最重要的功能是将这些组件有效组织起来,成为一个可实际运行的应用程序。
层次结构
服务端和客户端的引导(Bootstrap)需要区分开:
- 服务端使用一个父 Channel 来接受来自客户端的连接,并创建了 Channel 以用于它们之间的通信。
- 客户端将最可能只需要一个单独的、没有父 Channel 的 Channel 来完成所有与网络的交互。
服务端和客户端通用的引导逻辑由 AbstractBootstrap 处理,而特定于客户端或服务端的引导步骤则分别: BootStrap 或 ServerBootstrap 处理。
AbstractBootstrap
AbstractBootstrap 是一个抽象类,它有以下几个重要功能:
- 提炼 ServerBootstrap 和 Bootstrap 相同的逻辑代码。
- 提供子类一个快速便捷的方式创建 Channel。
- 支持链式调用。
核心方法说明如下:
/**
* {@link AbstractBootstrap} 是一个帮助类,它可以让你轻松地引导一个{@link Channel}。
* 仔细观察抽象类的泛型参数,子类 B 是其父类型的一个类型参数,因此可以返回运行时实例的引用以
* 支持方法的链式调用。
* 当不在{@link ServerBootstrap}上下文中使用时,{@link #bind()} 方法对于无连接的传输(如数据报(UDP))是很有用的。
*
*/
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
/**
* 「EventLoopGroup」实例对象用于处理即将创建的通道的所有I/O事件以及任务(Task)
*/
public B group(EventLoopGroup group) {}
/**
* 指定了「Channel」的实现类的具体类型。如果该实现类没有提供默认的无参构造器,可以通过调用
* channelFactory() 方法指定一个初始化工厂类。
* 如果提供无参构造器,将会通过反射的方式实例化对象。
*/
public B channel(Class<? extends C> channelClass) {}
/**
* 指定「Channel」应绑定到的本地地址。如果没有指定,将由操作由操作系统分配一个随机端口号。
*/
public B localAddress(SocketAddress localAddress) {}
/**
* 设置「ChannelOption」,这些key-value将会被应用到每个新创建的「Channel」的「ChannelConfig」中。
* 在 bind() 或 connect() 方法中会被设置到底层的「Channel」。
* 如果在 bind() 或 connect() 方法后面设置「ChannelOption」是无效的。
* 具体支持哪些「ChannelOption」取决于「Channel」类型
*/
public <T> B option(ChannelOption<T> option, T value) {}
/**
* 指定新创建「Channel」的属性值。这些属性值是通过 bind() 或 connect() 方法设置到「Channel」中。
* 具体取决于谁先被调用。
* 在「Channel」被创建后调用此方法是无效的。
*/
public <T> B attr(AttributeKey<T> key, T value) {}
/**
* 创建一个新的「Channel」并把它注册到「EventLoop」对象中
*/
public ChannelFuture register() {}
/**
* 创建一个新的「Channel」并绑定本地端口
*/
public ChannelFuture bind() {}
/**
* 「ChannelHandler」将会被添加到「ChannelPipeline」,用于处理I/O请求
*/
public B handler(ChannelHandler handler) {}
/**
* 你有时候可能会需要创建多个具有类似配置或完全相同配置的「Channel」。
* clone() 就是为了支持这种模式,不需要你每次都重写配置。
* 这种方式只会创建引导类实例的EventLoopGroup的一份浅拷贝,所以 EventLoopGroup 会被
* 所有克隆的「Channel」共享。
*/
public abstract B clone();
/**
* 返回抽象类「AbstractBootstrapConfig」的实例对象,该对象可以获取「Bootstrap」中的配置信息,
* 比如 localAddress、ChannelHandler、options、attrs 以及 group 等。
*/
public abstract AbstractBootstrapConfig<B, C> config();
}
Bootstrap
Bootstrap 是客户端所需要用到的引导类,作为客户端,一般是主动发起连接,所以需要远端地址和连接方法,相关 API 如下:
/**
* 客户端引导类,方便用户创建「Channel」。
* bind()方法在与无连接传输(如数据报(UDP))相结合时非常有用。
* 对于常规的TCP连接,请使用提供的connect()方法。
*/
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
/**
* 设置远程地址。也可以通过connect()方法指定
*/
public Bootstrap remoteAddress(SocketAddress remoteAddress) {}
/**
* 连接到远程节点并返回一个「ChannelFuture」,说明此操作是异步的。
* 会在连接完成后(可能成功,也可能失败)收到通知。
*/
public ChannelFuture connect() {}
// ...
}
ServerBootstrap
服务器引导相对比较复杂,因为服务端需要承载海量的连接请求,所以一般采用主从 Reactor 设计模型,因此,通过这个角度就能方便理解以 child 开头的相关 API 了。
/**
* 服务端引导API
*/
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
/**
* 为父(Acceptor)和子(client)设置「EventLoopGroup」,这些「EventLoopGroup」用于处理
* ServerChannel 和 Channel 的所有 I/O 事件
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {}
/**
* 一旦通道实例被创建(Acceptor处理I/O连接事件,一旦连接成功则创建新的通道),这些
* ChannelOption将被用于新创建的通道实例。使用 null 值移除之前设置的「ChannelOption」
*/
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {}
/**
* 为每个子通道上设置特定的「AttributeKey」和「value」,如果该值为空,则「AttributeKey」将会被移除
*/
public <T> ServerBootstrap childAttr(AttributeKey<T> childKey, T value) {}
/**
* 为子通道设置「ChannelHandler」,用来服务Channel的请求
*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {}
}
服务端启动源码解析
① io.netty.bootstrap.AbstractBootstrap#doBind
方法 doBind()
是一切的起点,也是我们源码分析的重点。在看源码之前,我们先梳理一下 doBind() 具体做了哪些事情:
- 调用
initAndRegister()
初始化并注册 Channel,同时返回一个 ChannelFuture 实例,这是一个异步过程。 - 通过
ChannelFuture#cause()
判断注册过程是否有异常,如果发生异常,直接返回。 通过
ChannelFuture#isDone()
判断注册过程是否已执行完毕:- 已完成,调用
doBind0()
进行 Socket 绑定。 - 未完成,向 ChannelFuture 注册一个回调监听,待
initAndRegister()
方法执行完后回调,也是调用doBind0()
进行 Socket 绑定。 ```java // io.netty.bootstrap.AbstractBootstrap#doBind /**
- 目标对象:通道
- 流程: 创建通道->初始化通道->注册通道->通道绑定
- ① 创建并初始化通道(异步)
- ② 调用JDK底层方法进行Socket绑定。将通道与端口绑定
- @param localAddress
@return */ private ChannelFuture doBind(final SocketAddress localAddress) {
// #1 异步创建并初始化通道 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { // 发生异常,直接返回 return regFuture; }
// #2 判断 initAndRegister() 过程是否已执行完毕 if (regFuture.isDone()) { // #2-1 成功完成Channel初始化相关工作,则可进行Socket绑定 // 一般不会这么快就完成 ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // 注册异步回调处理函数 // #2-2 相关Channel还未完成初始化,则向ChannelFuture添加一个「ChannelFutureListener」回调监听 // 当「initAndRegister()」方法完成后会调用「operationComplete()」,这个方法同样也是进行Socket绑定 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() {
@Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); // #3 绑定端口 doBind0(regFuture, channel, localAddress, promise); } }
② 创建并注册服务端 Channel
这里主要是说明
initAndRegister()
方法,它主要的作用:
- 已完成,调用
通过
ChannelFactory
工厂模式实例化服务端Channel
。- 初始化
Channel
。 -
步骤一:通过 ChannelFactory 工厂模式实例化服务端 Channel
相关源码如下:
// io.netty.bootstrap.AbstractBootstrap#initAndRegister /** * 初始化并注册服务端Channel * ① 通过反射创建对应的{@link io.netty.channel.ServerChannel}实例,一般使用{@link io.netty.channel.socket.nio.NioServerSocketChannel} * ② 初始化{@link io.netty.channel.ServerChannel}。Netty的Channel包含JDK底层的 {@link java.nio.channels.ServerSocketChannel} * ③ 把已完成初始化工作的Channel注册到EventLoopGroup其中一个EventLoop对象中 */ final ChannelFuture initAndRegister() { Channel channel = null; try { // #1 创建「Channel」。 // 通过反射创建对应的Channel实例,一般使用NioServerSocketChannel.class channel = channelFactory.newChannel(); // #2 初始化「Channel」 init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } // #3 注册「Channel」 // 把通道注册到EventLoopGroup对象中 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
ChannelFactory
通过源码可以看到使用 ChannelFactory 创建服务端 Channel。ChannelFactory 接口非常简洁:
/** * 工厂接口,用于创建「Channel」对象 */ @SuppressWarnings({ "ClassNameSameAsAncestorName", "deprecation" }) public interface ChannelFactory<T extends Channel> extends io.netty.bootstrap.ChannelFactory<T> { /** * Creates a new channel. */ @Override T newChannel(); }
就提供一个工厂方法
T newChannel()
用来实例化 Channel 对象。Netty 提供了一个默认的实例类:
从名字也可以看出,它是通过反射来实例化对象。一般来说:
- 如果 Channel 提供无参构造器,可以直接使用 Netty 提供的
ReflectiveChannelFactory
实例化对象。 - 如果 Channel 不能提供无参构造器,用户可以实现 ChannelFactory 接口,在
newChannel()
方法中使用 new 关键字实例化一个对象并返回。那怎么和 Bootstrap 关联起来呢? 抽象类 AbstractBootstrap 抽象了一个方法可以传入 ChannelFactory,随后在 Netty 引导启动过程中会使用该 ChannelFactory 实例化一个 Channel 对象。
具体 ReflectiveChannelFactory 源码就不贴出来了,内部使用 java.lang.reflect.Constructor#newInstance
反射获得实例化对象。
NioServerSocketChannel
一般我们在配置服务端的 ServerBootstrap 引导器时传入 NioServerSocketChannel 类型的服务器 Channel,可以简单理解是 Netty 对 Java 底层 java.nio.channels.SelectableChannel
通道的包装类 那它的构造器会完成什么事情呢? 总结起来大概只有三件:
- 初始化
io.netty.channel.Channel
属性。包括:- 全局唯一通道标识 ID。
- 实例化魔法类
io.netty.channel.Channel.Unsafe
。Channel 类型与 Unsafe 实例类型一一对应。 - 创建 DefaultChannelPipeline 管道实例。
- 将 JDK 底层的 Channel 设置为非阻塞模式。
- 创建 ChannelConfig 实例对象。Channel 类型与 ChannelConfig 实例类型一一对应。
```java
// io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()
/**
- NioServerSocketChannel 构造器 */ public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
/**
- 根据给定的「java.nio.channels.ServerSocketChannel」实例化对象
「java.nio.channels.ServerSocketChannel」是依据底层操作系统创建 */ public NioServerSocketChannel(ServerSocketChannel channel) { // #1 初始化相关属性,将通道设置为非阻塞模式 super(null, channel, SelectionKey.OP_ACCEPT);
// #2 创建配置类 config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
// io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel /**
- super() 方法
*/
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// #1 初始化相关属性:全局唯一标识id、魔法类unsafe、实例化 DefaultChannelPipeline
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
} catch (IOException e) {// #2 把JDK底层的Channel配置为非阻塞模式 ch.configureBlocking(false);
} }try { // #3 异常,关闭「Channel」 ch.close(); } catch (IOException e2) { logger.warn("Failed to close a partially initialized socket.", e2); } throw new ChannelException("Failed to enter non-blocking mode.", e);
// io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel) protected AbstractChannel(Channel parent) { this.parent = parent; // #1 全局唯一ID id = newId(); // #2 仅给内部使用操作网络相关的魔法类 unsafe = newUnsafe(); // #3 创建DefaultChannelPipeline管道 pipeline = newChannelPipeline(); }
// io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, “channel”); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
/**
- 根据操作系统底层获取对应的「java.nio.channels.ServerSocketChannel」
- 对于Linux版本大于等于2.6默认采用EPollSelectorProvider
- @param provider
@return */ private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a server socket.", e);
} }
public class DefaultSelectorProvider {
private DefaultSelectorProvider() { }
public static SelectorProvider create() {
String osname = AccessController.doPrivileged(
new GetPropertyAction("os.name"));
if ("SunOS".equals(osname)) {
return new sun.nio.ch.DevPollSelectorProvider();
}
// Linux内核2.6及以上版本,采用EPollSelectorProvider,
// 低版本内核使用PollSelectorProvider
if ("Linux".equals(osname)) {
String osversion = AccessController.doPrivileged(new GetPropertyAction("os.version"));
String[] vers = osversion.split("\\.", 0);
if (vers.length >= 2) {
try {
int major = Integer.parseInt(vers[0]);
int minor = Integer.parseInt(vers[1]);
if (major > 2 || (major == 2 && minor >= 6)) {
return new sun.nio.ch.EPollSelectorProvider();
}
} catch (NumberFormatException x) {
}
}
}
return new sun.nio.ch.PollSelectorProvider();
}
}
值得一提的是在 AbstractChannel(Channel) 构造器中实例 DefaultChannelPipeline,它用于管理 ChannelHandler。在构造器中,它会创建两个非常重要的对象:HeadContext 和 TailContext。两者之间构造双向链表结构。<br /><br /> 至于 HeadContext 和 TailContext 在 Netty 中扮演什么角色,待我们讲到 ChannelPipeline 章节时细说。创建服务端 NioServerChannel 我们已经讲完了,它所初始化的步骤也在最前面总结。
<a name="x2rWW"></a>
### 步骤二:初始化 Channel
初始化 Channel 的核心方法是 `io.netty.bootstrap.ServerBootstrap#init` 它完成以下事件:
1. 设置 Socket 参数。常见有:SO_RCVBUF、SO_BACKLOG 等等。
1. 设置自定义属性。可以从 Channel 中获得。
1. 向管道中添加 ChannelInitializer 处理器,帮助我们编排 ChannelPipeline。这个 ChannelInitializer 完成以下事件(**异步**):
1. 向管道中添加引导配置时的 ChannelHandler。
1. 异步添加 ServerBootstrapAcceptor对象。
此时 ChannelPipeline 内部结构如下图所示:<br /><br />我们重点说明一个步骤三,为什么在这里添加一个 ChannelInitialzer 帮助类呢?
1. ChannelInitializer 实现 ChanneHandler 接口,所以可以被添加到管道中,它是一个特殊的 ChannelHandler。主要功能是提供一种便捷的方式设置 Channel 的 ChannelPiepline。通常被用于 Bootstrap#handler()、ServerBootstrap#handler、ServerBootstrap#childHandler() 方法中。
1. 一旦 Channel 完成注册之后就立即触发相应的事件,并回调 `ChannelInitializer#initChannel()` 方法完成管道的初始化工作。并在完成初始化工作后从 ChannelPiepline 管道中移除。
1. 可以把 ChannelInitializer 想象成延迟初始化的一个占位符。
这里简单说明一下 ServerBootstrapAcceptor,大伙还记得这幅图么?<br /><br />没错,它就是图中 acceptor 的化身(每种 Reactor 模型都有 acceptor,但为了简单起见,只讨论主从 Reactor 模型)。它专注的事情有:
1. 一旦连接完成 TCP 三次握手,就为它创建一个新的 Channel。
1. 把 Channel 注册到 subReactor。
和 ServerBootstrapAcceptor 相关的源码会在 ChannelPiepline 讲到。
```java
// io.netty.bootstrap.ServerBootstrap#init
/**
* 初始化「Channel」
* ① 设置Channel属性
* ② 保存用户自定义属性
* ③ 向管道中添加 ChannelInitializer 处理器
* @param channel
*/
@Override
void init(Channel channel) {
// #1 设置「Channel」参数
// 在创建服务端「Channel」时,此时与Channel相关的配置存储在「ServerSocketChannelConfig」相关的子类中。
// 在初始化「Channel」的过程中,会将这些参数设置到JDK底层的ServerSocketChannel上,并把自定义属性绑定到Channel(#2)。
setChannelOptions(channel, newOptionsArray(), logger);
// #2 保存用户自定义属性
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
// #3 ServerBootstrap启动时内部添加一个ChannelInitializer处理器,该处理器有以下功能:
// ① 向管道中添加用户服务器配置的ChannelHandler(AbstractBootstrap.handler(ChannelHandler))
// ② 异步添加特殊的 ServerBootstrapAcceptor 处理器,该处理器用于向Worker_EventLoop分发(注册)已连接的通道
// ③ 此时调用ChannelPipeline#addLast()方法有:
// #3-1 将处理器插入链表中
// #3-2 把处理器(被HandlerContext引用)封装成「PendingHandlerAddedTask」异步任务提交到「任务队列」
// 虽然此时已经「物理」上添加到管道中,但是由于相关通道(Channel)还未完成注册,所以「HandleAdded」事件不会被触发,
// 而PendingHandlerAddedTask异步任务就是等待合适时机触发「HandleAdded」事件
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
// #1 把配置到ServerBootStrap的ChannelHandler添加到管道中
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// #2 向ServerBootstrap添加一个非常重要的ChannelHandler--「ServerBootstrapAcceptor」。
// 该处理器主要功能是将已连接的Channel注册到WorkerGroup。
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
步骤三:注册 Channel
回到 initAndRegister()
主流程,创建并初始化 Channel 后,我们就可以把 Channel 注册到 EventLoop 对象里了。
注册 Channel 流程相对来说稍显复杂,但我们一步步拆解,思路就清晰了。大致流程总结如下:
- 从 EventLoopGroup 中轮询一个 EventLoop 与 Channel 绑定。注意,此时这个 EventLoop 就等于 acceptor,它所完成的工作前面已经说明了。
- 将
java.nio.channels.SelectableChannel
注册到java.nio.channels.Selector
对象中。interestOps 默认值为 0。 - 触发 handlerAdded 事件。
- 回调步骤二在管道中添加的
ChannelInitializer#initChannel()
方法。此时管道内部结构图如下图所示:
- 回调步骤二在管道中添加的
- 触发 channelRegistered 事件。
注意,调用 ChannelPipline#addLast() 会触发 handlerAdded 事件并在管道中传播。但是事件触发的前提是 Channel 需要完成注册。
相关源码:
// io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
/**
* 调用 next() 方法从「EventLoopGroup」中轮询获得「EventLoop」对象,
* 随后把「Channel」注册到「EventLoop」对象中
*/
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
// io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
// io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 底层调用Unsafe魔法类实例完成注册,最后还是调用 register0(promise)方法
promise.channel().unsafe().register(this, promise);
return promise;
}
// io.netty.channel.AbstractChannel.AbstractUnsafe#register0
/**
* 通道注册
* @param promise
*/
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// #1 将JDK的Channel注册到Selector
doRegister();
neverRegistered = false;
// 此时通道已完成注册,接下来就需要触发各种事件了。
// 很多任务都必须在通道注册完成后才能开展
registered = true;
// 确保我们在通知promise之前调用handlerAdded()方法,这是必要的。
// 因为用户可能已经通过在ChannelFutureListener中的管道触发了事件。
// #2 触发「handlerAdded」事件
// 触发内部待添加的「ChannelHandler#handlerAdded()」回调方法
// 对于正常的Handler来说,一般不会做些什么,而对于特殊的channelinitializer
// 会把自定义的ChannelHandler添加到链表中
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// #3 触发「channelRegistered」事件
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered.
// This prevents firing multiple channel actives if the channel is deregistered and re-registered.
// #4 根据活跃状态触发相应事件
if (isActive()) {
if (firstRegistration) {
// 触发「channelActive」事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
AbstractUnsafe#register0()
方法是步骤三的核心方法,在阅读源码时我们需要舍弃掉一些细枝未节,只关注核心的步骤,至于如何触发 handlerAdded 事件这得等到讲解 ChannePipeline 时才会详细说明。
方法 pipeline.invokeHandlerAddedIfNeeded();
也是一个重要的方法,前面也说过,主要回调 ChannelInitializer``#initChannel()
方法初始化 Channel 的管道。但对于 mainReactor 和 subReactor 还是有所区别:
- 对于 mainReactor 来说,它有一个独立的 ChannelPiepline,内部结构为:
- 而对于 subReactor 来说,每一个连接(来自 mainReactor)都会创建一个新的 ChannelPiepline,而它每次都会回调我们在
ServerBootstrap.childChannel()
所添加的ChannelInitializer#initChannel()
方法。比如对于我们的 HTTP 服务器而言,管道内部结构为:
③ Channel 和 SocketAddress 进行绑定
Channel 和 SocketAddress 进行绑定的核心方法是 AbstractUnsafe#bind
,逻辑也十分清晰:
- 调用底层 JDK 接口绑定 SocketAddress。
- 绑定成功触发「channelActive」事件(由通道绑定的 EventLoop 执行)。
更新「ChannelPromise」状态,触发相应的监听事件。
// io.netty.channel.AbstractChannel.AbstractUnsafe#bind @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { boolean wasActive = isActive(); try { // #1 调用底层JDK方法绑定端口 doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { // #2 绑定成功触发「channelActive」事件 pipeline.fireChannelActive(); } }); } // #3 更新「ChannelPromise」状态,触发相应的监听事件 safeSetSuccess(promise); }
虽然步骤 1, 3 十分重要,但最重要的还是步骤 2,因为它完成了一件重要的事件就是向 SelectionKey 中添加
SelectionKey.OP_ACCEPT
兴趣事件。到这里,Netty 才可以说大致完成服务端启动,可以正式接收客户端的连接请求。相关源码如下:// io.netty.channel.nio.AbstractNioChannel#doBeginRead @Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { // 注册 OP_READ 事件到服务端Channel的事件集合中 selectionKey.interestOps(interestOps | readInterestOp); } }
服务端启动源码解析讲解结束了,我这里只是挑选部分的重要的源码进行解读,有些源码可能跳得比较快,那就需要朋友们通过 Debug 一步一步来了。总的来说,个人认为 Netty 启动服务有以下几个点需要值得关注:
异步事件模型。这是 Netty 提供给开发者强大的扩展功能。Netty 启动服务时部分步骤也是通过异步来完成的。
ChannelInitializer#initChannel()
调用时机。 对于服务端启动,我们最后要的是做好 acceptor 的初始化工作(在 Netty 中由 ServerBootstrapAcceptor 具象化)服务端如何处理客户端新建连接
通过上面的学习,回答这个问题就变得十分简单了。对于已完成 TCP 三次握手的连接,它们最终会被 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor 处理。
大致总结为以下几个步骤:Boss NioEventLoop 线程在
for(;;)
不断循环新的 I/O 连接事件。- 客户端连接成功建立,构造 Netty 客户端 NioSocketChannel 对象。
- 把 NioSocketChannel 对象注册到 Worker EventLoopGroup 中。
- 添加 OP_READ 事件到 NioSocketChannel 感兴趣的事件集合中。
关于 NioEventLoop 相关逻辑这里不展开讲解,我们直接看最核心的方法:
① 获取 SocketChannel
// io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// #1 从「ServerSocketChannel」接受新的连接并返回「SocketChannel」
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// #2 构造「NioSocketChannel」并添加到「ByteBuf」对象中
buf.add(new NioSocketChannel(this, ch));
// #3 返回
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
// #4 发生异常关闭通道连接
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
② 注册到 Worker EventLoopGroup
// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
/**
* 将新连接「NioSocketChannel」注册到 Worker EventLoopGroup对象中
*/
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// 向 Worker EventLoopGroup 注册客户端新连接的通道
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
总结
本文我们通过源码深入分析了 Netty 服务端启动的全流程,对其中的部分组件有了基本的认知和使用。最后也顺便简述了 Netty 处理新建立的客户端连接。有些地方没有讲得太详细,限于篇幅,只讲了大概,希望看到这篇文章的朋友们能动动手 Debug 一下,很多问题都能迎刃而解。