NioEvenLoop.java
接下来跟踪执行任务业务的run方法。
@Overrideprotected void run() {// 执行一个无限循环,在没有发生异常的情况下,其会永远执行下去for (;;) {try {try {// ---------------------------- 1 选择就绪channel --------------switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE: // 对于NioEventLoop,其不支持该casecontinue;case SelectStrategy.BUSY_WAIT: // 对于NioEventLoop,其不支持该case// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:// 若当前任务队列中没有任务,则会执行该case,即执行阻塞式选择select(wakenUp.getAndSet(false));// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and// 'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and// 'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {// 立即获取选择的结果selector.wakeup();}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();handleLoopException(e);continue;}// -------------------------------- 2 处理就绪channel的IO ------------------cancelledKeys = 0;needsToSelectAgain = false;// 该值的取值范围为(0,100]final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {// 获取当前时间,即就绪channel的IO开始执行的时间点final long ioStartTime = System.nanoTime();try {// 处理就绪channel的IOprocessSelectedKeys();} finally {// -------------------------------- 3 处理任务队列中的任务 ------------------// Ensure we always run tasks.// 计算出处理就绪channel的IO所用的时间final long ioTime = System.nanoTime() - ioStartTime;// ioTime * (100 - ioRatio) / ioRatio 为任务队列中的任务执行可以使用的时长runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}} // end-for}//执行阻塞式选择private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {// 计数器:// 为0,表示当前尚未有选择出的就绪channel// 非0,表示已经执行过了选择操作int selectCnt = 0;// 当前时间,其也是for循环第一轮循环的起始时间点long currentTimeNanos = System.nanoTime();// delayNanos() 表示定时任务队列中第一个定时任务距离开始执行时间还有多久// selectDeadLineNanos 表示定时任务队列中第一个定时任务开始执行的时间点long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {// ------------------- 1 处理定时任务队列中的马上就到执行时间的第一个定时任务 -----------// 500000L 表示0.5毫秒 1000000L 表示1毫秒// “马上”是多久?小于0.5毫秒long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) { // 为true表示当前具有马上就到其的任务要执行// 在结束当前select()方法之前,随带着查看一下是否有就绪的channelif (selectCnt == 0) {// 非阻塞选择selector.selectNow();selectCnt = 1;}break;}// 代码能走到这里,说明当前没有【马上】要执行的定时任务// ------------------- 2 处理具有新添加任务的情况 -----------// If a task was submitted when wakenUp value was true, the task didn't get a chance to call// Selector#wakeup. So we need to check task queue again before executing select operation.// If we don't, the task might be pended until select operation was timed out.// It might be pended until idle timeout if IdleStateHandler existed in pipeline.// 代码能运行当前的select()方法,是由于当时没有任务要执行。但仅仅是当时没有,// 并不代表一直没有。这里就是处理当前具有了新的任务的情况// 有任务就要先执行任务,所以要结果当前select()的执行。不过,在结束之前,// 随带着查看一下是否有就绪的channelif (hasTasks() && wakenUp.compareAndSet(false, true)) {// 非阻塞选择selector.selectNow();selectCnt = 1;break; // 结束当前的select()}// 代码能走到这里说明当前既没有【马上】要执行的定时任务,又没有新的任务添加// ------------------- 3 处理阻塞式选择的情况 -----------// 阻塞式选择,其阻塞被唤醒的条件有五个:// 1)发现就绪channel// 2)selector的wakeup()方法被调用// 3)当前线程被打断// 4)阻塞时间超时// 5)当出现大量的空轮询时,会使CPU占用率急剧飙升,出于对系统的保护,// 该方法会提前终止int selectedKeys = selector.select(timeoutMillis);selectCnt ++;// 若有就绪channel了,或wakeup()方法被调用了,或有了任务了,则直接结束select()if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - Selected something,// - waken up by user, or// - the task queue has a pending task.// - a scheduled task is ready for processingbreak;}// 代码能走到这里,说明selector.select()的结束已经排除了第1)2)两种情况if (Thread.interrupted()) {// Thread was interrupted so reset selected keys and break so we not run into a busy loop.// As this is most likely a bug in the handler of the user or it's client library we will// also log it.//// See https://github.com/netty/netty/issues/2426if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}// 代码能走到这里,说明selector.select()的结束已经排除了第1)2)3)三种情况,// 即selector.select()的结束原因只可能是4)5)两种情况之一了// ------------------- 4 解决NIO的Bug -----------long time = System.nanoTime();// 若该条件为true,则表示selector.select()是由于第4)种情况结束的if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;// 若该条件为false,则表示selector.select()是由于第5)种情况结束的} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The code exists in an extra method to ensure the method is not too big to inline as this// branch is not very likely to get hit very frequently.// 重构seletorselector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}// 更新currentTimeNanos为当前时间,即记录下一轮for循环开始的时间点currentTimeNanos = time;} // end-forif (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}// Harmless exception - log anyway}}private void processSelectedKeys() {// 若优化过的set不为null,if (selectedKeys != null) {// 处理优化过的SelectedKeysprocessSelectedKeysOptimized();} else {// 处理普通的SelectedKeys集合// selector.selectedKeys() 为set集合processSelectedKeysPlain(selector.selectedKeys());}}private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363// 赋值为null的原因有两个:// 1)当channel被关闭时便于做GC// 2)避免重复处理selectedKeys.keys[i] = null;// 获取到key中的附件,附件中可以写入任何数据,// 不过,对于NioEventLoop,其附件中存放的是原生channelfinal Object a = k.attachment();if (a instanceof AbstractNioChannel) {// 处理selectedKeyprocessSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();// 处理key失效的情况if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {// 获取当前selectionKey(即就绪channel)的就绪事件int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.// 处理连接就绪的情况(Server端不会出现该情况,只有Client才会发起连接Server的请求)// 当第一个连接请求发出后,若连接成功,则成功;否则,连接就绪// 连接就绪后该channel就会被selector选择。一旦选择了该channel,则处理就绪IO,// 连接就绪的IO就是去连接Server端,完成连接if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924// 获取当前selectionKey中注册的当前channel所关注的就绪事件int ops = k.interestOps();// 将ops中的连接就绪位置0ops &= ~SelectionKey.OP_CONNECT;// 将置0后的ops再写入到selectionKey中,表示当前连接就绪已经处理完毕k.interestOps(ops);// 完成连接unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.// 处理写就绪的情况// 什么是写就绪?当调用了writeAndFlush()后,将所有要发送的数据写入到用户缓存后,// 写操作就绪了// 写操作就绪后就可以处理就绪IO了。写操作就绪IO是,将用户缓存中的数据写入到网卡缓存中if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loop// 处理读就绪与接收连接就绪的情况// 读就绪是什么?就是通过网络client已经将数据写入到了网卡缓存。// 接收连接就绪是什么?就是通过网络client已经将连接请求写入到了网卡缓存。// 故,接收连接就绪其实就是读就绪的一种特例,因此,无论是读就绪还是接收连接就绪,// 一旦就绪,就可以处理就绪的IO了。这些就绪IO就是将网卡缓存中的数据(有可能是连接请求)读取// 到用户缓存,即会出现在channelRead()方法的msg变量中// readyOps为0表示没有任何channel就绪,没有就绪的channel,为何要执行读就绪IO?// 两个原因:// 1)为了避免NIO的BUG// 2)为了使没有就绪channel的IO执行时间拉长一些,从而使后面的任务队列中任务的执行时间不至于太短if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}处理普通的SelectedKeys集合private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {// check if the set is empty and if so just return to not create garbage by// creating a new Iterator every time even if there is nothing to process.// See https://github.com/netty/netty/issues/597if (selectedKeys.isEmpty()) {return;}// 迭代集合Iterator<SelectionKey> i = selectedKeys.iterator();for (;;) {final SelectionKey k = i.next();final Object a = k.attachment();i.remove();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (!i.hasNext()) {break;}if (needsToSelectAgain) {selectAgain();selectedKeys = selector.selectedKeys();// Create the iterator again to avoid ConcurrentModificationExceptionif (selectedKeys.isEmpty()) {break;} else {i = selectedKeys.iterator();}}}}
run中的runAllTasks()方法才是对任务真正执行
SingleThreadEventExecutor.java
/*** Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.*/protected boolean runAllTasks(long timeoutNanos) {// 从定时任务队列中取出所有需要马上执行的任务,并放入到taskQueuefetchFromScheduledTaskQueue();// 从任务队列获取一个任务Runnable task = pollTask();// 若该任务为null,则执行收尾任务if (task == null) {afterRunningAllTasks();return false;}// 若取出的任务不空// 计算出本次任务队列中的任务执行结束的时间点final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;// 计数器long runTasks = 0;long lastExecutionTime;for (;;) {// 执行任务safeExecute(task);runTasks ++;// Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.// 从这里可以知道,任务队列中任务的执行时间并不是一个控制十分精确的时间if ((runTasks & 0x3F) == 0) { // runTasks % 64 == 0lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}} // end-for// 执行收尾任务队列中的任务afterRunningAllTasks();this.lastExecutionTime = lastExecutionTime;return true;}
AbstractEventExecutor.java
/*** Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.*/protected static void safeExecute(Runnable task) {try {// 任务的真正执行发生在这里task.run();} catch (Throwable t) {logger.warn("A task raised an exception. Task: {}", task, t);}}
收尾队列中的任务afterRunningAllTasks()
SingleThreadEventLoop.java
@Overrideprotected void afterRunningAllTasks() {runAllTasksFrom(tailTasks);}
SingleThreadEventExecutor.java
/*** Runs all tasks from the passed {@code taskQueue}.** @param taskQueue To poll and execute all tasks.** @return {@code true} if at least one task was executed.*/protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {// 从任务队列获取一个任务Runnable task = pollTaskFrom(taskQueue);if (task == null) {return false;}for (;;) {// 执行任务safeExecute(task);task = pollTaskFrom(taskQueue);if (task == null) {return true;}}}protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {// 从任务队列中获取一个非唤醒任务,唤醒任务即空任务,或标识性任务for (;;) {Runnable task = taskQueue.poll();if (task == WAKEUP_TASK) {continue;}return task;}}
