前言
在传统的阻塞模型当中,一个线程当前只能处理一个请求,如果要支持并发的请求,就需要对每一个请求都建立一个线程,或者是建立一个线程池,将到来的请求交予线程池处理.
那么在Netty中,请求是从那部分开始被接收的,是交予自己处理呢? 还是交予线程去处理呢.
获取到来的请求
JDK NIO使用的方式.
final int select = selector.select();
final Set<SelectionKey> selectionKeys = selector.selectedKeys();
final Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
//biz
}
尽管 Netty
是在JDK的基础上进行了一次封装处理,但是本质上也离不开JDK本身的支持. 所以它必然也是需要获取哪些Key是就绪了的.
那么是在哪里获取的呢,在服务启动里边,我们提供了一个 boss
和 work
线程组,而上述获取key就绪的行为就发生在boss线程组当中.
获取就绪的 Channel
在服务启动当中曾经提到过2个方法,分别是
- register
- run
register
已经说明过了,而这里要描述的就是 run
方法.
方法比较长,一点一点往下走.
NioEventLoop.java 文件
@Override
protected void run() {
for (;;) {
try {
//说明(1)
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
//说明(2)
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
说明(1)
这部分可以看到主要是 select
操作,其对应了JDK NIO中的 select
操作. 主要用于获取准备就绪可以进行IO操作的 Channel
如果 haskTask()
返回的是 true
,则直接执行一次 selectNow()
操作,该操作不阻塞当前线程.
如果返回的是 false
,就会进入 SelectStrategy.SELECT
♻️
首先将 wakenUp
唤醒标志位设置为 false
并且获取当前的唤醒状态,并且执行一次 select(wakenUp)
操作.
如果 wakenUp
标志位又被设置成了true,将selector唤醒.
这里的核心就在于 select(wakenUp)
和 wakenUp
标志位的设置
- wakenUp何时设置为false
- 设置为false的情况比较简单,每次for循环开始之前,就会使用ACS将该状态位设置为
false
.
- 设置为false的情况比较简单,每次for循环开始之前,就会使用ACS将该状态位设置为
- 又是何时设置为true,设置成true的情况分为2种,分别是
- 其他线程添加任务时触发,唤醒selector,主要是为了任务得到及时处理,从上文的描述我们可以知道,如果存在task的时候,selector是不进行阻塞
select
操作的,而是selectNew
. - 在执行
select(wakenUp)
的过程中,如果有任务被提交了. 这个时候需要触发为true. 为什么呢? 因为在第一种提交任务的情况下,能够唤醒的前提是: wakenUp标志位是false
,如果是true,这个任务就不能够唤醒selector,那么就可能存在idle
超时的问题.
- 其他线程添加任务时触发,唤醒selector,主要是为了任务得到及时处理,从上文的描述我们可以知道,如果存在task的时候,selector是不进行阻塞
select(wakenUp)
相对就复杂的多. Netty针对这里做了优化,但是核心还是在于获取就绪的Channel
int selectedKeys = selector.select(timeoutMillis);
细节就不关注了,有兴趣可以瞅瞅.
说明(2)
在获取到就绪的Keys之后,就可以对返回的key进行遍历处理了。
ioRatio: 执行IO任务的比例,如果是100,则每次都要把所有提交的task都要执行完,默认是50. 大部分情况也不会进行修改. 所以会走 else
逻辑。
processSelectedKeys
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
初始化时已经设置过了selectedKeys,所以selectedKeys不为null.
processSelectedKeysOptimized
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;//方便GC
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//处理被选择的key
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
//... inValid 处理,这里不关注
return;
}
try {
int readyOps = k.readyOps();
//链接操作
if (k.isConnectable()) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//写操作
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
//读操作或者是链接操作.
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
到这一步为止. 就可以筛选出哪些channel是可读,哪些是可写,那是是accept处理的。
这里我们就关注 unsafe.read()
即可.