概述

线程模型指定了操作系统、编程语言、框架或应用程序的上下文中的线程管理的关键方面。在本章中,我们将详细讨论 EventLoop 的方方面面,包括:

  • EventLoop结构层次。
  • 无锁设计。
  • Reactor 线程模型。
  • EventLoop I/O 处理以及任务处理。

    层次结构

    EventExecutor精修版.png
    EventExecutorGroup 继承体系庞大,涉及到的接口、抽象类都比较多,但是我们将它们分门别类就一目了然了:
  1. JDK 接口。Executor 执行器是 Doug Lea 在 Java 1.5 版本提供了多线程任务执行器。ScheduledExecutorService 则是和时间相关的任务执行策略。io.netty.util.concurrent 包构建在 JDK 的 java.utl.concurrent 包上,用来提供线程执行器。
  2. 线程组。这部分是 MultithreadEventLoopGroup 抽象类及实现子类,可以把它们看成不同类型的线程池,比如 Nio 则使用 NioEventLoopGroup。这部分属于管理层,其内部定义了管理 EventLoop 生命周期的相关 API。
  3. 单线程。这部分是 EventLoop 接口实现子类。它们受线程组管理的单线程执行器。类型与线程组一一对应。它是任务处理的执行者,比如处理 I/O 事件。SingleThreadEventLoop 抽象类的子类需要实现最重要的一个方法:SingleThreadEventExecutor.run(),这是根据自身类型处理相关 I/O 事件和任务。比如对应 NioEventLoop 则是使用 NIO API 实现多路复用,而 EpollEventLoop 是通过调用 Linux Epoll API 实现。它的性能比 NIO 更好,因为 NIO 底层是调用 select() 完成,而 epoll 则比 select 性能更好。但是,由于 epoll 是 Linux 2.6 版本以后特有的 API,并不适用于其他操作系统,不属于标准规范。因此,在下面源码的解析中我们还是以 NIOEventLoop 源码进行讲解。

EventExecutorGroup 接口除了复写 JDK 接口外,还定义了管理线程池相关的 API,分别是

  • isShuttingDown()
  • shutdownGracefully()
  • shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
  • terminationFuture()
  • next()

其中,next() 方法是核心,它的作用是返回线程池中(Thread 被 EventLoop 包装,本质还是线程。因此,EventLoopGroup 就是线程池)下一个线程。Netty 提供默认的分配策略是轮询。
AbstractEventExecutorGroup 抽象类除了上述方法没有实现外,其实都有默认实现:通过 next() 相关 API 调用。

线程组

MultithreadEventExecutorGroup

