概述

前面的章节对 Netty 的整体架构进行简要概述,本篇文章会从引导器 Bootstrap 切入,开始一步一步构建一个 Netty 完整的知识体系,通过图+源码的方式让大家对 Netty 有更深刻的认知,方便后续在使用 Netty 构建底层高性能服务框架时信心十足。

源码版本 4.1.58

引导(Bootstrap)

Bootstrap 最重要的功能是将这些组件有效组织起来,成为一个可实际运行的应用程序。

层次结构

Bootstrap.png
服务端和客户端的引导(Bootstrap)需要区分开:

  • 服务端使用一个父 Channel 来接受来自客户端的连接,并创建了 Channel 以用于它们之间的通信。
  • 客户端将最可能只需要一个单独的、没有父 Channel 的 Channel 来完成所有与网络的交互。

服务端和客户端通用的引导逻辑由 AbstractBootstrap 处理,而特定于客户端或服务端的引导步骤则分别: BootStrap 或 ServerBootstrap 处理。

AbstractBootstrap

AbstractBootstrap 是一个抽象类,它有以下几个重要功能:

  1. 提炼 ServerBootstrap 和 Bootstrap 相同的逻辑代码。
  2. 提供子类一个快速便捷的方式创建 Channel。
  3. 支持链式调用。

核心方法说明如下:

  1. /**
  2. * {@link AbstractBootstrap} 是一个帮助类,它可以让你轻松地引导一个{@link Channel}。
  3. * 仔细观察抽象类的泛型参数,子类 B 是其父类型的一个类型参数,因此可以返回运行时实例的引用以
  4. * 支持方法的链式调用。
  5. * 当不在{@link ServerBootstrap}上下文中使用时,{@link #bind()} 方法对于无连接的传输(如数据报(UDP))是很有用的。
  6. *
  7. */
  8. public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
  9. /**
  10. * 「EventLoopGroup」实例对象用于处理即将创建的通道的所有I/O事件以及任务(Task)
  11. */
  12. public B group(EventLoopGroup group) {}
  13. /**
  14. * 指定了「Channel」的实现类的具体类型。如果该实现类没有提供默认的无参构造器,可以通过调用
  15. * channelFactory() 方法指定一个初始化工厂类。
  16. * 如果提供无参构造器,将会通过反射的方式实例化对象。
  17. */
  18. public B channel(Class<? extends C> channelClass) {}
  19. /**
  20. * 指定「Channel」应绑定到的本地地址。如果没有指定,将由操作由操作系统分配一个随机端口号。
  21. */
  22. public B localAddress(SocketAddress localAddress) {}
  23. /**
  24. * 设置「ChannelOption」,这些key-value将会被应用到每个新创建的「Channel」的「ChannelConfig」中。
  25. * 在 bind() 或 connect() 方法中会被设置到底层的「Channel」。
  26. * 如果在 bind() 或 connect() 方法后面设置「ChannelOption」是无效的。
  27. * 具体支持哪些「ChannelOption」取决于「Channel」类型
  28. */
  29. public <T> B option(ChannelOption<T> option, T value) {}
  30. /**
  31. * 指定新创建「Channel」的属性值。这些属性值是通过 bind() 或 connect() 方法设置到「Channel」中。
  32. * 具体取决于谁先被调用。
  33. * 在「Channel」被创建后调用此方法是无效的。
  34. */
  35. public <T> B attr(AttributeKey<T> key, T value) {}
  36. /**
  37. * 创建一个新的「Channel」并把它注册到「EventLoop」对象中
  38. */
  39. public ChannelFuture register() {}
  40. /**
  41. * 创建一个新的「Channel」并绑定本地端口
  42. */
  43. public ChannelFuture bind() {}
  44. /**
  45. * 「ChannelHandler」将会被添加到「ChannelPipeline」,用于处理I/O请求
  46. */
  47. public B handler(ChannelHandler handler) {}
  48. /**
  49. * 你有时候可能会需要创建多个具有类似配置或完全相同配置的「Channel」。
  50. * clone() 就是为了支持这种模式,不需要你每次都重写配置。
  51. * 这种方式只会创建引导类实例的EventLoopGroup的一份浅拷贝,所以 EventLoopGroup 会被
  52. * 所有克隆的「Channel」共享。
  53. */
  54. public abstract B clone();
  55. /**
  56. * 返回抽象类「AbstractBootstrapConfig」的实例对象,该对象可以获取「Bootstrap」中的配置信息,
  57. * 比如 localAddress、ChannelHandler、options、attrs 以及 group 等。
  58. */
  59. public abstract AbstractBootstrapConfig<B, C> config();
  60. }

