Reactor模式
Reactor模式详解
Reactor 反应堆设计模式


IO线程模型

**

1. 传统阻塞式IO

**
传统阻塞式IO的处理流程图如下所示:
image.png

对于每一个可能的用户请求,系统都会为其分配一个单独的线程进行处理。对于某一个线程来说,如果前面的请求没有处理结束,那么后续的请求就只能阻塞等待。当并发请求量很大时,系统需要创建大量的线程,这会消耗大量的资源。线程创建后,如果当前线程没有数据暂时可读,那么线程就会一直阻塞在read操作,造成线程资源的浪费。

如果使用Java网络编程实现,代码如下所示:

  1. public class BlockingIO{
  2. public static void main(String[] args) throws Exception{
  3. // 创建线程池
  4. ExecutorService service = Executors.newCachedThreadPool();
  5. // 创建ServerSocket
  6. ServerSocket serverSocket = new ServerSocket(8888);
  7. // 监听等待
  8. while(true){
  9. final Socket socket = serverSocket.accept();
  10. // 如果接收到一个Socket连接
  11. service.execute(()->{
  12. handler(socket);
  13. });
  14. }
  15. }
  16. public static void handler(Socket socket){
  17. try{
  18. byte[] bytes = new byte[1024];
  19. // 获取Socket的输入流
  20. InputStream inputStream = socket.getInputStream();
  21. while(true){
  22. // 尝试从Socket连接中读取数据
  23. int readBytes = inputStream.read(bytes);
  24. if(readBytes != -1){
  25. // 输入客户端传输的信息
  26. System.out.println(new String(bytes, 0, read));
  27. } else{
  28. break;
  29. }
  30. }
  31. } catch (IOException e) {
  32. e.printStackTrace();
  33. } finally {
  34. try {
  35. // 关闭Socket连接
  36. socket.close();
  37. } catch (IOException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. }

2. Reactor模型

Reactor模型针对传统阻塞I/O服务模型的2个缺点,解决方案如下:

  • 基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理
  • 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务

根据Reactor的数量和处理资源池线程的数量不同,它有如下的三种实现:

  • 单Reactor单线程模型
  • 单Reactor多线程模型
  • 主从Reactor多线程模型

Reactor模式由Reactor反应线程、Handlers处理器两大部分组成:

  • Reactor:它在一个单独的线程中运行,负责监听和分发事件,将IO事件分发给具体对应的Handler进行处理
  • Handlers:IO事件具体的处理程序

优点

  • 响应快:虽然同一反应器线程本身是同步的,但是不会被单个连接的同步IO阻塞
  • 编码简单:极大程度的避免了复杂的多线程同步,避免了多线程的各个进程之间切换的开销
  • 可扩展:可方便的通过增加反应器线程的个数来充分的利用CPU资源


缺点**

  • 不易调试
  • 需要底层操作系统IO多路复用的支持
  • 同一个Handler线程中,如果出现一个长时间的数据读写,会影响这个反应器中其他通道的IO处理

2.1 单Reactor单线程模型

单Reactor单线程模型意味着,只有一个Reactor来接收IO请求并实现请求的分发,以及只有一个线程作为Handler来处理IO请求。整体的模型图如下所示:
image.png

其中,Reactor通过select来监听客户端发来的请求,并根据不同的请求创建不同的IO事件。然后,将创建好的事件通过dispatch进行分发。如果是连接建立事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理连接建立后的后续业务。如果不是连接建立事件,则将其分发给对应的Handler进行处理。Handler会先读取数据,然后进行相应的逻辑处理,最后将处理后的结果由send通过Socket连接返回给客户端。

这种模型比较简单,不涉及多线程之间的通信和竞争问题。但是它有如下的不足:

  • 性能问题:只使用一个线程无法发挥多核的性能优势,Handler在处理某一个请求时,后续的请求都会被阻塞
  • 可靠性问题:如果线程意外终止或进入死循环,那么会导致整个系统不可用,无法接受和处理外部消息

它适用于客户端数量有限,而且请求处理迅速的场景。

Reactor模块实现:

  1. public class Reactor implements Runnable {
  2. private final Selector selector;
  3. private final ServerSocketChannel serverSocketChannel;
  4. public Reactor(int port) throws IOException { //Reactor初始化
  5. selector = Selector.open(); //打开一个Selector
  6. serverSocketChannel = ServerSocketChannel.open(); //建立一个Server端通道
  7. serverSocketChannel.socket().bind(new InetSocketAddress(port)); //绑定服务端口
  8. serverSocketChannel.configureBlocking(false); //selector模式下,所有通道必须是非阻塞的
  9. //Reactor是入口,最初给一个channel注册上去的事件都是accept
  10. SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  11. //attach callback object, Acceptor
  12. sk.attach(new Acceptor(serverSocketChannel, selector));
  13. }
  14. @Override
  15. public void run() {
  16. try {
  17. while (!Thread.interrupted()) {
  18. selector.select(); //就绪事件到达之前,阻塞
  19. Set selected = selector.selectedKeys(); //拿到本次select获取的就绪事件
  20. Iterator it = selected.iterator();
  21. while (it.hasNext()) {
  22. //这里进行任务分发
  23. dispatch((SelectionKey) (it.next()));
  24. }
  25. selected.clear();
  26. }
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. void dispatch(SelectionKey k) {
  32. Runnable r = (Runnable) (k.attachment()); //这里很关键,拿到每次selectKey里面附带的处理对象,然后调用其run,这个对象在具体的Handler里会进行创建,初始化的附带对象为Acceptor(看上面构造器)
  33. //调用之前注册的callback对象
  34. if (r != null) {
  35. r.run();
  36. }
  37. }
  38. }

Acceptor模块实现:

  1. public class Acceptor implements Runnable {
  2. private final Selector selector;
  3. private final ServerSocketChannel serverSocketChannel;
  4. Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
  5. this.serverSocketChannel = serverSocketChannel;
  6. this.selector = selector;
  7. }
  8. @Override
  9. public void run() {
  10. SocketChannel socketChannel;
  11. try {
  12. socketChannel = serverSocketChannel.accept();
  13. if (socketChannel != null) {
  14. System.out.println(String.format("收到来自 %s 的连接",
  15. socketChannel.getRemoteAddress()));
  16. new Handler(socketChannel, selector); //这里把客户端通道传给Handler,Handler负责接下来的事件处理(除了连接事件以外的事件均可)
  17. }
  18. } catch (IOException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }

Handler模块实现:

  1. public class Handler implements Runnable {
  2. private final SelectionKey selectionKey;
  3. private final SocketChannel socketChannel;
  4. private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  5. private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);
  6. private final static int READ = 0;
  7. private final static int SEND = 1;
  8. private int status = READ;
  9. Handler(SocketChannel socketChannel, Selector selector) throws IOException {
  10. this.socketChannel = socketChannel; //接收客户端连接
  11. this.socketChannel.configureBlocking(false); //置为非阻塞模式(selector仅允非阻塞模式)
  12. selectionKey = socketChannel.register(selector, 0); //将该客户端注册到selector,得到一个SelectionKey,以后的select到的就绪动作全都是由该对象进行封装
  13. selectionKey.attach(this); //附加处理对象,当前是Handler对象,run是对象处理业务的方法
  14. selectionKey.interestOps(SelectionKey.OP_READ); //走到这里,说明之前Acceptor里的建连已完成,那么接下来就是读取动作,因此这里首先将读事件标记为“感兴趣”事件
  15. selector.wakeup(); //唤起select阻塞
  16. }
  17. @Override
  18. public void run() {
  19. try {
  20. switch (status) {
  21. case READ:
  22. read();
  23. break;
  24. case SEND:
  25. send();
  26. break;
  27. default:
  28. }
  29. } catch (IOException e) { //这里的异常处理是做了汇总,常出的异常就是server端还有未读/写完的客户端消息,客户端就主动断开连接,这种情况下是不会触发返回-1的,这样下面read和write方法里的cancel和close就都无法触发,这样会导致死循环异常(read/write处理失败,事件又未被cancel,因此会不断的被select到,不断的报异常)
  30. System.err.println("read或send时发生异常!异常信息:" + e.getMessage());
  31. selectionKey.cancel();
  32. try {
  33. socketChannel.close();
  34. } catch (IOException e2) {
  35. System.err.println("关闭通道时发生异常!异常信息:" + e2.getMessage());
  36. e2.printStackTrace();
  37. }
  38. }
  39. }
  40. private void read() throws IOException {
  41. if (selectionKey.isValid()) {
  42. readBuffer.clear();
  43. int count = socketChannel.read(readBuffer); //read方法结束,意味着本次"读就绪"变为"读完毕",标记着一次就绪事件的结束
  44. if (count > 0) {
  45. System.out.println(String.format("收到来自 %s 的消息: %s",
  46. socketChannel.getRemoteAddress(),
  47. new String(readBuffer.array())));
  48. status = SEND;
  49. selectionKey.interestOps(SelectionKey.OP_WRITE); //注册写方法
  50. } else {
  51. //读模式下拿到的值是-1,说明客户端已经断开连接,那么将对应的selectKey从selector里清除,否则下次还会select到,因为断开连接意味着读就绪不会变成读完毕,也不cancel,下次select会不停收到该事件
  52. //所以在这种场景下,(服务器程序)你需要关闭socketChannel并且取消key,最好是退出当前函数。注意,这个时候服务端要是继续使用该socketChannel进行读操作的话,就会抛出“远程主机强迫关闭一个现有的连接”的IO异常。
  53. selectionKey.cancel();
  54. socketChannel.close();
  55. System.out.println("read时-------连接关闭");
  56. }
  57. }
  58. }
  59. void send() throws IOException {
  60. if (selectionKey.isValid()) {
  61. sendBuffer.clear();
  62. sendBuffer.put(String.format("我收到来自%s的信息辣:%s, 200ok;",
  63. socketChannel.getRemoteAddress(),
  64. new String(readBuffer.array())).getBytes());
  65. sendBuffer.flip();
  66. int count = socketChannel.write(sendBuffer); //write方法结束,意味着本次写就绪变为写完毕,标记着一次事件的结束
  67. if (count < 0) {
  68. //同上,write场景下,取到-1,也意味着客户端断开连接
  69. selectionKey.cancel();
  70. socketChannel.close();
  71. System.out.println("send时-------连接关闭");
  72. }
  73. //没断开连接,则再次切换到读
  74. status = READ;
  75. selectionKey.interestOps(SelectionKey.OP_READ);
  76. }
  77. }
  78. }

启动服务器:

  1. new Thread(new Reactor(2333)).start();

2.2 单Reactor多线程模型

相比较于前面的单线程模型,这里不同之处在于:Handler只负责相应事件,不做具体的业务处理。Handler通过read读取完数据后,将其分发给后面的worker线程池中的某个线程进行真正的处理。worker线程池会分配独立线程完整业务逻辑,并将结果返回给Handler。Handler收到响应后,通过send将其返回给客户端。

模型如下所示:
image.png

它可以充分的利用CPU多核的优势,但由于多线程造成的数据共享和并发访问,Reactor需要处理所有的事件监听和响应。如果只是在单线程下运行,那么Reactor会成为高并发场景下的性能瓶颈。

2.3 主从Reactor多线程模型

Reactor主线程MainReactor对象只负责通过select对连接请求的监听,将其交给Acceptor进行处理。当 Acceptor 处理连接事件后,MainReactor 通过accept获取新的连接,并将连接注册到SubReactor。subReactor将连接加入到连接队列中进行监听,并创建Handler进行事件处理。这里事件真正的处理者仍然是worker线程池中的线程,worker线程处理结束后,将结果响应给Handler,Handler通过send将结果返回给客户端。

模型如下所示:
image.png

这里Reactor采用的主从结构交互简单、职责明确。mainReactor只负责连接建立事件,subReactor负责后续业务的处理。但是模型的复杂度较高,编程困难。