概念

传统的 IO 流架构:
image.png

现在的 NIO 架构:
image.png

区别:

IO NIO
面向流(Stream Oriented) 面向缓冲区(Buffer Oriented)
阻塞IO(Blocking IO) 非阻塞IO(Non Blocking IO)
选择器(Selectors)

NIO 概念:Java NIO 系统的核心在于:通道(Channel)和缓冲区(Buffer)。通道表示打开到 IO 设备(例如:文件,套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。
简而言之:Channel 负责传输,Buffer 负责存储。

缓冲区Buffer

一、缓冲区(Buffer)
在 Java NIO 中负责数据的存取。缓冲区就是数组。用于存储不同数据类型的数据
根据数据类型不同(boolean 除外),提供了相应类型的缓冲区:
ByteBuffer,CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer,DoubleBuffer
上述缓冲区的管理方式几乎一致,通过 allocate() 获取缓冲区

二、缓冲区存取数据的两个核心方法
put() : 存入数据到缓冲区中
get() : 获取缓冲区中的数据

三、缓冲区中的四个核心属性
capacity : 容量,表示缓冲区中最大存储数据的容量。一旦声明不能改变。
limit : 界限,表示缓冲区中可以操作数据的大小。(limit 后数据不能进行读写)
position : 位置,表示缓冲区中正在操作数据的位置。
mark : 标记,表示记录当前 position 的位置。可以通过 reset() 恢复到 mark 的位置
0 <= mark <= position <= limit <= capacity

四、直接缓冲区与非直接缓冲区:(效率高的原因)
非直接缓冲区:通过 allocate() 方法分配缓冲区,将缓冲区建立在 JVM 的内存中
直接缓冲区:通过 allocateDirect() 方法分配直接缓冲区,将缓冲区建立在物理内存中。可以提高效率

上演代码:
先展示如下代码的结果:

————————-allocate()————————
0
1024
1024
————————-put()————————
5
1024
1024
————————-flip()————————
0
5
1024
abcde
————————-get()————————
5
5
1024
————————-rewind()————————
0
5
1024
————————-clear()————————
0
1024
1024
a

  1. @Test
  2. public void test1(){
  3. String str = "abcde";
  4. //1. 分配一个指定大小的缓冲区
  5. ByteBuffer buf = ByteBuffer.allocate(1024);
  6. System.out.println("-----------------allocate()----------------");
  7. System.out.println(buf.position()); // 0
  8. System.out.println(buf.limit()); // 1024
  9. System.out.println(buf.capacity()); // 1024
  10. //2. 利用 put() 存入数据到缓冲区中
  11. buf.put(str.getBytes());
  12. System.out.println("-----------------put()----------------");
  13. System.out.println(buf.position()); // 5
  14. System.out.println(buf.limit()); // 1024
  15. System.out.println(buf.capacity()); // 1024
  16. //3. 切换读取数据模式
  17. buf.flip();
  18. System.out.println("-----------------flip()----------------");
  19. System.out.println(buf.position()); // 0
  20. System.out.println(buf.limit()); // 5
  21. System.out.println(buf.capacity()); // 1024
  22. //4. 利用 get() 读取缓冲区中的数据
  23. byte[] dst = new byte[buf.limit()];
  24. buf.get(dst);
  25. System.out.println(new String(dst, 0, dst.length)); // abcde
  26. System.out.println("-----------------get()----------------");
  27. System.out.println(buf.position()); // 5
  28. System.out.println(buf.limit()); // 5
  29. System.out.println(buf.capacity()); // 1024
  30. //5. rewind() : 可重复读
  31. buf.rewind();
  32. System.out.println("-----------------rewind()----------------");
  33. System.out.println(buf.position()); // 0
  34. System.out.println(buf.limit()); // 5
  35. System.out.println(buf.capacity()); // 1024
  36. //6. clear() : 清空缓冲区. 但是缓冲区中的数据依然存在,但是处于“被遗忘”状态
  37. buf.clear();
  38. System.out.println("-----------------clear()----------------");
  39. System.out.println(buf.position()); // 0
  40. System.out.println(buf.limit()); // 1024
  41. System.out.println(buf.capacity()); // 1024
  42. System.out.println((char)buf.get()); // a
  43. }

再来演示一波:

  1. @Test
  2. public void test2(){
  3. String str = "abcde";
  4. ByteBuffer buf = ByteBuffer.allocate(1024);
  5. buf.put(str.getBytes());
  6. buf.flip();
  7. byte[] dst = new byte[buf.limit()];
  8. buf.get(dst, 0, 2);
  9. System.out.println(new String(dst, 0, 2)); // ab
  10. System.out.println(buf.position()); // 2
  11. //mark() : 标记
  12. buf.mark();
  13. buf.get(dst, 2, 2);
  14. System.out.println(new String(dst, 2, 2)); // cd
  15. System.out.println(buf.position()); // 4
  16. //reset() : 恢复到 mark 的位置
  17. buf.reset();
  18. System.out.println(buf.position()); // 2
  19. //判断缓冲区中是否还有剩余数据
  20. if(buf.hasRemaining()){
  21. //获取缓冲区中可以操作的数量
  22. System.out.println(buf.remaining()); // 3
  23. }
  24. }
  25. @Test
  26. public void test3(){
  27. //分配直接缓冲区
  28. ByteBuffer buf = ByteBuffer.allocateDirect(1024);
  29. System.out.println(buf.isDirect());
  30. }

Channel通道

概念:由 java.nio.channels 包定义的。Channel 表示 IO 源与目标打开的连接。Channel 类似于传统的“流”,只不过 Channel 本身不能直接访问数据,Channel 只能与 Buffer 进行交互。

通道的主要实现类
java.nio.channels.Channel 接口:
|—FileChannel
|—SocketChannel
|—ServerSocketChannel
|—DatagramChannel

获取通道
1. Java 针对支持通道的类提供了 getChannel() 方法
本地 IO:
FileInputStream/FileOutputStream
RandomAccessFile
网络IO:
Socket
ServerSocket
DatagramSocket
2. 在 JDK 1.7 中的 NIO.2 针对各个通道提供了静态方法 open()
3. 在 JDK 1.7 中的 NIO.2 的 Files 工具类的 newByteChannel()

上代码:

非直接缓冲区复制文件

  1. // 利用通道完成文件的复制(非直接缓冲区)
  2. @Test
  3. public void testa() throws Exception {
  4. long start = System.currentTimeMillis();
  5. //①获取通道
  6. FileInputStream fis = new FileInputStream("D:/workspace/demo/a.txt");
  7. FileOutputStream fos = new FileOutputStream("D:/workspace/demo/b.txt");
  8. FileChannel inChannel = fis.getChannel();
  9. FileChannel outChannel = fos.getChannel();
  10. //②分配指定大小的缓冲区
  11. ByteBuffer buf = ByteBuffer.allocate(1024);
  12. //③将通道中的数据存入缓冲区中
  13. while(inChannel.read(buf) != -1){
  14. buf.flip(); //切换读取数据的模式
  15. //④将缓冲区中的数据写入通道中
  16. outChannel.write(buf);
  17. buf.clear(); //清空缓冲区
  18. }
  19. outChannel.close();
  20. inChannel.close();
  21. fos.close();
  22. fis.close();
  23. long end = System.currentTimeMillis();
  24. System.out.println("耗费时间为:" + (end - start)); // 5
  25. }

直接缓冲区复制文件

//使用直接缓冲区完成文件的复制(内存映射文件)
@Test
public void testb() throws IOException{//2127-1902-1777
    long start = System.currentTimeMillis();

    FileChannel inChannel = FileChannel.open(Paths.get("D:/workspace/demo/a.txt"), StandardOpenOption.READ);
    FileChannel outChannel = FileChannel.open(Paths.get("D:/workspace/demo/c.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);

    //内存映射文件
    MappedByteBuffer inMappedBuf = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
    MappedByteBuffer outMappedBuf = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());

    //直接对缓冲区进行数据的读写操作
    byte[] dst = new byte[inMappedBuf.limit()];
    inMappedBuf.get(dst);
    outMappedBuf.put(dst);

    inChannel.close();
    outChannel.close();

    long end = System.currentTimeMillis();
    System.out.println("耗费时间为:" + (end - start));  // 12
}

///////////如下还可以使用快捷方式操作/////////////

//通道之间的数据传输(直接缓冲区)
@Test
public void testc() throws IOException{
    FileChannel inChannel = FileChannel.open(Paths.get("D:/workspace/demo/a.txt"), StandardOpenOption.READ);
    FileChannel outChannel = FileChannel.open(Paths.get("D:/workspace/demo/d.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);

    // inChannel.transferTo(0, inChannel.size(), outChannel);
    outChannel.transferFrom(inChannel, 0, inChannel.size());

    inChannel.close();
    outChannel.close();
}

直接/非直接缓存如何选择

查查资料。。。

分散(Scatter)与聚集(Gather)

分散读取(Scattering Reads):将通道中的数据分散到多个缓冲区中
聚集写入(Gathering Writes):将多个缓冲区中的数据聚集到通道中

@Test
public void testd() throws IOException{
    RandomAccessFile raf1 = new RandomAccessFile("D:/workspace/demo/a.txt", "rw");

    //1. 获取通道
    FileChannel channel1 = raf1.getChannel();

    //2. 分配指定大小的缓冲区
    ByteBuffer buf1 = ByteBuffer.allocate(100);
    ByteBuffer buf2 = ByteBuffer.allocate(1024);

    //3. 分散读取
    ByteBuffer[] bufs = {buf1, buf2};
    channel1.read(bufs);

    for (ByteBuffer byteBuffer : bufs) {
        byteBuffer.flip();
    }

    System.out.println(new String(bufs[0].array(), 0, bufs[0].limit()));
    System.out.println("-----------------");
    System.out.println(new String(bufs[1].array(), 0, bufs[1].limit()));

    //4. 聚集写入
    RandomAccessFile raf2 = new RandomAccessFile("D:/workspace/demo/f.txt", "rw");
    FileChannel channel2 = raf2.getChannel();

    channel2.write(bufs);
}

Selector

阻塞式/非阻塞式IO

阻塞式IO:
image.png

客户端直接跟服务端交互,发出读写请求,假如此时并没有数据过来,但是客户端依然占用着 Server 端的资源不放,那么 Server 端的这个资源便什么也做不了,被阻塞着,直到有数据过来,才能进行处理;

NIO 非阻塞式IO:
image.png

中间有一个 Selector 选择器,客户端的通道 Channel 都注册到 Selector 上,然后由 Selector 来监控着这些通道,假如某个通道 Channel 上的缓冲区中有数据过来了,那么此时算是准备就绪,然后 Selector 就会将请求发给 Server 的一个线程或者多个线程进行处理;保证服务端资源每次处理都是有效的;从而提高效率;

举个例子:快递的例子,快递员相当于客户端,我们相当于服务端,早期的阻塞式就是说,快递员和我们约好时间在今天中午 10 点在大门口,他送过来给我们;但是假如到了10点我们到了大门口,快递员却没有到,而且拖了很长时间,那么这段时间我们也就什么都干不了,在这等着,浪费资源;那么此时就出来一个 Selector,即电话,快递员到了大门口之后给我们打电话,说已经确定快递员到了大门口了,此时我们在下去就刚好可以取了;这样也就不浪费资源了;

NIO完成网络通信

一、使用 NIO 完成网络通信的三个核心:

  1. 通道(Channel):负责连接
    java.nio.channels.Channel 接口:
    |—SelectableChannel
    |—SocketChannel
    |—ServerSocketChannel
    |—DatagramChannel
    |—Pipe.SinkChannel
    |—Pipe.SourceChannel

  2. 缓冲区(Buffer):负责数据的存取

  3. 选择器(Selector):是 SelectableChannel 的多路复用器。用于监控 SelectableChannel 的 IO 状况

阻塞式网络通信

阻塞式网络通信代码测试:

//客户端
@Test
public void client() throws IOException{
    //1. 获取通道
    SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

    FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);

    //2. 分配指定大小的缓冲区
    ByteBuffer buf = ByteBuffer.allocate(1024);

    //3. 读取本地文件,并发送到服务端
    while(inChannel.read(buf) != -1){
        buf.flip();
        sChannel.write(buf);
        buf.clear();
    }

    //4. 关闭通道
    inChannel.close();
    sChannel.close();
}

//服务端
@Test
public void server() throws IOException{
    //1. 获取通道
    ServerSocketChannel ssChannel = ServerSocketChannel.open();

    FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);

    //2. 绑定连接
    ssChannel.bind(new InetSocketAddress(9898));

    //3. 获取客户端连接的通道
    SocketChannel sChannel = ssChannel.accept();

    //4. 分配指定大小的缓冲区
    ByteBuffer buf = ByteBuffer.allocate(1024);

    //5. 接收客户端的数据,并保存到本地
    while(sChannel.read(buf) != -1){
        buf.flip();
        outChannel.write(buf);
        buf.clear();
    }

    //6. 关闭通道
    sChannel.close();
    outChannel.close();
    ssChannel.close();
}

继续测试阻塞式网络通信,客户端会接收服务端反馈:

//客户端
@Test
public void client() throws IOException{
    SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

    FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);

    ByteBuffer buf = ByteBuffer.allocate(1024);

    while(inChannel.read(buf) != -1){
        buf.flip();
        sChannel.write(buf);
        buf.clear();
    }

    sChannel.shutdownOutput();

    //接收服务端的反馈
    int len = 0;
    while((len = sChannel.read(buf)) != -1){
        buf.flip();
        System.out.println(new String(buf.array(), 0, len));
        buf.clear();
    }

    inChannel.close();
    sChannel.close();
}

//服务端
@Test
public void server() throws IOException{
    ServerSocketChannel ssChannel = ServerSocketChannel.open();

    FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);

    ssChannel.bind(new InetSocketAddress(9898));

    SocketChannel sChannel = ssChannel.accept();

    ByteBuffer buf = ByteBuffer.allocate(1024);

    while(sChannel.read(buf) != -1){
        buf.flip();
        outChannel.write(buf);
        buf.clear();
    }

    //发送反馈给客户端
    buf.put("服务端接收数据成功".getBytes());
    buf.flip();
    sChannel.write(buf);

    sChannel.close();
    outChannel.close();
    ssChannel.close();
}

