Java NIO

一、使用NIO完成网络通信的三个核心

1.通道(Channel)

负责连接
java.nio.channels.Channel 接口:
|—SelectableChannel
|—SocketChannel
|—SeverSocketChannel
|—DatagramChannel

  1. |--Pipe.SinkChannel<br /> |--Pipe.SourceChannel

2.缓冲区(Buffer)

负责数据的存取

3.选择器(Selector)

A.选择器的定义

  • 选择器Selector是SelectableChannel对象的多路复用器。Selector可以同时监控多个SelectableChannel的IO状况,也就是说,使用Selector可以使一个单独的线程管理多个Channel。Selector是非阻塞IO的核心。
  • SelectableChannel的结构如下

image.png

B.选择器的应用

  1. 创建Selector:通过调用Selector.open()方法创建一个Selector

    1. // 创建选择器
    2. Selector selector = Selector.open();
  2. 向选择器注册通道:SelectableChannel.register(Selector sel, int ops)

    1. // 1.获取通道
    2. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    3. // 2.切换非阻塞模式
    4. serverSocketChannel.configureBlocking(false);
    5. // 3.绑定连接
    6. serverSocketChannel.bind(new InetSocketAddress(9999));
    7. // 4.获取选择器
    8. Selector selector = Selector.open();
    9. // 5.将通道注册到选择器上,并且指定“监听接收事件”
    10. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  3. 当调用register(Selector sel, int ops)将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数ops指定。

  4. 可以监听的事件类型(可使用SelectionKey的四个常量表示)
    1. 读:SelectionKey.OP_READ (1)
    2. 写:SelcetionKey.OP_WRITE (4)
    3. 连接:SelectionKey.OP_CONNECT (8)
    4. 接收:SelectionKey.OP_ACCEPT (16)
  5. 若注册不止一个监听事件,则可以使用“位或”操作符连接

    1. int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
  6. SelectionKey:表示SelectableChannel和Selector之间的注册关系。每次向选择器注册通道时就会选择一个事件(选择键)。选择键包含两个表示为整数值的操作集。操作集的每一位都表示改键的通道所支持的一类可选择操作。 | 返回值 方法名 | 描述 | | —- | —- | | int interestOps() | 获取感兴趣事件集合 | | int readyOps() | 获取通道已经准备就绪的操作的集合 | | SelectableChannel channel() | 获取注册通道 | | Selector selector() | 返回选择器 | | boolean isReadable() | 检测 Channel 中读事件是否就绪 | | boolean isWritable() | 检测 Channel 中写事件是否就绪 | | boolean isConnectable() | 检测 Channel 中连接是否就绪 | | boolean isAcceptable() | 检测 Channel 中接收是否就绪 |

A.阻塞式NIO示例

①、文件从客户端发送至服务端

  1. @Test
  2. public void clientTest() throws IOException {
  3. // 1.获取通道
  4. SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
  5. FileChannel fileChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
  6. // 2.分配指定大小的缓冲区
  7. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  8. // 3.读取本地文件,并发送到服务端
  9. while (fileChannel.read(byteBuffer) != -1) {
  10. byteBuffer.flip();
  11. socketChannel.write(byteBuffer);
  12. byteBuffer.clear();
  13. }
  14. // 4.关闭通道
  15. fileChannel.close();
  16. socketChannel.close();
  17. }
  18. @Test
  19. public void serverTest() throws IOException {
  20. // 1.获取通道
  21. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  22. FileChannel fileChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
  23. // 2.绑定连接
  24. serverSocketChannel.bind(new InetSocketAddress(9898));
  25. // 3.获取客户端连接的通道
  26. SocketChannel socketChannel = serverSocketChannel.accept();
  27. // 4.分配指定大小的缓冲区
  28. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  29. // 5.接收客户端的数据,并保存到本地
  30. while (socketChannel.read(byteBuffer) != -1) {
  31. byteBuffer.flip();
  32. fileChannel.write(byteBuffer);
  33. byteBuffer.clear();
  34. }
  35. // 6.关闭通道
  36. socketChannel.close();
  37. fileChannel.close();
  38. serverSocketChannel.close();
  39. }

