与 CPU 和内存相比,甚至与磁盘相比,网络的速度都算慢的。传统的 Java 解决方案是缓冲和多线程,多个线程可以同时为几个不同的连接生成数据,并将数据存储在缓冲区中,直到网络确实准备好发送这些数据。这对于相当简单的服务器和客户端,如果不需要非常高的性能,这种方法效果很好。
但如果是一个每秒需要处理上万请求的大型服务器,考虑到生成多个线程以及在线程间切换的开销,我们不会为每个连接都分配一个线程。如果一个线程可以负责多个连接,能够选取一个准备好接收数据的连接,尽快填充这个连接所能管理的尽可能多的数据,然后转向下一个准备好的连接,这样速度就会更快。幸运的是,几乎所有的现代操作系统都支持这种非阻塞 I/O。
阻塞 IO
阻塞 I/O 发起的 read 请求,线程会被挂起,一直等到内核数据准备好,并把数据从内核区域拷贝到应用程序的缓冲区中,当拷贝过程完成,read 请求调用才返回。接下来,应用程序就可以对缓冲区的数据进行数据解析。
阻塞 I/O 情况下,当用户调用 read 后,用户线程会被阻塞,等内核数据准备好并且数据从内核缓冲区拷贝到用户态缓存区后 read 才会返回。可以看到是阻塞的两个部分:
- CPU 把数据从磁盘读到内核缓冲区的过程
- CPU 把数据从内核缓冲区拷贝到用户缓冲区的过程
在 JDK 1.4 之前的传统 I/O 模型就采用的阻塞 IO,又叫 BIO(Blocking IO),属于同步阻塞模式。因此每个客户端的 socket 连接请求,服务端都会有个处理线程与之对应,相当于是一个连接一个线程。。
服务器线程会阻塞在 accept() 方法上,直到有客户端连接上服务器。对应的读写操作也是一样,服务器线程发起 I/O 请求后会一直阻塞,直到缓冲区的数据就绪后才能再进行下一步操作。针对网络通信都是一请求一应答的方式,虽然这简化了上层的应用开发,但在性能和可靠性方面存在着巨大的瓶颈。
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(9090,20);
// 通过死循环不断接收客户端请求
while (true) {
// 线程会阻塞在该方法
Socket client = server.accept();
new Thread(() -> {
// todo
}).start();
}
}
这种传统的 BIO 存在很大的局限性,因为服务器需要为每一个 Client 连接创建单独的线程,如果客户端很多则服务器会创建大量线程,从线程占用的内存和线程上下文切换来看,系统资源会有非常大的开销。而且 BIO 采用流来读取和写入数据,而流是阻塞的,当没有可读、可写的数据时,线程就会阻塞等待造成资源浪费。
非阻塞 IO
非阻塞 I/O 情况下,当用户调用 read 后,如果发现数据没准备好,会继续往下执行,此时应用程序可以不断轮询内核询问数据是否准备好,当数据没有准备好时,内核立即返回 EWOULDBLOCK 错误。直到数据被拷贝到应用程序缓冲区,read 请求才获取到结果。注意,这里最后一次 read 调用获取数据的过程是一个同步过程,这里的同步指的是内核态的数据拷贝到用户程序的缓存区这个过程。
针对于写操作,在非阻塞 I/O 的情况下,如果套接字的发送缓冲区已达到了极限,那么操作系统内核会尽最大可能从应用程序拷贝数据到发送缓冲区中,并立即从 write 函数返回。write 函数返回一个数值,告诉应用程序到底有多少数据被成功拷贝到了发送缓冲区中,应用程序需要再次调用 write 函数,以输出未完成拷贝的字节。
write 函数可以同时作用到阻塞 I/O 和非阻塞 I/O 上的,但是阻塞 I/O 和非阻塞 I/O 处理的方式是不一样的。
- 非阻塞 I/O 需要这样:拷贝→返回→再拷贝→再返回。
- 阻塞 I/O 需要这样:拷贝→直到所有数据拷贝至发送缓冲区完成→返回。
不过在实战中,我们可以不用区别阻塞和非阻塞 I/O,使用循环的方式来写入数据就好了。只不过在阻塞 I/O 的情况下,循环只执行一次就结束了。
多路复用
非阻塞情况下无可用数据时,每次让应用程序去轮询内核的 I/O 是否准备好,是一个不经济的做法,因为在轮询时应用进程啥也不能干。
客户端与服务器两端都是通过 socket 进行连接的,而 socket 在 Linux 操作系统中有对应的文件描述符,读写操作都是以该文件描述符为单位进行操作的。为了避免无效轮训,我们需要知道当前的文件描述符是否可读可写。但如果逐个文件描述符去询问,那么效率就和直接进行读写操作差不多了,我们希望有一种方法能够一次性得知哪些文件描述符可读,哪些文件描述符可写,于是像 select、poll 这样的 I/O 多路复用技术就隆重登场了。
通过 I/O 事件分发,当内数据准备好时再通知应用程序进行操作。这个做法大大改善了应用进程对 CPU 的利用率,在没有被通知的情况下,应用进程可以使用 CPU 做其他的事情。注意,这里 read 调用,获取数据的过程,也是一个同步的过程。
Linux 提供了 I/O 复用函数 select、poll、epoll,进程将一个或多个读操作通过系统调用函数,阻塞在函数操作上。这样,系统内核就可以帮我们侦测多个读操作是否处于就绪状态。
1. select
最初的多路复用器是 select,它的工作方式为:程序端每次把文件描述符集合交给 select 的系统调用,select 遍历每个文件描述符后返回那些可以操作的文件描述符,然后程序对可以操作的文件描述符进行读写。但它的缺点是一次传输的文件描述符集合有限,只能给出 1024 个文件描述符。
2. poll
为此 poll 在 select 的基础上进行了改进,取消了文件描述符数量的限制。但 select 和 poll 在性能上还可以优化,它们存在一个相同的缺点:那就是它们都需要在内核中对所有传入的文件描述符进行遍历,这个过程需要将
文件描述符复制到用户态和内核的地址空间之间,而无论这些文件描述符是否就绪,他们的开销都会随着文件描述符数量的增加而线性增大。
2. epoll
select、poll 是顺序扫描 fd 是否就绪,而且支持的 fd 数量不宜过大,因此它们的使用受到了一些制约。Linux 在 2.6 内核版本提供了 epoll 这个多路复用器,通过事件驱动的方式代替轮询扫描 fd,彻底解决性能问题。
为了不遍历所有的文件描述符,epoll 会在数据到达网卡时会触发中断。通常 CPU 会把相应的数据复制到内存中和相关的文件描述符进行绑定,epoll 在这个基础之上做了延伸。epoll 在内核中维护了一个红黑树以及一些链表结构,当数据到达网卡拷贝到内存时会把相应的 fd 从红黑树中拷贝到链表中,这样链表存储的就是已经有数据到达的文件描述符,这样当程序调用 epoll_wait 时就能直接把能读的文件描述符返回给应用程序。
Java NIO
如果客户端的连接数比较少,使用 BIO 也不会有什么问题,并且对应用程序的开发更简单。但如果服务器需要同时支持大量的长连接,并且各个客户端并不会很频繁地发送太多的数据,这种情况就很适合使用 NIO,而且只用少量的线程采用异步或非阻塞的按需处理来实现会高效的多。
NIO 的全称为 Non-Blocking IO,是一种同步非阻塞的 I/O 模型。非阻塞指的是用户线程不会原地等待 I/O 缓冲区就绪,可以先做一些其他操作,但要定时轮询检查 I/O 缓冲区数据是否就绪,这种轮训通常会采用 I/O 多路复用的技术,底层由操作系统的 epoll 模型实现。这种模式的好处在于:我们不必对每个客户端连接创建一个线程,并且数据的读写也是非阻塞的。
NIO 和传统 I/O 最大区别就是传统 I/O 是面向流,NIO 是面向 Buffer。Buffer 可以将文件一次性读入内存再做后续处理,而传统的方式是边读文件边处理数据。虽然传统 I/O 后面也使用了缓冲块,例如 BufferedInputStream,但仍然不能和 NIO 相媲美。
在 Java 中实现 NIO 主要由以下三个核心部分组成:
- Selector:是 NIO 实现多路复用的基础,它提供了一种高效的机制,可以检测到注册在 Selector 上的多个 Channel 中,是否有 Channel 处于就绪状态,进而实现了单线程对多 Channel 的高效管理。
- Channel:类似在 Linux 上看到的文件描述符,是 NIO 中被用来支持批量式 IO 操作的一种抽象。File 或者 Socket 通常被认为是比较高层次的抽象,而 Channel 则是更加操作系统底层的一种抽象,这也使得 NIO 得以充分利用现代操作系统底层机制,获得特定场景的性能优化。
- Buffer:高效的数据容器,除了布尔类型,所有原始数据类型都有相应的 Buffer 实现。读取或者写入 Channel 的数据都必须经由 Buffer。
1. 客户端实现
要实现 NIO 的客户端,首先要调用静态工厂方法 SocketChannel.open() 来创建一个新的 SocketChannel 对象,并指定要连接的主机和端口,channel 会以阻塞模式打开,如果连接无法建立则抛出 IOException 异常。
SocketAddress address = new InetSocketAddress("rama.poly.edu", 19);
SocketChannel client = SocketChannel.open(address);
在读取数据时,我们可以从 ByteBuffer 中获取字节数据,然后再写入传统的 OutputStream 中。不过,坚持采用一种完全基于通道的解决方案会更好,这样的解决方案需要利用 Channels 工具类,它可以将 OutputStream 封装到一个通道中。然后可以将读取到的数据写入与 OutputStream 连接的这个输出通道中。不过,在这样做之前必须回绕(flip)缓冲区,使得输出通道会从所读取数据的开头而不是末尾开始写入:
WritableByteChannel out = Channels.newChannel(System.out);
receiveBuffer.flip();
out.write(receiveBuffer);
不要在每次读、写时都创建一个新的缓冲区,这样做会降低性能。相反,要重用现有的缓冲区,在再次读取之前先清空(clear)缓冲区。清空和回绕有些不同,回绕可以保持缓冲区中的数据不变,只是准备写入而不是读取;而清空则是把缓冲区重置回初始状态。
receiveBuffer.clear();
完整的 NIO 客户端代码如下:
public class NIOEchoClient {
private static final String HOST = "127.0.0.1";
private static final int PORT = 8888;
public static void main(String[] args) throws Exception {
try (SocketChannel client = SocketChannel.open(new InetSocketAddress(HOST, PORT))) {
client.configureBlocking(false);
// 写入数据
byte[] data = "Hello NIO".getBytes();
ByteBuffer sendBuffer = ByteBuffer.allocate(data.length);
sendBuffer.put(data);
sendBuffer.flip();
client.write(sendBuffer);
// 读取数据,在非阻塞模式下,没有字节可用时会立即返回0
ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
while (true) {
int n = client.read(receiveBuffer);
if (n > 0) {
// 将读取到的数据写入到输出流中,打印在控制台
receiveBuffer.flip();
byte[] bytes = new byte[receiveBuffer.remaining()];
receiveBuffer.get(bytes);
System.out.println("Receive Server Response: " + new String(bytes));
} else if (n == -1) {
// 如果读到-1表示读通道关闭
break;
}
}
}
}
}
2. 服务端实现
客户端使用通道和缓冲区是可以的,不过实际上通道和缓冲区主要用于需要高效处理很多并发连接的服务端。要处理服务器,除了用于客户端的缓冲区和通道外,还需要选择器,以允许服务器查找所有准备好接收输出或发送输入的连接。
在传统 BIO 中,我们会为每个连接分配一个线程,线程数量会随着客户端连接迅速攀升。相反,在 NIO 中可以创建一个 Selector,允许程序迭代处理所有准备好的连接。
Selector selector = Selector.open();
然后需要使用每个通道的 register() 方法向监视这个通道的选择器进行注册。在注册时,要使用 SelectionKey 类提供的命名常量指定所关注的操作。对于服务器 socket,唯一关心的操作就是 OP_ACCEPT,也就是服务器通道是否准备好接受一个新连接?
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
为了检查是否有可操作的数据,可以调用选择器的 select() 方法。对于长时间运行的服务器,这一般要放在一个无限循环中。如果选择器确实找到了一个就绪的通道,其 selectedKeys() 方法会返回一个 SelectionKey 集合对象,其对应的通道已准备好进行 IO 操作,否则返回一个空集合。
while (true) {
selector.select();
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 从集合中删除这个键,从而不会处理两次
iterator.remove();
// 处理通道......
}
}
通过从集合中删除键,这就告诉选择器这个键已经处理过,这样 Selector 就不需要在每次调用 select() 时再将这个键返回给我们了。再次调用 select() 时,如果这个通道再次就绪,Selector 就会把该通道再增加到就绪集合中。不过,在这里删除就绪集合中的键非常重要。
完整的 NIO 服务端代码如下:
public class NIOEchoServer {
private static final int PORT = 8888;
public static void main(String[] args) throws Exception {
Selector selector;
try (ServerSocketChannel serverChannel = ServerSocketChannel.open()) {
// 创建服务端通道
serverChannel.bind(new InetSocketAddress(PORT), 1024);
serverChannel.configureBlocking(false);
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
// 如果是一个客户端连接就绪事件
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
System.out.println("接收到客户端连接请求: " + client);
client.configureBlocking(false);
// 只注册读事件就可以,因为我们是被动等待客户端发送消息
client.register(selector, SelectionKey.OP_READ);
}
// 读取客户端发送来的消息,并返回响应
else if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// 要考虑半读、半写的情况
// 因为我们事先并不知道要读多少数据,根据read的返回值是否为0判断读取是否结束
int readBytes = client.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
System.out.println("Receive Client Request: " + new String(bytes));
// 写数据时,由于发送缓冲区大小可能不够用,所以不会一次性发送所有数据
// 可通过hasRemaining()方法来判断缓冲区是否还有剩余
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
client.write(writeBuffer);
} else if (readBytes == 0) {
break;
} else {
key.cancel();
client.close();
}
}
} catch (IOException e) {
key.cancel();
try {
key.channel().close();
} catch (IOException ie) {
//
}
}
}
}
}
}
}
这个例子只使用了一个线程,还有些情况下可能仍要使用多个线程,特别是当不同操作有不同优先级时。例如,可能要在一个高优先级线程中接受新连接,而在一个低优先级的线程中对现有的连接提供服务。不过,线程和连接之间不再需要 1:1 的比例,select() 可以确保如果连接没有准备好接收数据,就绝对不会在这些连接上浪费时间,这样极大地提升用 Java 编写的服务器的可扩展性。