前言
在传统的阻塞模型当中,一个线程当前只能处理一个请求,如果要支持并发的请求,就需要对每一个请求都建立一个线程,或者是建立一个线程池,将到来的请求交予线程池处理.
那么在Netty中,请求是从那部分开始被接收的,是交予自己处理呢? 还是交予线程去处理呢.
获取到来的请求
JDK NIO使用的方式.
final int select = selector.select();final Set<SelectionKey> selectionKeys = selector.selectedKeys();final Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()){//biz}
尽管 Netty 是在JDK的基础上进行了一次封装处理,但是本质上也离不开JDK本身的支持. 所以它必然也是需要获取哪些Key是就绪了的.
那么是在哪里获取的呢,在服务启动里边,我们提供了一个 boss 和 work 线程组,而上述获取key就绪的行为就发生在boss线程组当中.
获取就绪的 Channel
在服务启动当中曾经提到过2个方法,分别是
- register
- run
register 已经说明过了,而这里要描述的就是 run 方法.
方法比较长,一点一点往下走.
NioEventLoop.java 文件
@Overrideprotected void run() {for (;;) {try {//说明(1)try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}default:}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;//说明(2)if (ioRatio == 100) {try {processSelectedKeys();} finally {runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}
说明(1)
这部分可以看到主要是 select 操作,其对应了JDK NIO中的 select 操作. 主要用于获取准备就绪可以进行IO操作的 Channel
如果 haskTask() 返回的是 true ,则直接执行一次 selectNow() 操作,该操作不阻塞当前线程.
如果返回的是 false ,就会进入 SelectStrategy.SELECT ♻️
首先将 wakenUp 唤醒标志位设置为 false 并且获取当前的唤醒状态,并且执行一次 select(wakenUp) 操作.
如果 wakenUp 标志位又被设置成了true,将selector唤醒.
这里的核心就在于 select(wakenUp) 和 wakenUp 标志位的设置
- wakenUp何时设置为false
- 设置为false的情况比较简单,每次for循环开始之前,就会使用ACS将该状态位设置为
false.
- 设置为false的情况比较简单,每次for循环开始之前,就会使用ACS将该状态位设置为
- 又是何时设置为true,设置成true的情况分为2种,分别是
- 其他线程添加任务时触发,唤醒selector,主要是为了任务得到及时处理,从上文的描述我们可以知道,如果存在task的时候,selector是不进行阻塞
select操作的,而是selectNew. - 在执行
select(wakenUp)的过程中,如果有任务被提交了. 这个时候需要触发为true. 为什么呢? 因为在第一种提交任务的情况下,能够唤醒的前提是: wakenUp标志位是false,如果是true,这个任务就不能够唤醒selector,那么就可能存在idle超时的问题.
- 其他线程添加任务时触发,唤醒selector,主要是为了任务得到及时处理,从上文的描述我们可以知道,如果存在task的时候,selector是不进行阻塞
select(wakenUp) 相对就复杂的多. Netty针对这里做了优化,但是核心还是在于获取就绪的Channel
int selectedKeys = selector.select(timeoutMillis);
细节就不关注了,有兴趣可以瞅瞅.
说明(2)
在获取到就绪的Keys之后,就可以对返回的key进行遍历处理了。
ioRatio: 执行IO任务的比例,如果是100,则每次都要把所有提交的task都要执行完,默认是50. 大部分情况也不会进行修改. 所以会走 else 逻辑。
processSelectedKeys
private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}}
初始化时已经设置过了selectedKeys,所以selectedKeys不为null.
processSelectedKeysOptimized
private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];selectedKeys.keys[i] = null;//方便GCfinal Object a = k.attachment();if (a instanceof AbstractNioChannel) {//处理被选择的keyprocessSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {selectedKeys.reset(i + 1);selectAgain();i = -1;}}}
processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {//... inValid 处理,这里不关注return;}try {int readyOps = k.readyOps();//链接操作if (k.isConnectable()) {int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}//写操作if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}//读操作或者是链接操作.if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}
到这一步为止. 就可以筛选出哪些channel是可读,哪些是可写,那是是accept处理的。
这里我们就关注 unsafe.read() 即可.
