NioEvenLoop.java
接下来跟踪执行任务业务的run方法。
@Override
protected void run() {
// 执行一个无限循环,在没有发生异常的情况下,其会永远执行下去
for (;;) {
try {
try {
// ---------------------------- 1 选择就绪channel --------------
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE: // 对于NioEventLoop,其不支持该case
continue;
case SelectStrategy.BUSY_WAIT: // 对于NioEventLoop,其不支持该case
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
// 若当前任务队列中没有任务,则会执行该case,即执行阻塞式选择
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
// 立即获取选择的结果
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
// -------------------------------- 2 处理就绪channel的IO ------------------
cancelledKeys = 0;
needsToSelectAgain = false;
// 该值的取值范围为(0,100]
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
// 获取当前时间,即就绪channel的IO开始执行的时间点
final long ioStartTime = System.nanoTime();
try {
// 处理就绪channel的IO
processSelectedKeys();
} finally {
// -------------------------------- 3 处理任务队列中的任务 ------------------
// Ensure we always run tasks.
// 计算出处理就绪channel的IO所用的时间
final long ioTime = System.nanoTime() - ioStartTime;
// ioTime * (100 - ioRatio) / ioRatio 为任务队列中的任务执行可以使用的时长
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
} // end-for
}
//执行阻塞式选择
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
// 计数器:
// 为0,表示当前尚未有选择出的就绪channel
// 非0,表示已经执行过了选择操作
int selectCnt = 0;
// 当前时间,其也是for循环第一轮循环的起始时间点
long currentTimeNanos = System.nanoTime();
// delayNanos() 表示定时任务队列中第一个定时任务距离开始执行时间还有多久
// selectDeadLineNanos 表示定时任务队列中第一个定时任务开始执行的时间点
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// ------------------- 1 处理定时任务队列中的马上就到执行时间的第一个定时任务 -----------
// 500000L 表示0.5毫秒 1000000L 表示1毫秒
// “马上”是多久?小于0.5毫秒
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) { // 为true表示当前具有马上就到其的任务要执行
// 在结束当前select()方法之前,随带着查看一下是否有就绪的channel
if (selectCnt == 0) {
// 非阻塞选择
selector.selectNow();
selectCnt = 1;
}
break;
}
// 代码能走到这里,说明当前没有【马上】要执行的定时任务
// ------------------- 2 处理具有新添加任务的情况 -----------
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
// 代码能运行当前的select()方法,是由于当时没有任务要执行。但仅仅是当时没有,
// 并不代表一直没有。这里就是处理当前具有了新的任务的情况
// 有任务就要先执行任务,所以要结果当前select()的执行。不过,在结束之前,
// 随带着查看一下是否有就绪的channel
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
// 非阻塞选择
selector.selectNow();
selectCnt = 1;
break; // 结束当前的select()
}
// 代码能走到这里说明当前既没有【马上】要执行的定时任务,又没有新的任务添加
// ------------------- 3 处理阻塞式选择的情况 -----------
// 阻塞式选择,其阻塞被唤醒的条件有五个:
// 1)发现就绪channel
// 2)selector的wakeup()方法被调用
// 3)当前线程被打断
// 4)阻塞时间超时
// 5)当出现大量的空轮询时,会使CPU占用率急剧飙升,出于对系统的保护,
// 该方法会提前终止
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// 若有就绪channel了,或wakeup()方法被调用了,或有了任务了,则直接结束select()
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
// 代码能走到这里,说明selector.select()的结束已经排除了第1)2)两种情况
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
// 代码能走到这里,说明selector.select()的结束已经排除了第1)2)3)三种情况,
// 即selector.select()的结束原因只可能是4)5)两种情况之一了
// ------------------- 4 解决NIO的Bug -----------
long time = System.nanoTime();
// 若该条件为true,则表示selector.select()是由于第4)种情况结束的
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
// 若该条件为false,则表示selector.select()是由于第5)种情况结束的
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
// 重构seletor
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
// 更新currentTimeNanos为当前时间,即记录下一轮for循环开始的时间点
currentTimeNanos = time;
} // end-for
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
private void processSelectedKeys() {
// 若优化过的set不为null,
if (selectedKeys != null) {
// 处理优化过的SelectedKeys
processSelectedKeysOptimized();
} else {
// 处理普通的SelectedKeys集合
// selector.selectedKeys() 为set集合
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
// 赋值为null的原因有两个:
// 1)当channel被关闭时便于做GC
// 2)避免重复处理
selectedKeys.keys[i] = null;
// 获取到key中的附件,附件中可以写入任何数据,
// 不过,对于NioEventLoop,其附件中存放的是原生channel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 处理selectedKey
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 处理key失效的情况
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
// 获取当前selectionKey(即就绪channel)的就绪事件
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// 处理连接就绪的情况(Server端不会出现该情况,只有Client才会发起连接Server的请求)
// 当第一个连接请求发出后,若连接成功,则成功;否则,连接就绪
// 连接就绪后该channel就会被selector选择。一旦选择了该channel,则处理就绪IO,
// 连接就绪的IO就是去连接Server端,完成连接
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
// 获取当前selectionKey中注册的当前channel所关注的就绪事件
int ops = k.interestOps();
// 将ops中的连接就绪位置0
ops &= ~SelectionKey.OP_CONNECT;
// 将置0后的ops再写入到selectionKey中,表示当前连接就绪已经处理完毕
k.interestOps(ops);
// 完成连接
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
// 处理写就绪的情况
// 什么是写就绪?当调用了writeAndFlush()后,将所有要发送的数据写入到用户缓存后,
// 写操作就绪了
// 写操作就绪后就可以处理就绪IO了。写操作就绪IO是,将用户缓存中的数据写入到网卡缓存中
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 处理读就绪与接收连接就绪的情况
// 读就绪是什么?就是通过网络client已经将数据写入到了网卡缓存。
// 接收连接就绪是什么?就是通过网络client已经将连接请求写入到了网卡缓存。
// 故,接收连接就绪其实就是读就绪的一种特例,因此,无论是读就绪还是接收连接就绪,
// 一旦就绪,就可以处理就绪的IO了。这些就绪IO就是将网卡缓存中的数据(有可能是连接请求)读取
// 到用户缓存,即会出现在channelRead()方法的msg变量中
// readyOps为0表示没有任何channel就绪,没有就绪的channel,为何要执行读就绪IO?
// 两个原因:
// 1)为了避免NIO的BUG
// 2)为了使没有就绪channel的IO执行时间拉长一些,从而使后面的任务队列中任务的执行时间不至于太短
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
处理普通的SelectedKeys集合
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
// 迭代集合
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
run中的runAllTasks()方法才是对任务真正执行
SingleThreadEventExecutor.java
/**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running
* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
*/
protected boolean runAllTasks(long timeoutNanos) {
// 从定时任务队列中取出所有需要马上执行的任务,并放入到taskQueue
fetchFromScheduledTaskQueue();
// 从任务队列获取一个任务
Runnable task = pollTask();
// 若该任务为null,则执行收尾任务
if (task == null) {
afterRunningAllTasks();
return false;
}
// 若取出的任务不空
// 计算出本次任务队列中的任务执行结束的时间点
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
// 计数器
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 执行任务
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
// 从这里可以知道,任务队列中任务的执行时间并不是一个控制十分精确的时间
if ((runTasks & 0x3F) == 0) { // runTasks % 64 == 0
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
} // end-for
// 执行收尾任务队列中的任务
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
AbstractEventExecutor.java
/**
* Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
*/
protected static void safeExecute(Runnable task) {
try {
// 任务的真正执行发生在这里
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
收尾队列中的任务afterRunningAllTasks()
SingleThreadEventLoop.java
@Override
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}
SingleThreadEventExecutor.java
/**
* Runs all tasks from the passed {@code taskQueue}.
*
* @param taskQueue To poll and execute all tasks.
*
* @return {@code true} if at least one task was executed.
*/
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
// 从任务队列获取一个任务
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
// 执行任务
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
// 从任务队列中获取一个非唤醒任务,唤醒任务即空任务,或标识性任务
for (;;) {
Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) {
continue;
}
return task;
}
}