MultithreadEventExecutorGroup 抽象类则是线程组的骨架基础,内部使用数组保存 EventExecutor 执行器。相关变量和构造器解析如下:

  1. /**
  2. * {@link EventExecutorGroup} 接口的抽象类实现,用于多线程处理任务。
  3. */
  4. public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
  5. // 使用数组保存执行器,数组长度默认为2*CPU cores,一旦初始化后就不允许扩容
  6. private final EventExecutor[] children;
  7. // EventExecutor视图
  8. private final Set<EventExecutor> readonlyChildren;
  9. // EventExecutor已终结数量
  10. private final AtomicInteger terminatedChildren = new AtomicInteger();
  11. private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
  12. // 选择器,Netty 提供的默认方式:轮询
  13. private final EventExecutorChooserFactory.EventExecutorChooser chooser;
  14. /**
  15. * 创建一个新的{@link EventExecutorGroup} 实例对象
  16. *
  17. * @param nThreads 当前实例将会使用到的线程数量
  18. * @param executor 执行器。如果使用默认的话则为{@code null} 值
  19. * @param chooserFactory {@link EventExecutor} 选择器工厂,用来获取{@link EventExecutorChooserFactory#EventExecutorChooser} 选择器
  20. * @param args 每当{@link #newChild(Executor, Object...)}方法调用时传递 args
  21. */
  22. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
  23. EventExecutorChooserFactory chooserFactory, Object... args) {
  24. if (nThreads <= 0) {
  25. throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
  26. }
  27. if (executor == null) {
  28. // #1 创建一个执行器,特点:每次调用 executor(Runnable) 方法都新建一个线程执行
  29. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  30. }
  31. // #2 创建数组对象,长度默认为:NettyRuntime.availableProcessors() * 2
  32. children = new EventExecutor[nThreads];
  33. for (int i = 0; i < nThreads; i ++) {
  34. boolean success = false;
  35. try {
  36. // #2-1 初始化数组中的「EventExecutor」实例对象
  37. children[i] = newChild(executor, args);
  38. success = true;
  39. } catch (Exception e) {
  40. // TODO: Think about if this is a good exception type
  41. throw new IllegalStateException("failed to create a child event loop", e);
  42. } finally {
  43. // #2-2 某个「EventExecutor」创建失败,会有以下处理方式:
  44. // ① 优雅关闭之前已经创建成功的Executor
  45. // ② 再次确认每个Executor是否完全被终结
  46. if (!success) {
  47. for (int j = 0; j < i; j ++) {
  48. // 优雅关闭
  49. children[j].shutdownGracefully();
  50. }
  51. // 再次确认每个Executor是否完全被终结
  52. for (int j = 0; j < i; j ++) {
  53. EventExecutor e = children[j];
  54. try {
  55. // 等待EventExecutor完全关闭才退出while循环
  56. while (!e.isTerminated()) {
  57. e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
  58. }
  59. } catch (InterruptedException interrupted) {
  60. // 在等待关闭的过程中,如遇到中断异常,
  61. // 则立即中断当前线程并退出循环,让调用者处理此中断
  62. Thread.currentThread().interrupt();
  63. break;
  64. }
  65. }
  66. }
  67. }
  68. }
  69. // #3 绑定轮询选择器
  70. // 当调用 next() 方法时就会通过此选择器从数组中轮询得到「EventExecotr」执行器对象
  71. chooser = chooserFactory.newChooser(children);
  72. // #4 创建监听回调
  73. final FutureListener<Object> terminationListener = future -> {
  74. if (terminatedChildren.incrementAndGet() == children.length) {
  75. terminationFuture.setSuccess(null);
  76. }
  77. };
  78. // #5 向所有已创建的「EventExecotor」添加关闭监听回调
  79. for (EventExecutor e: children) {
  80. e.terminationFuture().addListener(terminationListener);
  81. }
  82. // #6 生成「EventExecutor」视图,使用「Set」保存
  83. Set<EventExecutor> childrenSet = new LinkedHashSet<>(children.length);
  84. Collections.addAll(childrenSet, children);
  85. readonlyChildren = Collections.unmodifiableSet(childrenSet);
  86. }
  87. }

总结 MultithreadEventExecutorGroup 构造器做了哪些事情:

  1. 创建 EventExecutor 数组。
  2. 调用抽象方法 newChild() 填充数组对象。
  3. 将数组和轮询器进行绑定。
  4. 创建监听回调并注册到每个 EventExecutor 对象中。
  5. 生成 EventExecutor 视图。

这里会创建一个 executor 对象。它比较有趣:每次调用 executor(Runnable) 方法都会创建新线程执行任务。这个有什么用呢:

  • 延迟创建线程。只有当执行任务时才会通过 Executor 对象创建线程。这样好处是不会创建无用线程。
  • 内部封装了线程工厂,以一定规则对线程命名。

EventExecutor newChild(Executor executor, Object... args) 抽象方法需要子类实现,在层次结构中我们看到,其实也是创建对应的 EventLoop 实现类。比如对于 NioEventLoopGroup 而言,它会创建 NioEventLoop 执行器。

NioEventLoopGroup

Netty 核心实现类,可以看成线程池管理器,管理 NIoEventLoop 类型的线程。实现了 newChild() 抽象方法,源码为:

