本节讲解 apache-tomcat-7.0.32-src
源码中 ConcurrentLinkedQueue 的使用。首先介绍 Tomcat 的容器结构以及 NioEndPoint 的作用,以便后面能够更加平滑地切入话题,如图 11-4 所示是 Tomcat 的容器结构。
图 11-4
其中,Connector 是一个桥梁,它把 Server 和 Engine 连接了起来,Connector 的作用是接受客户端的请求,然后把请求委托给 Engine 容器处理。在 Connector 的内部具体使用 Endpoint 进行处理,根据处理方式的不同 Endpoint 可分为 NioEndpoint、JIoEndpoint、AprEndpoint。本节介绍 NioEndpoint 中的并发组件队列的使用。为了让读者更好地理解,有必要先说下 NioEndpoint 的作用。首先来看 NioEndpoint 中的三大组件的关系图(见图 11-5)。
图 11-5
● Acceptor 是套接字接受线程(Socket acceptor thread),用来接受用户的请求,并把请求封装为事件任务放入 Poller 的队列,一个 Connector 里面只有一个 Acceptor。
● Poller 是套接字处理线程(Socket poller thread),每个 Poller 内部都有一个独有的队列,Poller 线程则从自己的队列里面获取具体的事件任务,然后将其交给 Worker 进行处理。Poller 线程的个数与处理器的核数有关,代码如下。
protected int pollerThreadCount = Math.min(2, Runtime.getRuntime().
availableProcessors());
这里最多有 2 个 Poller 线程。
● Worker 是实际处理请求的线程,Worker 只是组件名字,真正做事情的是 SocketProcessor,它是 Poller 线程从自己的队列获取任务后的真正任务执行者。
可见,Tomcat 使用队列把接受请求与处理请求操作进行解耦,实现异步处理。其实 Tomcat 中 NioEndPoint 中的每个 Poller 里面都维护一个 ConcurrentLinkedQueue,用来缓存请求任务,其本身也是一个多生产者-单消费者模型。
生产者——Acceptor 线程
Acceptor 线程的作用是接受客户端发来的连接请求并将其放入 Poller 的事件队列。首先看下 Acceptor 处理请求的简明时序图(见图 11-6)。
图 11-6
下面分析 Acceptor 的源码,看其如何把接受的套接字连接放入队列。
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
//(1) 一直循环直到接收到 shutdown 命令
while (running) {
...
if (! running) {
break;
}
state = AcceptorState.RUNNING;
try {
//(2)如果达到 max connections 个请求则等待
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
//(3)从 TCP 缓存获取一个完成三次握手的套接字,没有则阻塞
socket = serverSock.accept();
} catch (IOException ioe) {
...
}
errorDelay = 0;
if (running && ! paused) {
//(4)设置套接字参数并封装套接字为事件任务,然后放入 Poller 的队列
if (! setSocketOptions(socket)) {
countDownConnection();
closeSocket(socket);
}
} else {
countDownConnection();
closeSocket(socket);
}
....
} catch (SocketTimeoutException sx) {
....
}
state = AcceptorState.ENDED;
}
}
代码(1)中的无限循环用来一直等待客户端的连接,循环退出条件是调用了 shutdown 命令。
代码(2)用来控制客户端的请求连接数量,如果连接数量达到设置的阈值,则当前请求会被挂起。
代码(3)从 TCP 缓存获取一个完成三次握手的套接字,如果当前没有,则当前线程会被阻塞挂起。
当代码(3)获取到一个连接套接字后,代码(4)会调用 setSocketOptions 设置该套接字。
protected boolean setSocketOptions(SocketChannel socket) {
// 处理链接
try {
...
//封装链接套接字为 channel 并注册到 Poller 队列
getPoller0().register(channel);
} catch (Throwable t) {
...
return false;
}
return true;
}
代码(5)将连接套接字封装为一个 channel 对象,并将其注册到 poller 对象的队列。
//具体注册到事件队列
public void register(final NioChannel socket) {
...
PollerEvent r = eventCache.poll();
ka.interestOps(SelectionKey.OP_READ); //this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket, ka, OP_REGISTER);
else r.reset(socket, ka, OP_REGISTER);
addEvent(r);
}
public void addEvent(Runnable event) {
events.offer(event);
...
}
其中,events 的定义如下:
protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
由此可见,events 是一个无界队列 ConcurrentLinkedQueue,根据前文讲的,使用队列作为同步转异步的方式要注意设置队列大小,否则可能造成 OOM。当然 Tomcat 肯定不会忽略这个问题,从代码(2)可以看出,Tomcat 让用户配置了一个最大连接数,超过这个数则会等待。
消费者——Poller 线程
Poller 线程的作用是从事件队列里面获取事件并进行处理。首先我们从时序图来全局了解下 Poller 线程的处理逻辑(见图 11-7)。
图 11-7
同理,我们看一下 Poller 线程的 run 方法代码逻辑。
public void run() {
while (true) {
try {
...
if (close) {
...
} else {
//(6)从事件队列获取事件
hasEvents = events();
}
try {
...
} catch ( NullPointerException x ) {...
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
//(7)遍历所有注册的 channel 并对感兴趣的事件进行处理
while (iterator ! = null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
if (attachment == null) {
iterator.remove();
} else {
attachment.access();
iterator.remove();
//(8)具体调用 SocketProcessor 进行处理
processKey(sk, attachment);
}
}//while
...
} catch (OutOfMemoryError oom) {
...
}
}//while
...
}
其中,代码(6)从 poller 的事件队列获取一个事件,events()的代码如下。
public boolean events() {
boolean result = false;
//从队列获取任务并执行
Runnable r = null;
while ( (r = events.poll()) ! = null ) {
result = true;
try {
r.run();
...
} catch ( Throwable x ) {
...
}
}
return result;
}
这里是使用循环来实现的,目的是为了避免虚假唤醒。
其中代码(7)和代码(8)则遍历所有注册的 channel,并对感兴趣的事件进行处理。
public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
try {
...
SocketProcessor sc = processorCache.poll();
if ( sc == null ) sc = new SocketProcessor(socket, status);
else sc.reset(socket, status);
if ( dispatch && getExecutor()! =null ) getExecutor().execute(sc);
else sc.run();
} catch (RejectedExecutionException rx) {
...
} catch (Throwable t) {
...
return false;
}
return true;
}
小结
本节通过分析 Tomcat 中 NioEndPoint 的实现源码介绍了并发组件 ConcurrentLinkedQueue 的使用。NioEndPoint 的思想也是使用队列将同步转为异步,并且由于 ConcurrentLinkedQueue 是无界队列,所以需要让用户提供一个设置队列大小的接口以防止队列元素过多导致 OOM。