Bootstrap

Bootstrap 是客户端所需要用到的引导类,作为客户端,一般是主动发起连接,所以需要远端地址和连接方法,相关 API 如下:

/**
 * 客户端引导类,方便用户创建「Channel」。
 * bind()方法在与无连接传输(如数据报(UDP))相结合时非常有用。
 * 对于常规的TCP连接,请使用提供的connect()方法。
 */
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
    /**
     * 设置远程地址。也可以通过connect()方法指定
     */
    public Bootstrap remoteAddress(SocketAddress remoteAddress) {}

    /**
     * 连接到远程节点并返回一个「ChannelFuture」,说明此操作是异步的。
     * 会在连接完成后(可能成功,也可能失败)收到通知。
     */
    public ChannelFuture connect() {}

    // ... 
}

ServerBootstrap

服务器引导相对比较复杂,因为服务端需要承载海量的连接请求,所以一般采用主从 Reactor 设计模型,因此,通过这个角度就能方便理解以 child 开头的相关 API 了。

/**
 * 服务端引导API
 */
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    /**
     * 为父(Acceptor)和子(client)设置「EventLoopGroup」,这些「EventLoopGroup」用于处理
     * ServerChannel 和 Channel 的所有 I/O 事件
     */
    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {}

    /**
     * 一旦通道实例被创建(Acceptor处理I/O连接事件,一旦连接成功则创建新的通道),这些
     * ChannelOption将被用于新创建的通道实例。使用 null 值移除之前设置的「ChannelOption」
     */
    public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {}

    /**
     * 为每个子通道上设置特定的「AttributeKey」和「value」,如果该值为空,则「AttributeKey」将会被移除
     */
    public <T> ServerBootstrap childAttr(AttributeKey<T> childKey, T value) {}

    /**
     * 为子通道设置「ChannelHandler」,用来服务Channel的请求
     */
    public ServerBootstrap childHandler(ChannelHandler childHandler) {}

}

服务端启动源码解析

① io.netty.bootstrap.AbstractBootstrap#doBind