// io.netty.channel.nio.NioEventLoopGroup#newChild
/**
     * 创建一个单线程的{@link EventLoop}对象
     * @param executor
     * @param args
     * @return
     * @throws Exception
     */
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

单线程执行器

单线程执行器可看作为真正可执行任务的线程,它们负责处理 I/O 事件和任务队列。

EventExecutor

EventExecutor 接口定义了一个重要的判断,即

// 判断调用此方法的是否为异线程
boolean inEventLoop();

// 判断 thread 是否属于异线程
boolean inEventLoop(Thread thread);

这是无锁异步编程的重要判断方法,事实上它提供了以下编程模型:
异同线程执行任务.png
通过 inEventLoop() 方法判断当前执行线程与 EventLoop 绑定的线程是否是同一个,如果是同线程则直接调用,如果是异线程,则把相关方法使用 Runnalble 匿名内部类包装,再调用 EventLoop#execute() 进行入队操作。待 EventLoop 执行器执行。由于只有一个线程处理所有事情,因此永远不要将一个长时间运行的任务放入到执行队列中,因为它将阻塞需要在同一线程执行的其他任务。Netty 对于这种情况也提供了相应的 API,可以让耗时的任务在单独的 EventLoopGroup 中执行。EventExecutor 还定义了与异步计算相关的接口, Future 章节会讲到这些内部。

SingleThreadEventExecutor

SingleThreadEventExecutor 是一个非常重要的抽象类,它是实现单线程执行器的基本骨架,作用:

  • 定义EventExecutor 执行器状态。
  • 定义任务队列和定时任务队列。

Reactor 模型

Netty源码之知晓网络IO模型 一文中我们详细讲解了什么是 Reactor 模型,那是怎么配置不同的 Reactor 模型呢?

单 Reactor 单线程模型

EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(group)

单 Reactor 多线程模型

EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(group)

主从 Reactor 模型

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)

以上代码非常简洁,这是 Netty 对 Reactor 模型高度抽象后的结果。在服务端开发中,我们通常使用主从 Reactor 模型,以达到最高的网络性能。而在客户端中,需要根据自身业务选择合适的类型,在性能和资源上寻找平衡。

源码解析

前面我们大致讲解了 EventLoopGroup 和 EventLoop 之间的关系,可以想象成老板和员工,EventLoopGroup 负责管理 EventLoop。因此,我们重点是关注 EventLoop 做了些什么。对于 EventLoop 实现类而言,Netty 提供 4 种默认实现类,分别是:

  1. NioEventLoop
  2. EpollEventLoop
  3. KQueueEventLoop
  4. EpollEventLoop

而其中 NioEventLoop 应用最为广泛,EpollEventLoop 只适用于待定版本的 Linux 系统,但是性能优于 NioEventLoop。接下来,我们只分析 NioEventLoop 源码实现。
Netty源码之服务端/客户端启动 一章中我们讲了 Netty 启动流程,这里简单描述一下(主从 Reactor 模型):

  1. 创建两个 EventLoopGroup “线程池”。
  2. 使用 ServerBootstrap 对象引导 Server 创建。
  3. 从 EventLoopGroup 获取一个 EventLoop 不断轮询 I/O 事件和处理任务,对于 Main Reactor 线程来说,它只关注 OP_ACCEPT 事件。
  4. 当一个新连接到达,Main Reactor 线程会创建 NioSocketChannel 对象并将其注册到 Sub Reactor中,由 Sub Reactor 其中一个 EvnetLoop 负责该连接的剩下 I/O 事件和相关处理。

而我们今天所讲的是与步骤 3 相关的源码实现。

① 开始线程

当我们在 Main 方法中创建 NioEventLoopGroup 对象时,此时并没有线程与之绑定,当真正需要执行任务时才会创建新的线程执行业务,而 EventLoop 则与该线程绑定,这就是 Netty 对 EventLoop 的无锁实现。其中也体现了懒加载的思想。
那让我们通过 execute(Runnable) 方法开启 NioEventLoop 的学习吧!
该方法被 SingleThreadEventExecutor 抽象类所实现,SingleThreadEventExecutor 是 EventLoop 的基本实现骨架。

