NIO 说明

NIO 为非阻塞 IO 模式,全称为 Non-Blocking IO。有别于传统的阻塞 IO 模式(BIO Blocking IO),NIO 在未有数据就绪时读取数据或缓存区满时写入数据并不会阻塞,而 BIO 则相反,会在未有数据就绪时读取数据或缓存区满时写入数据的情况下一直处理阻塞状态。

对于 Java NIO,数据在未就绪的时候还是会处于阻塞状态(只是读写数据为非阻塞),仅当数据就绪时,要应用程序自行获取就绪数据,这是一种同步的行为,而如果数据就绪时,操作系统读取数据后主动回调应用程序函数,这种被动回调的行为则是异步的方式,这种模式叫 AIO(异步 IO 模式)。

总结 BIO / NIO / AIO 模式:

  • BIO 阻塞同步 IO,数据就绪前及数据读写时都处于阻塞状态。
  • NIO 非阻塞同步 IO,数据就绪前阻塞,数据读写时非阻塞,且主动获取就绪数据。
  • AIO 非阻塞异步 IO,数据就绪前非阻塞,数据读写时非阻塞,且被动获取就绪数据。

阻塞与非阻塞,针对读写数据而言:

  • 阻塞,未有数据就绪时读取数据或缓存区满时写入数据阻塞
  • 非阻塞,未有数据就绪时读取数据或缓存区满时写入数据并不__会阻塞

同步与异步,针对就绪数据而言:

  • 同步,数据就绪后应用程序主动读取数据
  • 异步,数据就绪后读取好数据并回调应用程序的函数处理

同步非阻塞模型

image.png

  1. 应用不停地主动询问 (select) 操作系统是否存在就绪数据
  2. 当数据就绪后,应用系统 select 出一批就绪的连接、可读或可写的事件(SelectionKey)
  3. 最后,应用即可对 select 出来的事件进行操作,常见的如非阻塞读写数据

异步非阻塞模型

image.png

  1. listen 后应用就可以立刻返回
  2. 当数据就绪后,操作系统随即回调通知应用,应用通过 accept 方法接收通知
  3. 最后,非阻塞读写数据

Reactor 模型

NIO 有一种名叫 Reactor 模型,AIO 则是 Proactor,针对不同的情景,Reactor 又分了好几种模型,分别是

  1. 单 Reactor 单线程
  2. 单 Reactor 多线程
  3. 主从 Reactor 多线程

Java NIO 的 IO 事件的监听,参考如下图所示。
image.png

简单说明一下,Selector_1 监听的是新连接事件,而 Selector_2 监听的是可读写事件。监听的事件,在 Java 中会返回一组 SelectionKey,在这组 key 的基础上注册你感兴趣的事件,如 SelectionKey.OP_READ 或者是 SelectionKey.OP_WRITE。

反映到 Reactor 模型,具体说明参考如下。

  1. 对于单 Reactor 单线程,相当于 Selector_1 == Selector_2,且 ThreadPool 的线程数等于 1
  2. 对于单 Reactor 多线程,相当于 Selector_1 == Selector_2,且 ThreadPool 的线程数大于 1
  3. 对于多 Reactor 多线程,相当于 Selector_1 != Selector_2,且 ThreadPool 的线程数大于 1

值得一提的是,Reactor 模型只具备参考意义,在实际的项目中并不会遵循,或者说死板的按照 Reactor 模型实现,如果参考 http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf 中的 Reactor 模型,你会发现,read / send 数据都是通过 Reactor 单线程来处理,很明显这就会导致不可读写时阻塞从而降低服务器的处理效率。

