接下来,我们来分析 Netty 中的线程池。Netty 中的线程池比较不好理解,因为它的类比较多,而且它们之间的关系错综复杂。看下图,感受下 NioEventLoop 类和 NioEventLoopGroup 类的继承结构:

image.png

这张图我按照继承关系整理而来,大家仔细看一下就会发现,涉及到的类确实挺多的。本节来给大家理理清楚这部分内容。

首先,我们说的 Netty 的线程池,指的就是 **NioEventLoopGroup** 的实例;线程池中的单个线程,指的是右边 **NioEventLoop** 的实例。

回顾下我们第一节介绍的 Echo 例子,客户端和服务端的启动代码中,最开始我们总是先实例化 NioEventLoopGroup

  1. // EchoClient 代码最开始:
  2. EventLoopGroup group = new NioEventLoopGroup();
  3. // EchoServer 代码最开始:
  4. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  5. EventLoopGroup workerGroup = new NioEventLoopGroup();

下面,我们就从 NioEventLoopGroup 的源码开始进行分析。

NioEventLoopGroup 的创建

我们打开 NioEventLoopGroup 的源码,可以看到,NioEventLoopGroup 有多个构造方法用于参数设置,最简单地,我们采用无参构造函数,或仅仅设置线程数量就可以了,其他的参数采用默认值。

比如上面的代码中,我们只在实例化 bossGroup 的时候指定了参数,代表该线程池需要一个线程。

  1. public NioEventLoopGroup() {
  2. this(0);
  3. }
  4. public NioEventLoopGroup(int nThreads) {
  5. this(nThreads, (Executor) null);
  6. }
  7. ...
  8. // 参数最全的构造方法
  9. public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
  10. final SelectorProvider selectorProvider,
  11. final SelectStrategyFactory selectStrategyFactory,
  12. final RejectedExecutionHandler rejectedExecutionHandler) {
  13. // 调用父类的构造方法
  14. super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
  15. }

我们来稍微看一下构造方法中的各个参数:

  • nThreads:这个最简单,就是线程池中的线程数,也就是 NioEventLoop 的实例数量。
  • executor:我们知道,我们本身就是要构造一个线程池(Executor),为什么这里传一个 executor 实例呢?它其实不是给线程池用的,而是给 NioEventLoop 用的,以后再说。
  • chooserFactory:当我们提交一个任务到线程池的时候,线程池需要选择(choose)其中的一个线程来执行这个任务,这个就是用来实现选择策略的。
  • selectorProvider:这个简单,我们需要通过它来实例化 JDK 的 Selector,可以看到每个线程池都持有一个 selectorProvider 实例。
  • selectStrategyFactory:这个涉及到的是线程池中线程的工作流程,在介绍 NioEventLoop 的时候会说。
  • rejectedExecutionHandler:这个也是线程池的好朋友了,用于处理线程池中没有可用的线程来执行任务的情况。在 Netty 中稍微有一点点不一样,这个是给 NioEventLoop 实例用的,以后我们再详细介绍。

这里介绍这些参数是希望大家有个印象而已,大家发现没有,在构造 NioEventLoopGroup 实例时的好几个参数,都是用来构造 NioEventLoop 用的。

下面,我们从 NioEventLoopGroup 的无参构造方法开始,跟着源码走:

  1. public NioEventLoopGroup() {
  2. this(0);
  3. }

然后一步步走下去,到这个构造方法:

  1. public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
  2. super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
  3. }

大家自己要去跟一下源码,这样才知道中间设置了哪些默认值,下面这几个参数都被设置了默认值:

  • selectorProvider = SelectorProvider.provider()

    这个没什么好说的,调用了 JDK 提供的方法

  • selectStrategyFactory = DefaultSelectStrategyFactory.INSTANCE

    这个涉及到的是线程在做 select 操作和执行任务过程中的策略选择问题,在介绍 NioEventLoop 的时候会用到。

  • rejectedExecutionHandler = RejectedExecutionHandlers.reject()

    大家进去看一下 reject() 方法,也就是说,Netty 选择的默认拒绝策略是:抛出异常

跟着源码走,我们会来到父类 MultithreadEventLoopGroup 的构造方法中:

  1. protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
  2. super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
  3. }