// io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable)
/**
 * 单线程Executor执行任务
 * 任务默认立即执行
 * @param task
 */
@Override
public void execute(Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    execute(task,
            !(task instanceof LazyRunnable) &&
                    wakesUpForTask(task));  // 默认为「true」
}
/**
 * 单线程执行任务
 * @param task
 * @param immediate
 */
private void execute(Runnable task, boolean immediate) {
    // #1 判断是否为异线程调用该方法,初始化时,必定为false
    boolean inEventLoop = inEventLoop();

    // #2 添加到「taskQueue」队列中
    addTask(task);

    // #3 异线程调用
    if (!inEventLoop) {
        // #3-1 根据state状态值判断是否已开启线程
        // 如果没有开启,则会通过线程工厂创建一个新的线程用来执行接下来的任务和I/O操作
        startThread();

        // #3-2 线程被关闭
        if (isShutdown()) {
            boolean reject = false;
            try {
                // 移除当前任务
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {}
            if (reject) {            
                // 抛出「RejectedExecutionException」异常
                reject();
            }
        }
    }

    // #4 唤醒Executor执行器执行任务
    // 默认为添加任务后立即唤醒selector
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

execute(Runnable) 最终是调用 execute(Runnable,immediate) 方法。参数 immediate 表示是否需要立即执行的意思。设想此时线程阻塞在 Selector.select() 方法上,正好有一个异线程添加一个任务,如果 immediate=true 则表示将会调用 Selector.weakup() 方法唤醒阻塞线程,这样任务也就可以”立即”执行了。当前,如果线程并没有处于阻塞状态,是不需要重复唤醒的。流程示意图如下:
execute(runnable,immediate)调用流程示意图.png
通过了解 execute(Runnable, boolean) 方法,我们了解以下几点:

  1. 无论异线程还是同线程调用 execute() 方法,第一步都是将任务添加到任务队列中。这个添加过程是非阻塞的(java.util.Queue#offer)。
  2. 异线程调用时,需要判断线程是否和当前的 EventLoop 绑定(startThread()),这个方法就是 EventLoop 启动的关键,它会通过 for(;;) 不断轮询 I/O 事件并处理它们。
  3. Netty 默认设置一旦添加任务就立即执行,所以会判断当前的 EventLoop 的状态是否需要异线程手动唤醒。

    ② 无限循环处理I/O事件和任务

    这一步是由 startThread() 方法所触发,在此方法实现中,会根据 state 的值判断线程是否已经开启,如果没有开启则调用 doStartThread() 方法执行。需要注意的是,此刻调用这些方法的是异线程。相关源码如下:

    /**
    * 根据{@link #state}状态判断是否开启新的线程
    */
    private void startThread() {
     if (state == ST_NOT_STARTED) {
         // CAS更改state的值
         if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
             boolean success = false;
             try {
                 // 开启线程
                 doStartThread();
                 success = true;
             } finally {
                 if (!success) {
                     STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                 }
             }
         }
     }
    }
    

    实际是由 doStartThread() 完成。

    /**
    * 开启线程并在「for(;;)」不断处理I/O事件和任务
    */
    private void doStartThread() {
     assert thread == null;
     // 由「ThreadPerTaskExecutor」执行器执行任务(新开一个线程执行)
     executor.execute(() -> {
         // #1 保存线程变量(inEventLoop()方法就是通过此变量进行判断)
         thread = Thread.currentThread();
    
         // #2 响应中断
         if (interrupted) {
             thread.interrupt();
         }
    
         boolean success = false;
         // #3 更新「lastExecutionTime」,以便精通地进行静默期检查
         updateLastExecutionTime();
         try {
             // #4 「EventLoop」核心方法,轮询并处理I/O事件和任务
             SingleThreadEventExecutor.this.run();
    
             // #5 for(;;) 循环退出
             success = true;
         } catch (Throwable t) {
             logger.warn("Unexpected exception from an event executor: ", t);
         } finally {
             // #6 通过CAS将「state」状态更新为「ST_SHUTTING_DOWN」
             for (;;) {
                 int oldState = state;
                 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                     break;
                 }
             }
    
             // 如果gracefulShutdownStartTime==0:意味着并没有调用「confirmShutdown()」方法
             if (success && gracefulShutdownStartTime == 0) {
                 if (logger.isErrorEnabled()) {
                     logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                             SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                             "be called before run() implementation terminates.");
                 }
             }
    
             try {
                 // #7 直接「confirmShutdown」返回true才会结束
                 for (;;) {
                     if (confirmShutdown()) {
                         break;
                     }
                 }
    
                 // 现在我们要确保从此刻开始不能添加任何任务。这通过改变状态来实现。任务超过此刻的任务都将会被拒绝
                 // 这一点开始不能再增加任务。这可以通过切换状态来实现。任何超过这一点的新任务都将被拒绝。
                 for (;;) {
                     int oldState = state;
                     if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                             SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                         break;
                     }
                 }
    
                 // 执行剩余所有任务,不需要循环执行「confirmShutdown()」方法。
                 confirmShutdown();
             } finally {
                 try {
                     // #8 子类实现(NioEventLoop会调用Selector.close())方法
                     cleanup();
                 } finally {
                     // #9 移除所有本地线程缓存数据。因为执行器即将关闭并给Future发送相关通知。
                     //    用户可能阻塞在Future,一旦解除阻塞,JVM就可以终止并开始卸载类
                     // See https://github.com/netty/netty/issues/6596.
                     FastThreadLocal.removeAll();
    
                     STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                     // 唤醒等待在「threadLocak」的线程
                     threadLock.countDown();
    
                     int numUserTasks = drainTasks();
                     if (numUserTasks > 0 && logger.isWarnEnabled()) {
                         logger.warn("An event executor terminated with " +
                                 "non-empty task queue (" + numUserTasks + ')');
                     }
                     // #10 向Future发送通知
                     terminationFuture.setSuccess(null);
                 }
             }
         }
     });
    }
    

    doStartThread() 是 EventLoop 执行器开启线程的唯一入口。通过在构造器中传入的 ThreadPerTaskExecutor 每任务每线程执行器创建新线程执行嵌套方法,执行流程示意图如下:
    doStartThread()调用流程示意图.png
    从上面可以看出,SingleThreadEventExecutor.this.run() 是核心方法,它是 Netty 处理 I/O 事件和任务的核心方法。剩下的是与线程相关的状态变更及优雅关闭操作,待下面讲到优雅关闭时详说。这里重点是 SingleThreadEventExecutor.this.run() 方法。对于抽象类 SingleThreadEventExecutor 而言, run() 方法是抽象类,需要子类实现,而我们最常见的实现是 NioEventLoop,相关源码如下:

    /**
    * {@link NioEventLoop} 无限循环处理I/O事件和任务。主要完成以下操作:
    * ① 根据「taskQueue」计算此次for循环的Selector策略
    * ② 执行 select()/select(long)/selectNow()
    * ③ I/O事件的优先级比任务高。因此,无论ioRate比率如何,都要先处理I/O事件。后续根据ioRate分配任务处理时间。
    * ④ 解析Epoll空轮询bug
    * ⑤ 每次for循环都需要判断当前线程的状态。如果当前线程为正在关闭中,先关闭所有的通道连接,再处理任务队列。
    */
    @Override
    protected void run() {
     // select唤醒次数
     int selectCnt = 0;
     for (; ; ) {
         try {
             int strategy;
             try {
                 // 操作一(非阻塞)
                 // ① 如果「taskQueue」队列中有任务,这里会首先调用「Selector#selectNow()」方法获取已准备好的IO事件数量。
                 // ② 如果「taskQueue」队列中没有任务,则返回-1。
                 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                 switch (strategy) {
                     case SelectStrategy.CONTINUE:
                         continue;
                     case SelectStrategy.BUSY_WAIT:
                     case SelectStrategy.SELECT:
                         // #1-2 获取「调度任务队列」最近执行的时间(即未来第一个将要执行的任务的截止时间,纳秒表示)
                         // 只会返回-1或>0的数
                         // 获取定时任务队列截止日期时间
                         long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                         if (curDeadlineNanos == -1L) {
                             // 定时任务队列没有任务,则下次「selector」为「全阻塞」调用
                             curDeadlineNanos = NONE;
                         }
    
                         // #1-3 设置下次selector唤醒时间
                         // ① 定时任务队列中无任务,nextWakeupNanos=NONE
                         // ② 定时任务队列中有任务,nextWakeupNanos=deadlineNanos()
                         nextWakeupNanos.set(curDeadlineNanos);
    
                         try {
                             if (!hasTasks()) {
                                 // 操作二(可能阻塞):根据「curDeadlineNanos」调用相关「select」方法
                                 strategy = select(curDeadlineNanos);
                             }
                         } finally {
                             // 「selector」被唤醒或到时,更新nextWakeupNanos=AWAKE
                             // #1-5 延迟设置「nextWakeupNanos」的值,不能立即可见,可能会在短时间内读到旧值。
                             // 此次更新目的是为了阻塞不必须的唤醒动作,所以使用「lazySet()」是可行的(没有竞争条件)
                             // 关于「lasySet」:
                             // 普通的 valiatile 变量是立即可见,由底层操作系统指令所保证,
                             // 而 lazySet 却是无法保证这一点,所以其他线程在之后的一小段时间内还可以读取旧值
                             // 好处: 提高性能,在多个CPU缓冲之间同步一个内存值是很昂贵的
                             nextWakeupNanos.lazySet(AWAKE);
                         }
                         // fall through
                     default:
                 }
             } catch (IOException e) {
                 // #2 抛出异常,表明此时的Selector处于混乱状态,需要进行重建
                 // https://github.com/netty/netty/issues/8566
                 rebuildSelector0();
                 selectCnt = 0;
                 handleLoopException(e);
                 continue;
             }
    
             // #3 Select循环次数+1
             selectCnt++;
             cancelledKeys = 0;
             needsToSelectAgain = false;
             final int ioRatio = this.ioRatio;
             boolean ranTasks;
    
             // #4 平衡I/O事件和处理任务两者的耗时,默认比率为 50:50,
             if (ioRatio == 100) {
                 try {
                     if (strategy > 0) {
                         // #4-1 首要任务还是先处理I/O事件
                         processSelectedKeys();
                     }
                 } finally {
                     // #4-2 处理所有任务
                     ranTasks = runAllTasks();
                 }
             } else if (strategy > 0) {
                 // #5-1 保存I/O处理起始时间
                 final long ioStartTime = System.nanoTime();
                 try {
                     // #5-2 处理I/O事件
                     processSelectedKeys();
                 } finally {
                     // #5-3 计算本次轮询I/O耗时
                     final long ioTime = System.nanoTime() - ioStartTime;
    
                     // #5-4 根据此次I/O耗时计算分配给任务处理所需的CPU时间
                     // 由于ioRatio默认值为50,所以在一个轮次中,I/O事件处理和任务处理所花费的时间相等。
                     ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                 }
             } else {
                 // #6 本次循环没有I/O事件需要处理,那就处理任务吧
                 // runAllTasks(0)最多处理64个任务
                 ranTasks = runAllTasks(0);
             }
    
             // 本次for循环执行过任务或处理过I/O事件
             if (ranTasks || strategy > 0) {
                 if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                     // 连续提前几次返回
                     logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector);
                 }
                 // 重置「selectCnt」
                 selectCnt = 0;
    
                 // #7 异常唤醒判断(可能出现了 Epoll 空轮询BUG,如果超过一定次数,则需要重建「Selector」对象)
                 // 出现了既没有处理I/O事件,也没有处理任务任务,但是「selector」被唤醒了
             } else if (unexpectedSelectorWakeup(selectCnt)) {
                 selectCnt = 0;
             }
         } catch (CancelledKeyException e) {
             // Harmless exception - log anyway
             if (logger.isDebugEnabled()) {
                 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                         selector, e);
             }
         } catch (Error e) {
             throw (Error) e;
         } catch (Throwable t) {
             handleLoopException(t);
         } finally {
             try {
                 // 每次for循环都需要判断「EventLoop」状态,根据对应状态作出关闭动作
                 // 必须要执行「shundown」操作,即便在循环中有异常抛出
                 // ① 如果当前线程状态为「正在关闭」,则需要进行清理工作
                 // ② 关闭所有通道连接
                 // ③ 再次确认是否成功关闭
                 if (isShuttingDown()) {
                     closeAll();
                     // 确认是否已完全关闭,如果确认失败,则继续for循环
                     // 有可能添加了额外的任务
                     if (confirmShutdown()) {
                         return;
                     }
                 }
             } catch (Error e) {
                 throw (Error) e;
             } catch (Throwable t) {
                 handleLoopException(t);
             }
         }
     }
    }
    

    相关流程示意图如下:
    NioEvent.run.png
    NioEventLoop#run() 流程并不复杂,在这个方法中有以下几点值得读者注意:

  4. Select 策略是怎么执行的?

    1. 会先判断 taskQueue 是否有任务?
      1. 存在任务,直接调用 selectNow() 方法并立即返回,此方法为非阻塞调用,所以可能返回 0。
      2. 不存在任务,则需要决定 select() 操作的阻塞时间
        1. 如果定时任务中有任务,则获取最近任务的截止时期时间作为阻塞时间
        2. 如果定时任务中不存在任务,则直接调用 select() 一直阻塞直到有 I/O 事件到达。
  5. 变量 strategy 有以下两个含义:
    1. strategy>=0 时表示已准备 I/O 事件数量。
    2. strategy<0 时表示 Netty 自定义 Select 策略,详见 SelectStrategy。
  6. I/O 处理时间和任务处理时间如何平衡?
    1. 首先需要明确的一点是 I/O 处理的优先级高于任务处理,即当 I/O 事件和任务都存在的情况下,先处理 I/O 事件,再处理任务。
    2. Netty 使用 ioRatio 平衡 I/O 处理时间和任务处理时间的比率,默认为 50:50。Netty 每次都会记录(当然,当 strategy>0 才有意义)I/O 执行时间,这也是本轮任务的执行耗时。
  7. Netty 通过重建 Selector 方式解决 Epoll 空轮询 Bug 问题。当 selectCnt >=512 时意味着出现 Epoll 空轮询 bug。此 Bug 一直存在,本质是 JDK 对 Linux 的 Epoll 封装时没有考虑到以下异常情况:
    1. 当一个Socket的文件描述符被轮询,请求事件掩码为0,如果连接被突然终止(RST),那么原本阻塞的方法会被唤醒(EPOLL),并在返回的事件集中设置了 POLLHUP(可能还有 POLLERR )。但是JDK封装的上层并没有对应事件定义,但 SocketChannel 的兴趣集为0,这也意味着没有被选择的事件,方法返回0。而通常 select()方法是写在一个 while(true) 循环中的,这样就会导致应用程序一直不停地执行 select(),也就是导致 CPU 空转,CPU 使用率最终会飙升到 100%。
  8. 每次 for 循环都会检查 EventLoop 的状态,如果当前状态为正在关闭中,则会调用 closeAll() 方法关闭所有已连接的通道。
  9. 此方法中有一处是阻塞调用,即 select(),当我们向 NioEventLoop 添加任务时,如果不唤醒阻塞在此方法上的线程,我们也就不能立即执行任务。所以 Netty 默认是每当调用 execute(Runnable) 方法就会唤醒执行器线程(如果已唤醒,就跳过)。这样,也就能立即处理已添加的任务了。

    I/O事件处理

    Netty NIO 的事件处理流程与实际上与普通的使用 JDK NIO 编程无异,通过从 SelectionKey 对象获取所绑定的通道的 I/O 状态,根据此状态做进一步处理。核心方法 NioEventLoop#processSelectedKey(SelectionKey, AbstractNioChannel)

    processSelectedKey

    processSelectedKey 源码如下:

    /**
    * Netty 处理 I/O 事件。最终委托 {@link AbstractNioChannel.NioUnsafe} 实例对象完成对JDK原生通道的数据读/写操作。
    *
    * @param k    {@link Selector#selectedKeys()},目的是用来判断当前通道感兴趣的事件
    * @param ch   Netty封装的{@link AbstractNioChannel},内部包含了大量的信息
    */
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
     final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
     // #1 校验 key 是否合法
     if (!k.isValid()) {
         // 非法的「SelectionKey」需要被关闭
         final EventLoop eventLoop;
         try {
             eventLoop = ch.eventLoop();
         } catch (Throwable ignored) {
             // 如果 EventLoop 为空会抛出异常,这里会忽略这个异常。因为我们只是想确定 ch 是否注册到这个事件循环中,因此有权力关闭 ch
             return;
         }
    
         // 只有当 ch 仍然注册到这个 EventLoop 时才关闭 ch。Ch 可以从事件循环中取消注册,
         // 因此 SelectionKey 可以作为注销过程的一部分取消,但是通道仍然是健康的,不应该被关闭。
         // See https://github.com/netty/netty/issues/5125
         if (eventLoop == this) {
             // 非法 key,直接关闭
             unsafe.close(unsafe.voidPromise());
         }
         return;
     }
    
     // 合法的「SelectionKey」
     try {
         // #2 获取当前通道已准备好的事件
         int readyOps = k.readyOps();
    
         // #2-1 OP_CONNECT事件(客户端才拥有这种类型类型)
         if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
             int ops = k.interestOps();
             // 移除「OP_CONNECT」事件
             ops &= ~SelectionKey.OP_CONNECT;
             k.interestOps(ops);
    
             // 连接成功,触发相关事件回调(Promise)
             unsafe.finishConnect();
         }
    
         // #2-2「OP_WRITE」事件(Both)
         // 表示可向通道写入数据
         // 先处理 WRITE 事件以便可以先释放内存
         if ((readyOps & SelectionKey.OP_WRITE) != 0) {
             // 将数据冲刷到客户端,最后会调用 JavaChannel#write() 方法执行底层操作
             ch.unsafe().forceFlush();
         }
    
         // #2-3 「OP_READ」事件(Both)
         //      「OP_ACCEPT」事件(服务端)
         // Netty 将 READ 和 ACCEPT 统一封装,都通过 unsafe.read() 进行处理
         if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
             /**
              * ① 从 Channel 中读取数据并存储到分配的 ByteBuf
              * ② 调用 pipeline#fireChannelRead() 方法产生 Inbound 事件
              * ③ 依次调用 ChannelHandler#channelRead() 方法处理数据
              * ④ 调用 pipeline#fireChannelReadComplete() 方法完成读操作
              * ⑤ 执行 removeReadOp() 清除 OP_READ 事件
              */
             unsafe.read();
         }
     } catch (CancelledKeyException ignored) {
         unsafe.close(unsafe.voidPromise());
     }
    }
    

    整体处理流程如下图所示:
    processSelectKey() 调用流程示意图.png
    从上图可以明显看出,对底层通道的读取、写入等操作都是委托 Unsafe 类完成。它是 Netty 提供的仅供内部使用的操作底层 Socket 的一系列 API。关于 Unsafe 详见
    方法 processSelectedKey 里面有我们最熟悉的对 Java NIO 编程相关的逻辑框架:根据 SelectionKey 相关的感兴趣事件执行对应的方法。