简介

Java NIO(New IO,非阻塞式 IO(Non Blocking IO))是从 Java 1.4 版本开始引入的一个新的 IO API,可以替换标准的 Java IO API。NIO 与原来的 IO 有同样的作用和目的,但是使用的方式完全不同,NIO 支持面向缓冲区的、基于通道的 IO 操作。NIO 将以更加高效的方式进行文m件的读写操作。

NIO 与 IO 的主要区别

IO NIO
面向流(Stream Oriented) 面向缓冲区(Buffer Oriented)
阻塞 IO(Blocking IO) 非阻塞 IO(Non Blocking IO)
(无) 选择器(Selectors)

通道和缓冲区

Java NIO 系统的核心在于:通道(Channel)和缓冲区(Buffer)。通道表示打开到 IO 设备(例如:文件、套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。

缓冲区

在 Java NIO 中负责数据的存取。缓冲区就是数组。用于存储不同数据类型的数据
根据数据类型不同(boolean 除外),提供了相应类型的缓冲区:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

上述缓冲区的管理方式几乎一致,通过allocate()获取缓冲区。
ByteBuffer buffer = ByteBuffer.allocate(1024);

核心方法

  1. // -- Singleton get/put methods --
  2. /**
  3. * Relative <i>get</i> method. Reads the byte at this buffer's
  4. * current position, and then increments the position.
  5. *
  6. * @return The byte at the buffer's current position
  7. *
  8. * @throws BufferUnderflowException
  9. * If the buffer's current position is not smaller than its limit
  10. */
  11. public abstract byte get();
  12. /**
  13. * Relative <i>put</i> method&nbsp;&nbsp;<i>(optional operation)</i>.
  14. *
  15. * <p> Writes the given byte into this buffer at the current
  16. * position, and then increments the position. </p>
  17. *
  18. * @param b
  19. * The byte to be written
  20. *
  21. * @return This buffer
  22. *
  23. * @throws BufferOverflowException
  24. * If this buffer's current position is not smaller than its limit
  25. *
  26. * @throws ReadOnlyBufferException
  27. * If this buffer is read-only
  28. */
  29. public abstract ByteBuffer put(byte b);

核心属性

  • capacity:容量,表示缓冲区中最大存储数据的容量,一旦声明不能改变。
  • limit:界限,表示缓冲区可以操作的数据的大小。(limit 后的数据不能进行读写)
  • position:位置,表示缓冲区中正在操作数据的位置。
  • mark:标记,表示记录当前 position 的位置。可以通过 reset() 恢复到 mark 的位置。
  • mark <= position <= limit <= capacity

未命名文件.png

  1. public class Test_01 {
  2. public static void main(String[] args) {
  3. String str = "abode";
  4. //分配一个指定大小的缓冲区
  5. ByteBuffer buffer = ByteBuffer.allocate(1024);
  6. System.out.println("========= allocate =========");
  7. System.out.println(buffer.position()); //0
  8. System.out.println(buffer.capacity()); //1024
  9. System.out.println(buffer.limit()); //1024
  10. System.out.println("========= put =========");
  11. buffer.put(str.getBytes());
  12. System.out.println(buffer.position()); //5
  13. System.out.println(buffer.capacity()); //1024
  14. System.out.println(buffer.limit()); //1024
  15. System.out.println("========= flip =========");
  16. buffer.flip();
  17. System.out.println(buffer.position()); // 0
  18. System.out.println(buffer.capacity()); //1024
  19. System.out.println(buffer.limit()); //5
  20. System.out.println("========= get =========");
  21. byte[] bytes = new byte[buffer.limit()];
  22. buffer.get(bytes);
  23. System.out.println(new String(bytes, 0, bytes.length)); //abode
  24. System.out.println(buffer.position()); // 5
  25. System.out.println(buffer.capacity()); //1024
  26. System.out.println(buffer.limit()); //5
  27. System.out.println("========= rewind =========");
  28. buffer.rewind(); //可重复读
  29. System.out.println(buffer.position()); // 0
  30. System.out.println(buffer.capacity()); //1024
  31. System.out.println(buffer.limit()); //5
  32. System.out.println("========= clear =========");
  33. buffer.clear(); //清空缓冲区,但是缓冲区的数据依然存在,但是处于“被遗忘”状态
  34. System.out.println(buffer.position()); // 0
  35. System.out.println(buffer.capacity()); //1024
  36. System.out.println(buffer.limit()); //1024
  37. System.out.println((char) buffer.get()); //a
  38. System.out.println("========= other =========");
  39. buffer.mark(); //标记
  40. buffer.reset(); //恢复到mark位置
  41. if (buffer.hashRemaining()) { //判断缓冲区中是否还有剩余数据
  42. System.out.println(buffer.remaining()); //获取缓冲区中可以操作的数据数
  43. }
  44. }
  45. }

直接缓冲区与非直接缓冲区

  • 非直接缓冲区:通过 allocate() 方法分配缓冲区,将缓冲区建立在 JVM 的内存中

NIO - 图2

  • 直接缓冲区:通过 allocateDirect() 方法分配缓冲区,将缓冲区建立在物理内存中,可以提高效率

NIO - 图3

  • 字节缓冲区要么是直接的,要么是非直接的。如果为直接字节缓冲区,则 Java 虚拟机会尽最大努力直接在此缓冲区上执行本机 I/O 操作。也就是说,在每次调用基础操作系统的一个本机 I/O 操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。
  • 直接字节缓冲区可以通过调用此类的 allocateDirect() 工厂方法来创建。此方法返回的缓冲区进行分配和取消分配所需成本通常高于非直接缓冲区。直接缓冲区的内容可以驻留在常规的垃圾回收堆之外,因此,它们对应用程序的内存需求量造成的影响可能并不明显。所以,建议将直接缓冲区主要分配给那些易受基础系统的本机 I/O 操作影响的大型、持久的缓冲区。一般情况下,最好仅在直接缓冲区能在程序性能方面带来明显好处时分配它们。
  • 直接字节缓冲区还可以通过 FileChannel 的 map() 方法将文件区域直接映射到内存中来创建。该方法返回 MappedByteBuffer。Java 平台的实现有助于通过 JNI 从本机代码创建直接字节缓冲区。如果以上这些缓冲区中的某个缓冲区实例指的是不可访问的内存区域,则试图访问该区域不会更改缓冲区的内容,并且将会在访问期间或稍后的某个时间导致抛出不确定的异常。
  • 字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其 isDirect() 方法来确定。提供此方法是为了能够在性能关键型代码中执行显示缓冲区管理。

    通道

    通道(Channel):由 java.nio.channels 包定义的。Channel 表示 IO 源与目标打开的连接。Channel 类似于传统的“流”。只不过 Channel 本身不能直接访问数据,Channel 只能与 Buffer 进行交互。
    NIO - 图4
    用于源节点与目标节点的连接。在 Java NIO 中负责缓冲区中数据的传输。

    主要实现类

  • FileChannel

  • SelectableChannel(I)
    • SocketChannel
    • ServerSocketChannel
    • DatagramChannel
  • Pipe.SinkChannel
  • Pipe.SourceChannel

    获取通道

  • Java 针对支持通道的类提供了 getChannel() 方法

    • 本地 IO
      • FileInputStream / FileOutputStream
      • RandomAccessFile
    • 网络 IO

      • Socket
      • ServerSocket
      • DatagramSocket ```java /**
        • 非直接缓冲区(利用通道) */ public class Test_02 {

      public static void main(String[] args) { FileInputStream fis = null; FileOutputStream fos = null; FileChannel inChannel = null; FileChannel outChannel = null; try {

      1. fis = new FileInputStream("D:\\idea background\\5dc516c2b8418.jpg");
      2. fos = new FileOutputStream("D:\\idea background\\1.jpg");
      3. //获取通道
      4. inChannel = fis.getChannel();
      5. outChannel = fos.getChannel();
      6. //分配指定大小的缓冲区
      7. ByteBuffer buf = ByteBuffer.allocate(1024);
      8. //将通道中的数据存入缓冲区
      9. while (inChannel.read(buf) != -1) {
      10. buf.flip();
      11. //将缓冲区中的数据写入通道
      12. outChannel.write(buf);
      13. buf.clear();
      14. }

      } catch (IOException e) {

      1. e.printStackTrace();

      } finally {

      1. if (inChannel != null && outChannel != null && fos != null && fis != null) {
      2. try {
      3. outChannel.close();
      4. inChannel.close();
      5. fos.close();
      6. fis.close();
      7. } catch (IOException e) {
      8. e.printStackTrace();
      9. }
      10. }

      } } } ```

  • 在 JDK1.7 中的 NIO.2 针对各个通道提供了静态方法 open()

    1. /**
    2. * 使用直接缓冲区完成文件的复制(内存映射文件)
    3. */
    4. class Test_03 {
    5. public static void main(String[] args) throws IOException {
    6. FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
    7. FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"),
    8. StandardOpenOption.WRITE,
    9. StandardOpenOption.READ,
    10. StandardOpenOption.CREATE);
    11. //内存映射文件
    12. MappedByteBuffer inMappedBuf = inChannel.map(FileChannel.MapMode.READ_ONLY, 0,
    13. inChannel.size());
    14. MappedByteBuffer outMappedBuf = outChannel.map(FileChannel.MapMode.READ_WRITE, 0,
    15. inChannel.size());
    16. //直接对缓冲区进行数据的读写操作
    17. byte[] bytes = new byte[inMappedBuf.limit()];
    18. inMappedBuf.get(bytes);
    19. outMappedBuf.put(bytes);
    20. inChannel.close();
    21. outChannel.close();
    22. }
    23. }
  • 在 JDK1.7 中的 NIO.2 的 Files 工具类的 newByteChannel()

    通道间数据传输

    ```java /**

    • 通道之间的数据传输(直接缓冲区) */ public class Test_04 {

      public static void main(String[] args) throws IOException {

      1. FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
      2. FileChannel outChannel = FileChannel.open(Paths.get("3.jpg"), StandardOpenOption.WRITE,
      3. StandardOpenOption.READ,
      4. StandardOpenOption.CREATE);

// inChannel.transferTo(0, inChannel.size(), outChannel); outChannel.transferFrom(inChannel, 0, inChannel.size());

  1. inChannel.close();
  2. outChannel.close();
  3. }

}

  1. <a name="t5IKi"></a>
  2. ### 分散(_Scatter_)与聚集(_Gather_)
  3. - 分散读取(Scattering Reads):将通道中的数据分散到多个缓冲区中
  4. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/1605075/1611048938445-bce52b17-f250-4126-811d-9ba679ce3c40.png#align=left&display=inline&height=268&margin=%5Bobject%20Object%5D&name=image.png&originHeight=268&originWidth=312&size=25137&status=done&style=none&width=312)
  5. - 注意:按照缓冲区的顺序,从 Channel 中读取的数据依次将 Buffer 填满
  6. - 聚集写入(Gathering Writes):将多个缓冲区中的数据集聚集到一个通道中
  7. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/1605075/1611049084993-f65f4a99-0368-4ec5-a9c7-6cd9b5ed157d.png#align=left&display=inline&height=254&margin=%5Bobject%20Object%5D&name=image.png&originHeight=254&originWidth=316&size=25513&status=done&style=none&width=316)
  8. - 注意:按照缓冲区的顺序,写入 position 和 limit 之间额数据到 Channel
  9. ```java
  10. /**
  11. * 分散和聚集
  12. */
  13. public class Test_05 {
  14. public static void main(String[] args) throws IOException {
  15. RandomAccessFile raf1 = new RandomAccessFile("1.txt", "rw");
  16. //获取通道
  17. FileChannel channel1 = raf1.getChannel();
  18. //分配指定大小的缓冲区
  19. ByteBuffer buf1 = ByteBuffer.allocate(100);
  20. ByteBuffer buf2 = ByteBuffer.allocate(1024);
  21. //分散读取
  22. ByteBuffer[] buf = {buf1, buf2};
  23. channel1.read(buf);
  24. for (ByteBuffer byteBuffer : buf) {
  25. byteBuffer.flip();
  26. }
  27. System.out.println(new String(buf[0].array(), 0, buf[0].limit()));
  28. System.out.println("=========================");
  29. System.out.println(new String(buf[1].array(), 0, buf[1].limit()));
  30. //聚集写入
  31. RandomAccessFile raf2 = new RandomAccessFile("2.txt", "rw");
  32. FileChannel channel2 = raf2.getChannel();
  33. channel2.write(buf);
  34. }
  35. }