这里我们发现,如果采用无参构造函数,那么到这里的时候,默认地 nThreads 会被设置为 CPU 核心数 *2。大家可以看下 DEFAULT_EVENT_LOOP_THREADS 的默认值,以及 static 代码块的设值逻辑。

我们继续往下走:

  1. protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object...args) {
  2. this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
  3. }

到这一步的时候,new ThreadPerTaskExecutor(threadFactory) 会构造一个 executor

我们现在还不知道这个 executor 怎么用。这里我们先看下它的源码:

Executor 作为线程池的最顶层接口, 我们知道,它只有一个 execute(runnable) 方法,从上面我们可以看到,实现类 ThreadPerTaskExecutor 的逻辑就是每来一个任务,新建一个线程

我们先记住这个,前面也说了,它是给 NioEventLoop 用的,不是给 NioEventLoopGroup 用的。

  1. public final class ThreadPerTaskExecutor implements Executor {
  2. private final ThreadFactory threadFactory;
  3. public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
  4. if (threadFactory == null) {
  5. throw new NullPointerException("threadFactory");
  6. }
  7. this.threadFactory = threadFactory;
  8. }
  9. @Override
  10. public void execute(Runnable command) {
  11. // 为每个任务新建一个线程
  12. threadFactory.newThread(command).start();
  13. }
  14. }

上一步设置完了 executor,我们继续往下看:

  1. protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object...args) {
  2. this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
  3. }

这一步设置了 chooserFactory,用来实现从线程池中选择一个线程的选择策略。

ChooserFactory 的逻辑比较简单,我们看下 DefaultEventExecutorChooserFactory 的实现:

这里设置的策略也很简单:

1、如果线程池的线程数量是 2^n,采用下面的方式会高效一些:

2、如果不是,用取模的方式:

  1. @Override
  2. public EventExecutorChooser newChooser(EventExecutor[] executors) {
  3. if (isPowerOfTwo(executors.length)) {
  4. return new PowerOfTwoEventExecutorChooser(executors);
  5. } else {
  6. return new GenericEventExecutorChooser(executors);
  7. }
  8. }
  1. @Override
  2. public EventExecutor next() {
  3. return executors[idx.getAndIncrement() & executors.length - 1];
  4. }
  1. @Override
  2. public EventExecutor next() {
  3. return executors[Math.abs(idx.getAndIncrement() % executors.length)];
  4. }

走了这么久,我们终于到了一个干实事的构造方法中了。

io.netty.util.concurrent.MultithreadEventExecutorGroup

  1. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
  2. EventExecutorChooserFactory chooserFactory, Object... args) {
  3. if (nThreads <= 0) {
  4. throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
  5. }
  6. // executor 如果是 null,做一次和前面一样的默认设置。
  7. if (executor == null) {
  8. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  9. }
  10. // 这里的 children 数组非常重要,它就是线程池中的线程数组,这么说不太严谨,但是就大概这个意思
  11. children = new EventExecutor[nThreads];
  12. // 下面这个 for 循环将实例化 children 数组中的每一个元素
  13. for (int i = 0; i < nThreads; i ++) {
  14. boolean success = false;
  15. try {
  16. // 实例化!!!!!!
  17. children[i] = newChild(executor, args);
  18. success = true;
  19. } catch (Exception e) {
  20. // TODO: Think about if this is a good exception type
  21. throw new IllegalStateException("failed to create a child event loop", e);
  22. } finally {
  23. // 如果有一个 child 实例化失败,那么 success 就会为 false,然后进入下面的失败处理逻辑
  24. if (!success) {
  25. // 把已经成功实例化的“线程” shutdown,shutdown 是异步操作
  26. for (int j = 0; j < i; j ++) {
  27. children[j].shutdownGracefully();
  28. }
  29. // 等待这些线程成功 shutdown
  30. for (int j = 0; j < i; j ++) {
  31. EventExecutor e = children[j];
  32. try {
  33. while (!e.isTerminated()) {
  34. e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
  35. }
  36. } catch (InterruptedException interrupted) {
  37. // 把中断状态设置回去,交给关心的线程来处理.
  38. Thread.currentThread().interrupt();
  39. break;
  40. }
  41. }
  42. }
  43. }
  44. }
  45. // ================================================
  46. // === 到这里,就是代表上面的实例化所有线程已经成功结束 ===
  47. // ================================================
  48. // 通过之前设置的 chooserFactory 来实例化 Chooser,把线程池数组传进去,
  49. // 这就不必再说了吧,实现线程选择策略
  50. chooser = chooserFactory.newChooser(children);
  51. // 设置一个 Listener 用来监听该线程池的 termination 事件
  52. // 下面的代码逻辑是:给池中每一个线程都设置这个 listener,当监听到所有线程都 terminate 以后,这个线程池就算真正的 terminate 了。
  53. final FutureListener<Object> terminationListener = new FutureListener<Object>() {
  54. @Override
  55. public void operationComplete(Future<Object> future) throws Exception {
  56. if (terminatedChildren.incrementAndGet() == children.length) {
  57. terminationFuture.setSuccess(null);
  58. }
  59. }
  60. };
  61. for (EventExecutor e: children) {
  62. e.terminationFuture().addListener(terminationListener);
  63. }
  64. // 设置 readonlyChildren,它是只读集合,以后用到再说
  65. Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
  66. Collections.addAll(childrenSet, children);
  67. readonlyChildren = Collections.unmodifiableSet(childrenSet);
  68. }