方法 doBind() 是一切的起点,也是我们源码分析的重点。在看源码之前,我们先梳理一下 doBind() 具体做了哪些事情:

  1. 调用 initAndRegister() 初始化并注册 Channel,同时返回一个 ChannelFuture 实例,这是一个异步过程。
  2. 通过 ChannelFuture#cause() 判断注册过程是否有异常,如果发生异常,直接返回。
  3. 通过 ChannelFuture#isDone() 判断注册过程是否已执行完毕:

    1. 已完成,调用 doBind0() 进行 Socket 绑定。
    2. 未完成,向 ChannelFuture 注册一个回调监听,待 initAndRegister() 方法执行完后回调,也是调用 doBind0() 进行 Socket 绑定。 ```java // io.netty.bootstrap.AbstractBootstrap#doBind /**
    • 目标对象:通道
    • 流程: 创建通道->初始化通道->注册通道->通道绑定
    • ① 创建并初始化通道(异步)
    • ② 调用JDK底层方法进行Socket绑定。将通道与端口绑定
    • @param localAddress
    • @return */ private ChannelFuture doBind(final SocketAddress localAddress) {

      // #1 异步创建并初始化通道 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { // 发生异常,直接返回 return regFuture; }

      // #2 判断 initAndRegister() 过程是否已执行完毕 if (regFuture.isDone()) { // #2-1 成功完成Channel初始化相关工作,则可进行Socket绑定 // 一般不会这么快就完成 ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // 注册异步回调处理函数 // #2-2 相关Channel还未完成初始化,则向ChannelFuture添加一个「ChannelFutureListener」回调监听 // 当「initAndRegister()」方法完成后会调用「operationComplete()」,这个方法同样也是进行Socket绑定 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() {

         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
             Throwable cause = future.cause();
             if (cause != null) {
                 // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                 // IllegalStateException once we try to access the EventLoop of the Channel.
                 promise.setFailure(cause);
             } else {
                 // Registration was successful, so set the correct executor to use.
                 // See https://github.com/netty/netty/issues/2586
                 promise.registered();
                 // #3 绑定端口
                 doBind0(regFuture, channel, localAddress, promise);
             }
         }
      

      }); return promise; } } ```

      ② 创建并注册服务端 Channel

      这里主要是说明 initAndRegister() 方法,它主要的作用:

  4. 通过 ChannelFactory 工厂模式实例化服务端 Channel

  5. 初始化 Channel
  6. 注册 Channel

    步骤一:通过 ChannelFactory 工厂模式实例化服务端 Channel

    相关源码如下:

    // io.netty.bootstrap.AbstractBootstrap#initAndRegister
    /**
    * 初始化并注册服务端Channel
    * ① 通过反射创建对应的{@link io.netty.channel.ServerChannel}实例,一般使用{@link io.netty.channel.socket.nio.NioServerSocketChannel}
    * ② 初始化{@link io.netty.channel.ServerChannel}。Netty的Channel包含JDK底层的 {@link java.nio.channels.ServerSocketChannel}
    * ③ 把已完成初始化工作的Channel注册到EventLoopGroup其中一个EventLoop对象中
    */
    final ChannelFuture initAndRegister() {
     Channel channel = null;
     try {
         // #1 创建「Channel」。
         // 通过反射创建对应的Channel实例,一般使用NioServerSocketChannel.class
         channel = channelFactory.newChannel();
    
         // #2 初始化「Channel」
         init(channel);
     } catch (Throwable t) {
         if (channel != null) {
             // channel can be null if newChannel crashed (eg SocketException("too many open files"))
             channel.unsafe().closeForcibly();
             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
         }
         // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
         return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
     }
    
     // #3 注册「Channel」
     // 把通道注册到EventLoopGroup对象中
     ChannelFuture regFuture = config().group().register(channel);
     if (regFuture.cause() != null) {
         if (channel.isRegistered()) {
             channel.close();
         } else {
             channel.unsafe().closeForcibly();
         }
     }
    
     return regFuture;
    }
    

    ChannelFactory

    通过源码可以看到使用 ChannelFactory 创建服务端 Channel。ChannelFactory 接口非常简洁:

    /**
    * 工厂接口,用于创建「Channel」对象
    */
    @SuppressWarnings({ "ClassNameSameAsAncestorName", "deprecation" })
    public interface ChannelFactory<T extends Channel> extends io.netty.bootstrap.ChannelFactory<T> {
     /**
      * Creates a new channel.
      */
     @Override
     T newChannel();
    }
    

    就提供一个工厂方法 T newChannel() 用来实例化 Channel 对象。Netty 提供了一个默认的实例类:
    ReflectiveChannelFactory.png
    从名字也可以看出,它是通过反射来实例化对象。一般来说:

  • 如果 Channel 提供无参构造器,可以直接使用 Netty 提供的 ReflectiveChannelFactory 实例化对象。
  • 如果 Channel 不能提供无参构造器,用户可以实现 ChannelFactory 接口,在 newChannel() 方法中使用 new 关键字实例化一个对象并返回。那怎么和 Bootstrap 关联起来呢? 抽象类 AbstractBootstrap 抽象了一个方法可以传入 ChannelFactory,随后在 Netty 引导启动过程中会使用该 ChannelFactory 实例化一个 Channel 对象。

具体 ReflectiveChannelFactory 源码就不贴出来了,内部使用 java.lang.reflect.Constructor#newInstance反射获得实例化对象。

NioServerSocketChannel