非阻塞式网络通信

//客户端
@Test
public void client() throws IOException{
    //1. 获取通道
    SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

    //2. 切换非阻塞模式
    sChannel.configureBlocking(false);

    //3. 分配指定大小的缓冲区
    ByteBuffer buf = ByteBuffer.allocate(1024);

    //4. 发送数据给服务端
    Scanner scan = new Scanner(System.in);

    while(scan.hasNext()){
        String str = scan.next();
        buf.put((new Date().toString() + "\n" + str).getBytes());
        buf.flip();
        sChannel.write(buf);
        buf.clear();
    }

    //5. 关闭通道
    sChannel.close();
}

//服务端
@Test
public void server() throws IOException{
    //1. 获取通道
    ServerSocketChannel ssChannel = ServerSocketChannel.open();

    //2. 切换非阻塞模式
    ssChannel.configureBlocking(false);

    //3. 绑定连接
    ssChannel.bind(new InetSocketAddress(9898));

    //4. 获取选择器
    Selector selector = Selector.open();

    //5. 将通道注册到选择器上, 并且指定“监听接收事件”
    ssChannel.register(selector, SelectionKey.OP_ACCEPT);

    //6. 轮询式的获取选择器上已经“准备就绪”的事件
    while(selector.select() > 0){

        //7. 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
        Iterator<SelectionKey> it = selector.selectedKeys().iterator();

        while(it.hasNext()){
            //8. 获取准备“就绪”的是事件
            SelectionKey sk = it.next();

            //9. 判断具体是什么事件准备就绪
            if(sk.isAcceptable()){
                //10. 若“接收就绪”,获取客户端连接
                SocketChannel sChannel = ssChannel.accept();

                //11. 切换非阻塞模式
                sChannel.configureBlocking(false);

                //12. 将该通道注册到选择器上
                sChannel.register(selector, SelectionKey.OP_READ);
            }else if(sk.isReadable()){
                //13. 获取当前选择器上“读就绪”状态的通道
                SocketChannel sChannel = (SocketChannel) sk.channel();

                //14. 读取数据
                ByteBuffer buf = ByteBuffer.allocate(1024);

                int len = 0;
                while((len = sChannel.read(buf)) > 0 ){
                    buf.flip();
                    System.out.println(new String(buf.array(), 0, len));
                    buf.clear();
                }
            }

            //15. 取消选择键 SelectionKey
            it.remove();
        }
    }
}

