传统 IO 模型
对于传统的 I/O 通信方式来说,客户端连接到服务端,服务端接收客户端请求并响应的流程为:
读取(read) -> 解码(decode) -> 应用处理(compute) -> 编码(encode) -> 发送结果(send)
服务端会为每一个客户端连接新建一个线程,建立通道,从而处理后续的请求,也就是 BIO 的方式。
这种方式在客户端数量不断增加的情况下,对于连接和请求的响应会急剧下降,并且占用太多线程浪费资源,线程数量也不是没有上限的,会遇到各种瓶颈。虽然可以使用线程池进行优化,但是依然有诸多问题,比如在线程池中所有线程都在处理请求时,无法响应其他的客户端连接,每个客户端依旧需要专门的服务端线程来服务,即使此时客户端无请求,也处于阻塞状态无法释放。基于此,提出了基于事件驱动的 Reactor 模型。
Reactor 模型
Reactor 模式是基于事件驱动开发的,服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,因此 Reactor 模式也叫 Dispatcher 模式,即 I/O 多路复用统一监听事件,收到事件后分发(Dispatch 给某进程),这是编写高性能网络服务器的必备技术之一。
因为这种模式是符合大规模生产的需求的。我们生活中遍地都是类似的模式。比如去咖啡店喝咖啡,你点了一杯咖啡在一旁喝着,服务员也不会管你,等你有续杯需求的时候,再去和服务员提(触发事件),服务员满足了你的需求,你就继续可以喝着咖啡玩手机。整个柜台的服务方式就是一个事件驱动的方式。
Reactor 模式以 NIO 为底层支持,核心组成部分包括 Reactor 和 Handler:
- Reactor 在一个单独的线程中运行,负责监听事件并分发给适当的处理程序来对 I/O 事件做出反应。它就像公司的电话接线员,接听来自客户的电话并将线路转移到适当的联系人。
- Handler 负责处理 I/O 事件要完成的实际事件,Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。类似于客户想要与之交谈的公司中的实际员工。
根据 Reactor 的数量和 Handler 线程数量,可以将 Reactor 分为三种模型:
- 单线程模型(单 Reactor 单线程)
- 多线程模型(单 Reactor 多线程)
- 主从多线程模型(多 Reactor 多线程)
1. 单线程
单线程模型中只有一个 Reactor 线程,Reactor 内部通过 Selector 监控连接事件,收到事件后通过 dispatch 进行分发,如果是连接建立的事件,则由 Acceptor 进行处理,Acceptor 通过 accept 接受连接,并创建一个 Handler 来处理该连接后续的各种事件,如果是读写事件,则直接调用连接对应的 Handler 来处理。
因此,Reactor 线程不断监听事件,当监听到连接事件就分发给 Acceptor 进行处理,如果监听到读写事件就直接调用连接对应的 Handler 来处理,Handler 负责完成 decode -> compute -> encode 的业务流程。
这种模型好处是简单,坏处却很明显。和 I/O 事件处理相比,应用程序的业务逻辑处理一般是比较耗时的,比如 XML 文件的解析、数据库记录的查找、文件资料的读取和传输、计算型工作的处理等,当某个 Handler 一直阻塞时,会导致其他客户端的 handler 和 accpetor 都得不到执行,无法做到高性能,只适用于业务处理非常快速的场景,例如 redis 读写操作。
2. 多线程
主线程中,Reactor 对象通过 Selector 监控连接事件,收到事件后通过 dispatch 进行分发,如果是连接建立事件,还是交由 Acceptor 处理,Acceptor 通过 accept 接收连接,并创建一个 Handler 来处理后续事件,而 Handler 只负责响应事件,不进行业务操作,也就是只进行 read 读取数据和 write 写出数据,真正的业务处理交给一个 worker 线程池进行处理。
在多线程模型中,Reactor 主线程只负责处理 I/O 相关的工作,decode、compute、encode 这种耗时的工作放置到另外的 worker 线程池中,线程池分配一个空闲的线程完成真正的业务处理,当工作线程执行完成后再将响应结果交给 Reactor 主线程的 Handler 进行处理,Handler 通过套接字将结果 send 给客户端。这样工作线程就和 Reactor 主线程解耦,是一个比较明智的选择。
在这种线程模型下,单 Reactor 线程承担了所有事件的监听和响应,当我们的服务端遇到大量的客户端同时进行连接,或者在请求连接时执行一些耗时操作,比如身份认证,权限检查等,这种瞬时的高并发就容易成为性能瓶颈,在实战中的表现可能就是客户端连接成功率偏低。
再者,新的硬件技术不断发展,多核多路 CPU 已经得到极大的应用,单 reactor 反应堆模式看着大把的 CPU 资源却不用,有点可惜。下面我们就将 acceptor 上的连接建立事件和已建立连接的 I/O 事件分离,形成所谓的主 - 从 reactor 模式。
3. 主从多线程
主 - 从这个模式的核心思想是,主反应堆(mainReactor)线程只负责分发 Acceptor 连接建立事件,已连接套接字上的 I/O 事件交给 subReactor 负责分发。其中 subReactor 的数量可以根据 CPU 的核数灵活设置。
在主 - 从模型中存在多个 Reactor 线程,每个 Reactor 都有自己的 Selector 选择器,线程和 dispatch。其中主反应堆(mainReactor)线程通过自己的 Selector 监听连接建立事件,收到事件后交由 Accpetor 处理,Acceptor 通过 accept 接收连接,并将新建立的连接分配给某个 subReactor 线程。
然后,subReactor 线程将 mainReactor 分配的连接加入连接队列中并通过自己的 Selector 进行监听,当监听到这个连接上的读写事件时,会创建一个 Handler 用于处理后续事件,这个 Handler 会完成 read -> 业务处理 -> send 的完整业务流程。
在这个 Handler 处理过程中,为了解决了业务逻辑和 I/O 分发之间的耦合问题,这里同多线程模型一样,也采用了 worker 线程池,专门用来处理复杂的业务逻辑(decode -> compute -> encode)。
参考链接:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
下面示例展示了 Java NIO 对 Reactor 模型的具体实现:
package org.xl.nio.demo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Java NIO for Reactor
*/
public class NioReactor {
/**
* Main Reactor
*/
private final ReactorThread[] mainReactorThreads = new ReactorThread[1];
/**
* Sub Reactor
*/
private final ReactorThread[] subReactorThreads = new ReactorThread[8];
/**
* Worker ThreadPool
*/
private static final ExecutorService workPool = Executors.newCachedThreadPool();
private abstract static class ReactorThread extends Thread {
private final Selector selector;
private final LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
private volatile boolean running = false;
private ReactorThread() throws IOException {
selector = Selector.open();
}
/**
* Selector监听到有事件后,调用这个方法
*/
public abstract void handler(SelectableChannel channel) throws Exception;
@Override
public void run() {
while (running) {
try {
// 从队列中获取要执行的任务
Runnable task;
while ((task = taskQueue.poll()) != null) {
task.run();
}
selector.select(1000);
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> iter = selected.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
// 关注 Read 和 Accept两个事件
if (key.isReadable() || key.isAcceptable()) {
try {
SelectableChannel channel = (SelectableChannel) key.attachment();
channel.configureBlocking(false);
handler(channel);
} catch (Exception ex) {
// 如果有异常,就取消这个KEY的订阅
key.cancel();
}
}
}
selector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private SelectionKey register(SelectableChannel channel) throws Exception {
// 为什么register要以任务提交的形式,让reactor线程去处理?
// 因为线程在执行channel注册到selector的过程中,会和调用selector.select()方法的线程争用同一把锁
// 而select()方法是在eventLoop中通过while循环调用的,争抢的可能性很高,为了让register能更快的执行,就放到同一个线程来处理
FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel));
taskQueue.add(futureTask);
return futureTask.get();
}
private void doStart() {
if (!running) {
running = true;
start();
}
}
}
/**
* 初始化Main Reactor线程组, 只做请求分发,不做具体的数据读取
*/
private void initMainReactorGroup() throws IOException {
for (int i = 0; i < mainReactorThreads.length; i++) {
mainReactorThreads[i] = new ReactorThread() {
private final AtomicInteger incr = new AtomicInteger(0);
@Override
public void handler(SelectableChannel channel) throws Exception {
ServerSocketChannel ch = (ServerSocketChannel) channel;
SocketChannel socketChannel = ch.accept();
socketChannel.configureBlocking(false);
// 收到连接建立的通知之后,分发给I/O线程继续去读取数据
int index = incr.getAndIncrement() % subReactorThreads.length;
ReactorThread workEventLoop = subReactorThreads[index];
workEventLoop.doStart();
SelectionKey selectionKey = workEventLoop.register(socketChannel);
selectionKey.interestOps(SelectionKey.OP_READ);
System.out.println(Thread.currentThread().getName() + "收到新连接 : " + socketChannel.getRemoteAddress());
}
};
}
}
/**
* 初始化Sub Reactor线程组, 负责处理客户端连接以后socketChannel的IO读写
*/
private void initSubReactorGroup() throws IOException {
for (int i = 0; i < subReactorThreads.length; i++) {
subReactorThreads[i] = new ReactorThread() {
@Override
public void handler(SelectableChannel channel) throws IOException {
SocketChannel ch = (SocketChannel) channel;
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
ch.read(requestBuffer);
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println(Thread.currentThread().getName() + "收到数据, 来自:" + ch.getRemoteAddress());
workPool.submit(() -> {
// 拿到数据后,可以执行一些业务操作 eg:数据库、RPC
});
ByteBuffer buffer = ByteBuffer.wrap("OK".getBytes());
while (buffer.hasRemaining()) {
ch.write(buffer);
}
}
};
}
}
/**
* 初始化channel,并且绑定一个eventLoop线程
*/
private void startMainReactor() throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
int index = new Random().nextInt(mainReactorThreads.length);
mainReactorThreads[index].doStart();
SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
serverSocketChannel.bind(new InetSocketAddress(8080));
System.out.println("启动完成,端口8080");
}
public static void main(String[] args) throws Exception {
NioReactor nioReactor = new NioReactor();
// 初始化
nioReactor.initMainReactorGroup();
nioReactor.initSubReactorGroup();
// 启动MainReactor
nioReactor.startMainReactor();
}
}