字符集(Charset)

  • 编码:字符串 -> 字符数组
  • 解码:字节数组 -> 字符串

    1. public class Test_06 {
    2. public static void main(String[] args) throws CharacterCodingException {
    3. /*Map<String, Charset> map = Charset.availableCharsets();
    4. Set<Map.Entry<String, Charset>> entries = map.entrySet();
    5. for (Map.Entry<String, Charset> entry : entries) {
    6. System.out.println(entry.getKey() + " " + entry.getValue());
    7. }*/
    8. Charset gbk = Charset.forName("GBK");
    9. //获取编码器
    10. CharsetEncoder encoder = gbk.newEncoder();
    11. //获取解码器
    12. CharsetDecoder decoder = gbk.newDecoder();
    13. CharBuffer charBuffer = CharBuffer.allocate(1024);
    14. charBuffer.put("这都可以?");
    15. charBuffer.flip();
    16. //编码
    17. ByteBuffer encode = encoder.encode(charBuffer);
    18. for (int i = 0; i < 10; i++) {
    19. System.out.println(encode.get());
    20. }
    21. //解码
    22. encode.flip();
    23. CharBuffer decode = decoder.decode(encode);
    24. System.out.println(decode.toString());
    25. }
    26. }

    NIO 的非阻塞式网络通信

    选择器

    SelectableChannel 的多路复用器。用于监控 SelectableChannel 的 IO 状况。

  • 当调用 register(Selector sel, int ops) 将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数 ops 指定。

  • 可以监听的事件类型(可使用 SelectionKey 的四个常量表示
    • 读:SelectionKey.OP_READ (1)
    • 写:SelectionKey.OP_WRITE (2)
    • 连接:SelectionKey.OP_CONNECT (8)
    • 接收:SelectionKey.OP_ACCEPT (16)
  • 若注册时不止监听一个事件,则可以使用“位或”操作符连接。

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

SelectionKey

表示 SelectableChannel 和 Selector 之间的注册关系。每次向选择器注册通道时就会选择一个事件(选择键)。选择键包含两个表示为整数值的操作集。操作集的每一位都表示该键的通道所支持的一类可选择操作。

阻塞式

客户端

  1. /**
  2. * 测试阻塞的 NIO
  3. */
  4. public class Test_01_Client {
  5. public static void main(String[] args) throws IOException {
  6. //获取通道
  7. SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
  8. FileChannel inChannel = FileChannel.open(Paths.get("1.txt"), StandardOpenOption.READ);
  9. //分配指定大小的缓冲区
  10. ByteBuffer buffer = ByteBuffer.allocate(1024);
  11. //读取本地文件,并发送到服务端
  12. while (inChannel.read(buffer) != -1) {
  13. buffer.flip();
  14. socketChannel.write(buffer);
  15. buffer.clear();
  16. }
  17. //关闭客户端的输出,如果不关闭的话,会一直等待
  18. socketChannel.shutdownOutput();
  19. //接收服务端的反馈
  20. int len = 0;
  21. while ((len = socketChannel.read(buffer)) != -1) {
  22. buffer.flip();
  23. System.out.println(new String(buffer.array(), 0, len));
  24. buffer.clear();
  25. }
  26. //关闭通道
  27. inChannel.close();
  28. socketChannel.close();
  29. }
  30. }

服务端

  1. /**
  2. * 测试阻塞的 NIO
  3. */
  4. public class Test_01_Server {
  5. public static void main(String[] args) throws IOException {
  6. //获取通道
  7. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  8. FileChannel outChannel = FileChannel.open(Paths.get("3.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
  9. //绑定连接
  10. serverSocketChannel.bind(new InetSocketAddress(9898));
  11. //获取客户端连接的通道
  12. SocketChannel accept = serverSocketChannel.accept();
  13. //分配指定大小的缓冲区
  14. ByteBuffer buffer = ByteBuffer.allocate(1024);
  15. //接收客户端的数据,并保存到本地
  16. while (accept.read(buffer) != -1) {
  17. buffer.flip();
  18. outChannel.write(buffer);
  19. buffer.clear();
  20. }
  21. //发送反馈给客户端
  22. buffer.put("服务端接收数据成功".getBytes());
  23. buffer.flip();
  24. accept.write(buffer);
  25. //关闭通道
  26. accept.close();
  27. outChannel.close();
  28. serverSocketChannel.close();
  29. }
  30. }

非阻塞式

客户端

  1. /**
  2. * 非阻塞式 NIO
  3. */
  4. public class Test_02_Client {
  5. public static void main(String[] args) throws IOException {
  6. SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8989));
  7. //切换非阻塞模式
  8. sChannel.configureBlocking(false);
  9. ByteBuffer buffer = ByteBuffer.allocate(1024);
  10. //发送数据给服务端
  11. Scanner scanner = new Scanner(System.in);
  12. while (scanner.hasNext()) {
  13. String str = scanner.next();
  14. buffer.put((LocalDateTime.now().toString() + "\n" + str).getBytes());
  15. buffer.flip();
  16. sChannel.write(buffer);
  17. buffer.clear();
  18. }
  19. //关闭通道
  20. sChannel.close();
  21. }
  22. }

服务端

  1. /**
  2. * 非阻塞式 NIO
  3. */
  4. public class Test_02_Server {
  5. public static void main(String[] args) throws IOException {
  6. ServerSocketChannel ssChannel = ServerSocketChannel.open();
  7. //切换非阻塞模式
  8. ssChannel.configureBlocking(false);
  9. ssChannel.bind(new InetSocketAddress(8989));
  10. //获取选择器
  11. Selector selector = Selector.open();
  12. //将通道注册到选择器上,并且指定监听接收事件
  13. ssChannel.register(selector, SelectionKey.OP_ACCEPT);
  14. //轮询式的获取选择器上已经准备就绪的事件
  15. while (selector.select() > 0) {
  16. //获取当前选择器中所有注册的选择键(已就绪的监听事件)
  17. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  18. while (iterator.hasNext()) {
  19. //获取准备就绪的事件
  20. SelectionKey next = iterator.next();
  21. //判断具体是什么事件准备就绪
  22. if (next.isAcceptable()) {
  23. //若接受就绪,获取客户端连接
  24. SocketChannel socketChannel = ssChannel.accept();
  25. //切换非阻塞模式
  26. socketChannel.configureBlocking(false);
  27. //将通道注册到选择器上
  28. socketChannel.register(selector, SelectionKey.OP_READ);
  29. } else if (next.isReadable()) {
  30. //获取当前选择器上读就绪状态的通道
  31. SocketChannel channel = (SocketChannel) next.channel();
  32. //读取数据
  33. ByteBuffer buffer = ByteBuffer.allocate(1024);
  34. int len = 0;
  35. while ((len = channel.read(buffer)) > 0) {
  36. buffer.flip();
  37. System.out.println(new String(buffer.array(), 0, len));
  38. buffer.clear();
  39. }
  40. } //TODO
  41. //取消选择键
  42. iterator.remove();
  43. }
  44. }
  45. }
  46. }

DatagramChannel

  • Java NIO 中的 DatagramChannel 是一个能收发 UDP 包的通道。
  • 操作步骤

    • 打开 DatagramChannel
    • 接收 / 发送数据

      客户端

      1. /**
      2. * DatagramChannel
      3. */
      4. public class Test_03_Client {
      5. public static void main(String[] args) throws IOException {
      6. DatagramChannel dChannel = DatagramChannel.open();
      7. dChannel.configureBlocking(false);
      8. ByteBuffer buffer = ByteBuffer.allocate(1024);
      9. Scanner scanner = new Scanner(System.in);
      10. while (scanner.hasNext()) {
      11. String str = scanner.next();
      12. buffer.put((LocalDateTime.now().toString() + "\n" + str).getBytes());
      13. buffer.flip();
      14. dChannel.send(buffer, new InetSocketAddress("127.0.0.1", 8989));
      15. buffer.clear();
      16. }
      17. dChannel.close();
      18. }
      19. }

      服务端

      1. /**
      2. * DatagramChannel
      3. */
      4. public class Test_03_Server {
      5. public static void main(String[] args) throws IOException {
      6. DatagramChannel dChannel = DatagramChannel.open();
      7. dChannel.configureBlocking(false);
      8. dChannel.bind(new InetSocketAddress(8989));
      9. Selector selector = Selector.open();
      10. dChannel.register(selector, SelectionKey.OP_READ);
      11. while (selector.select() > 0) {
      12. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
      13. while (iterator.hasNext()) {
      14. SelectionKey next = iterator.next();
      15. if (next.isReadable()) {
      16. ByteBuffer buffer = ByteBuffer.allocate(1024);
      17. dChannel.receive(buffer);
      18. buffer.flip();
      19. System.out.println(new String(buffer.array(), 0, buffer.limit()));
      20. buffer.clear();
      21. }
      22. }
      23. iterator.remove();
      24. }
      25. }
      26. }

      管道(Pipe

  • Java NIO 管道是 2 个线程之间的单向数据连接。Pipe 有一个 source 通道和一个 sink 通道。数据会被写到 sink 通道,从 source 通道读取。

    1. /**
    2. * 单向 Pipe
    3. */
    4. public class Test_04 {
    5. public static void main(String[] args) throws IOException {
    6. //获取管道
    7. Pipe pipe = Pipe.open();
    8. //将缓冲区的数据写入管道
    9. ByteBuffer buffer = ByteBuffer.allocate(1024);
    10. Pipe.SinkChannel sinkChannel = pipe.sink();
    11. buffer.put("通过单向管道发送数据".getBytes());
    12. buffer.flip();
    13. sinkChannel.write(buffer);
    14. //读取缓冲区中的数据
    15. Pipe.SourceChannel sourceChannel = pipe.source();
    16. buffer.flip();
    17. int len = sourceChannel.read(buffer);
    18. System.out.println(new String(buffer.array(), 0, len));
    19. sourceChannel.close();
    20. sinkChannel.close();
    21. }
    22. }