在早期的 Java 语言中,我们使用最多的线程处理的主要方式无非是按需创建和启动新的 Thread 来执行并发的任务单元(即为每个任务单独创建一条线程处理),这种方式在高并发的场景下工作效率不高且资源浪费严重。随后引入了线程池技术,通过缓存和重用Thread 极大地提高了性能。

虽然池化和重用线程相对于简单地为每个任务都创建和销毁线程是一种进步,但是它并不能消除由上下文切换所带来的开销,其随着线程数量的增加很快变得明显。Netty 基于主从 Reactor 的线程模型,可以将每一条线程的性能都压榨到了极致。

传统 I/O 线程模型

如果要让服务器服务多个客户端,那么最直接的⽅式就是为每⼀条连接创建线程。处理完业务逻辑后,随着连接关闭后线程也同样要销毁了。实例代码如下:

  1. public static void main(String[] args) throws IOException {
  2. ServerSocket serverSocket = new ServerSocket(9000);
  3. while (true) {
  4. System.out.println("等待连接。。");
  5. //阻塞方法
  6. Socket clientSocket = serverSocket.accept();
  7. System.out.println("有客户端连接了。。");
  8. //单线程模型,只有一条线程处理,如果没处理完会阻塞不能accept下一个客户端
  9. handler(clientSocket);
  10. }
  11. }
  12. private static void handler(Socket clientSocket) throws IOException {
  13. byte[] bytes = new byte[1024];
  14. System.out.println("准备read。。");
  15. //接收客户端的数据,阻塞方法,没有数据可读时就阻塞
  16. int read = clientSocket.getInputStream().read(bytes);
  17. System.out.println("read完毕。。");
  18. if (read != -1) {
  19. System.out.println("接收到客户端的数据:" + new String(bytes, 0, read));
  20. }
  21. clientSocket.getOutputStream().write("HelloClient".getBytes());
  22. clientSocket.getOutputStream().flush();
  23. }

但是这样不停地创建和销毁线程,不仅会带来性能开销,也会造成浪费资源,而且如果有成千上万条连接请求,对应每个连接创建一条线程也不太现实。
微信截图_20210722093156.png

改造一:加入线程池