DatagramChannel 演示:

@Test
public void send() throws IOException{
    DatagramChannel dc = DatagramChannel.open();

    dc.configureBlocking(false);

    ByteBuffer buf = ByteBuffer.allocate(1024);

    Scanner scan = new Scanner(System.in);

    while(scan.hasNext()){
        String str = scan.next();
        buf.put((new Date().toString() + ":\n" + str).getBytes());
        buf.flip();
        dc.send(buf, new InetSocketAddress("127.0.0.1", 9898));
        buf.clear();
    }

    dc.close();
}

@Test
public void receive() throws IOException{
    DatagramChannel dc = DatagramChannel.open();

    dc.configureBlocking(false);

    dc.bind(new InetSocketAddress(9898));

    Selector selector = Selector.open();

    dc.register(selector, SelectionKey.OP_READ);

    while(selector.select() > 0){
        Iterator<SelectionKey> it = selector.selectedKeys().iterator();

        while(it.hasNext()){
            SelectionKey sk = it.next();

            if(sk.isReadable()){
                ByteBuffer buf = ByteBuffer.allocate(1024);

                dc.receive(buf);
                buf.flip();
                System.out.println(new String(buf.array(), 0, buf.limit()));
                buf.clear();
            }
        }

        it.remove();
    }
}

