我们开发的绝大多数业务系统,都是 IO 密集型系统。跟 IO 密集型系统相对的另一种系统叫计算密集型系统。通过这两种系统的名字,估计你也能大概猜出来 IO 密集型系统是什么意思。IO 密集型系统大部分时间都在执行 IO 操作,这个 IO 操作主要包括网络 IO 和磁盘 IO,以及与计算机连接的一些外围设备的访问。对于 IO 密集型系统,特别适合使用异步的设计来提升系统性能。

应用程序最常使用的 IO 资源,主要包括磁盘 IO 和网络 IO。由于现在的 SSD 的速度越来越快,对于本地磁盘的读写,异步的意义越来越小。所以,使用异步设计的方法来提升 IO 性能,我们更加需要关注的问题是,如何来实现高性能的异步网络传输。常见的 IO 模型有:BIO(同步阻塞)、NIO(同步非阻塞)、AIO(异步非阻塞),下面我们就来了解一下这几种不同的 IO 类型。

BIO(同步阻塞IO)

BIO,称为同步 IO,当我们实现网络通信时,通常会建立一个 TCP 连接,此时用户代码会获得一个用于收发数据的通道,每个通道会在内存中开辟两片区域用于收发数据的缓存。

发送数据的过程比较简单,我们直接往这个通道里面来写入数据就可以了。用户代码在发送时写入的数据会暂存在缓存中,然后操作系统会通过网卡,把发送缓存中的数据传输到对端的服务器上。只要这个缓存不满,或者说,我们发送数据的速度没有超过网卡传输速度的上限,那这个发送数据的操作耗时,只不过是一次内存写入的时间,这个时间是非常快的。所以,发送数据的时候同步发送就可以了,没有必要异步。

比较麻烦的是接收数据。对于数据的接收方来说,它并不知道什么时候会收到数据。那我们能直接想到的方法就是,用一个线程阻塞在那儿等着数据,当有数据到来的时候,操作系统会先把数据写入接收缓存,然后给接收数据的线程发一个通知,线程收到通知后结束等待,开始读取数据。处理完这一批数据后,继续阻塞等待下一批数据到来,这样周而复始地处理收到的数据。

这就是 BIO 的模型。同步网络 IO 模型在处理少量连接的时候,是没有问题的。但是如果要同时处理非常多的连接,同步的网络 IO 模型就有点儿力不从心了。以下是一个简单的socket建立连接进行通信的例子:

服务端代码如下:

  1. public class SocketServer {
  2. public static void main(String[] args) throws IOException {
  3. ServerSocket serverSocket = new ServerSocket(9000);
  4. while (true) {
  5. System.out.println("等待连接。。");
  6. Socket clientSocket = serverSocket.accept();
  7. System.out.println("有客户端连接了。。");
  8. //单线程模型,只有一条线程处理,如果没处理完会阻塞不能accept下一个客户端
  9. handler(clientSocket);
  10. }
  11. }
  12. private static void handler(Socket clientSocket) throws IOException {
  13. byte[] bytes = new byte[1024];
  14. System.out.println("准备read。。");
  15. //接收客户端的数据,阻塞方法,没有数据可读时就阻塞
  16. int read = clientSocket.getInputStream().read(bytes);
  17. System.out.println("read完毕。。");
  18. if (read != -1) {
  19. System.out.println("接收到客户端的数据:" + new String(bytes, 0, read));
  20. }
  21. clientSocket.getOutputStream().write("HelloClient".getBytes());
  22. clientSocket.getOutputStream().flush();
  23. }
  24. }

客户端代码如下:

  1. public static void main(String[] args) throws IOException, InterruptedException {
  2. Socket socket = new Socket("127.0.0.1", 9000);
  3. //向服务端发送数据
  4. socket.getOutputStream().write("HelloServer".getBytes());
  5. socket.getOutputStream().flush();
  6. byte[] bytes = new byte[1024];
  7. //接收服务端回传的数据
  8. socket.getInputStream().read(bytes);
  9. System.out.println("接收到服务端的数据:" + new String(bytes));
  10. socket.close();
  11. }

