4.1 Reactor模式的重要性

从学习的角度来说,Reactor模式相当于高性能、高并发的一项非常重要的基础知识,只有掌握了它,才能真正理解和掌握Nginx、Redis、Netty等这些大名鼎鼎的中间件技术。

4.1.1 为什么首先学习Reactor模式

Netty本身很抽象,大量应用了设计模式。所以,学习像Netty这样的“精品中的精品”框架也是需要先从设计模式入手的,而Netty的整体架构是基于Reactor模式的。所以,学习和掌握Reactor模式,对于开始学习高并发通信(包括Netty框架)的人来说,一定是磨刀不误砍柴工。

4.1.2 Reactor模式简介

Reactor模式由Reactor线程、Handlers处理器两大角色组成,两大角色的职责分别如下:
(1)Reactor线程的职责:负责响应IO事件,并且分发到Handlers处理器。
(2)Handlers处理器的职责:非阻塞的执行业务处理逻辑。

4.1.3 多线程OIO的致命缺陷

在Java的OIO编程中,原始的网络服务器程序一般使用一个while循环不断地监听端口是否有新的连接。如果有,就调用一个处理函数来完成传输处理。

  1. while(true){
  2. socket = accept(); //阻塞,接收连接
  3. handle(socket) ; //读取数据、业务处理、写入结果
  4. }