②、文件从服务端发送至客户端并返回接收信息

  1. @Test
  2. public void clientSendImageReceiveMessageTest() throws IOException {
  3. SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9889));
  4. FileChannel fileChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.READ);
  5. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  6. while (fileChannel.read(byteBuffer) != -1) {
  7. byteBuffer.flip();
  8. socketChannel.write(byteBuffer);
  9. byteBuffer.clear();
  10. }
  11. // 告诉客户端已发送结束,否则服务端一直在等待以至于发生服务端接收阻塞
  12. socketChannel.shutdownOutput();
  13. // 接收服务端的反馈
  14. int length = 0;
  15. while ((length = socketChannel.read(byteBuffer)) != -1) {
  16. byteBuffer.flip();
  17. System.out.println(new String(byteBuffer.array(), 0, length));
  18. byteBuffer.clear();
  19. }
  20. fileChannel.close();
  21. socketChannel.close();
  22. }
  23. @Test
  24. public void serverReceiveImageTest() throws IOException {
  25. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  26. FileChannel fileChannel = FileChannel.open(Paths.get("3.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
  27. serverSocketChannel.bind(new InetSocketAddress(9889));
  28. SocketChannel socketChannel = serverSocketChannel.accept();
  29. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  30. while (socketChannel.read(byteBuffer) != -1) {
  31. byteBuffer.flip();
  32. fileChannel.write(byteBuffer);
  33. byteBuffer.clear();
  34. }
  35. // 发送反馈给客户端
  36. byteBuffer.put("服务端接收数据成功".getBytes());
  37. byteBuffer.flip();
  38. socketChannel.write(byteBuffer);
  39. socketChannel.close();
  40. fileChannel.close();
  41. serverSocketChannel.close();
  42. }

B.非阻塞式NIO示例

①非阻塞式NIO实现聊天室

  1. // 非阻塞式实现聊天室
  2. @Test
  3. public void clientTest() throws IOException {
  4. // 1.获取通道
  5. SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
  6. // 2.切换非阻塞模式
  7. socketChannel.configureBlocking(false);
  8. // 3.分配指定大小的缓冲区
  9. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  10. // 4.发送数据给服务端
  11. Scanner scanner = new Scanner(System.in);
  12. while (scanner.hasNext()) {
  13. String input = scanner.next();
  14. byteBuffer.put((LocalDateTime.now().toString() + "发送内容:" + input).getBytes());
  15. byteBuffer.flip();
  16. socketChannel.write(byteBuffer);
  17. byteBuffer.clear();
  18. }
  19. // 5.关闭通道
  20. socketChannel.close();
  21. }
  22. @Test
  23. public void serverTest() throws IOException {
  24. // 1.获取通道
  25. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  26. // 2.切换非阻塞模式
  27. serverSocketChannel.configureBlocking(false);
  28. // 3.绑定连接
  29. serverSocketChannel.bind(new InetSocketAddress(9999));
  30. // 4.获取选择器
  31. Selector selector = Selector.open();
  32. // 5.将通道注册到选择器上,并且指定“监听接收事件”
  33. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  34. // 6.轮询式获取选择器中所有注册的“选择键(已就绪的监听事件)”
  35. while (selector.select() > 0) {
  36. // 7.获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
  37. Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
  38. while (selectionKeyIterator.hasNext()) {
  39. // 8.获取准备就绪的事件
  40. SelectionKey selectionKey = selectionKeyIterator.next();
  41. // 9.判断具体是什么事件准备就绪
  42. if (selectionKey.isAcceptable()) {
  43. // 10.若“接收就绪”,获取客户端连接
  44. SocketChannel socketChannel = serverSocketChannel.accept();
  45. // 11.切换非阻塞模式
  46. socketChannel.configureBlocking(false);
  47. // 12.将该通道注册到选择器上
  48. socketChannel.register(selector, SelectionKey.OP_READ);
  49. } else if (selectionKey.isReadable()) {
  50. // 13.获取当前选择器上“读就绪”状态的通道
  51. SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
  52. // 14.读取数据
  53. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  54. int len = 0;
  55. while (((len = socketChannel.read(byteBuffer)) > 0)) {
  56. byteBuffer.flip();
  57. System.out.println(new String(byteBuffer.array(), 0, len));
  58. byteBuffer.clear();
  59. }
  60. }
  61. // 15.取消选择键selectionKey
  62. selectionKeyIterator.remove();
  63. }
  64. }
  65. }

在IDEA中该示例因为在Junit测试中无法使用Scanner在控制台输入的问题

IDEA测试类的Scanner不能键入值的问题