上面的代码非常简单吧,没有什么需要特别说的,接下来,我们来看看 newChild() 这个方法,这个方法非常重要,它将创建线程池中的线程。

我上面已经用过很多次”线程”这个词了,它可不是 Thread 的意思,而是指池中的个体,后面我们会看到每个”线程”在什么时候会真正创建 Thread 实例。反正每个 NioEventLoop 实例内部都会有一个自己的 Thread 实例,所以把这两个概念混在一起也无所谓吧。

newChild(…) 方法在 NioEventLoopGroup 中覆写了,上面说的”线程”其实就是 NioEventLoop

  1. @Override
  2. protected EventLoop newChild(Executor executor, Object... args) throws Exception {
  3. return new NioEventLoop(this, executor, (SelectorProvider) args[0],
  4. ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
  5. }

它调用了 NioEventLoop 的构造方法:

  1. NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
  2. SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
  3. // 调用父类构造器
  4. super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
  5. if (selectorProvider == null) {
  6. throw new NullPointerException("selectorProvider");
  7. }
  8. if (strategy == null) {
  9. throw new NullPointerException("selectStrategy");
  10. }
  11. provider = selectorProvider;
  12. // 开启 NIO 中最重要的组件:Selector
  13. final SelectorTuple selectorTuple = openSelector();
  14. selector = selectorTuple.selector;
  15. unwrappedSelector = selectorTuple.unwrappedSelector;
  16. selectStrategy = strategy;
  17. }

我们先粗略观察一下,然后再往下看:

  • 在 Netty 中,NioEventLoopGroup 代表线程池,NioEventLoop 就是其中的线程。
  • 线程池 NioEventLoopGroup 是池中的线程 NioEventLoopparent,从上面的代码中的取名可以看出。
  • 每个 NioEventLoop 都有自己的 Selector,上面的代码也反应了这一点,这和 Tomcat 中的 NIO 模型有点区别。
  • executorselectStrategyrejectedExecutionHandlerNioEventLoopGroup 中一路传到了 NioEventLoop 中。

这个时候,我们来看一下 NioEventLoop 类的属性都有哪些,我们先忽略它继承自父类的属性,单单看它自己的:

  1. private Selector selector;
  2. private Selector unwrappedSelector;
  3. private SelectedSelectionKeySet selectedKeys;
  4. private final SelectorProvider provider;
  5. private final AtomicBoolean wakenUp = new AtomicBoolean();
  6. private final SelectStrategy selectStrategy;
  7. private volatile int ioRatio = 50;
  8. private int cancelledKeys;
  9. private boolean needsToSelectAgain;