如果前一个网络连接的handle(socket)没有处理完,那么后面的新连接无法被服务端接收,于是后面的请求就会被阻塞,导致服务器的吞吐量太低。这对于服务器来说是一个严重的问题。
为了解决这个严重的连接阻塞问题,出现了一个极为经典的模式:Connection Per Thread(一个线程处理一个连接)模式。

  1. public class ConnectionPerThread implements Runnable{
  2. @Override
  3. public void run() {
  4. try {
  5. // 服务器监听socket
  6. ServerSocket serverSocket = new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
  7. while (!Thread.interrupted()){
  8. Socket socket = serverSocket.accept();
  9. // 接收到一个连接后,为socket连接,新建一个专属的处理器对象
  10. Handler handler = new Handler(socket);
  11. new Thread(handler).start();
  12. }
  13. } catch (IOException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. static class Handler implements Runnable{
  18. final Socket socket;
  19. Handler (Socket s){
  20. socket = s;
  21. }
  22. @Override
  23. public void run() {
  24. while (true){
  25. try {
  26. byte[] input = new byte[1024];
  27. socket.getInputStream().read(input);
  28. byte[] output = null;
  29. socket.getOutputStream().write(output);
  30. } catch (IOException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }
  35. }
  36. }

在系统中,线程是比较昂贵的系统资源。如果线程的数量太多,系统将无法承受。而且,线程的反复创建、销毁、切换也需要代价。因此,在高并发的应用场景下,多线程OIO的缺陷是致命的
用Reactor模式对线程的数量进行控制,做到一个线程处理大量的连接。

4.2 单线程Reactor模式

总体来说,Reactor模式有点类似事件驱动模式。在事件驱动模式中,当有事件触发时,事件源会将事件分发到Handler(处理器),由Handler负责事件处理。Reactor模式中的反应器角色类似于事件驱动模式中的事件分发器(Dispatcher)角色
(1)Reactor:负责查询IO事件,当检测到一个IO事件时将其发送给相应的Handler处理器去处理。这里的IO事件就是NIO中选择器查询出来的通道IO事件。
(2)Handler:与IO事件(或者选择键)绑定,负责IO事件的处理,完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写到通道等。

4.2.1 什么是单线程Reactor

简单地说,Reactor和Handlers处于一个线程中执行。
image.png
SelectionKey(选择键)的几个重要的成员方法:
(1)void attach(Object o):将对象附加到选择键。
此方法可以将任何Java POJO对象作为附件添加到SelectionKey实例。此方法非常重要,因为在单线程版本的Reactor模式实现中可以将Handler实例作为附件添加到SelectionKey实例。
(2)Object attachment():从选择键获取附加对象。
此方法与attach(Object o)是配套使用的,其作用是取出之前通过attach(Object o)方法添加到SelectionKey实例的附加对象。这个方法同样非常重要,当IO事件发生时,选择键将被select方法查询出来,可以直接将选择键的附件对象取出。

4.2.2 单线程Reactor的参考代码

  1. package com.crazymakercircle.ReactorModel;
  2. //省略import
  3. //单线程Reactor
  4. class EchoServerReactor implements Runnable {
  5. Selector selector;
  6. ServerSocketChannel serverSocket;
  7. //构造函数
  8. EchoServerReactor() throws IOException {
  9. //省略:打开选择器、serverSocket连接监听通道
  10. //注册serverSocket的accept新连接接收事件
  11. SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
  12. //将新连接处理器作为附件,绑定到sk选择键
  13. sk.attach(new AcceptorHandler());
  14. }
  15. public void run() {
  16. //选择器轮询
  17. try {
  18. while (!Thread.interrupted()) {
  19. selector.select();
  20. Set selected = selector.selectedKeys();
  21. Iterator it = selected.iterator();
  22. while (it.hasNext()) {
  23. //反应器负责dispatch收到的事件
  24. SelectionKey sk=it.next();
  25. dispatch(sk);
  26. }
  27. selected.clear();
  28. }
  29. } catch (IOException ex) { ex.printStackTrace(); }
  30. }
  31. //反应器的分发事件
  32. void dispatch(SelectionKey k) {
  33. Runnable handler = (Runnable) (k.attachment());
  34. //调用之前绑定到选择键的handler对象
  35. if (handler != null) {
  36. handler.run();
  37. }
  38. }
  39. //处理器:处理新连接
  40. class AcceptorHandler implements Runnable {
  41. public void run() {
  42. //接受新连接
  43. SocketChannel channel = serverSocket.accept();
  44. //需要为新连接创建一个输入输出的handler
  45. if (channel != null)
  46. new EchoHandler(selector, channel);
  47. }
  48. }
  49. //…
  50. }

在注册serverSocket服务监听连接的接受事件之后,创建一个AcceptorHandler新连接处理器的实例作为附件,被附加(attach)到SelectionKey中。

  1. //为serverSocket注册新连接接受(accept)事件
  2. SelectionKeysk =serverSocket.register(selector, SelectionKey.OP_ACCEPT);
  3. //将新连接处理器作为附件,绑定到sk选择键
  4. sk.attach(new AcceptorHandler());
  1. 当新连接事件发生后,取出之前附加到SelectionKey中的Handler业务处理器进行socket的各种IO处理。
  1. void dispatch(SelectionKey k) {
  2. Runnable r = (Runnable) (k.attachment());
  3. //调用之前绑定到选择键的处理器对象
  4. if (r != null) {
  5. r.run();
  6. }
  7. }

处理器AcceptorHandler的两大职责是完成新连接的接收工作、为新连接创建一个负责数据传输的Handler(称之为IOHandler)。

  1. //新连接处理器
  2. class AcceptorHandler implements Runnable {
  3. public void run() {
  4. //接受新连接
  5. SocketChannel channel = serverSocket.accept();
  6. //需要为新连接创建一个输入输出的Handler
  7. if (channel != null) new EchoHandler(selector, channel);
  8. }
  9. }
  1. 顾名思义,IOHandler就是负责socket连接的数据输入、业务处理、结果输出。
  1. package com.crazymakercircle.ReactorModel;
  2. //负责数据传输的Handler
  3. class IOHandler implements Runnable {
  4. final SocketChannel channel;
  5. final SelectionKey sk;
  6. IOHandler (Selector selector, SocketChannel c) {
  7. channel = c;
  8. c.configureBlocking(false);
  9. //与之前的注册方式不同,先仅仅取得选择键,之后再单独设置感兴趣的IO事件
  10. sk = channel.register(selector, 0); //仅仅取得选择键
  11. //将Handler处理器作为选择键的附件
  12. sk.attach(this);
  13. //注册读写就绪事件
  14. sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
  15. }
  16. public void run() {
  17. //…处理输入和输出
  18. }
  19. }

(1)将新的SocketChannel传输通道注册到Reactor类的同一个选择器中。这样保证了Reactor在查询IO事件时能查询到Handler注册到选择器的IO事件(数据传输事件)。
(2)Channel传输通道注册完成后,将IOHandler实例自身作为附件附加到选择键中。这样,在Reactor类分发事件(选择键)时,能执行到IOHandler的run()方法,完成数据传输处理。

4.2.3 单线程Reactor模式的EchoServer的实战案例

EchoServer的功能很简单:读取客户端的输入并回显到客户端,所以也叫回显服务器。基于Reactor模式来实现,设计三个重要的类:
(1)设计一个反应器类:EchoServerReactor类。
(2)设计两个处理器类:AcceptorHandler新连接处理器、EchoHandler回显处理器。

  1. public class EchoServerReactor implements Runnable{
  2. Selector selector;
  3. ServerSocketChannel serverSocketChannel;
  4. public EchoServerReactor() throws IOException {
  5. selector = Selector.open();
  6. serverSocketChannel = ServerSocketChannel.open();
  7. serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 18899));
  8. serverSocketChannel.configureBlocking(false);
  9. Logger.info("服务端已经开始监听:" + " 127.0.0.1 " + "18899");
  10. SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  11. // 将新连接处理器作为附件,绑定到sk选择器
  12. sk.attach(new AcceptorHandler());
  13. }
  14. @Override
  15. public void run() {
  16. // 选择器轮询
  17. try {
  18. while (!Thread.interrupted()){
  19. selector.select();
  20. Set<SelectionKey> selected = selector.selectedKeys();
  21. Iterator<SelectionKey> it = selected.iterator();
  22. while (it.hasNext()){
  23. // 反应器负责dispatch收到的事件
  24. SelectionKey sk = it.next();
  25. dispatch(sk);
  26. }
  27. selected.clear();
  28. }
  29. } catch (IOException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. void dispatch(SelectionKey key){
  34. Runnable handler = (Runnable) key.attachment();
  35. if (handler != null){
  36. handler.run();
  37. }
  38. }
  39. // 处理器,处理新连接
  40. class AcceptorHandler implements Runnable{
  41. @Override
  42. public void run() {
  43. try {
  44. // 接受新连接
  45. SocketChannel socketChannel = serverSocketChannel.accept();
  46. Logger.info("接收到一个连接");
  47. // 需要为新连接创建一个输入输出的handler
  48. if (socketChannel != null){
  49. new EchoHandler(selector, socketChannel);
  50. }
  51. } catch (IOException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. }
  56. public static void main(String[] args) throws IOException {
  57. new Thread(new EchoServerReactor()).start();
  58. }
  59. }

第二个处理器为EchoHandler回显处理器,也是一个传输处理器,主要是完成客户端的内容读取和回显

  1. public class EchoHandler implements Runnable{
  2. final SocketChannel socketChannel;
  3. final SelectionKey sk;
  4. final ByteBuffer buffer = ByteBuffer.allocate(1024);
  5. static final int RECIVING = 0, SENDING = 1;
  6. int state = RECIVING;
  7. public EchoHandler(Selector selector, SocketChannel c) throws IOException {
  8. socketChannel = c;
  9. c.configureBlocking(false);
  10. // 与之前的注册方式不同,先仅仅取得选择键,之后再单独设置感兴趣的IO事件
  11. sk = socketChannel.register(selector, 0);
  12. // 将handler处理器作为选择键的附件
  13. sk.attach(this);
  14. // 注册读写就绪事件
  15. sk.interestOps(SelectionKey.OP_READ);
  16. }
  17. @Override
  18. public void run() {
  19. try {
  20. if (state == SENDING){
  21. //写入通道
  22. socketChannel.write(buffer);
  23. // 写完后,准备开始从通道读
  24. buffer.clear();
  25. // 写完后,注册read就绪事件
  26. sk.interestOps(SelectionKey.OP_READ);
  27. // 写完后,进入接受状态
  28. state = RECIVING;
  29. }else if (state == RECIVING){
  30. //从通道读
  31. int length = 0;
  32. while ((length = socketChannel.read(buffer)) > 0){
  33. Logger.info(new String(buffer.array(), 0, length));
  34. }
  35. //读完后,准备开始写入通道,byteBuffer切换成读模式
  36. buffer.flip();
  37. //读完后,注册write就绪事件
  38. sk.interestOps(SelectionKey.OP_WRITE);
  39. //读完后,进入发送的状态
  40. state = SENDING;
  41. }
  42. } catch (IOException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. }

4.2.4 单线程Reactor模式的缺点

Reactor和Handler都在同一条线程中执行。这样,带来了一个问题:当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。在这种场景下,被阻塞的Handler不仅仅负责输入和输出处理的传输处理器,还包括负责新连接监听的AcceptorHandler处理器,可能导致服务器无响应。

4.3 多线程Reactor模式

4.3.1 多线程版本的Reactor模式演进

(1)升级Handler。既要使用多线程,又要尽可能高效率,则可以考虑使用线程池。
(2)升级Reactor。可以考虑引入多个Selector(选择器),提升选择大量通道的能力。
总体来说,多线程版本的Reactor模式大致如下:
(1)将负责数据传输处理的IOHandler处理器的执行放入独立的线程池中。这样,业务处理线程与负责新连接监听的反应器线程就能相互隔离,避免服务器的连接监听受到阻塞。
(2)如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,并且为每一个SubReactor引入一个线程,一个线程负责一个选择器的事件轮询。这样充分释放了系统资源的能力,也大大提升了反应器管理大量连接或者监听大量传输通道的能力。

4.3.2 多线程版本Reactor的实战案例

(1)引入多个选择器。
(2)设计一个新的子反应器(SubReactor)类,子反应器负责查询一个选择器。
(3)开启多个处理线程,一个处理线程负责执行一个子反应器。
image.png

  1. package com.crazymakercircle.ReactorModel;
  2. //…
  3. //多线程版本反应器
  4. class MultiThreadEchoServerReactor {
  5. ServerSocketChannel serverSocket;
  6. AtomicInteger next = new AtomicInteger(0);
  7. //选择器集合,引入多个选择器
  8. Selector[] selectors = new Selector[2];
  9. //引入多个子反应器
  10. SubReactor[] subReactors = null;
  11. MultiThreadEchoServerReactor() throws IOException {
  12. //初始化多个选择器
  13. selectors[0] = Selector.open(); //用于监听新连接事件
  14. selectors[1] = Selector.open(); //用于监听传输事件
  15. serverSocket = ServerSocketChannel.open();
  16. InetSocketAddress address = new InetSocketAddress("127.0.0.1",18899);
  17. serverSocket.socket().bind(address);
  18. //非阻塞
  19. serverSocket.configureBlocking(false);
  20. //第一个选择器,负责监控新连接事件
  21. SelectionKey sk = serverSocket.register(selectors[0],OP_ACCEPT);
  22. //绑定Handler:新连接监控Handler绑定到SelectionKey(选择键)
  23. sk.attach(new AcceptorHandler());
  24. //第一个子反应器,负责第一个选择器的新连接事件分发(而不处理)
  25. SubReactor subReactor1 = new SubReactor(selectors[0]);
  26. //第二个子反应器,负责第二个选择器的传输事件的分发(而不处理)
  27. SubReactor subReactor2 = new SubReactor(selectors[1]);
  28. subReactors = new SubReactor[]{subReactor1, subReactor2};
  29. }
  30. private void startService() {
  31. //一个子反应器对应一个线程
  32. new Thread(subReactors[0]).start();
  33. new Thread(subReactors[1]).start();
  34. }
  35. //子反应器,负责事件分发,但是不负责事件处理
  36. class SubReactor implements Runnable {
  37. //每个线程负责一个选择器的查询和选择
  38. final Selector selector;
  39. public SubReactor(Selector selector) {
  40. this.selector = selector;
  41. }
  42. public void run() {
  43. try {
  44. while (!Thread.interrupted()) {
  45. selector.select();
  46. Set<SelectionKey>keySet = selector.selectedKeys();
  47. Iterator<SelectionKey> it = keySet.iterator();
  48. while (it.hasNext()) {
  49. //dispatch所查询的事件
  50. SelectionKeysk = it.next();
  51. dispatch(sk);
  52. }
  53. keySet.clear();
  54. }
  55. } catch (IOException ex) {
  56. ex.printStackTrace();
  57. }
  58. }
  59. void dispatch(SelectionKeysk) {
  60. Runnable handler = (Runnable) sk.attachment();
  61. //获取之前attach绑定到选择键的handler处理器对象,执行事件处理
  62. if (handler != null) {
  63. handler.run();
  64. }
  65. }
  66. }
  67. //Handler:新连接处理器
  68. class AcceptorHandler implements Runnable {
  69. public void run() {
  70. try {
  71. SocketChannel channel = serverSocket.accept();
  72. //创建传输处理器,并且将传输通道注册到选择器2
  73. if (channel != null)
  74. new MultiThreadEchoHandler(selectors[1], channel);
  75. } catch (IOException e) {
  76. e.printStackTrace();
  77. }
  78. }
  79. }
  80. public static void main(String[] args) throws IOException {
  81. MultiThreadEchoServerReactor server = new MultiThreadEchoServerReactor();
  82. server.startService();
  83. }
  84. }

4.3.3 多线程版本Handler的实战案例

新的回显处理器为MultiThreadEchoHandler,主要的升级是引入了一个线程池(ThreadPool),使得数据传输和业务处理的代码执行在独立的线程池中,彻底地做到IO处理以及业务处理线程和反应器IO事件轮询线程的完全隔离。

  1. class MultiThreadEchoHandler implements Runnable
  2. {
  3. final SocketChannel channel;
  4. final SelectionKey sk;
  5. final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  6. static final int RECIEVING = 0, SENDING = 1;
  7. int state = RECIEVING;
  8. //引入线程池
  9. static ExecutorService pool = Executors.newFixedThreadPool(4);
  10. //构造器
  11. MultiThreadEchoHandler(Selector selector, SocketChannel c) {
  12. channel = c;
  13. c.configureBlocking(false);
  14. //先取得选择键,再设置感兴趣的IO事件
  15. sk = channel.register(selector, 0);
  16. //将本Handler作为sk选择键的附件,方便事件dispatch
  17. sk.attach(this);
  18. //向sk选择键设置Read就绪事件
  19. sk.interestOps(SelectionKey.OP_READ);
  20. selector.wakeup();
  21. }
  22. //此run方法在IO事件轮询线程中被调用
  23. public void run()
  24. {
  25. //提交数据传输任务到线程池
  26. //使得IO处理不在IO事件轮询线程中执行,而是在独立的线程池中执行
  27. pool.submit(()->asyncRun());
  28. }
  29. //数据传输与业务处理任务,不在IO事件轮询线程中执行,而是在独立的线程池中执行
  30. public synchronized void asyncRun()
  31. {
  32. try
  33. {
  34. if (state == SENDING)
  35. {
  36. //写入通道
  37. channel.write(byteBuffer);
  38. //byteBuffer切换成写模式,写完后准备开始从通道读
  39. byteBuffer.clear();
  40. //写完后,注册read就绪事件
  41. sk.interestOps(SelectionKey.OP_READ);
  42. //进入接收的状态
  43. state = RECIEVING;
  44. } else if (state == RECIEVING)
  45. {
  46. //从通道读
  47. int length = 0;
  48. while ((length = channel.read(byteBuffer)) > 0)
  49. {
  50. Logger.info(new String(byteBuffer.array(), 0, length));
  51. }
  52. //读完后,翻转byteBuffer的读写模式
  53. byteBuffer.flip();
  54. //注册write就绪事件
  55. sk.interestOps(SelectionKey.OP_WRITE);
  56. //进入发送的状态
  57. state = SENDING;
  58. }
  59. //处理结束了,这里不能关闭select key,需要重复使用
  60. //sk.cancel();
  61. } catch (IOException ex)
  62. {
  63. ex.printStackTrace();
  64. }
  65. }
  66. }

4.4 Reactor模式的优缺点

(1)Reactor模式和生产者消费者模式对比
二者的相似之处:在一定程度上,Reactor模式有点类似生产者消费者模式。在生产者消费者模式中,一个或多个生产者将事件加入一个队列中,一个或多个消费者主动从这个队列中拉取(Pull)事件来处理。
二者的不同之处:Reactor模式是基于查询的,没有专门的队列去缓冲存储IO事件,查询到IO事件之后,反应器会根据不同IO选择键(事件)将其分发给对应的Handler来处理。
(2)Reactor模式和观察者模式对比
二者的相似之处:在Reactor模式中,当查询到IO事件后,服务处理程序使用单路/多路分发(Dispatch)策略,同步分发这些IO事件。观察者模式(Observer Pattern)也被称作发布/订阅模式,它定义了一种依赖关系,让多个观察者同时监听某一个主题(Topic)。这个主题对象在状态发生变化时会通知所有观察者,它们能够执行相应的处理。
二者的不同之处:在Reactor模式中,Handler实例和IO事件(选择键)的订阅关系基本上是一个事件绑定到一个Handler,每一个IO事件(选择键)被查询后,反应器会将事件分发给所绑定的Handler,也就是一个事件只能被一个Handler处理;在观察者模式中,同一时刻、同一主题可以被订阅过的多个观察者处理。

作为高性能的IO模式,Reactor模式的优点如下:

  1. 响应快,虽然同一反应器线程本身是同步的,但是不会被单个连接的IO操作所阻塞。
  2. 编程相对简单,最大限度避免了复杂的多线程同步,也避免了多线程各个进程之间切换的开销。
  3. 可扩展,可以方便地通过增加反应器线程的个数来充分利用CPU资源。

Reactor模式的缺点如下:

  1. Reactor模式增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
  2. Reactor模式依赖于操作系统底层的IO多路复用系统调用的支持,如Linux中的epoll系统调用。如果操作系统的底层不支持IO多路复用,Reactor模式不会那么高效。
  3. 在同一个Handler业务线程中,如果出现一个长时间的数据读写,就会影响这个反应器中其他通道的IO处理。