Reactor模式
Reactor模式详解
Reactor 反应堆设计模式
IO线程模型
1. 传统阻塞式IO
**
传统阻塞式IO的处理流程图如下所示:
对于每一个可能的用户请求,系统都会为其分配一个单独的线程进行处理。对于某一个线程来说,如果前面的请求没有处理结束,那么后续的请求就只能阻塞等待。当并发请求量很大时,系统需要创建大量的线程,这会消耗大量的资源。线程创建后,如果当前线程没有数据暂时可读,那么线程就会一直阻塞在read操作,造成线程资源的浪费。
如果使用Java网络编程实现,代码如下所示:
public class BlockingIO{
public static void main(String[] args) throws Exception{
// 创建线程池
ExecutorService service = Executors.newCachedThreadPool();
// 创建ServerSocket
ServerSocket serverSocket = new ServerSocket(8888);
// 监听等待
while(true){
final Socket socket = serverSocket.accept();
// 如果接收到一个Socket连接
service.execute(()->{
handler(socket);
});
}
}
public static void handler(Socket socket){
try{
byte[] bytes = new byte[1024];
// 获取Socket的输入流
InputStream inputStream = socket.getInputStream();
while(true){
// 尝试从Socket连接中读取数据
int readBytes = inputStream.read(bytes);
if(readBytes != -1){
// 输入客户端传输的信息
System.out.println(new String(bytes, 0, read));
} else{
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
// 关闭Socket连接
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2. Reactor模型
Reactor模型针对传统阻塞I/O服务模型的2个缺点,解决方案如下:
- 基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理
- 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务
根据Reactor的数量和处理资源池线程的数量不同,它有如下的三种实现:
- 单Reactor单线程模型
- 单Reactor多线程模型
- 主从Reactor多线程模型
Reactor模式由Reactor反应线程、Handlers处理器两大部分组成:
- Reactor:它在一个单独的线程中运行,负责监听和分发事件,将IO事件分发给具体对应的Handler进行处理
- Handlers:IO事件具体的处理程序
优点
- 响应快:虽然同一反应器线程本身是同步的,但是不会被单个连接的同步IO阻塞
- 编码简单:极大程度的避免了复杂的多线程同步,避免了多线程的各个进程之间切换的开销
- 可扩展:可方便的通过增加反应器线程的个数来充分的利用CPU资源
缺点**
- 不易调试
- 需要底层操作系统IO多路复用的支持
- 同一个Handler线程中,如果出现一个长时间的数据读写,会影响这个反应器中其他通道的IO处理
2.1 单Reactor单线程模型
单Reactor单线程模型意味着,只有一个Reactor来接收IO请求并实现请求的分发,以及只有一个线程作为Handler来处理IO请求。整体的模型图如下所示:
其中,Reactor通过select来监听客户端发来的请求,并根据不同的请求创建不同的IO事件。然后,将创建好的事件通过dispatch进行分发。如果是连接建立事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理连接建立后的后续业务。如果不是连接建立事件,则将其分发给对应的Handler进行处理。Handler会先读取数据,然后进行相应的逻辑处理,最后将处理后的结果由send通过Socket连接返回给客户端。
这种模型比较简单,不涉及多线程之间的通信和竞争问题。但是它有如下的不足:
- 性能问题:只使用一个线程无法发挥多核的性能优势,Handler在处理某一个请求时,后续的请求都会被阻塞
- 可靠性问题:如果线程意外终止或进入死循环,那么会导致整个系统不可用,无法接受和处理外部消息
它适用于客户端数量有限,而且请求处理迅速的场景。
Reactor模块实现:
public class Reactor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException { //Reactor初始化
selector = Selector.open(); //打开一个Selector
serverSocketChannel = ServerSocketChannel.open(); //建立一个Server端通道
serverSocketChannel.socket().bind(new InetSocketAddress(port)); //绑定服务端口
serverSocketChannel.configureBlocking(false); //selector模式下,所有通道必须是非阻塞的
//Reactor是入口,最初给一个channel注册上去的事件都是accept
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//attach callback object, Acceptor
sk.attach(new Acceptor(serverSocketChannel, selector));
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select(); //就绪事件到达之前,阻塞
Set selected = selector.selectedKeys(); //拿到本次select获取的就绪事件
Iterator it = selected.iterator();
while (it.hasNext()) {
//这里进行任务分发
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment()); //这里很关键,拿到每次selectKey里面附带的处理对象,然后调用其run,这个对象在具体的Handler里会进行创建,初始化的附带对象为Acceptor(看上面构造器)
//调用之前注册的callback对象
if (r != null) {
r.run();
}
}
}
Acceptor模块实现:
public class Acceptor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
this.serverSocketChannel = serverSocketChannel;
this.selector = selector;
}
@Override
public void run() {
SocketChannel socketChannel;
try {
socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
System.out.println(String.format("收到来自 %s 的连接",
socketChannel.getRemoteAddress()));
new Handler(socketChannel, selector); //这里把客户端通道传给Handler,Handler负责接下来的事件处理(除了连接事件以外的事件均可)
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Handler模块实现:
public class Handler implements Runnable {
private final SelectionKey selectionKey;
private final SocketChannel socketChannel;
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);
private final static int READ = 0;
private final static int SEND = 1;
private int status = READ;
Handler(SocketChannel socketChannel, Selector selector) throws IOException {
this.socketChannel = socketChannel; //接收客户端连接
this.socketChannel.configureBlocking(false); //置为非阻塞模式(selector仅允非阻塞模式)
selectionKey = socketChannel.register(selector, 0); //将该客户端注册到selector,得到一个SelectionKey,以后的select到的就绪动作全都是由该对象进行封装
selectionKey.attach(this); //附加处理对象,当前是Handler对象,run是对象处理业务的方法
selectionKey.interestOps(SelectionKey.OP_READ); //走到这里,说明之前Acceptor里的建连已完成,那么接下来就是读取动作,因此这里首先将读事件标记为“感兴趣”事件
selector.wakeup(); //唤起select阻塞
}
@Override
public void run() {
try {
switch (status) {
case READ:
read();
break;
case SEND:
send();
break;
default:
}
} catch (IOException e) { //这里的异常处理是做了汇总,常出的异常就是server端还有未读/写完的客户端消息,客户端就主动断开连接,这种情况下是不会触发返回-1的,这样下面read和write方法里的cancel和close就都无法触发,这样会导致死循环异常(read/write处理失败,事件又未被cancel,因此会不断的被select到,不断的报异常)
System.err.println("read或send时发生异常!异常信息:" + e.getMessage());
selectionKey.cancel();
try {
socketChannel.close();
} catch (IOException e2) {
System.err.println("关闭通道时发生异常!异常信息:" + e2.getMessage());
e2.printStackTrace();
}
}
}
private void read() throws IOException {
if (selectionKey.isValid()) {
readBuffer.clear();
int count = socketChannel.read(readBuffer); //read方法结束,意味着本次"读就绪"变为"读完毕",标记着一次就绪事件的结束
if (count > 0) {
System.out.println(String.format("收到来自 %s 的消息: %s",
socketChannel.getRemoteAddress(),
new String(readBuffer.array())));
status = SEND;
selectionKey.interestOps(SelectionKey.OP_WRITE); //注册写方法
} else {
//读模式下拿到的值是-1,说明客户端已经断开连接,那么将对应的selectKey从selector里清除,否则下次还会select到,因为断开连接意味着读就绪不会变成读完毕,也不cancel,下次select会不停收到该事件
//所以在这种场景下,(服务器程序)你需要关闭socketChannel并且取消key,最好是退出当前函数。注意,这个时候服务端要是继续使用该socketChannel进行读操作的话,就会抛出“远程主机强迫关闭一个现有的连接”的IO异常。
selectionKey.cancel();
socketChannel.close();
System.out.println("read时-------连接关闭");
}
}
}
void send() throws IOException {
if (selectionKey.isValid()) {
sendBuffer.clear();
sendBuffer.put(String.format("我收到来自%s的信息辣:%s, 200ok;",
socketChannel.getRemoteAddress(),
new String(readBuffer.array())).getBytes());
sendBuffer.flip();
int count = socketChannel.write(sendBuffer); //write方法结束,意味着本次写就绪变为写完毕,标记着一次事件的结束
if (count < 0) {
//同上,write场景下,取到-1,也意味着客户端断开连接
selectionKey.cancel();
socketChannel.close();
System.out.println("send时-------连接关闭");
}
//没断开连接,则再次切换到读
status = READ;
selectionKey.interestOps(SelectionKey.OP_READ);
}
}
}
启动服务器:
new Thread(new Reactor(2333)).start();
2.2 单Reactor多线程模型
相比较于前面的单线程模型,这里不同之处在于:Handler只负责相应事件,不做具体的业务处理。Handler通过read读取完数据后,将其分发给后面的worker线程池中的某个线程进行真正的处理。worker线程池会分配独立线程完整业务逻辑,并将结果返回给Handler。Handler收到响应后,通过send将其返回给客户端。
模型如下所示:
它可以充分的利用CPU多核的优势,但由于多线程造成的数据共享和并发访问,Reactor需要处理所有的事件监听和响应。如果只是在单线程下运行,那么Reactor会成为高并发场景下的性能瓶颈。
2.3 主从Reactor多线程模型
Reactor主线程MainReactor对象只负责通过select对连接请求的监听,将其交给Acceptor进行处理。当 Acceptor 处理连接事件后,MainReactor 通过accept获取新的连接,并将连接注册到SubReactor。subReactor将连接加入到连接队列中进行监听,并创建Handler进行事件处理。这里事件真正的处理者仍然是worker线程池中的线程,worker线程处理结束后,将结果响应给Handler,Handler通过send将结果返回给客户端。
模型如下所示:
这里Reactor采用的主从结构交互简单、职责明确。mainReactor只负责连接建立事件,subReactor负责后续业务的处理。但是模型的复杂度较高,编程困难。