Pipe 代码演示:

@Test
public void test1() throws IOException{
    //1. 获取管道
    Pipe pipe = Pipe.open();

    //2. 将缓冲区中的数据写入管道
    ByteBuffer buf = ByteBuffer.allocate(1024);

    Pipe.SinkChannel sinkChannel = pipe.sink();
    buf.put("通过单向管道发送数据".getBytes());
    buf.flip();
    sinkChannel.write(buf);

    //3. 读取缓冲区中的数据
    Pipe.SourceChannel sourceChannel = pipe.source();
    buf.flip();
    int len = sourceChannel.read(buf);
    System.out.println(new String(buf.array(), 0, len));

    sourceChannel.close();
    sinkChannel.close();
}

实战

1)获取FileChannel

public void test3() throws Exception {
    FileInputStream in = new FileInputStream("");
    ReadableByteChannel channel = Channels.newChannel(in);

    InputStream inputStream = Channels.newInputStream(channel);

    FileChannel fileChannel = in.getChannel();
}

2)读文件

public void test4() throws Exception {
    FileInputStream in = new FileInputStream("f1.txt");
    FileChannel channel = in.getChannel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    while (channel.read(buffer) > 0) {
        buffer.flip();
        while (buffer.hasRemaining()) {
            System.out.print((char) buffer.get());
        }
        buffer.clear();
    }
}

3)写文件

public void test5() throws Exception {
    FileOutputStream out = new FileOutputStream("f3.txt");
    FileChannel channel = out.getChannel();
    String text = "jfdlksa";
    ByteBuffer buffer = ByteBuffer.wrap(text.getBytes());
    channel.write(buffer);
}

4)基于内存映射读取文件,速度最快

public void test6() throws Exception {
    FileInputStream in = new FileInputStream("f2.txt");
    FileChannel c = in.getChannel();
    MappedByteBuffer mbb = c.map(FileChannel.MapMode.READ_ONLY, 0, c.size());
    while (mbb.hasRemaining()) {
        System.out.print((char) mbb.get());
    }
}

5)文件拷贝

public void test7() throws Exception {
    try (FileChannel src = new FileInputStream("f2.txt").getChannel();
         FileChannel des = new FileOutputStream("f2_back.txt").getChannel()) {
        src.transferTo(0, src.size(), des);
        // 或者如下
        // des.transferFrom(src, 0, src.size());
    }
}