我们开发的绝大多数业务系统,都是 IO 密集型系统。跟 IO 密集型系统相对的另一种系统叫计算密集型系统。通过这两种系统的名字,估计你也能大概猜出来 IO 密集型系统是什么意思。IO 密集型系统大部分时间都在执行 IO 操作,这个 IO 操作主要包括网络 IO 和磁盘 IO,以及与计算机连接的一些外围设备的访问。对于 IO 密集型系统,特别适合使用异步的设计来提升系统性能。
应用程序最常使用的 IO 资源,主要包括磁盘 IO 和网络 IO。由于现在的 SSD 的速度越来越快,对于本地磁盘的读写,异步的意义越来越小。所以,使用异步设计的方法来提升 IO 性能,我们更加需要关注的问题是,如何来实现高性能的异步网络传输。常见的 IO 模型有:BIO(同步阻塞)、NIO(同步非阻塞)、AIO(异步非阻塞),下面我们就来了解一下这几种不同的 IO 类型。
BIO(同步阻塞IO)
BIO,称为同步 IO,当我们实现网络通信时,通常会建立一个 TCP 连接,此时用户代码会获得一个用于收发数据的通道,每个通道会在内存中开辟两片区域用于收发数据的缓存。
发送数据的过程比较简单,我们直接往这个通道里面来写入数据就可以了。用户代码在发送时写入的数据会暂存在缓存中,然后操作系统会通过网卡,把发送缓存中的数据传输到对端的服务器上。只要这个缓存不满,或者说,我们发送数据的速度没有超过网卡传输速度的上限,那这个发送数据的操作耗时,只不过是一次内存写入的时间,这个时间是非常快的。所以,发送数据的时候同步发送就可以了,没有必要异步。
比较麻烦的是接收数据。对于数据的接收方来说,它并不知道什么时候会收到数据。那我们能直接想到的方法就是,用一个线程阻塞在那儿等着数据,当有数据到来的时候,操作系统会先把数据写入接收缓存,然后给接收数据的线程发一个通知,线程收到通知后结束等待,开始读取数据。处理完这一批数据后,继续阻塞等待下一批数据到来,这样周而复始地处理收到的数据。
这就是 BIO 的模型。同步网络 IO 模型在处理少量连接的时候,是没有问题的。但是如果要同时处理非常多的连接,同步的网络 IO 模型就有点儿力不从心了。以下是一个简单的socket建立连接进行通信的例子:
服务端代码如下:
public class SocketServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
System.out.println("等待连接。。");
Socket clientSocket = serverSocket.accept();
System.out.println("有客户端连接了。。");
//单线程模型,只有一条线程处理,如果没处理完会阻塞不能accept下一个客户端
handler(clientSocket);
}
}
private static void handler(Socket clientSocket) throws IOException {
byte[] bytes = new byte[1024];
System.out.println("准备read。。");
//接收客户端的数据,阻塞方法,没有数据可读时就阻塞
int read = clientSocket.getInputStream().read(bytes);
System.out.println("read完毕。。");
if (read != -1) {
System.out.println("接收到客户端的数据:" + new String(bytes, 0, read));
}
clientSocket.getOutputStream().write("HelloClient".getBytes());
clientSocket.getOutputStream().flush();
}
}
客户端代码如下:
public static void main(String[] args) throws IOException, InterruptedException {
Socket socket = new Socket("127.0.0.1", 9000);
//向服务端发送数据
socket.getOutputStream().write("HelloServer".getBytes());
socket.getOutputStream().flush();
byte[] bytes = new byte[1024];
//接收服务端回传的数据
socket.getInputStream().read(bytes);
System.out.println("接收到服务端的数据:" + new String(bytes));
socket.close();
}
服务端首先调创建一个 ServerSocket 对象(网络协议为 IPv4,传输协议为TCP),接着调用 bind() 方法绑定所要监听指定的端口并调用 listen() 方法进行监听。服务端进入了监听状态后,通过调用 accept() 方法,从内核 ACCEPT 队列获取TCP连接,如果没有客户端连接,则会阻塞等待客户端连接的到来。获取连接后返回一个Socket对象(服务端创建了两个Socket对象,一个用来监听端口,一个用来真正传输数据)
服务器的内核实际上为每个 Socket 维护了两个队列:
- 一个是还没完全建立连接的队列,称为 TCP 半连接队列 (也称SYN队列) 这个队列都是没有完成三次握手的连接,此时服务端处于 SYN_RECV 的状态;
- 一个是一件建立连接的队列,称为 TCP 全连接队列(也称ACCEPT队列)这个队列都是完成了三次握手的连接,此时服务端处于 ESTABLISHED 状态;
客户端在创建好 Socket 后,调用 connect() 函数发起连接,参数是要指明服务端的 IP 地址和端口号,然后就开始 TCP 三次握手。
上面代码中,当一个客户端与服务端建立了连接,在执行业务代码期间,如果在有另一个客户端发起连接,则会发生阻塞。服务端只有处理完与当前客户端的通信后才会处理下一个连接,这就是 BIO 的特点,只能实现一对一的通信。
上诉例子若要处理多个连接,可以通过多线程进行改造
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
System.out.println("等待连接。。");
Socket clientSocket = serverSocket.accept();
System.out.println("有客户端连接了。。");
//多线程模型,每次accept了一个客户端连接创建一个新的线程处理
new Thread(new Runnable() {
@Override
public void run() {
try {
handler(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
这种方案,若有一些客户端只连接不发送数据,那么线程的资源就会浪费,且每个连接都需要阻塞一个线程来等待数据,大量的连接数就会需要相同数量的数据接收线程。当这些 TCP 连接都在进行数据收发的时候,会有大量的线程来抢占 CPU 时间,造成频繁的 CPU 上下文切换,导致 CPU 的负载升高,整个系统的性能就会比较慢,如经典的 C10K 问题。
NIO(同步非阻塞IO)
先抛开你知道的各种语言的异步类库和各种异步的网络 IO 框架,对于业务开发者来说,一个好的异步网络框架,我们希望达到的效果,无非就是,只用少量的线程就能处理大量的连接,有数据到来的时候能第一时间处理就可以了。为了解决 BIO 的阻塞问题,JDK1.4 开始引入 NIO,一个线程可以处理多个连接请求,客户端发送的连接请求都会注册到多路复用器 selector 上,多路复用器轮询到连接有 IO 请求就进行处理。
NIO 有三大核心组件:Selector 选择器、Channel 管道、buffer 缓冲区。
Buffer
NIO 是面向缓冲的,发送给一个 Channel 的所有数据都必须首先放到缓冲区中,同样地,从 Channel 中读取的任何数据都要先读到缓冲区中。也就是说,不会直接对 Channel 进行读写数据,而是要先经过缓冲区。
缓冲区本质上是一块可以写入数据,也可以从中读取数据的内存,其底层是一个数组。 这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。
Buffer 中有以下几个重要的属性:
public abstract class Buffer {
.......
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
......
}
- capacity:表示 Buffer 的容量,在 Buffer 创建时确定,后续不能修改;
- limit:表示 Buffer 当前能读写的最大位置,不能对超过 limit 位置进行读写,limit 可被修改;
position:下一个要被读或写的元素的索引,每次读写 Buffer 都会修改 position,flip() 方法可以重置 position;
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
mark:标志位;
Channel
通道 Channel 是对原 I/O 包中的流的模拟,是应用程序和操作系统之间交互事件、传递内容的通道。通过 Channel 可以读取,也可以向操作系统写入数据,通道与流的不同之处在于,流只能在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类),而通道是双向的,可以用于读、写或者同时用于读写。
本地文件写:
public static void main(String[] args) throws Exception {
String s = "hello world";
FileOutputStream outputStream = new FileOutputStream("F:\\helloworld\\hello.txt");
//获取fileChannel
FileChannel fileWriteChannel = outputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//将数据放入buffer中
byteBuffer.put(s.getBytes());
//放入了数据 position改变 读/写buffer中的数据需要将position重新置0
byteBuffer.flip();
//将Buffer中数据写入Channel中
fileWriteChannel.write(byteBuffer);
fileWriteChannel.close();
}
本地文件读:
public static void main(String[] args) throws Exception{
FileInputStream inputStream = new FileInputStream("F:\\helloworld\\hello.txt");
//获取输入流FileChannel
FileChannel fileChannel = inputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
fileChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array()));
fileChannel.close();
}
Selector
在 Java 的 NIO 中,它提供了一个 Selector 对象,来解决一个线程在多个网络连接上的多路复用问题。在 NIO 中,每个已经建立好的连接用一个 Channel 对象来表示。我们希望能实现,在一个线程里,接收来自多个 Channel 的数据。也就是说,这些 Channel 中,任何一个 Channel 收到数据后,第一时间能在同一个线程里面来处理。
简单的非阻塞例子:
public class NioServer {
// 保存客户端连接
static List<SocketChannel> channelList = new ArrayList<>();
public static void main(String[] args) throws IOException, InterruptedException {
// 创建NIO ServerSocketChannel,与BIO的serverSocket类似
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
// 设置ServerSocketChannel为非阻塞
serverSocket.configureBlocking(false);
System.out.println("服务启动成功");
while (true) {
// 非阻塞模式accept方法不会阻塞,否则会阻塞
// NIO的非阻塞是由操作系统内部实现的,底层调用了linux内核的accept函数
SocketChannel socketChannel = serverSocket.accept();
if (socketChannel != null) { // 如果有客户端进行连接
System.out.println("连接成功");
// 设置SocketChannel为非阻塞
socketChannel.configureBlocking(false);
// 保存客户端连接在List中
channelList.add(socketChannel);
}
// 遍历连接进行数据读取
Iterator<SocketChannel> iterator = channelList.iterator();
while (iterator.hasNext()) {
SocketChannel sc = iterator.next();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
// 非阻塞模式read方法不会阻塞,否则会阻塞
int len = sc.read(byteBuffer);
// 如果有数据,把数据打印出来
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
} else if (len == -1) {
// 如果客户端断开,把socket从集合中去掉
iterator.remove();
System.out.println("客户端断开连接");
}
}
}
}
}
上面例子设置了 ServerSocketChannel 为非阻塞,调用 accept() 方法不会阻塞;设置SocketChannel 为非阻塞,调用 read()、write() 方法也不会阻塞。这样子虽然解决了阻塞问题,但存在一个严重的问题,我们把所有 SocketChannel 都放到了一个集合中去,每次要处理客户端的请求都要把集合遍历一遍,如果 1000 个连接在里面,只有 100 个会发送数据,这样做效率很低。
我们可以想一下,一个线程对应多个 Channel,有可能会出现这两种情况:
- 线程在忙着处理收到的数据,这时候 Channel 中又收到了新数据;
- 线程闲着没事儿干,所有的 Channel 中都没收到数据,也不能确定哪个 Channel 会在什么时候收到数据。
Selecor 通过一种类似于事件响应的机制来解决这个问题。首先你需要把你的连接,也就是 Channel 绑定到 Selector 上,然后你可以在接收数据的线程来调用 Selector.select() 方法来等待数据到来。这个 select 方法是一个阻塞方法,这个线程会一直卡在这儿,直到这些 Channel 中的任意一个有数据到来,就会结束等待返回数据。它的返回值是一个迭代器,你可以从这个迭代器里面获取所有 Channel 收到的数据,然后来执行你的数据接收的业务逻辑。代码如下:
public static void main(String[] args) throws IOException, InterruptedException {
// 创建NIO ServerSocketChannel
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
// 设置ServerSocketChannel为非阻塞
serverSocket.configureBlocking(false);
// 打开Selector处理Channel,即创建epoll
Selector selector = Selector.open();
// 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务启动成功");
while (true) {
// 阻塞等待需要处理的事件发生
selector.select();
// 获取selector中注册的全部事件的 SelectionKey实例
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍历SelectionKey对事件进行处理
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 如果是OP_ACCEPT事件,则进行连接获取和事件注册
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
// 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接成功");
// 如果是OP_READ事件,则进行读取
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
int len = socketChannel.read(byteBuffer);
// 如果有数据,把数据打印出来
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
key.attach("服务端已收到消息");
//将SocketChannel的事件变为OP_WRITE
key.interestOps(SelectionKey.OP_WRITE);
} else if (len == -1) {
// 如果客户端断开连接,关闭Socket
System.out.println("客户端断开连接");
socketChannel.close();
}
}else if(key.isWritable()){
SocketChannel socketChannel = (SocketChannel) key.channel();
String msg = (String) key.attachment();
key.attach(null);
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
key.interestOps(SelectionKey.OP_READ);
}
//从事件集合里删除本次处理的key,防止下次select重复处理
iterator.remove();
}
}
}
上面我们通过创建一个 Selector,将 Channel 绑定对应的事件(read、write、accept、connect)注册到 Selector 中,返回一个 SelectionKey 于该 Channel 绑定,与。Selector调用 select() 方法开始监听事件的发生,如果没有事件则会阻塞直到新的事件到来。调用selectedKeys() 获取发生的事件。
SelectionKey 是一个抽象类,表示 selectableChannel 在 Selector 中注册的标识。每个Channel 向 Selector 注册时,都将会创建一个 SelectionKey。SelectionKey 将 Channel 与 Selector 建立了关系,并维护了 channel 事件。
在向 Selector 对象注册感兴趣的事件时,JAVA NIO 共定义了四种事件,OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT,分别对应读、写、请求连接、接受连接等网络 Socket 操作。
引入多路复用器 Selector 后,只有在连接真正有读写事件发生时,才会进行读写,就大大地减少了系统的开销。