结合它的构造方法我们来总结一下:

  • provider:它由 NioEventLoopGroup 传进来,前面我们说了一个线程池有一个 selectorProvider,用于创建 Selector 实例
  • selector:虽然我们还没看创建 selector 的代码,但我们已经知道,在 Netty 中 Selector 是跟着线程池中的线程走的。也就是说,并非一个线程池一个 Selector 实例,而是线程池中每一个线程都有一个 Selector 实例。

    在无参构造过程中,我们发现,Netty 设置线程个数是 CPU 核心数的两倍,假设我们的机器 CPU 是 4 核,那么对应的就会有 8 个 Selector 实例。

  • selectStrategyselect 操作的策略,这个不急。

  • ioRatio:这是 IO 任务的执行时间比例,因为每个线程既有 IO 任务执行,也有非 IO 任务需要执行,所以该参数为了保证有足够时间是给 IO 的。这里也不需要急着去理解什么 IO 任务、什么非 IO 任务。

然后我们继续走它的构造方法,我们看到上面的构造方法调用了父类的构造器,它的父类是 SingleThreadEventLoop

io.netty.channel.SingleThreadEventLoop :

  1. protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
  2. SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
  3. protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
  4. boolean addTaskWakesUp, int maxPendingTasks,
  5. RejectedExecutionHandler rejectedExecutionHandler) {
  6. super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
  7. tailTasks = newTaskQueue(maxPendingTasks);
  8. }

SingleThreadEventLoop 这个名字很诡异有没有?然后它的构造方法又调用了父类 SingleThreadEventExecutor 的构造方法。

io.netty.util.concurrent.SingleThreadEventExecutor

  1. protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
  2. boolean addTaskWakesUp, int maxPendingTasks,
  3. RejectedExecutionHandler rejectedHandler) {
  4. super(parent);
  5. this.addTaskWakesUp = addTaskWakesUp;
  6. this.maxPendingTasks = Math.max(16, maxPendingTasks);
  7. this.executor = ObjectUtil.checkNotNull(executor, "executor");
  8. // taskQueue,这个东西很重要,提交给 NioEventLoop 的任务都会进入到这个 taskQueue 中等待被执行
  9. // 这个 queue 的默认容量是 16
  10. taskQueue = newTaskQueue(this.maxPendingTasks);
  11. rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
  12. }
  13. protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
  14. return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
  15. }

到这里就更加诡异了,NioEventLoop 的父类是 SingleThreadEventLoop,而 SingleThreadEventLoop 的父类是 **SingleThreadEventExecutor**,它的名字告诉我们,它是一个 Executor,是一个线程池,而且是 Single Thread 单线程的。

也就是说,线程池 NioEventLoopGroup 中的每一个线程 NioEventLoop 也可以当做一个线程池来用,只不过它只有一个线程。这种设计虽然看上去很巧妙,不过有点反人类的样子。

上面这个构造函数比较简单:

  • 设置了 parent,也就是之前创建的线程池 NioEventLoopGroup 实例
  • executor:它是我们之前实例化的 ThreadPerTaskExecutor,我们说过,这个东西在线程池中没有用,它是给 NioEventLoop 用的,马上我们就要看到它了。提前透露一下,它用来开启 NioEventLoop 中的线程(Thread 实例)。
  • taskQueue:这算是该构造方法中新的东西,它是任务队列。我们前面说过,NioEventLoop 需要负责 IO 事件和非 IO 事件,通常它都在执行 selectorselect 方法或者正在处理 selectedKeys,如果我们要 submit 一个任务给它,任务就会被放到 taskQueue 中,等它来轮询。
  • rejectedExecutionHandlertaskQueue 的默认容量是 16,所以,如果 submit 的任务堆积了到了 16,再往里面提交任务会触发 rejectedExecutionHandler 的执行策略。

    还记得默认策略吗:抛出 RejectedExecutionException 异常。

    NioEventLoopGroup 的默认构造中,它的实现是这样的:

  1. private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
  2. @Override
  3. public void rejected(Runnable task, SingleThreadEventExecutor executor) {
  4. throw new RejectedExecutionException();
  5. }
  6. };

然后,我们再回到 NioEventLoop 的构造方法:

  1. NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
  2. SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
  3. // 我们刚刚说完了这个
  4. super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
  5. if (selectorProvider == null) {
  6. throw new NullPointerException("selectorProvider");
  7. }
  8. if (strategy == null) {
  9. throw new NullPointerException("selectStrategy");
  10. }
  11. provider = selectorProvider;
  12. // 创建 selector 实例
  13. final SelectorTuple selectorTuple = openSelector();
  14. selector = selectorTuple.selector;
  15. unwrappedSelector = selectorTuple.unwrappedSelector;
  16. selectStrategy = strategy;
  17. }