服务端首先调创建一个 ServerSocket 对象(网络协议为 IPv4,传输协议为TCP),接着调用 bind() 方法绑定所要监听指定的端口并调用 listen() 方法进行监听。服务端进入了监听状态后,通过调用 accept() 方法,从内核 ACCEPT 队列获取TCP连接,如果没有客户端连接,则会阻塞等待客户端连接的到来。获取连接后返回一个Socket对象(服务端创建了两个Socket对象,一个用来监听端口,一个用来真正传输数据)

服务器的内核实际上为每个 Socket 维护了两个队列:

  • 一个是还没完全建立连接的队列,称为 TCP 半连接队列 (也称SYN队列) 这个队列都是没有完成三次握手的连接,此时服务端处于 SYN_RECV 的状态;
  • 一个是一件建立连接的队列,称为 TCP 全连接队列(也称ACCEPT队列)这个队列都是完成了三次握手的连接,此时服务端处于 ESTABLISHED 状态;

客户端在创建好 Socket 后,调用 connect() 函数发起连接,参数是要指明服务端的 IP 地址和端口号,然后就开始 TCP 三次握手。
微信截图_20210702154525.png
上面代码中,当一个客户端与服务端建立了连接,在执行业务代码期间,如果在有另一个客户端发起连接,则会发生阻塞。服务端只有处理完与当前客户端的通信后才会处理下一个连接,这就是 BIO 的特点,只能实现一对一的通信。

