NioEvenLoop.java

    接下来跟踪执行任务业务的run方法。

    1. @Override
    2. protected void run() {
    3. // 执行一个无限循环,在没有发生异常的情况下,其会永远执行下去
    4. for (;;) {
    5. try {
    6. try {
    7. // ---------------------------- 1 选择就绪channel --------------
    8. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
    9. case SelectStrategy.CONTINUE: // 对于NioEventLoop,其不支持该case
    10. continue;
    11. case SelectStrategy.BUSY_WAIT: // 对于NioEventLoop,其不支持该case
    12. // fall-through to SELECT since the busy-wait is not supported with NIO
    13. case SelectStrategy.SELECT:
    14. // 若当前任务队列中没有任务,则会执行该case,即执行阻塞式选择
    15. select(wakenUp.getAndSet(false));
    16. // 'wakenUp.compareAndSet(false, true)' is always evaluated
    17. // before calling 'selector.wakeup()' to reduce the wake-up
    18. // overhead. (Selector.wakeup() is an expensive operation.)
    19. //
    20. // However, there is a race condition in this approach.
    21. // The race condition is triggered when 'wakenUp' is set to
    22. // true too early.
    23. //
    24. // 'wakenUp' is set to true too early if:
    25. // 1) Selector is waken up between 'wakenUp.set(false)' and
    26. // 'selector.select(...)'. (BAD)
    27. // 2) Selector is waken up between 'selector.select(...)' and
    28. // 'if (wakenUp.get()) { ... }'. (OK)
    29. //
    30. // In the first case, 'wakenUp' is set to true and the
    31. // following 'selector.select(...)' will wake up immediately.
    32. // Until 'wakenUp' is set to false again in the next round,
    33. // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
    34. // any attempt to wake up the Selector will fail, too, causing
    35. // the following 'selector.select(...)' call to block
    36. // unnecessarily.
    37. //
    38. // To fix this problem, we wake up the selector again if wakenUp
    39. // is true immediately after selector.select(...).
    40. // It is inefficient in that it wakes up the selector for both
    41. // the first case (BAD - wake-up required) and the second case
    42. // (OK - no wake-up required).
    43. if (wakenUp.get()) {
    44. // 立即获取选择的结果
    45. selector.wakeup();
    46. }
    47. // fall through
    48. default:
    49. }
    50. } catch (IOException e) {
    51. // If we receive an IOException here its because the Selector is messed up. Let's rebuild
    52. // the selector and retry. https://github.com/netty/netty/issues/8566
    53. rebuildSelector0();
    54. handleLoopException(e);
    55. continue;
    56. }
    57. // -------------------------------- 2 处理就绪channel的IO ------------------
    58. cancelledKeys = 0;
    59. needsToSelectAgain = false;
    60. // 该值的取值范围为(0,100]
    61. final int ioRatio = this.ioRatio;
    62. if (ioRatio == 100) {
    63. try {
    64. processSelectedKeys();
    65. } finally {
    66. // Ensure we always run tasks.
    67. runAllTasks();
    68. }
    69. } else {
    70. // 获取当前时间,即就绪channel的IO开始执行的时间点
    71. final long ioStartTime = System.nanoTime();
    72. try {
    73. // 处理就绪channel的IO
    74. processSelectedKeys();
    75. } finally {
    76. // -------------------------------- 3 处理任务队列中的任务 ------------------
    77. // Ensure we always run tasks.
    78. // 计算出处理就绪channel的IO所用的时间
    79. final long ioTime = System.nanoTime() - ioStartTime;
    80. // ioTime * (100 - ioRatio) / ioRatio 为任务队列中的任务执行可以使用的时长
    81. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    82. }
    83. }
    84. } catch (Throwable t) {
    85. handleLoopException(t);
    86. }
    87. // Always handle shutdown even if the loop processing threw an exception.
    88. try {
    89. if (isShuttingDown()) {
    90. closeAll();
    91. if (confirmShutdown()) {
    92. return;
    93. }
    94. }
    95. } catch (Throwable t) {
    96. handleLoopException(t);
    97. }
    98. } // end-for
    99. }
    100. //执行阻塞式选择
    101. private void select(boolean oldWakenUp) throws IOException {
    102. Selector selector = this.selector;
    103. try {
    104. // 计数器:
    105. // 为0,表示当前尚未有选择出的就绪channel
    106. // 非0,表示已经执行过了选择操作
    107. int selectCnt = 0;
    108. // 当前时间,其也是for循环第一轮循环的起始时间点
    109. long currentTimeNanos = System.nanoTime();
    110. // delayNanos() 表示定时任务队列中第一个定时任务距离开始执行时间还有多久
    111. // selectDeadLineNanos 表示定时任务队列中第一个定时任务开始执行的时间点
    112. long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    113. for (;;) {
    114. // ------------------- 1 处理定时任务队列中的马上就到执行时间的第一个定时任务 -----------
    115. // 500000L 表示0.5毫秒 1000000L 表示1毫秒
    116. // “马上”是多久?小于0.5毫秒
    117. long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    118. if (timeoutMillis <= 0) { // 为true表示当前具有马上就到其的任务要执行
    119. // 在结束当前select()方法之前,随带着查看一下是否有就绪的channel
    120. if (selectCnt == 0) {
    121. // 非阻塞选择
    122. selector.selectNow();
    123. selectCnt = 1;
    124. }
    125. break;
    126. }
    127. // 代码能走到这里,说明当前没有【马上】要执行的定时任务
    128. // ------------------- 2 处理具有新添加任务的情况 -----------
    129. // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
    130. // Selector#wakeup. So we need to check task queue again before executing select operation.
    131. // If we don't, the task might be pended until select operation was timed out.
    132. // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
    133. // 代码能运行当前的select()方法,是由于当时没有任务要执行。但仅仅是当时没有,
    134. // 并不代表一直没有。这里就是处理当前具有了新的任务的情况
    135. // 有任务就要先执行任务,所以要结果当前select()的执行。不过,在结束之前,
    136. // 随带着查看一下是否有就绪的channel
    137. if (hasTasks() && wakenUp.compareAndSet(false, true)) {
    138. // 非阻塞选择
    139. selector.selectNow();
    140. selectCnt = 1;
    141. break; // 结束当前的select()
    142. }
    143. // 代码能走到这里说明当前既没有【马上】要执行的定时任务,又没有新的任务添加
    144. // ------------------- 3 处理阻塞式选择的情况 -----------
    145. // 阻塞式选择,其阻塞被唤醒的条件有五个:
    146. // 1)发现就绪channel
    147. // 2)selector的wakeup()方法被调用
    148. // 3)当前线程被打断
    149. // 4)阻塞时间超时
    150. // 5)当出现大量的空轮询时,会使CPU占用率急剧飙升,出于对系统的保护,
    151. // 该方法会提前终止
    152. int selectedKeys = selector.select(timeoutMillis);
    153. selectCnt ++;
    154. // 若有就绪channel了,或wakeup()方法被调用了,或有了任务了,则直接结束select()
    155. if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
    156. // - Selected something,
    157. // - waken up by user, or
    158. // - the task queue has a pending task.
    159. // - a scheduled task is ready for processing
    160. break;
    161. }
    162. // 代码能走到这里,说明selector.select()的结束已经排除了第1)2)两种情况
    163. if (Thread.interrupted()) {
    164. // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
    165. // As this is most likely a bug in the handler of the user or it's client library we will
    166. // also log it.
    167. //
    168. // See https://github.com/netty/netty/issues/2426
    169. if (logger.isDebugEnabled()) {
    170. logger.debug("Selector.select() returned prematurely because " +
    171. "Thread.currentThread().interrupt() was called. Use " +
    172. "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
    173. }
    174. selectCnt = 1;
    175. break;
    176. }
    177. // 代码能走到这里,说明selector.select()的结束已经排除了第1)2)3)三种情况,
    178. // 即selector.select()的结束原因只可能是4)5)两种情况之一了
    179. // ------------------- 4 解决NIO的Bug -----------
    180. long time = System.nanoTime();
    181. // 若该条件为true,则表示selector.select()是由于第4)种情况结束的
    182. if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
    183. // timeoutMillis elapsed without anything selected.
    184. selectCnt = 1;
    185. // 若该条件为false,则表示selector.select()是由于第5)种情况结束的
    186. } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
    187. selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
    188. // The code exists in an extra method to ensure the method is not too big to inline as this
    189. // branch is not very likely to get hit very frequently.
    190. // 重构seletor
    191. selector = selectRebuildSelector(selectCnt);
    192. selectCnt = 1;
    193. break;
    194. }
    195. // 更新currentTimeNanos为当前时间,即记录下一轮for循环开始的时间点
    196. currentTimeNanos = time;
    197. } // end-for
    198. if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
    199. if (logger.isDebugEnabled()) {
    200. logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
    201. selectCnt - 1, selector);
    202. }
    203. }
    204. } catch (CancelledKeyException e) {
    205. if (logger.isDebugEnabled()) {
    206. logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
    207. selector, e);
    208. }
    209. // Harmless exception - log anyway
    210. }
    211. }
    212. private void processSelectedKeys() {
    213. // 若优化过的set不为null,
    214. if (selectedKeys != null) {
    215. // 处理优化过的SelectedKeys
    216. processSelectedKeysOptimized();
    217. } else {
    218. // 处理普通的SelectedKeys集合
    219. // selector.selectedKeys() 为set集合
    220. processSelectedKeysPlain(selector.selectedKeys());
    221. }
    222. }
    223. private void processSelectedKeysOptimized() {
    224. for (int i = 0; i < selectedKeys.size; ++i) {
    225. final SelectionKey k = selectedKeys.keys[i];
    226. // null out entry in the array to allow to have it GC'ed once the Channel close
    227. // See https://github.com/netty/netty/issues/2363
    228. // 赋值为null的原因有两个:
    229. // 1)当channel被关闭时便于做GC
    230. // 2)避免重复处理
    231. selectedKeys.keys[i] = null;
    232. // 获取到key中的附件,附件中可以写入任何数据,
    233. // 不过,对于NioEventLoop,其附件中存放的是原生channel
    234. final Object a = k.attachment();
    235. if (a instanceof AbstractNioChannel) {
    236. // 处理selectedKey
    237. processSelectedKey(k, (AbstractNioChannel) a);
    238. } else {
    239. @SuppressWarnings("unchecked")
    240. NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    241. processSelectedKey(k, task);
    242. }
    243. if (needsToSelectAgain) {
    244. // null out entries in the array to allow to have it GC'ed once the Channel close
    245. // See https://github.com/netty/netty/issues/2363
    246. selectedKeys.reset(i + 1);
    247. selectAgain();
    248. i = -1;
    249. }
    250. }
    251. }
    252. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    253. final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    254. // 处理key失效的情况
    255. if (!k.isValid()) {
    256. final EventLoop eventLoop;
    257. try {
    258. eventLoop = ch.eventLoop();
    259. } catch (Throwable ignored) {
    260. // If the channel implementation throws an exception because there is no event loop, we ignore this
    261. // because we are only trying to determine if ch is registered to this event loop and thus has authority
    262. // to close ch.
    263. return;
    264. }
    265. // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
    266. // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
    267. // still healthy and should not be closed.
    268. // See https://github.com/netty/netty/issues/5125
    269. if (eventLoop != this || eventLoop == null) {
    270. return;
    271. }
    272. // close the channel if the key is not valid anymore
    273. unsafe.close(unsafe.voidPromise());
    274. return;
    275. }
    276. try {
    277. // 获取当前selectionKey(即就绪channel)的就绪事件
    278. int readyOps = k.readyOps();
    279. // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
    280. // the NIO JDK channel implementation may throw a NotYetConnectedException.
    281. // 处理连接就绪的情况(Server端不会出现该情况,只有Client才会发起连接Server的请求)
    282. // 当第一个连接请求发出后,若连接成功,则成功;否则,连接就绪
    283. // 连接就绪后该channel就会被selector选择。一旦选择了该channel,则处理就绪IO,
    284. // 连接就绪的IO就是去连接Server端,完成连接
    285. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    286. // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    287. // See https://github.com/netty/netty/issues/924
    288. // 获取当前selectionKey中注册的当前channel所关注的就绪事件
    289. int ops = k.interestOps();
    290. // 将ops中的连接就绪位置0
    291. ops &= ~SelectionKey.OP_CONNECT;
    292. // 将置0后的ops再写入到selectionKey中,表示当前连接就绪已经处理完毕
    293. k.interestOps(ops);
    294. // 完成连接
    295. unsafe.finishConnect();
    296. }
    297. // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
    298. // 处理写就绪的情况
    299. // 什么是写就绪?当调用了writeAndFlush()后,将所有要发送的数据写入到用户缓存后,
    300. // 写操作就绪了
    301. // 写操作就绪后就可以处理就绪IO了。写操作就绪IO是,将用户缓存中的数据写入到网卡缓存中
    302. if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    303. // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
    304. ch.unsafe().forceFlush();
    305. }
    306. // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
    307. // to a spin loop
    308. // 处理读就绪与接收连接就绪的情况
    309. // 读就绪是什么?就是通过网络client已经将数据写入到了网卡缓存。
    310. // 接收连接就绪是什么?就是通过网络client已经将连接请求写入到了网卡缓存。
    311. // 故,接收连接就绪其实就是读就绪的一种特例,因此,无论是读就绪还是接收连接就绪,
    312. // 一旦就绪,就可以处理就绪的IO了。这些就绪IO就是将网卡缓存中的数据(有可能是连接请求)读取
    313. // 到用户缓存,即会出现在channelRead()方法的msg变量中
    314. // readyOps为0表示没有任何channel就绪,没有就绪的channel,为何要执行读就绪IO?
    315. // 两个原因:
    316. // 1)为了避免NIO的BUG
    317. // 2)为了使没有就绪channel的IO执行时间拉长一些,从而使后面的任务队列中任务的执行时间不至于太短
    318. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    319. unsafe.read();
    320. }
    321. } catch (CancelledKeyException ignored) {
    322. unsafe.close(unsafe.voidPromise());
    323. }
    324. }
    325. 处理普通的SelectedKeys集合
    326. private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    327. // check if the set is empty and if so just return to not create garbage by
    328. // creating a new Iterator every time even if there is nothing to process.
    329. // See https://github.com/netty/netty/issues/597
    330. if (selectedKeys.isEmpty()) {
    331. return;
    332. }
    333. // 迭代集合
    334. Iterator<SelectionKey> i = selectedKeys.iterator();
    335. for (;;) {
    336. final SelectionKey k = i.next();
    337. final Object a = k.attachment();
    338. i.remove();
    339. if (a instanceof AbstractNioChannel) {
    340. processSelectedKey(k, (AbstractNioChannel) a);
    341. } else {
    342. @SuppressWarnings("unchecked")
    343. NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    344. processSelectedKey(k, task);
    345. }
    346. if (!i.hasNext()) {
    347. break;
    348. }
    349. if (needsToSelectAgain) {
    350. selectAgain();
    351. selectedKeys = selector.selectedKeys();
    352. // Create the iterator again to avoid ConcurrentModificationException
    353. if (selectedKeys.isEmpty()) {
    354. break;
    355. } else {
    356. i = selectedKeys.iterator();
    357. }
    358. }
    359. }
    360. }

    run中的runAllTasks()方法才是对任务真正执行

    SingleThreadEventExecutor.java

    1. /**
    2. * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running
    3. * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
    4. */
    5. protected boolean runAllTasks(long timeoutNanos) {
    6. // 从定时任务队列中取出所有需要马上执行的任务,并放入到taskQueue
    7. fetchFromScheduledTaskQueue();
    8. // 从任务队列获取一个任务
    9. Runnable task = pollTask();
    10. // 若该任务为null,则执行收尾任务
    11. if (task == null) {
    12. afterRunningAllTasks();
    13. return false;
    14. }
    15. // 若取出的任务不空
    16. // 计算出本次任务队列中的任务执行结束的时间点
    17. final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    18. // 计数器
    19. long runTasks = 0;
    20. long lastExecutionTime;
    21. for (;;) {
    22. // 执行任务
    23. safeExecute(task);
    24. runTasks ++;
    25. // Check timeout every 64 tasks because nanoTime() is relatively expensive.
    26. // XXX: Hard-coded value - will make it configurable if it is really a problem.
    27. // 从这里可以知道,任务队列中任务的执行时间并不是一个控制十分精确的时间
    28. if ((runTasks & 0x3F) == 0) { // runTasks % 64 == 0
    29. lastExecutionTime = ScheduledFutureTask.nanoTime();
    30. if (lastExecutionTime >= deadline) {
    31. break;
    32. }
    33. }
    34. task = pollTask();
    35. if (task == null) {
    36. lastExecutionTime = ScheduledFutureTask.nanoTime();
    37. break;
    38. }
    39. } // end-for
    40. // 执行收尾任务队列中的任务
    41. afterRunningAllTasks();
    42. this.lastExecutionTime = lastExecutionTime;
    43. return true;
    44. }

    AbstractEventExecutor.java

    1. /**
    2. * Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
    3. */
    4. protected static void safeExecute(Runnable task) {
    5. try {
    6. // 任务的真正执行发生在这里
    7. task.run();
    8. } catch (Throwable t) {
    9. logger.warn("A task raised an exception. Task: {}", task, t);
    10. }
    11. }

    收尾队列中的任务afterRunningAllTasks()

    SingleThreadEventLoop.java

    1. @Override
    2. protected void afterRunningAllTasks() {
    3. runAllTasksFrom(tailTasks);
    4. }

    SingleThreadEventExecutor.java

    1. /**
    2. * Runs all tasks from the passed {@code taskQueue}.
    3. *
    4. * @param taskQueue To poll and execute all tasks.
    5. *
    6. * @return {@code true} if at least one task was executed.
    7. */
    8. protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    9. // 从任务队列获取一个任务
    10. Runnable task = pollTaskFrom(taskQueue);
    11. if (task == null) {
    12. return false;
    13. }
    14. for (;;) {
    15. // 执行任务
    16. safeExecute(task);
    17. task = pollTaskFrom(taskQueue);
    18. if (task == null) {
    19. return true;
    20. }
    21. }
    22. }
    23. protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
    24. // 从任务队列中获取一个非唤醒任务,唤醒任务即空任务,或标识性任务
    25. for (;;) {
    26. Runnable task = taskQueue.poll();
    27. if (task == WAKEUP_TASK) {
    28. continue;
    29. }
    30. return task;
    31. }
    32. }