可以看到,最重要的方法其实就是 openSelector() 方法,它将创建 NIO 中最重要的一个组件 **Selector**。在这个方法中,Netty 也做了一些优化,这部分我们就不去分析它了。

到这里,我们的线程池 NioEventLoopGroup 创建完成了,并且实例化了池中的所有 NioEventLoop 实例。

同时,大家应该已经看到,上面并没有真正创建 NioEventLoop 中的线程(没有创建 Thread 实例)。

提前透露一下,创建线程的时机在第一个任务提交过来的时候,那么第一个任务是什么呢?就是我们前面说的 channel**register** 操作。

NioEventLoop 的工作流程

前面,我们在分析线程池的实例化的时候说过,NioEventLoop 中并没有启动 Java 线程。这里我们来仔细分析下在 register 过程中调用的 **eventLoop.execute(runnable)** 这个方法,这个代码在父类 SingleThreadEventExecutor 中。

io.netty.util.concurrent.SingleThreadEventExecutor#execute

  1. @Override
  2. public void execute(Runnable task) {
  3. if (task == null) {
  4. throw new NullPointerException("task");
  5. }
  6. // 判断添加任务的线程是否就是当前 EventLoop 中的线程
  7. boolean inEventLoop = inEventLoop();
  8. // 添加任务到之前介绍的 taskQueue 中,
  9. // 如果 taskQueue 满了(默认大小 16),根据我们之前说的,默认的策略是抛出异常
  10. addTask(task);
  11. if (!inEventLoop) {
  12. // 如果不是 NioEventLoop 内部线程提交的 task,那么判断下线程是否已经启动,没有的话,就启动线程
  13. startThread();
  14. if (isShutdown() && removeTask(task)) {
  15. reject();
  16. }
  17. }
  18. if (!addTaskWakesUp && wakesUpForTask(task)) {
  19. wakeup(inEventLoop);
  20. }
  21. }

原来启动 NioEventLoop 中的线程的方法在这里。

另外,上节我们说的 register 操作进到了 taskQueue 中,所以它其实是被归类到了非 IO 操作的范畴。

下面是 startThread 的源码,判断线程是否已经启动来决定是否要进行启动操作。

io.netty.util.concurrent.SingleThreadEventExecutor#startThread

  1. private void startThread() {
  2. if (state == ST_NOT_STARTED) {
  3. if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
  4. try {
  5. doStartThread();
  6. } catch (Throwable cause) {
  7. STATE_UPDATER.set(this, ST_NOT_STARTED);
  8. PlatformDependent.throwException(cause);
  9. }
  10. }
  11. }
  12. }

我们按照前面的思路,根据线程没有启动的情况,来看看 doStartThread() 方法:

io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread

  1. private void doStartThread() {
  2. assert thread == null;
  3. // 这里的 executor 大家是不是有点熟悉的感觉,它就是一开始我们实例化 NioEventLoop 的时候传进来的 ThreadPerTaskExecutor 的实例。它是每次来一个任务,创建一个线程的那种 executor。
  4. // 一旦我们调用它的 execute 方法,它就会创建一个新的线程,所以这里终于会创建 Thread 实例
  5. executor.execute(new Runnable() {
  6. @Override
  7. public void run() {
  8. // 看这里,将 “executor” 中创建的这个线程设置为 NioEventLoop 的线程!!!
  9. thread = Thread.currentThread();
  10. if (interrupted) {
  11. thread.interrupt();
  12. }
  13. boolean success = false;
  14. updateLastExecutionTime();
  15. try {
  16. // 执行 SingleThreadEventExecutor 的 run() 方法,它在 NioEventLoop 中实现了
  17. SingleThreadEventExecutor.this.run();
  18. success = true;
  19. } catch (Throwable t) {
  20. logger.warn("Unexpected exception from an event executor: ", t);
  21. } finally {
  22. // ... 我们直接忽略掉这里的代码
  23. }
  24. }
  25. });
  26. }

上面线程启动以后,会执行 NioEventLoop 中的 run() 方法,这是一个非常重要的方法,这个方法肯定是没那么容易结束的,必然是像 JDK 线程池的 Worker 那样,不断地循环获取新的任务的。它需要不断地做 select 操作和轮询 taskQueue 这个队列。