在此基础上,我们可以采取资源复用的方式,不再为每个连接都创建一个线程,而是创建一个线程池,使线程资源可以重复利用。实例代码如下:

  1. public static void main(String[] args) throws IOException {
  2. ServerSocket serverSocket = new ServerSocket(9000);
  3. while (true) {
  4. System.out.println("等待连接。。");
  5. //阻塞方法
  6. Socket clientSocket = serverSocket.accept();
  7. System.out.println("有客户端连接了。。");
  8. //多线程模型,每次accept了一个客户端连接创建一个新的线程处理
  9. new Thread(new Runnable() {
  10. @Override
  11. public void run() {
  12. try {
  13. handler(clientSocket);
  14. } catch (IOException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }).start();
  19. }
  20. }
  21. private static void handler(Socket clientSocket) throws IOException {
  22. byte[] bytes = new byte[1024];
  23. System.out.println("准备read。。");
  24. //接收客户端的数据,阻塞方法,没有数据可读时就阻塞
  25. int read = clientSocket.getInputStream().read(bytes);
  26. System.out.println("read完毕。。");
  27. if (read != -1) {
  28. System.out.println("接收到客户端的数据:" + new String(bytes, 0, read));
  29. }
  30. clientSocket.getOutputStream().write("HelloClient".getBytes());
  31. clientSocket.getOutputStream().flush();
  32. }

微信截图_20210722094403.png

改造二:非阻塞

引⼊了线程池,⼀个线程可以处理多个请求,但是线程在处理某个连接的 read 等操作时,如果遇到没有数据可读的情况,就会发生阻塞,那么该线程就没办法继续处理其他连接的业务。 要解决这⼀个问题,最简单的方式就是将 socket 改成非阻塞,把连接创建的socket 放入到一个集合中,然后线程不断地轮询调用 read 操作来判断是否有数据。

  1. public class NioServer {
  2. // 保存客户端连接
  3. static List<SocketChannel> channelList = new ArrayList<>();
  4. public static void main(String[] args) throws IOException, InterruptedException {
  5. // 创建NIO ServerSocketChannel,与BIO的serverSocket类似
  6. ServerSocketChannel serverSocket = ServerSocketChannel.open();
  7. serverSocket.socket().bind(new InetSocketAddress(9000));
  8. // 设置ServerSocketChannel为非阻塞
  9. serverSocket.configureBlocking(false);
  10. System.out.println("服务启动成功");
  11. while (true) {
  12. // 非阻塞模式accept方法不会阻塞,否则会阻塞
  13. // NIO的非阻塞是由操作系统内部实现的,底层调用了linux内核的accept函数
  14. SocketChannel socketChannel = serverSocket.accept();
  15. if (socketChannel != null) { // 如果有客户端进行连接
  16. System.out.println("连接成功");
  17. // 设置SocketChannel为非阻塞
  18. socketChannel.configureBlocking(false);
  19. // 保存客户端连接在List中
  20. channelList.add(socketChannel);
  21. }
  22. // 遍历连接进行数据读取
  23. Iterator<SocketChannel> iterator = channelList.iterator();
  24. while (iterator.hasNext()) {
  25. SocketChannel sc = iterator.next();
  26. ByteBuffer byteBuffer = ByteBuffer.allocate(128);
  27. // 非阻塞模式read方法不会阻塞,否则会阻塞
  28. int len = sc.read(byteBuffer);
  29. // 如果有数据,把数据打印出来
  30. if (len > 0) {
  31. System.out.println("接收到消息:" + new String(byteBuffer.array()));
  32. } else if (len == -1) { // 如果客户端断开,把socket从集合中去掉
  33. iterator.remove();
  34. System.out.println("客户端断开连接");
  35. }
  36. }
  37. }
  38. }
  39. }

这种⽅式虽然该能够解决阻塞的问题,但是解决的⽅式比较粗暴,因为我们把所有的socket 连接都放到了一个集合中,每次需要处理客户请求都需要把集合遍历一次,随着⼀个线程处理的连接越多,轮询的效率就会越低,比如现有 10000 个连接,只有 100 个连接发送数据,而我们需要把这 10000 个连接都遍历一次。

I/O 多路复用

上面的问题在于,线程并不知道当前连接是否有数据可读,从而需要每次通过 read 去试探。 那有没有办法在只有当连接上有数据的时候,线程才去发起读请求呢?

事件驱动模式

微信截图_20210722104632.png

基于事件驱动模式的 I/O 多路复用,我们只需要关心当前有事件发生的 socket,不需要通过轮询的方式去逐个检测每个socket是否有事件发生。我们只需要将对应的socket绑定相关的事件注册到多路复用器上(Selector),I/O 多路复用技术会用⼀个操作系统函数来监听我们所有关心事件的连接。我们熟悉的 epoll 模型就是内核提供给用户态的多路复用系统调用,线程可以通过⼀个操作系统调用函数(epoll_wait())从内核中获取多个已发生的事件。
微信截图_20210703104225.png

  1. 如果没有事件发生,线程只需阻塞在这个系统调用,而无需像前面的线程池方案那样轮询每个 socket 连接,检查是否有事件发生。
  2. 如果有事件发生,内核会返回产生了事件的连接,线程就会从阻塞状态返回,然后在用户态中再处理这些连接对应的业务即可。

    I/O多路复用具体实现及原理可以阅读《I/O多路复用原理》

Reactor模型

基于 I/O 多路复用的模式,已经可以把一个线程的性能压榨得很高,而 Reactor 模式正是对 I/O 多路复用做了一层封装,基于 I/O 多路复用监听事件发生,收到事件后根据事件类型分配给某个 Handler 处理。

  • Reactor:运行在一个单独的线程中,负责监听事件(accept、read、write…..),并将对应的事件分发对应的Handler进行处理(可以认为是Selector
  • Handler:负责处理Reactor分发过来的事件

根据实际业务场景,Reactor的数量可以有一个或多个,Handler也可以有一个或多个,常用的方案有 单Reactor单线程、单Reactor多线程、多Reactor多线程

单 Reactor 单线程

微信截图_20210722115520.png
可以看到有 Reactor、Acceptor、Handler 这三个对象:

  • Reactor:对象的作用是监听和分发事件
  • Acceptor:对象的作用是获取连接,处理accept事件(Handler中的一种)
  • Handler:对象的作用是对非连接事件进行实际业务处理

Reactor 通过调用 select() 方法获取发生的事件:

  1. 如果是 accept 事件,则交给 Acceptor 对象进行处理,Acceptor 对象会调用 accept() 方法,并创建一个 socketChannel,绑定相应的事件并注册到 Selector 上;
  2. 如果不是 accept 事件,则分发给对应的 Handler 对象对相关事件进行处理;

服务器端的 Reactor 是一个线程对象,该线程会启动事件循环,并使用 Selector(选择器)来实现 IO 的多路复用。注册一个 Acceptor 事件处理器到 Reactor 中,Acceptor 事件处 理器所关注的事件是 ACCEPT 事件,这样 Reactor 会监听客户端向服务器端发起的连接请求事件。

当 Reactor 监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理。比如,读处理器会通过 SocketChannel 的 read() 方法读取数据,此时 read() 操作可以直接读取到数据,而不会堵塞与等待可读的数据到来。每当处理完所有就绪的感兴趣的 I/O 事件后,Reactor 线程会再次执行 select() 阻塞等待新的事件就绪并将其分派给对应处理器进行处理。

上述所有的事件都是在一个线程中完成,Handler 对象在业务处理时,线程⽆法处理其他连接的事件的,如果业务处理耗时比较长,那么就造成响应的延迟。如果某个业务处理中出现异常或进入死循环,则整个网络通信不可用。

单 Reactor 多线程

与单线程 Reactor 模式不同的是,添加了一个工作者线程池,并将非 I/O 操作从 Reactor线程中移出转交给工作者线程池来执行。这样能够提高 Reactor 线程的 I/O 响应,不至于因为一些耗时的业务逻辑而延迟对后面 I/O 请求的处理。具体的过程如下:

  1. Reactor 通过调用 select() 方法获取发生的事件;
  2. 如果是 accept 事件,则交给 Acceptor 对象进行处理,Acceptor 对象会调用 accept()方法,并创建一个 socketChannel,绑定相应的事件并注册到 Selector 上;
  3. 如果不是 accept 事件,则分发给对应的 Handler 对象;
  4. Handler 对象只对事件进行接收和发送,不做任何处理,比如处理 read 事件,调用read 方法读取到数据后,交给线程池中的线程进行处理;
  5. 线程池中的工作线程处理完业务方法后,将结果返回给的 Handler,接着调动 send 方法将响应结果返回给客户端;

微信截图_20210722121505.png
引入了线程池,通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程产生的巨大开销。并且当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。

但是对于高负载、大并发或大数据量的应用场景却不合适,主要原因如下:

  • 一个 NIO 线程同时处理成百上千的链路,性能上无法支撑,即便 NIO 线程的 CPU 负 荷达到 100%,也无法满足海量消息的读取和发送;
  • 当 NIO 线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了 NIO 线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;

多 Reactor 多线程

要解决一个 Reactor 处理所有事件的问题,可以引入 Reactor 线程池。Reactor 线程池中的每一 Reactor 线程都会有自己的 Selector、线程和分发的事件循环逻辑。

以主从 Reactor 模型为例,该模式下分为 mainReactor 和 subReactor 两种 Reactor。mainReactor 可以只有一个,但 subReactor 一般会有多个。mainReactor 线程主要负责接收客户端的连接请求,然后将接收到的 SocketChannel 传递给 subReactor,由 subReactor 来完成和客户端的通信。主要流程如下:

  1. 注册一个 Acceptor 事件处理器到 mainReactor 中,Acceptor 事件处理器所关注的事件是 ACCEPT 事件,这样 mainReactor 会监听客户端向服务器端发起的连接请求事件(ACCEPT事件),启动 mainReactor 的事件循环。
  2. 客户端向服务器端发起一个连接请求,mainReactor 监听到了该 ACCEPT 事件并将该 ACCEPT 事件派发给 Acceptor 处理器来进行处理。Acceptor 处理器通过 accept() 方法得到与这个客户端对应的连接(SocketChannel),然后将这个 SocketChannel 传递给 subReactor 线程池。
  3. subReactor 线程池分配一个 subReactor 线程给这个 SocketChannel,即将 SocketChannel 关注的 READ 事件以及对应的 READ 事件处理器注册到 subReactor 线程中的其中一个 Reactor 线程(当然你也注册 WRITE 事件以及 WRITE 事件处理器到 subReactor 线程中以完成 I/O 写操作)。Reactor 线程池中的每一 Reactor 线程都会有自己的 Selector、线程和分发的循环逻辑。
  4. 后续的处理跟单 Reactor 多线程模式一样,Handler 对象只对事件进行接收和发送,不做任何处理,交给线程池中的工作线程处理;

注意,所以的 I/O 操作(包括 accept()、read()、write() 以及 connect() 操作)依旧还是在 Reactor 线程(mainReactor 线程 或 subReactor 线程)中完成的。Thread Pool 线程池仅用来处理非 I/O 操作的逻辑。
微信截图_20210722122853.png

多 Reactor 线程模式将“接受客户端的连接请求”和“与该客户端的通信”分在了两个 Reactor 线程来完成。mainReactor 完成接收客户端连接请求的操作,它不负责与客户端的通 信,而是将建立好的连接转交给 subReactor 线程来完成与客户端的通信,这样一来就不会因为 read() 数据量太大而导致后面的客户端连接请求得不到即时处理的情况。并且多 Reactor 线程模式在海量的客户端并发请求的情况下,还可以通过实现 subReactor 线程池来将海量的连接分发给多个 subReactor 线程,在多核的操作系统中这能大大提升应用的负载和吞吐量。