一般我们在配置服务端的 ServerBootstrap 引导器时传入 NioServerSocketChannel 类型的服务器 Channel,可以简单理解是 Netty 对 Java 底层 java.nio.channels.SelectableChannel 通道的包装类 那它的构造器会完成什么事情呢? 总结起来大概只有三件:

  1. 初始化 io.netty.channel.Channel 属性。包括:
    1. 全局唯一通道标识 ID。
    2. 实例化魔法类 io.netty.channel.Channel.Unsafe。Channel 类型与 Unsafe 实例类型一一对应。
    3. 创建 DefaultChannelPipeline 管道实例。
  2. 将 JDK 底层的 Channel 设置为非阻塞模式。
  3. 创建 ChannelConfig 实例对象。Channel 类型与 ChannelConfig 实例类型一一对应。 ```java // io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel() /**
    • NioServerSocketChannel 构造器 */ public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }

/**

  • 根据给定的「java.nio.channels.ServerSocketChannel」实例化对象
  • 「java.nio.channels.ServerSocketChannel」是依据底层操作系统创建 */ public NioServerSocketChannel(ServerSocketChannel channel) { // #1 初始化相关属性,将通道设置为非阻塞模式 super(null, channel, SelectionKey.OP_ACCEPT);

    // #2 创建配置类 config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }

// io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel /**

  • super() 方法 */ protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { // #1 初始化相关属性:全局唯一标识id、魔法类unsafe、实例化 DefaultChannelPipeline super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try {
     // #2 把JDK底层的Channel配置为非阻塞模式
     ch.configureBlocking(false);
    
    } catch (IOException e) {
     try {
         // #3 异常,关闭「Channel」
         ch.close();
     } catch (IOException e2) {
         logger.warn("Failed to close a partially initialized socket.", e2);
     }
     throw new ChannelException("Failed to enter non-blocking mode.", e);
    
    } }

// io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel) protected AbstractChannel(Channel parent) { this.parent = parent; // #1 全局唯一ID id = newId(); // #2 仅给内部使用操作网络相关的魔法类 unsafe = newUnsafe(); // #3 创建DefaultChannelPipeline管道 pipeline = newChannelPipeline(); }

// io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, “channel”); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;

}

/**

  • 根据操作系统底层获取对应的「java.nio.channels.ServerSocketChannel」
  • 对于Linux版本大于等于2.6默认采用EPollSelectorProvider
  • @param provider
  • @return */ private static ServerSocketChannel newSocket(SelectorProvider provider) {

    try {

     return provider.openServerSocketChannel();
    

    } catch (IOException e) {

     throw new ChannelException("Failed to open a server socket.", e);
    

    } }

public class DefaultSelectorProvider {

private DefaultSelectorProvider() { }

public static SelectorProvider create() {
    String osname = AccessController.doPrivileged(
        new GetPropertyAction("os.name"));
    if ("SunOS".equals(osname)) {
        return new sun.nio.ch.DevPollSelectorProvider();
    }

    // Linux内核2.6及以上版本,采用EPollSelectorProvider,
    // 低版本内核使用PollSelectorProvider
    if ("Linux".equals(osname)) {
        String osversion = AccessController.doPrivileged(new GetPropertyAction("os.version"));
        String[] vers = osversion.split("\\.", 0);
        if (vers.length >= 2) {
            try {
                int major = Integer.parseInt(vers[0]);
                int minor = Integer.parseInt(vers[1]);
                if (major > 2 || (major == 2 && minor >= 6)) {
                    return new sun.nio.ch.EPollSelectorProvider();
                }
            } catch (NumberFormatException x) {
            }
        }
    }

    return new sun.nio.ch.PollSelectorProvider();
}

}

值得一提的是在 AbstractChannel(Channel) 构造器中实例 DefaultChannelPipeline,它用于管理 ChannelHandler。在构造器中,它会创建两个非常重要的对象:HeadContext 和 TailContext。两者之间构造双向链表结构。<br />![#1 DefaultChannelPipeline初始化.png](https://cdn.nlark.com/yuque/0/2021/png/105848/1615422590758-1e4f81b3-acff-4a5c-b2fa-d8ef9064e404.png#height=155&id=Fbz6U&margin=%5Bobject%20Object%5D&name=%231%20DefaultChannelPipeline%E5%88%9D%E5%A7%8B%E5%8C%96.png&originHeight=309&originWidth=717&originalType=binary&ratio=1&size=23808&status=done&style=none&width=359)<br /> 至于 HeadContext 和 TailContext 在 Netty 中扮演什么角色,待我们讲到 ChannelPipeline 章节时细说。创建服务端 NioServerChannel 我们已经讲完了,它所初始化的步骤也在最前面总结。
<a name="x2rWW"></a>
### 步骤二:初始化 Channel
初始化 Channel 的核心方法是 `io.netty.bootstrap.ServerBootstrap#init` 它完成以下事件:

