概述
线程模型指定了操作系统、编程语言、框架或应用程序的上下文中的线程管理的关键方面。在本章中,我们将详细讨论 EventLoop 的方方面面,包括:
- EventLoop结构层次。
- 无锁设计。
- Reactor 线程模型。
- EventLoop I/O 处理以及任务处理。
层次结构
EventExecutorGroup 继承体系庞大,涉及到的接口、抽象类都比较多,但是我们将它们分门别类就一目了然了:
- JDK 接口。Executor 执行器是 Doug Lea 在 Java 1.5 版本提供了多线程任务执行器。ScheduledExecutorService 则是和时间相关的任务执行策略。io.netty.util.concurrent 包构建在 JDK 的 java.utl.concurrent 包上,用来提供线程执行器。
- 线程组。这部分是 MultithreadEventLoopGroup 抽象类及实现子类,可以把它们看成不同类型的线程池,比如 Nio 则使用 NioEventLoopGroup。这部分属于管理层,其内部定义了管理 EventLoop 生命周期的相关 API。
- 单线程。这部分是 EventLoop 接口实现子类。它们受线程组管理的单线程执行器。类型与线程组一一对应。它是任务处理的执行者,比如处理 I/O 事件。SingleThreadEventLoop 抽象类的子类需要实现最重要的一个方法:SingleThreadEventExecutor.run(),这是根据自身类型处理相关 I/O 事件和任务。比如对应 NioEventLoop 则是使用 NIO API 实现多路复用,而 EpollEventLoop 是通过调用 Linux Epoll API 实现。它的性能比 NIO 更好,因为 NIO 底层是调用 select() 完成,而 epoll 则比 select 性能更好。但是,由于 epoll 是 Linux 2.6 版本以后特有的 API,并不适用于其他操作系统,不属于标准规范。因此,在下面源码的解析中我们还是以 NIOEventLoop 源码进行讲解。
EventExecutorGroup 接口除了复写 JDK 接口外,还定义了管理线程池相关的 API,分别是
- isShuttingDown()
- shutdownGracefully()
- shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
- terminationFuture()
- next()
其中,next() 方法是核心,它的作用是返回线程池中(Thread 被 EventLoop 包装,本质还是线程。因此,EventLoopGroup 就是线程池)下一个线程。Netty 提供默认的分配策略是轮询。
AbstractEventExecutorGroup 抽象类除了上述方法没有实现外,其实都有默认实现:通过 next() 相关 API 调用。
线程组
MultithreadEventExecutorGroup
MultithreadEventExecutorGroup 抽象类则是线程组的骨架基础,内部使用数组保存 EventExecutor 执行器。相关变量和构造器解析如下:
/**
* {@link EventExecutorGroup} 接口的抽象类实现,用于多线程处理任务。
*/
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
// 使用数组保存执行器,数组长度默认为2*CPU cores,一旦初始化后就不允许扩容
private final EventExecutor[] children;
// EventExecutor视图
private final Set<EventExecutor> readonlyChildren;
// EventExecutor已终结数量
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
// 选择器,Netty 提供的默认方式:轮询
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
/**
* 创建一个新的{@link EventExecutorGroup} 实例对象
*
* @param nThreads 当前实例将会使用到的线程数量
* @param executor 执行器。如果使用默认的话则为{@code null} 值
* @param chooserFactory {@link EventExecutor} 选择器工厂,用来获取{@link EventExecutorChooserFactory#EventExecutorChooser} 选择器
* @param args 每当{@link #newChild(Executor, Object...)}方法调用时传递 args
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
// #1 创建一个执行器,特点:每次调用 executor(Runnable) 方法都新建一个线程执行
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// #2 创建数组对象,长度默认为:NettyRuntime.availableProcessors() * 2
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// #2-1 初始化数组中的「EventExecutor」实例对象
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// #2-2 某个「EventExecutor」创建失败,会有以下处理方式:
// ① 优雅关闭之前已经创建成功的Executor
// ② 再次确认每个Executor是否完全被终结
if (!success) {
for (int j = 0; j < i; j ++) {
// 优雅关闭
children[j].shutdownGracefully();
}
// 再次确认每个Executor是否完全被终结
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
// 等待EventExecutor完全关闭才退出while循环
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// 在等待关闭的过程中,如遇到中断异常,
// 则立即中断当前线程并退出循环,让调用者处理此中断
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// #3 绑定轮询选择器
// 当调用 next() 方法时就会通过此选择器从数组中轮询得到「EventExecotr」执行器对象
chooser = chooserFactory.newChooser(children);
// #4 创建监听回调
final FutureListener<Object> terminationListener = future -> {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
};
// #5 向所有已创建的「EventExecotor」添加关闭监听回调
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
// #6 生成「EventExecutor」视图,使用「Set」保存
Set<EventExecutor> childrenSet = new LinkedHashSet<>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
}
总结 MultithreadEventExecutorGroup 构造器做了哪些事情:
- 创建 EventExecutor 数组。
- 调用抽象方法
newChild()
填充数组对象。 - 将数组和轮询器进行绑定。
- 创建监听回调并注册到每个 EventExecutor 对象中。
- 生成 EventExecutor 视图。
这里会创建一个 executor 对象。它比较有趣:每次调用 executor(Runnable) 方法都会创建新线程执行任务。这个有什么用呢:
- 延迟创建线程。只有当执行任务时才会通过 Executor 对象创建线程。这样好处是不会创建无用线程。
- 内部封装了线程工厂,以一定规则对线程命名。
EventExecutor newChild(Executor executor, Object... args)
抽象方法需要子类实现,在层次结构中我们看到,其实也是创建对应的 EventLoop 实现类。比如对于 NioEventLoopGroup 而言,它会创建 NioEventLoop 执行器。
NioEventLoopGroup
Netty 核心实现类,可以看成线程池管理器,管理 NIoEventLoop 类型的线程。实现了 newChild() 抽象方法,源码为:
// io.netty.channel.nio.NioEventLoopGroup#newChild
/**
* 创建一个单线程的{@link EventLoop}对象
* @param executor
* @param args
* @return
* @throws Exception
*/
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
单线程执行器
单线程执行器可看作为真正可执行任务的线程,它们负责处理 I/O 事件和任务队列。
EventExecutor
EventExecutor 接口定义了一个重要的判断,即
// 判断调用此方法的是否为异线程
boolean inEventLoop();
// 判断 thread 是否属于异线程
boolean inEventLoop(Thread thread);
这是无锁异步编程的重要判断方法,事实上它提供了以下编程模型:
通过 inEventLoop()
方法判断当前执行线程与 EventLoop 绑定的线程是否是同一个,如果是同线程则直接调用,如果是异线程,则把相关方法使用 Runnalble 匿名内部类包装,再调用 EventLoop#execute()
进行入队操作。待 EventLoop 执行器执行。由于只有一个线程处理所有事情,因此永远不要将一个长时间运行的任务放入到执行队列中,因为它将阻塞需要在同一线程执行的其他任务。Netty 对于这种情况也提供了相应的 API,可以让耗时的任务在单独的 EventLoopGroup 中执行。EventExecutor 还定义了与异步计算相关的接口, Future 章节会讲到这些内部。
SingleThreadEventExecutor
SingleThreadEventExecutor 是一个非常重要的抽象类,它是实现单线程执行器的基本骨架,作用:
- 定义EventExecutor 执行器状态。
- 定义任务队列和定时任务队列。
Reactor 模型
在 Netty源码之知晓网络IO模型 一文中我们详细讲解了什么是 Reactor 模型,那是怎么配置不同的 Reactor 模型呢?
单 Reactor 单线程模型
EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(group)
单 Reactor 多线程模型
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(group)
主从 Reactor 模型
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
以上代码非常简洁,这是 Netty 对 Reactor 模型高度抽象后的结果。在服务端开发中,我们通常使用主从 Reactor 模型,以达到最高的网络性能。而在客户端中,需要根据自身业务选择合适的类型,在性能和资源上寻找平衡。
源码解析
前面我们大致讲解了 EventLoopGroup 和 EventLoop 之间的关系,可以想象成老板和员工,EventLoopGroup 负责管理 EventLoop。因此,我们重点是关注 EventLoop 做了些什么。对于 EventLoop 实现类而言,Netty 提供 4 种默认实现类,分别是:
- NioEventLoop
- EpollEventLoop
- KQueueEventLoop
- EpollEventLoop
而其中 NioEventLoop 应用最为广泛,EpollEventLoop 只适用于待定版本的 Linux 系统,但是性能优于 NioEventLoop。接下来,我们只分析 NioEventLoop 源码实现。
在 Netty源码之服务端/客户端启动 一章中我们讲了 Netty 启动流程,这里简单描述一下(主从 Reactor 模型):
- 创建两个 EventLoopGroup “线程池”。
- 使用 ServerBootstrap 对象引导 Server 创建。
- 从 EventLoopGroup 获取一个 EventLoop 不断轮询 I/O 事件和处理任务,对于 Main Reactor 线程来说,它只关注 OP_ACCEPT 事件。
- 当一个新连接到达,Main Reactor 线程会创建 NioSocketChannel 对象并将其注册到 Sub Reactor中,由 Sub Reactor 其中一个 EvnetLoop 负责该连接的剩下 I/O 事件和相关处理。
① 开始线程
当我们在 Main 方法中创建 NioEventLoopGroup 对象时,此时并没有线程与之绑定,当真正需要执行任务时才会创建新的线程执行业务,而 EventLoop 则与该线程绑定,这就是 Netty 对 EventLoop 的无锁实现。其中也体现了懒加载的思想。
那让我们通过 execute(Runnable)
方法开启 NioEventLoop 的学习吧!
该方法被 SingleThreadEventExecutor 抽象类所实现,SingleThreadEventExecutor 是 EventLoop 的基本实现骨架。
// io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable)
/**
* 单线程Executor执行任务
* 任务默认立即执行
* @param task
*/
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task,
!(task instanceof LazyRunnable) &&
wakesUpForTask(task)); // 默认为「true」
}
/**
* 单线程执行任务
* @param task
* @param immediate
*/
private void execute(Runnable task, boolean immediate) {
// #1 判断是否为异线程调用该方法,初始化时,必定为false
boolean inEventLoop = inEventLoop();
// #2 添加到「taskQueue」队列中
addTask(task);
// #3 异线程调用
if (!inEventLoop) {
// #3-1 根据state状态值判断是否已开启线程
// 如果没有开启,则会通过线程工厂创建一个新的线程用来执行接下来的任务和I/O操作
startThread();
// #3-2 线程被关闭
if (isShutdown()) {
boolean reject = false;
try {
// 移除当前任务
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {}
if (reject) {
// 抛出「RejectedExecutionException」异常
reject();
}
}
}
// #4 唤醒Executor执行器执行任务
// 默认为添加任务后立即唤醒selector
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
execute(Runnable)
最终是调用 execute(Runnable,immediate)
方法。参数 immediate
表示是否需要立即执行的意思。设想此时线程阻塞在 Selector.select() 方法上,正好有一个异线程添加一个任务,如果 immediate=true 则表示将会调用 Selector.weakup() 方法唤醒阻塞线程,这样任务也就可以”立即”执行了。当前,如果线程并没有处于阻塞状态,是不需要重复唤醒的。流程示意图如下:
通过了解 execute(Runnable, boolean) 方法,我们了解以下几点:
- 无论异线程还是同线程调用 execute() 方法,第一步都是将任务添加到任务队列中。这个添加过程是非阻塞的(java.util.Queue#offer)。
- 异线程调用时,需要判断线程是否和当前的 EventLoop 绑定(
startThread()
),这个方法就是 EventLoop 启动的关键,它会通过for(;;)
不断轮询 I/O 事件并处理它们。 Netty 默认设置一旦添加任务就立即执行,所以会判断当前的 EventLoop 的状态是否需要异线程手动唤醒。
② 无限循环处理I/O事件和任务
这一步是由
startThread()
方法所触发,在此方法实现中,会根据state
的值判断线程是否已经开启,如果没有开启则调用doStartThread()
方法执行。需要注意的是,此刻调用这些方法的是异线程。相关源码如下:/** * 根据{@link #state}状态判断是否开启新的线程 */ private void startThread() { if (state == ST_NOT_STARTED) { // CAS更改state的值 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { // 开启线程 doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } }
实际是由
doStartThread()
完成。/** * 开启线程并在「for(;;)」不断处理I/O事件和任务 */ private void doStartThread() { assert thread == null; // 由「ThreadPerTaskExecutor」执行器执行任务(新开一个线程执行) executor.execute(() -> { // #1 保存线程变量(inEventLoop()方法就是通过此变量进行判断) thread = Thread.currentThread(); // #2 响应中断 if (interrupted) { thread.interrupt(); } boolean success = false; // #3 更新「lastExecutionTime」,以便精通地进行静默期检查 updateLastExecutionTime(); try { // #4 「EventLoop」核心方法,轮询并处理I/O事件和任务 SingleThreadEventExecutor.this.run(); // #5 for(;;) 循环退出 success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { // #6 通过CAS将「state」状态更新为「ST_SHUTTING_DOWN」 for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // 如果gracefulShutdownStartTime==0:意味着并没有调用「confirmShutdown()」方法 if (success && gracefulShutdownStartTime == 0) { if (logger.isErrorEnabled()) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " + "be called before run() implementation terminates."); } } try { // #7 直接「confirmShutdown」返回true才会结束 for (;;) { if (confirmShutdown()) { break; } } // 现在我们要确保从此刻开始不能添加任何任务。这通过改变状态来实现。任务超过此刻的任务都将会被拒绝 // 这一点开始不能再增加任务。这可以通过切换状态来实现。任何超过这一点的新任务都将被拒绝。 for (;;) { int oldState = state; if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) { break; } } // 执行剩余所有任务,不需要循环执行「confirmShutdown()」方法。 confirmShutdown(); } finally { try { // #8 子类实现(NioEventLoop会调用Selector.close())方法 cleanup(); } finally { // #9 移除所有本地线程缓存数据。因为执行器即将关闭并给Future发送相关通知。 // 用户可能阻塞在Future,一旦解除阻塞,JVM就可以终止并开始卸载类 // See https://github.com/netty/netty/issues/6596. FastThreadLocal.removeAll(); STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); // 唤醒等待在「threadLocak」的线程 threadLock.countDown(); int numUserTasks = drainTasks(); if (numUserTasks > 0 && logger.isWarnEnabled()) { logger.warn("An event executor terminated with " + "non-empty task queue (" + numUserTasks + ')'); } // #10 向Future发送通知 terminationFuture.setSuccess(null); } } } }); }
doStartThread()
是 EventLoop 执行器开启线程的唯一入口。通过在构造器中传入的 ThreadPerTaskExecutor 每任务每线程执行器创建新线程执行嵌套方法,执行流程示意图如下:
从上面可以看出,SingleThreadEventExecutor.this.run()
是核心方法,它是 Netty 处理 I/O 事件和任务的核心方法。剩下的是与线程相关的状态变更及优雅关闭操作,待下面讲到优雅关闭时详说。这里重点是SingleThreadEventExecutor.this.run()
方法。对于抽象类 SingleThreadEventExecutor 而言, run() 方法是抽象类,需要子类实现,而我们最常见的实现是 NioEventLoop,相关源码如下:/** * {@link NioEventLoop} 无限循环处理I/O事件和任务。主要完成以下操作: * ① 根据「taskQueue」计算此次for循环的Selector策略 * ② 执行 select()/select(long)/selectNow() * ③ I/O事件的优先级比任务高。因此,无论ioRate比率如何,都要先处理I/O事件。后续根据ioRate分配任务处理时间。 * ④ 解析Epoll空轮询bug * ⑤ 每次for循环都需要判断当前线程的状态。如果当前线程为正在关闭中,先关闭所有的通道连接,再处理任务队列。 */ @Override protected void run() { // select唤醒次数 int selectCnt = 0; for (; ; ) { try { int strategy; try { // 操作一(非阻塞) // ① 如果「taskQueue」队列中有任务,这里会首先调用「Selector#selectNow()」方法获取已准备好的IO事件数量。 // ② 如果「taskQueue」队列中没有任务,则返回-1。 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: // #1-2 获取「调度任务队列」最近执行的时间(即未来第一个将要执行的任务的截止时间,纳秒表示) // 只会返回-1或>0的数 // 获取定时任务队列截止日期时间 long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { // 定时任务队列没有任务,则下次「selector」为「全阻塞」调用 curDeadlineNanos = NONE; } // #1-3 设置下次selector唤醒时间 // ① 定时任务队列中无任务,nextWakeupNanos=NONE // ② 定时任务队列中有任务,nextWakeupNanos=deadlineNanos() nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { // 操作二(可能阻塞):根据「curDeadlineNanos」调用相关「select」方法 strategy = select(curDeadlineNanos); } } finally { // 「selector」被唤醒或到时,更新nextWakeupNanos=AWAKE // #1-5 延迟设置「nextWakeupNanos」的值,不能立即可见,可能会在短时间内读到旧值。 // 此次更新目的是为了阻塞不必须的唤醒动作,所以使用「lazySet()」是可行的(没有竞争条件) // 关于「lasySet」: // 普通的 valiatile 变量是立即可见,由底层操作系统指令所保证, // 而 lazySet 却是无法保证这一点,所以其他线程在之后的一小段时间内还可以读取旧值 // 好处: 提高性能,在多个CPU缓冲之间同步一个内存值是很昂贵的 nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // #2 抛出异常,表明此时的Selector处于混乱状态,需要进行重建 // https://github.com/netty/netty/issues/8566 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; } // #3 Select循环次数+1 selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; // #4 平衡I/O事件和处理任务两者的耗时,默认比率为 50:50, if (ioRatio == 100) { try { if (strategy > 0) { // #4-1 首要任务还是先处理I/O事件 processSelectedKeys(); } } finally { // #4-2 处理所有任务 ranTasks = runAllTasks(); } } else if (strategy > 0) { // #5-1 保存I/O处理起始时间 final long ioStartTime = System.nanoTime(); try { // #5-2 处理I/O事件 processSelectedKeys(); } finally { // #5-3 计算本次轮询I/O耗时 final long ioTime = System.nanoTime() - ioStartTime; // #5-4 根据此次I/O耗时计算分配给任务处理所需的CPU时间 // 由于ioRatio默认值为50,所以在一个轮次中,I/O事件处理和任务处理所花费的时间相等。 ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { // #6 本次循环没有I/O事件需要处理,那就处理任务吧 // runAllTasks(0)最多处理64个任务 ranTasks = runAllTasks(0); } // 本次for循环执行过任务或处理过I/O事件 if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { // 连续提前几次返回 logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } // 重置「selectCnt」 selectCnt = 0; // #7 异常唤醒判断(可能出现了 Epoll 空轮询BUG,如果超过一定次数,则需要重建「Selector」对象) // 出现了既没有处理I/O事件,也没有处理任务任务,但是「selector」被唤醒了 } else if (unexpectedSelectorWakeup(selectCnt)) { selectCnt = 0; } } catch (CancelledKeyException e) { // Harmless exception - log anyway if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } finally { try { // 每次for循环都需要判断「EventLoop」状态,根据对应状态作出关闭动作 // 必须要执行「shundown」操作,即便在循环中有异常抛出 // ① 如果当前线程状态为「正在关闭」,则需要进行清理工作 // ② 关闭所有通道连接 // ③ 再次确认是否成功关闭 if (isShuttingDown()) { closeAll(); // 确认是否已完全关闭,如果确认失败,则继续for循环 // 有可能添加了额外的任务 if (confirmShutdown()) { return; } } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } } } }
相关流程示意图如下:
NioEventLoop#run()
流程并不复杂,在这个方法中有以下几点值得读者注意:Select 策略是怎么执行的?
- 会先判断 taskQueue 是否有任务?
- 存在任务,直接调用
selectNow()
方法并立即返回,此方法为非阻塞调用,所以可能返回 0。 - 不存在任务,则需要决定 select() 操作的阻塞时间
- 如果定时任务中有任务,则获取最近任务的截止时期时间作为阻塞时间
- 如果定时任务中不存在任务,则直接调用 select() 一直阻塞直到有 I/O 事件到达。
- 存在任务,直接调用
- 会先判断 taskQueue 是否有任务?
- 变量 strategy 有以下两个含义:
- 当
strategy>=0
时表示已准备 I/O 事件数量。 - 当
strategy<0
时表示 Netty 自定义 Select 策略,详见 SelectStrategy。
- 当
- I/O 处理时间和任务处理时间如何平衡?
- 首先需要明确的一点是 I/O 处理的优先级高于任务处理,即当 I/O 事件和任务都存在的情况下,先处理 I/O 事件,再处理任务。
- Netty 使用 ioRatio 平衡 I/O 处理时间和任务处理时间的比率,默认为 50:50。Netty 每次都会记录(当然,当 strategy>0 才有意义)I/O 执行时间,这也是本轮任务的执行耗时。
- Netty 通过重建 Selector 方式解决 Epoll 空轮询 Bug 问题。当 selectCnt >=512 时意味着出现 Epoll 空轮询 bug。此 Bug 一直存在,本质是 JDK 对 Linux 的 Epoll 封装时没有考虑到以下异常情况:
- 当一个Socket的文件描述符被轮询,请求事件掩码为0,如果连接被突然终止(RST),那么原本阻塞的方法会被唤醒(EPOLL),并在返回的事件集中设置了 POLLHUP(可能还有 POLLERR )。但是JDK封装的上层并没有对应事件定义,但 SocketChannel 的兴趣集为0,这也意味着没有被选择的事件,方法返回0。而通常 select()方法是写在一个 while(true) 循环中的,这样就会导致应用程序一直不停地执行 select(),也就是导致 CPU 空转,CPU 使用率最终会飙升到 100%。
- 每次 for 循环都会检查 EventLoop 的状态,如果当前状态为正在关闭中,则会调用
closeAll()
方法关闭所有已连接的通道。 此方法中有一处是阻塞调用,即
select()
,当我们向 NioEventLoop 添加任务时,如果不唤醒阻塞在此方法上的线程,我们也就不能立即执行任务。所以 Netty 默认是每当调用 execute(Runnable) 方法就会唤醒执行器线程(如果已唤醒,就跳过)。这样,也就能立即处理已添加的任务了。I/O事件处理
Netty NIO 的事件处理流程与实际上与普通的使用 JDK NIO 编程无异,通过从 SelectionKey 对象获取所绑定的通道的 I/O 状态,根据此状态做进一步处理。核心方法
NioEventLoop#processSelectedKey(SelectionKey, AbstractNioChannel)
。processSelectedKey
processSelectedKey
源码如下:/** * Netty 处理 I/O 事件。最终委托 {@link AbstractNioChannel.NioUnsafe} 实例对象完成对JDK原生通道的数据读/写操作。 * * @param k {@link Selector#selectedKeys()},目的是用来判断当前通道感兴趣的事件 * @param ch Netty封装的{@link AbstractNioChannel},内部包含了大量的信息 */ private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // #1 校验 key 是否合法 if (!k.isValid()) { // 非法的「SelectionKey」需要被关闭 final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // 如果 EventLoop 为空会抛出异常,这里会忽略这个异常。因为我们只是想确定 ch 是否注册到这个事件循环中,因此有权力关闭 ch return; } // 只有当 ch 仍然注册到这个 EventLoop 时才关闭 ch。Ch 可以从事件循环中取消注册, // 因此 SelectionKey 可以作为注销过程的一部分取消,但是通道仍然是健康的,不应该被关闭。 // See https://github.com/netty/netty/issues/5125 if (eventLoop == this) { // 非法 key,直接关闭 unsafe.close(unsafe.voidPromise()); } return; } // 合法的「SelectionKey」 try { // #2 获取当前通道已准备好的事件 int readyOps = k.readyOps(); // #2-1 OP_CONNECT事件(客户端才拥有这种类型类型) if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); // 移除「OP_CONNECT」事件 ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); // 连接成功,触发相关事件回调(Promise) unsafe.finishConnect(); } // #2-2「OP_WRITE」事件(Both) // 表示可向通道写入数据 // 先处理 WRITE 事件以便可以先释放内存 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 将数据冲刷到客户端,最后会调用 JavaChannel#write() 方法执行底层操作 ch.unsafe().forceFlush(); } // #2-3 「OP_READ」事件(Both) // 「OP_ACCEPT」事件(服务端) // Netty 将 READ 和 ACCEPT 统一封装,都通过 unsafe.read() 进行处理 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { /** * ① 从 Channel 中读取数据并存储到分配的 ByteBuf * ② 调用 pipeline#fireChannelRead() 方法产生 Inbound 事件 * ③ 依次调用 ChannelHandler#channelRead() 方法处理数据 * ④ 调用 pipeline#fireChannelReadComplete() 方法完成读操作 * ⑤ 执行 removeReadOp() 清除 OP_READ 事件 */ unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
整体处理流程如下图所示:
从上图可以明显看出,对底层通道的读取、写入等操作都是委托 Unsafe 类完成。它是 Netty 提供的仅供内部使用的操作底层 Socket 的一系列 API。关于 Unsafe 详见
方法processSelectedKey
里面有我们最熟悉的对 Java NIO 编程相关的逻辑框架:根据 SelectionKey 相关的感兴趣事件执行对应的方法。