服务端启动完成后,服务就可以接受外部流量了.

前言

在传统的阻塞模型当中,一个线程当前只能处理一个请求,如果要支持并发的请求,就需要对每一个请求都建立一个线程,或者是建立一个线程池,将到来的请求交予线程池处理.
那么在Netty中,请求是从那部分开始被接收的,是交予自己处理呢? 还是交予线程去处理呢.

获取到来的请求

JDK NIO使用的方式.

  1. final int select = selector.select();
  2. final Set<SelectionKey> selectionKeys = selector.selectedKeys();
  3. final Iterator<SelectionKey> iterator = selectionKeys.iterator();
  4. while (iterator.hasNext()){
  5. //biz
  6. }

尽管 Netty 是在JDK的基础上进行了一次封装处理,但是本质上也离不开JDK本身的支持. 所以它必然也是需要获取哪些Key是就绪了的.
那么是在哪里获取的呢,在服务启动里边,我们提供了一个 bosswork 线程组,而上述获取key就绪的行为就发生在boss线程组当中.

获取就绪的 Channel

在服务启动当中曾经提到过2个方法,分别是

  • register
  • run

register 已经说明过了,而这里要描述的就是 run 方法.
方法比较长,一点一点往下走.

NioEventLoop.java 文件

  1. @Override
  2. protected void run() {
  3. for (;;) {
  4. try {
  5. //说明(1)
  6. try {
  7. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
  8. case SelectStrategy.CONTINUE:
  9. continue;
  10. case SelectStrategy.BUSY_WAIT:
  11. case SelectStrategy.SELECT:
  12. select(wakenUp.getAndSet(false));
  13. if (wakenUp.get()) {
  14. selector.wakeup();
  15. }
  16. default:
  17. }
  18. } catch (IOException e) {
  19. rebuildSelector0();
  20. handleLoopException(e);
  21. continue;
  22. }
  23. cancelledKeys = 0;
  24. needsToSelectAgain = false;
  25. final int ioRatio = this.ioRatio;
  26. //说明(2)
  27. if (ioRatio == 100) {
  28. try {
  29. processSelectedKeys();
  30. } finally {
  31. runAllTasks();
  32. }
  33. } else {
  34. final long ioStartTime = System.nanoTime();
  35. try {
  36. processSelectedKeys();
  37. } finally {
  38. final long ioTime = System.nanoTime() - ioStartTime;
  39. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  40. }
  41. }
  42. } catch (Throwable t) {
  43. handleLoopException(t);
  44. }
  45. try {
  46. if (isShuttingDown()) {
  47. closeAll();
  48. if (confirmShutdown()) {
  49. return;
  50. }
  51. }
  52. } catch (Throwable t) {
  53. handleLoopException(t);
  54. }
  55. }
  56. }

代码虽然又一大截,但是主要氛分为2块来看.

说明(1)

这部分可以看到主要是 select 操作,其对应了JDK NIO中的 select 操作. 主要用于获取准备就绪可以进行IO操作的 Channel
如果 haskTask() 返回的是 true ,则直接执行一次 selectNow() 操作,该操作不阻塞当前线程.
如果返回的是 false ,就会进入 SelectStrategy.SELECT ♻️
首先将 wakenUp 唤醒标志位设置为 false 并且获取当前的唤醒状态,并且执行一次 select(wakenUp) 操作.
如果 wakenUp 标志位又被设置成了true,将selector唤醒.

这里的核心就在于 select(wakenUp)wakenUp 标志位的设置

  • wakenUp何时设置为false
    • 设置为false的情况比较简单,每次for循环开始之前,就会使用ACS将该状态位设置为 false.
  • 又是何时设置为true,设置成true的情况分为2种,分别是
    • 其他线程添加任务时触发,唤醒selector,主要是为了任务得到及时处理,从上文的描述我们可以知道,如果存在task的时候,selector是不进行阻塞 select 操作的,而是 selectNew .
    • 在执行 select(wakenUp) 的过程中,如果有任务被提交了. 这个时候需要触发为true. 为什么呢? 因为在第一种提交任务的情况下,能够唤醒的前提是: wakenUp标志位是 false ,如果是true,这个任务就不能够唤醒selector,那么就可能存在 idle 超时的问题.

select(wakenUp) 相对就复杂的多. Netty针对这里做了优化,但是核心还是在于获取就绪的Channel

  1. int selectedKeys = selector.select(timeoutMillis);

细节就不关注了,有兴趣可以瞅瞅.

说明(2)

在获取到就绪的Keys之后,就可以对返回的key进行遍历处理了。
ioRatio: 执行IO任务的比例,如果是100,则每次都要把所有提交的task都要执行完,默认是50. 大部分情况也不会进行修改. 所以会走 else 逻辑。

processSelectedKeys
  1. private void processSelectedKeys() {
  2. if (selectedKeys != null) {
  3. processSelectedKeysOptimized();
  4. } else {
  5. processSelectedKeysPlain(selector.selectedKeys());
  6. }
  7. }

初始化时已经设置过了selectedKeys,所以selectedKeys不为null.

processSelectedKeysOptimized
  1. private void processSelectedKeysOptimized() {
  2. for (int i = 0; i < selectedKeys.size; ++i) {
  3. final SelectionKey k = selectedKeys.keys[i];
  4. selectedKeys.keys[i] = null;//方便GC
  5. final Object a = k.attachment();
  6. if (a instanceof AbstractNioChannel) {
  7. //处理被选择的key
  8. processSelectedKey(k, (AbstractNioChannel) a);
  9. } else {
  10. @SuppressWarnings("unchecked")
  11. NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
  12. processSelectedKey(k, task);
  13. }
  14. if (needsToSelectAgain) {
  15. selectedKeys.reset(i + 1);
  16. selectAgain();
  17. i = -1;
  18. }
  19. }
  20. }

processSelectedKey
  1. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  2. final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
  3. if (!k.isValid()) {
  4. //... inValid 处理,这里不关注
  5. return;
  6. }
  7. try {
  8. int readyOps = k.readyOps();
  9. //链接操作
  10. if (k.isConnectable()) {
  11. int ops = k.interestOps();
  12. ops &= ~SelectionKey.OP_CONNECT;
  13. k.interestOps(ops);
  14. unsafe.finishConnect();
  15. }
  16. //写操作
  17. if ((readyOps & SelectionKey.OP_WRITE) != 0) {
  18. // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
  19. ch.unsafe().forceFlush();
  20. }
  21. //读操作或者是链接操作.
  22. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  23. unsafe.read();
  24. }
  25. } catch (CancelledKeyException ignored) {
  26. unsafe.close(unsafe.voidPromise());
  27. }
  28. }

到这一步为止. 就可以筛选出哪些channel是可读,哪些是可写,那是是accept处理的。

这里我们就关注 unsafe.read() 即可.