1. 设置 Socket 参数。常见有:SO_RCVBUF、SO_BACKLOG 等等。
1. 设置自定义属性。可以从 Channel 中获得。
1. 向管道中添加 ChannelInitializer 处理器,帮助我们编排 ChannelPipeline。这个 ChannelInitializer 完成以下事件(**异步**):
   1. 向管道中添加引导配置时的 ChannelHandler。
   1. 异步添加 ServerBootstrapAcceptor对象。

此时 ChannelPipeline 内部结构如下图所示:<br />![#2 初始化通道.png](https://cdn.nlark.com/yuque/0/2021/png/105848/1615424552018-7c05d130-06b6-4026-87e0-8afc9464a42f.png#height=157&id=Xapat&margin=%5Bobject%20Object%5D&name=%232%20%E5%88%9D%E5%A7%8B%E5%8C%96%E9%80%9A%E9%81%93.png&originHeight=314&originWidth=1236&originalType=binary&ratio=1&size=31140&status=done&style=none&width=618)<br />我们重点说明一个步骤三,为什么在这里添加一个 ChannelInitialzer 帮助类呢?

1. ChannelInitializer 实现 ChanneHandler 接口,所以可以被添加到管道中,它是一个特殊的 ChannelHandler。主要功能是提供一种便捷的方式设置 Channel 的 ChannelPiepline。通常被用于 Bootstrap#handler()、ServerBootstrap#handler、ServerBootstrap#childHandler() 方法中。
1. 一旦 Channel 完成注册之后就立即触发相应的事件,并回调 `ChannelInitializer#initChannel()` 方法完成管道的初始化工作。并在完成初始化工作后从 ChannelPiepline 管道中移除。
1. 可以把 ChannelInitializer 想象成延迟初始化的一个占位符。

这里简单说明一下 ServerBootstrapAcceptor,大伙还记得这幅图么?<br />![主从Reactor模型.png](https://cdn.nlark.com/yuque/0/2021/png/105848/1615425661746-e9fa527e-82bf-405d-a572-3c3e8fd08fd1.png#height=925&id=Me6Dw&margin=%5Bobject%20Object%5D&name=%E4%B8%BB%E4%BB%8EReactor%E6%A8%A1%E5%9E%8B.png&originHeight=925&originWidth=1506&originalType=binary&ratio=1&size=123612&status=done&style=none&width=1506)<br />没错,它就是图中 acceptor 的化身(每种 Reactor 模型都有 acceptor,但为了简单起见,只讨论主从 Reactor 模型)。它专注的事情有:

1. 一旦连接完成 TCP 三次握手,就为它创建一个新的 Channel。
1. 把 Channel 注册到 subReactor。

和 ServerBootstrapAcceptor 相关的源码会在 ChannelPiepline 讲到。
```java
// io.netty.bootstrap.ServerBootstrap#init
/**
 * 初始化「Channel」
 * ① 设置Channel属性
 * ② 保存用户自定义属性
 * ③ 向管道中添加 ChannelInitializer 处理器
 * @param channel
 */
@Override
void init(Channel channel) {
    // #1 设置「Channel」参数
    // 在创建服务端「Channel」时,此时与Channel相关的配置存储在「ServerSocketChannelConfig」相关的子类中。
    // 在初始化「Channel」的过程中,会将这些参数设置到JDK底层的ServerSocketChannel上,并把自定义属性绑定到Channel(#2)。
    setChannelOptions(channel, newOptionsArray(), logger);


    // #2 保存用户自定义属性
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
    }
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

    // #3 ServerBootstrap启动时内部添加一个ChannelInitializer处理器,该处理器有以下功能:
    // ① 向管道中添加用户服务器配置的ChannelHandler(AbstractBootstrap.handler(ChannelHandler))
    // ② 异步添加特殊的 ServerBootstrapAcceptor 处理器,该处理器用于向Worker_EventLoop分发(注册)已连接的通道
    // ③ 此时调用ChannelPipeline#addLast()方法有:
    //    #3-1 将处理器插入链表中
    //    #3-2 把处理器(被HandlerContext引用)封装成「PendingHandlerAddedTask」异步任务提交到「任务队列」
    // 虽然此时已经「物理」上添加到管道中,但是由于相关通道(Channel)还未完成注册,所以「HandleAdded」事件不会被触发,
    // 而PendingHandlerAddedTask异步任务就是等待合适时机触发「HandleAdded」事件
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {

            // #1 把配置到ServerBootStrap的ChannelHandler添加到管道中
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // #2 向ServerBootstrap添加一个非常重要的ChannelHandler--「ServerBootstrapAcceptor」。
            // 该处理器主要功能是将已连接的Channel注册到WorkerGroup。
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

步骤三:注册 Channel

回到 initAndRegister() 主流程,创建并初始化 Channel 后,我们就可以把 Channel 注册到 EventLoop 对象里了。
注册 Channel 流程相对来说稍显复杂,但我们一步步拆解,思路就清晰了。大致流程总结如下:

  1. 从 EventLoopGroup 中轮询一个 EventLoop 与 Channel 绑定。注意,此时这个 EventLoop 就等于 acceptor,它所完成的工作前面已经说明了。
  2. java.nio.channels.SelectableChannel 注册到 java.nio.channels.Selector 对象中。interestOps 默认值为 0。
  3. 触发 handlerAdded 事件。
    1. 回调步骤二在管道中添加的 ChannelInitializer#initChannel() 方法。此时管道内部结构图如下图所示:

#3 invokeHandlerAddedIfNeeded.png

  1. 触发 channelRegistered 事件。

    注意,调用 ChannelPipline#addLast() 会触发 handlerAdded 事件并在管道中传播。但是事件触发的前提是 Channel 需要完成注册。

相关源码:

// io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
/**
 * 调用 next() 方法从「EventLoopGroup」中轮询获得「EventLoop」对象,
 * 随后把「Channel」注册到「EventLoop」对象中
 */
@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

// io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

// io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    // 底层调用Unsafe魔法类实例完成注册,最后还是调用 register0(promise)方法
    promise.channel().unsafe().register(this, promise);
    return promise;
}

// io.netty.channel.AbstractChannel.AbstractUnsafe#register0
/**
 * 通道注册
 * @param promise
 */
private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;

        // #1 将JDK的Channel注册到Selector
        doRegister();
        neverRegistered = false;
        // 此时通道已完成注册,接下来就需要触发各种事件了。
        // 很多任务都必须在通道注册完成后才能开展
        registered = true;

        // 确保我们在通知promise之前调用handlerAdded()方法,这是必要的。
        // 因为用户可能已经通过在ChannelFutureListener中的管道触发了事件。

        // #2 触发「handlerAdded」事件
        // 触发内部待添加的「ChannelHandler#handlerAdded()」回调方法
        // 对于正常的Handler来说,一般不会做些什么,而对于特殊的channelinitializer
        // 会把自定义的ChannelHandler添加到链表中
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);

        // #3 触发「channelRegistered」事件
        pipeline.fireChannelRegistered();

        // Only fire a channelActive if the channel has never been registered.
        // This prevents firing multiple channel actives if the channel is deregistered and re-registered.
        // #4 根据活跃状态触发相应事件
        if (isActive()) {
            if (firstRegistration) {
                // 触发「channelActive」事件
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

AbstractUnsafe#register0() 方法是步骤三的核心方法,在阅读源码时我们需要舍弃掉一些细枝未节,只关注核心的步骤,至于如何触发 handlerAdded 事件这得等到讲解 ChannePipeline 时才会详细说明。
方法 pipeline.invokeHandlerAddedIfNeeded(); 也是一个重要的方法,前面也说过,主要回调 ChannelInitializer``#initChannel() 方法初始化 Channel 的管道。但对于 mainReactor 和 subReactor 还是有所区别:

  1. 对于 mainReactor 来说,它有一个独立的 ChannelPiepline,内部结构为:

#3 invokeHandlerAddedIfNeeded.png

  1. 而对于 subReactor 来说,每一个连接(来自 mainReactor)都会创建一个新的 ChannelPiepline,而它每次都会回调我们在 ServerBootstrap.childChannel() 所添加的 ChannelInitializer#initChannel() 方法。比如对于我们的 HTTP 服务器而言,管道内部结构为:

#4 subreactor piepline.png
不要把这两个步骤给混淆了。

③ Channel 和 SocketAddress 进行绑定

Channel 和 SocketAddress 进行绑定的核心方法是 AbstractUnsafe#bind,逻辑也十分清晰:

  1. 调用底层 JDK 接口绑定 SocketAddress。
  2. 绑定成功触发「channelActive」事件(由通道绑定的 EventLoop 执行)。
  3. 更新「ChannelPromise」状态,触发相应的监听事件。

    // io.netty.channel.AbstractChannel.AbstractUnsafe#bind
    @Override
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    
     boolean wasActive = isActive();
     try {
         // #1 调用底层JDK方法绑定端口
         doBind(localAddress);
     } catch (Throwable t) {
         safeSetFailure(promise, t);
         closeIfClosed();
         return;
     }
    
     if (!wasActive && isActive()) {
         invokeLater(new Runnable() {
             @Override
             public void run() {
                 // #2 绑定成功触发「channelActive」事件
                 pipeline.fireChannelActive();
             }
         });
     }
    
     // #3 更新「ChannelPromise」状态,触发相应的监听事件
     safeSetSuccess(promise);
    }
    

    虽然步骤 1, 3 十分重要,但最重要的还是步骤 2,因为它完成了一件重要的事件就是向 SelectionKey 中添加 SelectionKey.OP_ACCEPT 兴趣事件。到这里,Netty 才可以说大致完成服务端启动,可以正式接收客户端的连接请求。相关源码如下:

    // io.netty.channel.nio.AbstractNioChannel#doBeginRead
    @Override
    protected void doBeginRead() throws Exception {
     // Channel.read() or ChannelHandlerContext.read() was called
     final SelectionKey selectionKey = this.selectionKey;
     if (!selectionKey.isValid()) {
         return;
     }
    
     readPending = true;
    
     final int interestOps = selectionKey.interestOps();
     if ((interestOps & readInterestOp) == 0) {
         // 注册 OP_READ 事件到服务端Channel的事件集合中
         selectionKey.interestOps(interestOps | readInterestOp);
     }
    }
    

    服务端启动源码解析讲解结束了,我这里只是挑选部分的重要的源码进行解读,有些源码可能跳得比较快,那就需要朋友们通过 Debug 一步一步来了。总的来说,个人认为 Netty 启动服务有以下几个点需要值得关注:

  4. 异步事件模型。这是 Netty 提供给开发者强大的扩展功能。Netty 启动服务时部分步骤也是通过异步来完成的。

  5. ChannelInitializer#initChannel() 调用时机。 对于服务端启动,我们最后要的是做好 acceptor 的初始化工作(在 Netty 中由 ServerBootstrapAcceptor 具象化)

    服务端如何处理客户端新建连接

    通过上面的学习,回答这个问题就变得十分简单了。对于已完成 TCP 三次握手的连接,它们最终会被 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor 处理。
    大致总结为以下几个步骤:

  6. Boss NioEventLoop 线程在 for(;;) 不断循环新的 I/O 连接事件。

  7. 客户端连接成功建立,构造 Netty 客户端 NioSocketChannel 对象。
  8. 把 NioSocketChannel 对象注册到 Worker EventLoopGroup 中。
  9. 添加 OP_READ 事件到 NioSocketChannel 感兴趣的事件集合中。

关于 NioEventLoop 相关逻辑这里不展开讲解,我们直接看最核心的方法:

① 获取 SocketChannel

// io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    // #1 从「ServerSocketChannel」接受新的连接并返回「SocketChannel」
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {

            // #2 构造「NioSocketChannel」并添加到「ByteBuf」对象中
            buf.add(new NioSocketChannel(this, ch));

            // #3 返回
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            // #4 发生异常关闭通道连接
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

② 注册到 Worker EventLoopGroup

// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
/**
 * 将新连接「NioSocketChannel」注册到 Worker EventLoopGroup对象中
 */
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        // 向 Worker EventLoopGroup 注册客户端新连接的通道
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

总结

本文我们通过源码深入分析了 Netty 服务端启动的全流程,对其中的部分组件有了基本的认知和使用。最后也顺便简述了 Netty 处理新建立的客户端连接。有些地方没有讲得太详细,限于篇幅,只讲了大概,希望看到这篇文章的朋友们能动动手 Debug 一下,很多问题都能迎刃而解。