我们先来简单地看一下它的源码,这里先不做深入地介绍。

io.netty.channel.nio.NioEventLoop#run

  1. @Override
  2. protected void run() {
  3. // 代码嵌套在 for 循环中
  4. for (;;) {
  5. try {
  6. // selectStrategy 终于要派上用场了
  7. // 它有两个值,一个是 CONTINUE 一个是 SELECT
  8. // 针对这块代码,我们分析一下。
  9. // 1. 如果 taskQueue 不为空,也就是 hasTasks() 返回 true,
  10. // 那么执行一次 selectNow(),该方法不会阻塞
  11. // 2. 如果 hasTasks() 返回 false,那么执行 SelectStrategy.SELECT 分支,
  12. // 进行 select(...),这块是带阻塞的
  13. // 这个很好理解,就是按照是否有任务在排队来决定是否可以进行阻塞
  14. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
  15. case SelectStrategy.CONTINUE:
  16. continue;
  17. case SelectStrategy.SELECT:
  18. // 如果 !hasTasks(),那么进到这个 select 分支,这里 select 带阻塞的
  19. select(wakenUp.getAndSet(false));
  20. if (wakenUp.get()) {
  21. selector.wakeup();
  22. }
  23. default:
  24. }
  25. cancelledKeys = 0;
  26. needsToSelectAgain = false;
  27. // 默认地,ioRatio 的值是 50
  28. final int ioRatio = this.ioRatio;
  29. if (ioRatio == 100) {
  30. // 如果 ioRatio 设置为 100,那么先执行 IO 操作,然后在 finally 块中执行 taskQueue 中的任务
  31. try {
  32. // 1. 执行 IO 操作。因为前面 select 以后,可能有些 channel 是需要处理的。
  33. processSelectedKeys();
  34. } finally {
  35. // 2. 执行非 IO 任务,也就是 taskQueue 中的任务
  36. runAllTasks();
  37. }
  38. } else {
  39. // 如果 ioRatio 不是 100,那么根据 IO 操作耗时,限制非 IO 操作耗时
  40. final long ioStartTime = System.nanoTime();
  41. try {
  42. // 执行 IO 操作
  43. processSelectedKeys();
  44. } finally {
  45. // 根据 IO 操作消耗的时间,计算执行非 IO 操作(runAllTasks)可以用多少时间.
  46. final long ioTime = System.nanoTime() - ioStartTime;
  47. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  48. }
  49. }
  50. } catch (Throwable t) {
  51. handleLoopException(t);
  52. }
  53. // Always handle shutdown even if the loop processing threw an exception.
  54. try {
  55. if (isShuttingDown()) {
  56. closeAll();
  57. if (confirmShutdown()) {
  58. return;
  59. }
  60. }
  61. } catch (Throwable t) {
  62. handleLoopException(t);
  63. }
  64. }
  65. }

上面这段代码是 NioEventLoop 的核心,这里介绍两点:

  1. 首先,会根据 hasTasks() 的结果来决定是执行 selectNow() 还是 select(oldWakenUp),这个应该好理解。如果有任务正在等待,那么应该使用无阻塞的 selectNow(),如果没有任务在等待,那么就可以使用带阻塞的 select 操作。
  2. ioRatio 控制 IO 操作所占的时间比重:
    • 如果设置为 100%,那么先执行 IO 操作,然后再执行任务队列中的任务。
    • 如果不是 100%,那么先执行 IO 操作,然后执行 taskQueue 中的任务,但是需要控制执行任务的总时间。也就是说,非 IO 操作可以占用的时间,通过 ioRatio 以及这次 IO 操作耗时计算得出。

我们这里先不要去关心 select(oldWakenUp)processSelectedKeys() 方法和 runAllTasks(…) 方法的细节,只要先理解它们分别做什么事情就可以了。

回过神来,我们前面在 register 的时候提交了 register 任务给 NioEventLoop,这是 NioEventLoop 接收到的第一个任务,所以这里会实例化 Thread 并且启动,然后进入到 NioEventLoop 中的 run 方法。

当然了,实际情况可能是,Channel 实例被 register 到一个已经启动线程的 NioEventLoop 实例中。