NIO 线程策略

  1. 线程读写 wait/notify 方式,优点是可以利用到 CPU 缓存,缺点是 wait 时间过长导致线程池无法处理新任务;且不能在单 Reactor 单线程上使用,否则会出现一条工作线程在等待,造成其他任务都无法处理的情况;还有就是可能会造成任务饥饿,即客户端使用了多个长连接的情况下,且进行批处理的工作,那么线程池很可能被这些请求占满,导致其他的请求任务无法得到响应。
  2. 线程读写重新提交(resubmit)线程池执行,缺点是多核 CPU 的情况下,可能不能利用 CPU 缓存,优点是不需要操作线程的等待和唤醒(wait/notify),不会造成任务饥饿,可以不间断地处理新任务。
  3. 结合 #1#2 的优点,使用两组线程池,分别以 wait/notify 及 resubmit 方式处理事件,优先使用 wait/notify 线程池处理任务,若已满载的情况下,使用 resubmit 线程池,缺点是实现复杂。重新定义 wait/notify 线程池的 java.util.concurrent.ThreadPoolExecutor#setRejectedExecutionHandler 方法会是一个好主意,将 reject 任务提交到 resubmit 线程池中处理。

NIO in Action

现在通过代码实现,如下所示,至于 NioHandler / NioConnection 并没给出具体实现,这里的只是一个简单的 Demo 例子。

  1. public class NioServer {
  2. private int port;
  3. private int backlog;
  4. public NioServer(int port) {
  5. this(port, 0);
  6. }
  7. public NioServer(int port, int backlog) {
  8. this.port = port;
  9. this.backlog = backlog;
  10. }
  11. public void run() throws IOException {
  12. ServerSocketChannel ssc = ServerSocketChannel.open();
  13. ssc.bind(new InetSocketAddress(port), backlog);
  14. ssc.configureBlocking(false);
  15. Selector selector = Selector.open();
  16. ssc.register(selector, SelectionKey.OP_ACCEPT);
  17. NioHandler handler = new NioHandler();
  18. while (Thread.currentThread().isAlive()) {
  19. if (selector.select() <= 0) {
  20. continue;
  21. }
  22. Set<SelectionKey> keySet = selector.selectedKeys();
  23. Iterator<SelectionKey> itr = keySet.iterator();
  24. while (itr.hasNext()) {
  25. SelectionKey key = itr.next();
  26. try {
  27. if (key.isAcceptable()) {
  28. // 1. set key interest reading
  29. SocketChannel sc = ssc.accept();
  30. sc.configureBlocking(false);
  31. SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
  32. sk.attach(new NioConnection(sc, sk));
  33. // 2. log connection
  34. } else { // readable OR writable
  35. // 取消监听所有事件
  36. key.interestOps(0);
  37. // Protocol(HTTP / gRPC / Customized) processing
  38. handler.dispatch((NioConnection) key.attachment());
  39. }
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. keySet.clear();
  45. }
  46. }
  47. }

启动 NIO 服务。

NioServer server = new NioServer(9099);
server.run();

细节

  1. Selector.wakeup() 可以唤醒 Selector.select() 方法;
  2. SelectionKey.interestOps(0) 表示不对任何事件感兴趣,在 Selector.selectedKeys() 中就不会返回;
  3. Accept 连接之后可以立即进行数据读取,并不需要先对读事件作出感兴趣设置,直到无数据可读时再设置 SelectionKey.interestOps(SelectionKey.OP_READ);
  4. 业务处理完成后可以立即进行数据写入,并不需要先对写事件作出感兴趣设置,直到写满后不能再写时再设置SelectionKey.interestOps(SelectionKey.OP_WRITE);
  5. SocketChannel.read(ByteBuffer) 返回 0 表示无数据可读取,SocketChannel.write(ByteBuffer) 返回 0 时表示数据无法写入(写缓冲区已满);
  6. SocketChannel.read(ByteBuffer) / SocketChannel.write(ByteBuffer) 返回 -1 时表示连接已关闭;
  7. SocketChannel.close() / SocketChannel.socket().close() 即可关闭通道;
  8. ServerSocketChannel.bind(new InetSocketAddress(port), backlog) 其中 backlog 是连接等待被 ServerSocketChannel.accept() 的最大长度,默认 backlog 是 50,超过 backlog 长度后但未被 accept 的连接将会被拒绝;

参考文献

http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf(网络上 NIO Reactor 模型的出处)