接下来,我们来分析 Netty 中的线程池。Netty 中的线程池比较不好理解,因为它的类比较多,而且它们之间的关系错综复杂。看下图,感受下 NioEventLoop 类和 NioEventLoopGroup 类的继承结构:

这张图我按照继承关系整理而来,大家仔细看一下就会发现,涉及到的类确实挺多的。本节来给大家理理清楚这部分内容。
首先,我们说的 Netty 的线程池,指的就是 **NioEventLoopGroup** 的实例;线程池中的单个线程,指的是右边 **NioEventLoop** 的实例。
回顾下我们第一节介绍的 Echo 例子,客户端和服务端的启动代码中,最开始我们总是先实例化 NioEventLoopGroup:
// EchoClient 代码最开始:EventLoopGroup group = new NioEventLoopGroup();// EchoServer 代码最开始:EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();
下面,我们就从 NioEventLoopGroup 的源码开始进行分析。
NioEventLoopGroup 的创建
我们打开 NioEventLoopGroup 的源码,可以看到,NioEventLoopGroup 有多个构造方法用于参数设置,最简单地,我们采用无参构造函数,或仅仅设置线程数量就可以了,其他的参数采用默认值。
比如上面的代码中,我们只在实例化
bossGroup的时候指定了参数,代表该线程池需要一个线程。
public NioEventLoopGroup() {this(0);}public NioEventLoopGroup(int nThreads) {this(nThreads, (Executor) null);}...// 参数最全的构造方法public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory,final RejectedExecutionHandler rejectedExecutionHandler) {// 调用父类的构造方法super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);}
我们来稍微看一下构造方法中的各个参数:
nThreads:这个最简单,就是线程池中的线程数,也就是NioEventLoop的实例数量。executor:我们知道,我们本身就是要构造一个线程池(Executor),为什么这里传一个executor实例呢?它其实不是给线程池用的,而是给NioEventLoop用的,以后再说。chooserFactory:当我们提交一个任务到线程池的时候,线程池需要选择(choose)其中的一个线程来执行这个任务,这个就是用来实现选择策略的。selectorProvider:这个简单,我们需要通过它来实例化 JDK 的Selector,可以看到每个线程池都持有一个selectorProvider实例。selectStrategyFactory:这个涉及到的是线程池中线程的工作流程,在介绍 NioEventLoop 的时候会说。rejectedExecutionHandler:这个也是线程池的好朋友了,用于处理线程池中没有可用的线程来执行任务的情况。在 Netty 中稍微有一点点不一样,这个是给NioEventLoop实例用的,以后我们再详细介绍。
这里介绍这些参数是希望大家有个印象而已,大家发现没有,在构造 NioEventLoopGroup 实例时的好几个参数,都是用来构造 NioEventLoop 用的。
下面,我们从 NioEventLoopGroup 的无参构造方法开始,跟着源码走:
public NioEventLoopGroup() {this(0);}
然后一步步走下去,到这个构造方法:
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());}
大家自己要去跟一下源码,这样才知道中间设置了哪些默认值,下面这几个参数都被设置了默认值:
selectorProvider = SelectorProvider.provider()这个没什么好说的,调用了 JDK 提供的方法
selectStrategyFactory = DefaultSelectStrategyFactory.INSTANCE这个涉及到的是线程在做 select 操作和执行任务过程中的策略选择问题,在介绍 NioEventLoop 的时候会用到。
rejectedExecutionHandler = RejectedExecutionHandlers.reject()大家进去看一下 reject() 方法,也就是说,Netty 选择的默认拒绝策略是:抛出异常
跟着源码走,我们会来到父类 MultithreadEventLoopGroup 的构造方法中:
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);}
这里我们发现,如果采用无参构造函数,那么到这里的时候,默认地 nThreads 会被设置为 CPU 核心数 *2。大家可以看下 DEFAULT_EVENT_LOOP_THREADS 的默认值,以及 static 代码块的设值逻辑。
我们继续往下走:
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object...args) {this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);}
到这一步的时候,new ThreadPerTaskExecutor(threadFactory) 会构造一个 executor。
我们现在还不知道这个
executor怎么用。这里我们先看下它的源码:
Executor作为线程池的最顶层接口, 我们知道,它只有一个execute(runnable)方法,从上面我们可以看到,实现类ThreadPerTaskExecutor的逻辑就是每来一个任务,新建一个线程。我们先记住这个,前面也说了,它是给
NioEventLoop用的,不是给NioEventLoopGroup用的。
public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}this.threadFactory = threadFactory;}@Overridepublic void execute(Runnable command) {// 为每个任务新建一个线程threadFactory.newThread(command).start();}}
上一步设置完了 executor,我们继续往下看:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object...args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);}
这一步设置了 chooserFactory,用来实现从线程池中选择一个线程的选择策略。
ChooserFactory的逻辑比较简单,我们看下DefaultEventExecutorChooserFactory的实现:这里设置的策略也很简单:
1、如果线程池的线程数量是
2^n,采用下面的方式会高效一些:2、如果不是,用取模的方式:
@Overridepublic EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTwoEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}}
@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}
@Overridepublic EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}
走了这么久,我们终于到了一个干实事的构造方法中了。
io.netty.util.concurrent.MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}// executor 如果是 null,做一次和前面一样的默认设置。if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}// 这里的 children 数组非常重要,它就是线程池中的线程数组,这么说不太严谨,但是就大概这个意思children = new EventExecutor[nThreads];// 下面这个 for 循环将实例化 children 数组中的每一个元素for (int i = 0; i < nThreads; i ++) {boolean success = false;try {// 实例化!!!!!!children[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {// 如果有一个 child 实例化失败,那么 success 就会为 false,然后进入下面的失败处理逻辑if (!success) {// 把已经成功实例化的“线程” shutdown,shutdown 是异步操作for (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}// 等待这些线程成功 shutdownfor (int j = 0; j < i; j ++) {EventExecutor e = children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {// 把中断状态设置回去,交给关心的线程来处理.Thread.currentThread().interrupt();break;}}}}}// ================================================// === 到这里,就是代表上面的实例化所有线程已经成功结束 ===// ================================================// 通过之前设置的 chooserFactory 来实例化 Chooser,把线程池数组传进去,// 这就不必再说了吧,实现线程选择策略chooser = chooserFactory.newChooser(children);// 设置一个 Listener 用来监听该线程池的 termination 事件// 下面的代码逻辑是:给池中每一个线程都设置这个 listener,当监听到所有线程都 terminate 以后,这个线程池就算真正的 terminate 了。final FutureListener<Object> terminationListener = new FutureListener<Object>() {@Overridepublic void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);}}};for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}// 设置 readonlyChildren,它是只读集合,以后用到再说Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);}
上面的代码非常简单吧,没有什么需要特别说的,接下来,我们来看看 newChild() 这个方法,这个方法非常重要,它将创建线程池中的线程。
我上面已经用过很多次”线程”这个词了,它可不是
Thread的意思,而是指池中的个体,后面我们会看到每个”线程”在什么时候会真正创建Thread实例。反正每个 NioEventLoop 实例内部都会有一个自己的Thread实例,所以把这两个概念混在一起也无所谓吧。
newChild(…) 方法在 NioEventLoopGroup 中覆写了,上面说的”线程”其实就是 NioEventLoop:
@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);}
它调用了 NioEventLoop 的构造方法:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {// 调用父类构造器super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);if (selectorProvider == null) {throw new NullPointerException("selectorProvider");}if (strategy == null) {throw new NullPointerException("selectStrategy");}provider = selectorProvider;// 开启 NIO 中最重要的组件:Selectorfinal SelectorTuple selectorTuple = openSelector();selector = selectorTuple.selector;unwrappedSelector = selectorTuple.unwrappedSelector;selectStrategy = strategy;}
我们先粗略观察一下,然后再往下看:
- 在 Netty 中,
NioEventLoopGroup代表线程池,NioEventLoop就是其中的线程。 - 线程池 
NioEventLoopGroup是池中的线程NioEventLoop的 parent,从上面的代码中的取名可以看出。 - 每个 
NioEventLoop都有自己的Selector,上面的代码也反应了这一点,这和 Tomcat 中的 NIO 模型有点区别。 executor、selectStrategy和rejectedExecutionHandler从NioEventLoopGroup中一路传到了NioEventLoop中。
这个时候,我们来看一下 NioEventLoop 类的属性都有哪些,我们先忽略它继承自父类的属性,单单看它自己的:
private Selector selector;private Selector unwrappedSelector;private SelectedSelectionKeySet selectedKeys;private final SelectorProvider provider;private final AtomicBoolean wakenUp = new AtomicBoolean();private final SelectStrategy selectStrategy;private volatile int ioRatio = 50;private int cancelledKeys;private boolean needsToSelectAgain;
结合它的构造方法我们来总结一下:
provider:它由NioEventLoopGroup传进来,前面我们说了一个线程池有一个selectorProvider,用于创建Selector实例selector:虽然我们还没看创建selector的代码,但我们已经知道,在 Netty 中Selector是跟着线程池中的线程走的。也就是说,并非一个线程池一个Selector实例,而是线程池中每一个线程都有一个Selector实例。在无参构造过程中,我们发现,Netty 设置线程个数是 CPU 核心数的两倍,假设我们的机器 CPU 是 4 核,那么对应的就会有 8 个 Selector 实例。
selectStrategy:select操作的策略,这个不急。ioRatio:这是 IO 任务的执行时间比例,因为每个线程既有 IO 任务执行,也有非 IO 任务需要执行,所以该参数为了保证有足够时间是给 IO 的。这里也不需要急着去理解什么 IO 任务、什么非 IO 任务。
然后我们继续走它的构造方法,我们看到上面的构造方法调用了父类的构造器,它的父类是 SingleThreadEventLoop。
io.netty.channel.SingleThreadEventLoop :
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedExecutionHandler) {super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);tailTasks = newTaskQueue(maxPendingTasks);}
SingleThreadEventLoop 这个名字很诡异有没有?然后它的构造方法又调用了父类 SingleThreadEventExecutor 的构造方法。
io.netty.util.concurrent.SingleThreadEventExecutor :
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedHandler) {super(parent);this.addTaskWakesUp = addTaskWakesUp;this.maxPendingTasks = Math.max(16, maxPendingTasks);this.executor = ObjectUtil.checkNotNull(executor, "executor");// taskQueue,这个东西很重要,提交给 NioEventLoop 的任务都会进入到这个 taskQueue 中等待被执行// 这个 queue 的默认容量是 16taskQueue = newTaskQueue(this.maxPendingTasks);rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {return new LinkedBlockingQueue<Runnable>(maxPendingTasks);}
到这里就更加诡异了,NioEventLoop 的父类是 SingleThreadEventLoop,而 SingleThreadEventLoop 的父类是 **SingleThreadEventExecutor**,它的名字告诉我们,它是一个 Executor,是一个线程池,而且是 Single Thread 单线程的。
也就是说,线程池 NioEventLoopGroup 中的每一个线程 NioEventLoop 也可以当做一个线程池来用,只不过它只有一个线程。这种设计虽然看上去很巧妙,不过有点反人类的样子。
上面这个构造函数比较简单:
- 设置了 
parent,也就是之前创建的线程池NioEventLoopGroup实例 executor:它是我们之前实例化的ThreadPerTaskExecutor,我们说过,这个东西在线程池中没有用,它是给NioEventLoop用的,马上我们就要看到它了。提前透露一下,它用来开启NioEventLoop中的线程(Thread实例)。taskQueue:这算是该构造方法中新的东西,它是任务队列。我们前面说过,NioEventLoop需要负责 IO 事件和非 IO 事件,通常它都在执行selector的select方法或者正在处理selectedKeys,如果我们要submit一个任务给它,任务就会被放到taskQueue中,等它来轮询。rejectedExecutionHandler:taskQueue的默认容量是 16,所以,如果submit的任务堆积了到了 16,再往里面提交任务会触发rejectedExecutionHandler的执行策略。还记得默认策略吗:抛出
RejectedExecutionException异常。在
NioEventLoopGroup的默认构造中,它的实现是这样的:
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {@Overridepublic void rejected(Runnable task, SingleThreadEventExecutor executor) {throw new RejectedExecutionException();}};
然后,我们再回到 NioEventLoop 的构造方法:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {// 我们刚刚说完了这个super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);if (selectorProvider == null) {throw new NullPointerException("selectorProvider");}if (strategy == null) {throw new NullPointerException("selectStrategy");}provider = selectorProvider;// 创建 selector 实例final SelectorTuple selectorTuple = openSelector();selector = selectorTuple.selector;unwrappedSelector = selectorTuple.unwrappedSelector;selectStrategy = strategy;}
可以看到,最重要的方法其实就是 openSelector() 方法,它将创建 NIO 中最重要的一个组件 **Selector**。在这个方法中,Netty 也做了一些优化,这部分我们就不去分析它了。
到这里,我们的线程池 NioEventLoopGroup 创建完成了,并且实例化了池中的所有 NioEventLoop 实例。
同时,大家应该已经看到,上面并没有真正创建 NioEventLoop 中的线程(没有创建 Thread 实例)。
提前透露一下,创建线程的时机在第一个任务提交过来的时候,那么第一个任务是什么呢?就是我们前面说的 channel 的 **register** 操作。
NioEventLoop 的工作流程
前面,我们在分析线程池的实例化的时候说过,NioEventLoop 中并没有启动 Java 线程。这里我们来仔细分析下在 register 过程中调用的 **eventLoop.execute(runnable)** 这个方法,这个代码在父类 SingleThreadEventExecutor 中。
io.netty.util.concurrent.SingleThreadEventExecutor#execute
@Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}// 判断添加任务的线程是否就是当前 EventLoop 中的线程boolean inEventLoop = inEventLoop();// 添加任务到之前介绍的 taskQueue 中,// 如果 taskQueue 满了(默认大小 16),根据我们之前说的,默认的策略是抛出异常addTask(task);if (!inEventLoop) {// 如果不是 NioEventLoop 内部线程提交的 task,那么判断下线程是否已经启动,没有的话,就启动线程startThread();if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}
原来启动
NioEventLoop中的线程的方法在这里。另外,上节我们说的
register操作进到了taskQueue中,所以它其实是被归类到了非 IO 操作的范畴。
下面是 startThread 的源码,判断线程是否已经启动来决定是否要进行启动操作。
io.netty.util.concurrent.SingleThreadEventExecutor#startThread
private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {try {doStartThread();} catch (Throwable cause) {STATE_UPDATER.set(this, ST_NOT_STARTED);PlatformDependent.throwException(cause);}}}}
我们按照前面的思路,根据线程没有启动的情况,来看看 doStartThread() 方法:
io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
private void doStartThread() {assert thread == null;// 这里的 executor 大家是不是有点熟悉的感觉,它就是一开始我们实例化 NioEventLoop 的时候传进来的 ThreadPerTaskExecutor 的实例。它是每次来一个任务,创建一个线程的那种 executor。// 一旦我们调用它的 execute 方法,它就会创建一个新的线程,所以这里终于会创建 Thread 实例executor.execute(new Runnable() {@Overridepublic void run() {// 看这里,将 “executor” 中创建的这个线程设置为 NioEventLoop 的线程!!!thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {// 执行 SingleThreadEventExecutor 的 run() 方法,它在 NioEventLoop 中实现了SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {// ... 我们直接忽略掉这里的代码}}});}
上面线程启动以后,会执行 NioEventLoop 中的 run() 方法,这是一个非常重要的方法,这个方法肯定是没那么容易结束的,必然是像 JDK 线程池的 Worker 那样,不断地循环获取新的任务的。它需要不断地做 select 操作和轮询 taskQueue 这个队列。
我们先来简单地看一下它的源码,这里先不做深入地介绍。
io.netty.channel.nio.NioEventLoop#run
@Overrideprotected void run() {// 代码嵌套在 for 循环中for (;;) {try {// selectStrategy 终于要派上用场了// 它有两个值,一个是 CONTINUE 一个是 SELECT// 针对这块代码,我们分析一下。// 1. 如果 taskQueue 不为空,也就是 hasTasks() 返回 true,// 那么执行一次 selectNow(),该方法不会阻塞// 2. 如果 hasTasks() 返回 false,那么执行 SelectStrategy.SELECT 分支,// 进行 select(...),这块是带阻塞的// 这个很好理解,就是按照是否有任务在排队来决定是否可以进行阻塞switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:// 如果 !hasTasks(),那么进到这个 select 分支,这里 select 带阻塞的select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}default:}cancelledKeys = 0;needsToSelectAgain = false;// 默认地,ioRatio 的值是 50final int ioRatio = this.ioRatio;if (ioRatio == 100) {// 如果 ioRatio 设置为 100,那么先执行 IO 操作,然后在 finally 块中执行 taskQueue 中的任务try {// 1. 执行 IO 操作。因为前面 select 以后,可能有些 channel 是需要处理的。processSelectedKeys();} finally {// 2. 执行非 IO 任务,也就是 taskQueue 中的任务runAllTasks();}} else {// 如果 ioRatio 不是 100,那么根据 IO 操作耗时,限制非 IO 操作耗时final long ioStartTime = System.nanoTime();try {// 执行 IO 操作processSelectedKeys();} finally {// 根据 IO 操作消耗的时间,计算执行非 IO 操作(runAllTasks)可以用多少时间.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}
上面这段代码是 NioEventLoop 的核心,这里介绍两点:
- 首先,会根据 
hasTasks()的结果来决定是执行selectNow()还是select(oldWakenUp),这个应该好理解。如果有任务正在等待,那么应该使用无阻塞的selectNow(),如果没有任务在等待,那么就可以使用带阻塞的 select 操作。 ioRatio控制 IO 操作所占的时间比重:- 如果设置为 100%,那么先执行 IO 操作,然后再执行任务队列中的任务。
 - 如果不是 100%,那么先执行 IO 操作,然后执行 
taskQueue中的任务,但是需要控制执行任务的总时间。也就是说,非 IO 操作可以占用的时间,通过ioRatio以及这次 IO 操作耗时计算得出。 
我们这里先不要去关心 select(oldWakenUp) 、processSelectedKeys() 方法和 runAllTasks(…) 方法的细节,只要先理解它们分别做什么事情就可以了。
回过神来,我们前面在 register 的时候提交了 register 任务给 NioEventLoop,这是 NioEventLoop 接收到的第一个任务,所以这里会实例化 Thread 并且启动,然后进入到 NioEventLoop 中的 run 方法。
当然了,实际情况可能是,
Channel实例被register到一个已经启动线程的NioEventLoop实例中。