上诉例子若要处理多个连接,可以通过多线程进行改造

  1. public static void main(String[] args) throws IOException {
  2. ServerSocket serverSocket = new ServerSocket(9000);
  3. while (true) {
  4. System.out.println("等待连接。。");
  5. Socket clientSocket = serverSocket.accept();
  6. System.out.println("有客户端连接了。。");
  7. //多线程模型,每次accept了一个客户端连接创建一个新的线程处理
  8. new Thread(new Runnable() {
  9. @Override
  10. public void run() {
  11. try {
  12. handler(clientSocket);
  13. } catch (IOException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. }).start();
  18. }
  19. }

这种方案,若有一些客户端只连接不发送数据,那么线程的资源就会浪费,且每个连接都需要阻塞一个线程来等待数据,大量的连接数就会需要相同数量的数据接收线程。当这些 TCP 连接都在进行数据收发的时候,会有大量的线程来抢占 CPU 时间,造成频繁的 CPU 上下文切换,导致 CPU 的负载升高,整个系统的性能就会比较慢,如经典的 C10K 问题。

NIO(同步非阻塞IO)

先抛开你知道的各种语言的异步类库和各种异步的网络 IO 框架,对于业务开发者来说,一个好的异步网络框架,我们希望达到的效果,无非就是,只用少量的线程就能处理大量的连接,有数据到来的时候能第一时间处理就可以了。为了解决 BIO 的阻塞问题,JDK1.4 开始引入 NIO,一个线程可以处理多个连接请求,客户端发送的连接请求都会注册到多路复用器 selector 上,多路复用器轮询到连接有 IO 请求就进行处理。

NIO 有三大核心组件:Selector 选择器、Channel 管道、buffer 缓冲区。

Buffer

NIO 是面向缓冲的,发送给一个 Channel 的所有数据都必须首先放到缓冲区中,同样地,从 Channel 中读取的任何数据都要先读到缓冲区中。也就是说,不会直接对 Channel 进行读写数据,而是要先经过缓冲区。

缓冲区本质上是一块可以写入数据,也可以从中读取数据的内存,其底层是一个数组。 这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。
微信截图_20210721143229.png

Buffer 中有以下几个重要的属性:

  1. public abstract class Buffer {
  2. .......
  3. // Invariants: mark <= position <= limit <= capacity
  4. private int mark = -1;
  5. private int position = 0;
  6. private int limit;
  7. private int capacity;
  8. ......
  9. }
  • capacity:表示 Buffer 的容量,在 Buffer 创建时确定,后续不能修改;
  • limit:表示 Buffer 当前能读写的最大位置,不能对超过 limit 位置进行读写,limit 可被修改;
  • position:下一个要被读或写的元素的索引,每次读写 Buffer 都会修改 position,flip() 方法可以重置 position;

    1. public final Buffer flip() {
    2. limit = position;
    3. position = 0;
    4. mark = -1;
    5. return this;
    6. }
  • mark:标志位;

Channel

通道 Channel 是对原 I/O 包中的流的模拟,是应用程序和操作系统之间交互事件、传递内容的通道。通过 Channel 可以读取,也可以向操作系统写入数据,通道与流的不同之处在于,流只能在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类),而通道是双向的,可以用于读、写或者同时用于读写。
微信截图_20210721150323.png

本地文件写:

  1. public static void main(String[] args) throws Exception {
  2. String s = "hello world";
  3. FileOutputStream outputStream = new FileOutputStream("F:\\helloworld\\hello.txt");
  4. //获取fileChannel
  5. FileChannel fileWriteChannel = outputStream.getChannel();
  6. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  7. //将数据放入buffer中
  8. byteBuffer.put(s.getBytes());
  9. //放入了数据 position改变 读/写buffer中的数据需要将position重新置0
  10. byteBuffer.flip();
  11. //将Buffer中数据写入Channel中
  12. fileWriteChannel.write(byteBuffer);
  13. fileWriteChannel.close();
  14. }

微信截图_20210721155842.png

本地文件读:

  1. public static void main(String[] args) throws Exception{
  2. FileInputStream inputStream = new FileInputStream("F:\\helloworld\\hello.txt");
  3. //获取输入流FileChannel
  4. FileChannel fileChannel = inputStream.getChannel();
  5. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  6. fileChannel.read(byteBuffer);
  7. System.out.println(new String(byteBuffer.array()));
  8. fileChannel.close();
  9. }

微信截图_20210721160015.png

Selector

在 Java 的 NIO 中,它提供了一个 Selector 对象,来解决一个线程在多个网络连接上的多路复用问题。在 NIO 中,每个已经建立好的连接用一个 Channel 对象来表示。我们希望能实现,在一个线程里,接收来自多个 Channel 的数据。也就是说,这些 Channel 中,任何一个 Channel 收到数据后,第一时间能在同一个线程里面来处理。

简单的非阻塞例子:

  1. public class NioServer {
  2. // 保存客户端连接
  3. static List<SocketChannel> channelList = new ArrayList<>();
  4. public static void main(String[] args) throws IOException, InterruptedException {
  5. // 创建NIO ServerSocketChannel,与BIO的serverSocket类似
  6. ServerSocketChannel serverSocket = ServerSocketChannel.open();
  7. serverSocket.socket().bind(new InetSocketAddress(9000));
  8. // 设置ServerSocketChannel为非阻塞
  9. serverSocket.configureBlocking(false);
  10. System.out.println("服务启动成功");
  11. while (true) {
  12. // 非阻塞模式accept方法不会阻塞,否则会阻塞
  13. // NIO的非阻塞是由操作系统内部实现的,底层调用了linux内核的accept函数
  14. SocketChannel socketChannel = serverSocket.accept();
  15. if (socketChannel != null) { // 如果有客户端进行连接
  16. System.out.println("连接成功");
  17. // 设置SocketChannel为非阻塞
  18. socketChannel.configureBlocking(false);
  19. // 保存客户端连接在List中
  20. channelList.add(socketChannel);
  21. }
  22. // 遍历连接进行数据读取
  23. Iterator<SocketChannel> iterator = channelList.iterator();
  24. while (iterator.hasNext()) {
  25. SocketChannel sc = iterator.next();
  26. ByteBuffer byteBuffer = ByteBuffer.allocate(128);
  27. // 非阻塞模式read方法不会阻塞,否则会阻塞
  28. int len = sc.read(byteBuffer);
  29. // 如果有数据,把数据打印出来
  30. if (len > 0) {
  31. System.out.println("接收到消息:" + new String(byteBuffer.array()));
  32. } else if (len == -1) {
  33. // 如果客户端断开,把socket从集合中去掉
  34. iterator.remove();
  35. System.out.println("客户端断开连接");
  36. }
  37. }
  38. }
  39. }
  40. }

上面例子设置了 ServerSocketChannel 为非阻塞,调用 accept() 方法不会阻塞;设置SocketChannel 为非阻塞,调用 read()、write() 方法也不会阻塞。这样子虽然解决了阻塞问题,但存在一个严重的问题,我们把所有 SocketChannel 都放到了一个集合中去,每次要处理客户端的请求都要把集合遍历一遍,如果 1000 个连接在里面,只有 100 个会发送数据,这样做效率很低。

我们可以想一下,一个线程对应多个 Channel,有可能会出现这两种情况:

  • 线程在忙着处理收到的数据,这时候 Channel 中又收到了新数据;
  • 线程闲着没事儿干,所有的 Channel 中都没收到数据,也不能确定哪个 Channel 会在什么时候收到数据。

Selecor 通过一种类似于事件响应的机制来解决这个问题。首先你需要把你的连接,也就是 Channel 绑定到 Selector 上,然后你可以在接收数据的线程来调用 Selector.select() 方法来等待数据到来。这个 select 方法是一个阻塞方法,这个线程会一直卡在这儿,直到这些 Channel 中的任意一个有数据到来,就会结束等待返回数据。它的返回值是一个迭代器,你可以从这个迭代器里面获取所有 Channel 收到的数据,然后来执行你的数据接收的业务逻辑。代码如下:

  1. public static void main(String[] args) throws IOException, InterruptedException {
  2. // 创建NIO ServerSocketChannel
  3. ServerSocketChannel serverSocket = ServerSocketChannel.open();
  4. serverSocket.socket().bind(new InetSocketAddress(9000));
  5. // 设置ServerSocketChannel为非阻塞
  6. serverSocket.configureBlocking(false);
  7. // 打开Selector处理Channel,即创建epoll
  8. Selector selector = Selector.open();
  9. // 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣
  10. SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
  11. System.out.println("服务启动成功");
  12. while (true) {
  13. // 阻塞等待需要处理的事件发生
  14. selector.select();
  15. // 获取selector中注册的全部事件的 SelectionKey实例
  16. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  17. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  18. // 遍历SelectionKey对事件进行处理
  19. while (iterator.hasNext()) {
  20. SelectionKey key = iterator.next();
  21. // 如果是OP_ACCEPT事件,则进行连接获取和事件注册
  22. if (key.isAcceptable()) {
  23. ServerSocketChannel server = (ServerSocketChannel) key.channel();
  24. SocketChannel socketChannel = server.accept();
  25. socketChannel.configureBlocking(false);
  26. // 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件
  27. socketChannel.register(selector, SelectionKey.OP_READ);
  28. System.out.println("客户端连接成功");
  29. // 如果是OP_READ事件,则进行读取
  30. } else if (key.isReadable()) {
  31. SocketChannel socketChannel = (SocketChannel) key.channel();
  32. ByteBuffer byteBuffer = ByteBuffer.allocate(128);
  33. int len = socketChannel.read(byteBuffer);
  34. // 如果有数据,把数据打印出来
  35. if (len > 0) {
  36. System.out.println("接收到消息:" + new String(byteBuffer.array()));
  37. key.attach("服务端已收到消息");
  38. //将SocketChannel的事件变为OP_WRITE
  39. key.interestOps(SelectionKey.OP_WRITE);
  40. } else if (len == -1) {
  41. // 如果客户端断开连接,关闭Socket
  42. System.out.println("客户端断开连接");
  43. socketChannel.close();
  44. }
  45. }else if(key.isWritable()){
  46. SocketChannel socketChannel = (SocketChannel) key.channel();
  47. String msg = (String) key.attachment();
  48. key.attach(null);
  49. socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
  50. key.interestOps(SelectionKey.OP_READ);
  51. }
  52. //从事件集合里删除本次处理的key,防止下次select重复处理
  53. iterator.remove();
  54. }
  55. }
  56. }

上面我们通过创建一个 Selector,将 Channel 绑定对应的事件(read、write、accept、connect)注册到 Selector 中,返回一个 SelectionKey 于该 Channel 绑定,与。Selector调用 select() 方法开始监听事件的发生,如果没有事件则会阻塞直到新的事件到来。调用selectedKeys() 获取发生的事件。
微信截图_20210702205137.png

SelectionKey 是一个抽象类,表示 selectableChannel 在 Selector 中注册的标识。每个Channel 向 Selector 注册时,都将会创建一个 SelectionKey。SelectionKey 将 Channel 与 Selector 建立了关系,并维护了 channel 事件。

在向 Selector 对象注册感兴趣的事件时,JAVA NIO 共定义了四种事件,OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT,分别对应读、写、请求连接、接受连接等网络 Socket 操作。

引入多路复用器 Selector 后,只有在连接真正有读写事件发生时,才会进行读写,就大大地减少了系统的开销。