NIO 基本介绍

  1. Java NIO 全称 java non-blocking IO,是指JDK提供的新API,从JDK1.4提供了一系列改进的输入/输出的新特性,被统称为NIO(即New IO),是同步非阻塞的
  2. NIO 相关类都被放在java.nio包及子包下,并对原Java.io包中的很多类进行改写.[基本案例]
  3. NIO 有三大核心部分: Channel(通道),Buffer(缓冲区),Selector(选择器)
  4. NIO是面向缓冲区, 或者面向 块 编程的,数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网格
  5. Java NIO 的非阻塞,模式, 使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变得可以读取之前,该线程可以继续做其他的事情,非阻塞写也是如此,一个线程请求写入一些数据到某通道,单不需要等待它完全写入,这个线程同事可以去做其他事情[后面有案例说明]
  6. 通俗理解: NIO是可以做到用一个线程来处理多个操作,假设有10000个请求过来,根据实际情况,可以分配50或者100个线程来处理,不会像之前的阻塞IO那样,非得分配10000个线程
  7. HTTP2.0使用了多路复用技术,做到同一个连接并发处理多个请求,并且并发请求的数量必HTTP1.1大了好几个数量级

    案例说明NIO的Buffer

    编写代码

    ```java package com.dance.netty.nio;

import java.nio.IntBuffer;

public class BasicBuffer {

  1. public static void main(String[] args) {
  2. // 新建Buffer
  3. // 5 容量
  4. IntBuffer intBuffer = IntBuffer.allocate(5);
  5. // 向Buffer中存放数据
  6. for (int i = 0; i < intBuffer.capacity(); i++) {
  7. IntBuffer put = intBuffer.put(i * 2);
  8. }
  9. // 从Buffer中获取数据
  10. // flip() : 将Buffer转换, 读写切换
  11. intBuffer.flip();
  12. // 获取数据
  13. while (intBuffer.hasRemaining()) {
  14. System.out.println(intBuffer.get());
  15. }
  16. }

}

<a name="Zg4HZ"></a>
## 执行结果
```java
0
2
4
6
8

NIO 和 BIO 的 比较

  1. BIO以流的方式处理数据,而NIO以块的方式处理数据,块 I/O的效率比流 I/O高很多
  2. BIO 是阻塞的, NIO则是非阻塞的
  3. BIO基于字节流和字符流进行操作,而NIO基于Channel(通道)和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入通道,Selector(选择器)用于监听多个通道的事件(比如L连接请求,数据到达等), 因此使用单个线程就可以监听多个客户端通道

    NIO 三大核心原理示意图

    一张图描述NIO的 Selector Channel Buffer 的关系

    Selector Channel Buffer的关系图(简单版)

    关系图说明
    image.png

  4. 每个channel都会对应一个Buffer

  5. Selector对应一个线程, 一个线程对应多个channel(连接)
  6. 该图反应了有三个Channel注册到 该Selector //程序
  7. 程序切换到那个Channel是由事件决定的, Event是一个重要的概念
  8. Selector 会根据不同的事件,在各个通道上切换
  9. BUffer就是一个内存块, 底层就是一个数组
  10. 数据的读取写入是通过BUffer, 这个和BIO不同, BIO中要么是输入流,或者是输出流, 不能双向,但是NIO的Buffer是可以读也可以写的,,需要调用flip方法切换, channel也是双向的,可以返回底层操作系统的情况,比如Linux,底层操作系统通道就是双向的

    重点

    Buffer可以通过调用flip方法切换读写

    缓冲区(Buffer)

    基本介绍

    缓冲区(Buffer) : 缓冲区本质上是一个可以读写数据的内存块, 可以理解成是一个容器对象(含数组), 该对象提供了一组方法,可以更轻松的使用内存块,缓冲区内置了一些机制,能够跟踪和记录缓冲区的状态变化情况,Channel提供了从文件,网络读取数据的渠道,但是读取或写入的数据必须经由Buffer, 如图: [后面举例说明]
    image.png

    Buffer类及其子类

  11. 在NIO中, Buffer是一个顶层父类, 它是一个抽象类, 类的层级关系图

image.png

  1. Buffer类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息:

image.png
buffer通过控制position游标位置来读取和写入数据

  1. Buffer类相关方法一览

image.png

ByteBuffer

从前面可以看出对于Java中的基本数据类型(boolean除外),都有一个Buffer类型与之相对应,最常用的自然是ByteBuffer类(二进制数据), 该类的主要方法如下:
image.png

通道(Channel)

基本介绍

  1. NIO的通道类似于流, 但有些区别如下:
    1. 通道可以同时进行读写, 而流只能读或者只能写
    2. 通道可以实现异步读写数据
    3. 通道可以从缓冲读数据,也可以写数据到缓冲
  2. BIO中的Stream是单向的, 例如FileInputStream对象只能进行读取数据的操作, 而NIO中的通道(Channel)是双向的, 可以读操作, 也可以写操作
  3. Channel在NIO中是一个接口

    public interface Channel extends Closeable{}
    
  4. 常用的Channel类有 : FileChannel, DatagramChannel, ServerSocketChannel, 和 SocketChannel

[ServerSocketChannel类似 ServerSocket, SocketChannel 类似 Socket]

  1. FileChannel用于文件的数据读写, DatagramChannel用于UDP数据的读写, ServerSocketChannel和SocketChannel用于TCP的数据读写
  2. 图示

image.png

FileChannel 类

FileChannel主要用于对本地文件进行IO操作, 常见的方法有

  • public int read(ByteBuffer dst), 从通道读取数据并放到缓冲区中
  • public int write(ByteBuffer src), 把缓冲区的数据写入通道
  • public Long transferFrom(ReadableByteChannel src, long position, long count), 从目标通道中复制数据到当前通道
  • public long transferTo(long position, long count, WriteableByteChannel target), 把数据从当前通道复制给目标通道

03-Java NIO 编程 - 图8
从图中可以看出,相对于Channel来说,往Buffer中read相当于写数据,从Buffer中write相当于读数据

案例1-本地文件写数据

需求

  1. 使用前面学习的ByteBuffer(缓冲), 和FileChannel(通道), 将 “Hello, Flower” 写入到 01.txt 中
  2. 文件不存在就创建

    编码

    ```java package com.dance.netty.nio.demo;

import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets;

/**

  • 通过Channel写数据到文件 */ public class FileChannel01 {

    public static void main(String[] args) throws IOException {

     // 创建字符串
     String msg = "hello,Flower";
    
     // 创建输出流
     FileOutputStream fileOutputStream = new FileOutputStream("D:\\zhangyugen@JD.com\\coding\\netty\\src\\main\\resources\\01.txt");
    
     // 获取FileChannel
     FileChannel channel = fileOutputStream.getChannel();
    
     // 创建字节缓冲区
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    
     // 将字符串写入缓冲区
     byteBuffer.put(msg.getBytes(StandardCharsets.UTF_8));
    
     // 重置游标, 读写切换
     byteBuffer.flip();
    
     // 从缓冲区写入通道
     channel.write(byteBuffer);
    
     // 关闭文件流
     fileOutputStream.close();
    

    }

}

<a name="Ay8ZE"></a>
### 执行结果
![image.png](https://cdn.nlark.com/yuque/0/2021/png/1603133/1640015795691-f0847863-9d50-4042-b568-4d81ef30f4ef.png#clientId=u6fccf4fe-8fcd-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=346&id=ufde688a9&margin=%5Bobject%20Object%5D&name=image.png&originHeight=346&originWidth=1147&originalType=binary&ratio=1&rotation=0&showTitle=false&size=31954&status=done&style=none&taskId=u3d863320-4cbc-4fc2-9064-699aa2c6043&title=&width=1147)<br />ok
<a name="e3Lfq"></a>
## 案例2-本地文件读数据
<a name="tM6WE"></a>
### 需求

1. 使用前面学习后的ByteBuffer(缓冲)和Channel(通道), 将01.txt 中的数据读入到程序,并显示在控制台屏幕
1. 文件已存在
<a name="gAuDE"></a>
### 编码
```java
package com.dance.netty.nio.demo;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
 * 通过Channel从文件读取数据
 */
public class FileChannel02 {

    public static void main(String[] args) throws IOException {

        File file = new File("D:\\zhangyugen@JD.com\\coding\\netty\\src\\main\\resources\\01.txt");

        // 创建输入流
        FileInputStream fileInputStream = new FileInputStream(file);

        // 获取FileChannel
        FileChannel channel = fileInputStream.getChannel();

        // 创建字节缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());

        // 将通道中的数据读入到Buffer
        channel.read(byteBuffer);

        // 因为没有使用Buffer 所以不需要flip

        // 通过构造String 传入buffer的 底层数组 转成字符串打印
        System.out.println(new String(byteBuffer.array()));

        // 关闭文件流
        fileInputStream.close();

    }

}

执行结果

hello,Flower

案例3-使用一个Buffer完成文件的读取和写入

需求

  1. 使用FileChannel(通道)和 read, write ,完成文件的拷贝
  2. 拷贝 文件 1.txt 到 2.txt

文件准备 01.txt

hello,Flower
    my name is dance, nice to mite you, thanks!

图解
image.png

编码

package com.dance.netty.nio.demo;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
 * 使用通道实现文件拷贝
 */
public class FileChannel03 {

    public static void main(String[] args) throws Exception {

        FileInputStream fileInputStream = new FileInputStream("src/main/resources/01.txt");
        FileOutputStream fileOutputStream = new FileOutputStream("src/main/resources/02.txt");

        FileChannel readChannel = fileInputStream.getChannel();
        FileChannel writeChannel = fileOutputStream.getChannel();

        ByteBuffer container = ByteBuffer.allocate(4);

        // 循环读取
        for (;;){

            // 读取之前清空一下上一次的数据缓存
            container.clear();

            // 从通道中读取数据到Channel 返回值为读取数量
            int byteSize = readChannel.read(container);

            System.out.println(byteSize);

            // -1为读取完成 没有读取到任何数据
            if (byteSize == -1)
                break;

            if (byteSize == 3)
                System.out.println("到3了");

            // 读写切换 重置游标
            container.flip();

            // 为了避免写入空格 采用设置limit 限制
//            if(byteSize != container.capacity())
//                container.limit(byteSize);

            // 将Buffer中的数据写入到 Channel 返回值为写入的数量
            int write = writeChannel.write(container);
        }

        writeChannel.close();
        readChannel.close();
        fileOutputStream.close();
        fileInputStream.close();

    }

}

执行结果

image.png
本来还打算手动设置limit防止读取数据中的后面空格,后来在Debug的时候发现,在调用flip方法后会重新计算limit和position,所以就不需要了
image.png
image.png

案例4-拷贝文件TransferFrom方法

需求

  1. 使用FileChannel(通道)和方法 transferFrom, 完成文件的拷贝
  2. 拷贝一张图片

准备图片 01.jpeg
image.png

编码

package com.dance.netty.nio.demo;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
 * 使用通道实现图片拷贝
 */
public class FileChannel04 {

    public static void main(String[] args) throws Exception {

        FileInputStream fileInputStream = new FileInputStream("src/main/resources/01.jpeg");
        FileOutputStream fileOutputStream = new FileOutputStream("src/main/resources/02.jpeg");

        FileChannel sourceChannel = fileInputStream.getChannel();
        FileChannel targetChannel = fileOutputStream.getChannel();

        // 将源Channel的数据直接传输到目标Channel 从0开始 到 源的大小
        long length = targetChannel.transferFrom(sourceChannel, 0, sourceChannel.size());

        System.out.println(length);

        targetChannel.close();
        sourceChannel.close();
        fileOutputStream.close();
        fileInputStream.close();

    }

}

执行结果

image.png
ok了

关于Buffer和Channel的注意事项和细节

类型转化的get和put

ByteBuffer支持类型转化的put和get, put放入的是什么数据类型, get就应该使用相应的数据类型来取出, 否则就有可能报错(BufferUnderflowException)异常,
案例

package com.dance.netty.nio.demo;

import java.nio.ByteBuffer;

public class ByteBufferPutAndGet {

    public static void main(String[] args) {

        ByteBuffer buffer = ByteBuffer.allocate(1024);

        buffer.putInt(1).putDouble(0.02).putLong(32312312L).putShort((short) 12).putChar('舞');

        buffer.flip();

        System.out.println(buffer.getInt());
        System.out.println(buffer.getDouble());
        System.out.println(buffer.getLong());
        System.out.println(buffer.getShort());
        System.out.println(buffer.getChar());

    }

}

执行结果

1
0.02
32312312
12
舞

将最后一个getChar类型的改为getLong类型,就会报错,所以,存入什么类型就获取什么类型,要按照顺序来
image.png

只读Buffer

可以将一个普通Buffer转成只读Buffer
编码

package com.dance.netty.nio.demo;

import java.nio.ByteBuffer;

public class ByteBufferReadOnly {

    public static void main(String[] args) {

        ByteBuffer buffer = ByteBuffer.allocate(64);

        // 写入数据
        for (byte i = 0; i < 64; i++) {
            buffer.put(i);
        }

        // 读写切换 换为只读
        buffer.flip();
        ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();

        while (readOnlyBuffer.hasRemaining()) {
            System.out.println(readOnlyBuffer.get());
        }

        // 尝试只读Buffer写入
        // 只读 读写切换
        readOnlyBuffer.flip();

        readOnlyBuffer.put((byte) 1);

    }

}

执行结果

.......
60
61
62
63
Exception in thread "main" java.nio.ReadOnlyBufferException
    at java.nio.HeapByteBufferR.put(HeapByteBufferR.java:172)
    at com.dance.netty.nio.demo.ByteBufferReadOnly.main(ByteBufferReadOnly.java:28)

那么通过调用asReadOnlyBuffer方法返回的对象是和上面定义的Buffer是同一个类吗,会不会影响上面的Buffer
image.png
通过阅读源码可以看到,是新生成了一个HeapByteBufferR的一个新对象,所以和上面定义的对象没有任何关系

MappedByteBuffer

NIO还提供了MappedByteBuffer, 可以让文件直接在内存(堆外的内存)中进行修改, 而如何同步到文件由NIO来完成
编码
将文件02.txt中的Flower修改为Dance
image.png

package com.dance.netty.nio.demo;

import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;

public class BufferAsMappedByteBuffer {

    public static void main(String[] args) throws Exception{

        // 加载文件 权限为 rw
        RandomAccessFile randomAccessFile = new RandomAccessFile("src/main/resources/02.txt", "rw");

        FileChannel channel = randomAccessFile.getChannel();

        /*
         * FileChannel.MapMode.READ_WRITE : 模式
         * 6: 开始下标
         * 6: 从开始下标后的,修改字符数量
         */
        MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 6, 6);

        map.put(0,(byte) 'D');
        map.put(1,(byte) 'a');
        map.put(2,(byte) 'n');
        map.put(3,(byte) 'c');
        map.put(4,(byte) 'e');
        map.put(5,(byte) ' ');

        channel.close();
        randomAccessFile.close();
        System.out.println("修改完成~~");
    }

}

执行结果
image.png
不知道为啥,好像是IDEA的索引没有更新,在IDEA中看到的是没有改变的,在文件夹中用记事本打开是已修改的

Buffer的分散和聚集

前面我们讲的读写操作,都是通过一个Buffer完成的, NIO还支持,通过多个Buffer(即Buffer数组)完成读写操作, 即Scattering和Gathering
编码

package com.dance.netty.nio.demo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.stream.Stream;

public class BufferAsScatteringAndGathering {

    public static void main(String[] args) throws IOException {

        /*
         * Scattering : 将数据写入Buffer时, 可以采用Buffer数组, 依次写入 [分散]
         * Gathering : 从Buffer读取数据时, 可以采用Buffer数组, 依次读取 [聚集]
         */

        // 使用 ServerSocketChannel 和 SocketChannel

        // 获取一个ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        // 创建地址 地址默认为127.0.0.1 端口指定为 7000
        SocketAddress socketAddress = new InetSocketAddress(7000);

        // 绑定地址
        serverSocketChannel.socket().bind(socketAddress);

        // 创建Buffer数组
        ByteBuffer[] byteBuffers = new ByteBuffer[2];
        // 第一个容量为5
        byteBuffers[0] = ByteBuffer.allocate(5);
        // 第二个容量为3
        byteBuffers[1] = ByteBuffer.allocate(3);

        System.out.println("ServerSocketChannel stating in 7000.......");

        // 阻塞并等待连接
        SocketChannel socketChannel = serverSocketChannel.accept();

        // 定义消息长度
        int messageLength = 8;

        // 循环
        for (; ; ) {

            // 读取字节大小
            int byteReadSize = 0;

            // 只有小于 指定长度 才读取
            while (byteReadSize < messageLength) {
                // 修复BUG 在读取之前调用一下clear 不然第二次读取数据 readSize 会错误
                Arrays.stream(byteBuffers).forEach(ByteBuffer::clear);

                // 从通道读取到字节数组
                long readSize = socketChannel.read(byteBuffers);
                // 累计读取字节数量
                byteReadSize += readSize;
                System.out.println("byteReadSize: " + byteReadSize);
                // 打印字节数组中的Buffer中的position和limit
                Arrays.stream(byteBuffers).map(byteBuffer -> "buffer position: " + byteBuffer.position()
                                + " buffer limit: " + byteBuffer.limit() + "\t")
                        .forEach(System.out::print);
                System.out.println();
            }

            // 将Buffer数组中的Buffer做读写切换
            Arrays.stream(byteBuffers).forEach(ByteBuffer::flip);

            // 将接收到的数据回显给客户端
            int byteWriteSize = 0;

//            if (byteWriteSize < messageLength) {
                // 将数据写回到客户端
                long writeSize = socketChannel.write(byteBuffers);
                // 累计写出字节数量
                byteWriteSize += writeSize;

                // 修复BUG 补充上面逻辑,上面要累计到8 跳出,下面如果不够的话也需要跳出所以采用while不合理,所以取消条件
//            }

            // 清空字节数组中的Buffer
            Arrays.stream(byteBuffers).forEach(ByteBuffer::clear);

            // 打印统计
            System.out.println("byteReadSize: " + byteReadSize + ", byteWriteSize: " + byteWriteSize);

        }

    }

}

执行结果

ServerSocketChannel stating in 7000.......
byteReadSize: 4
buffer position: 4 buffer limit: 5    buffer position: 0 buffer limit: 3    
byteReadSize: 9
buffer position: 5 buffer limit: 5    buffer position: 0 buffer limit: 3    
byteReadSize: 9, byteWriteSize: 5
byteReadSize: 8
buffer position: 5 buffer limit: 5    buffer position: 3 buffer limit: 3    
byteReadSize: 8, byteWriteSize: 8
byteReadSize: 8
buffer position: 5 buffer limit: 5    buffer position: 3 buffer limit: 3    
byteReadSize: 8, byteWriteSize: 8
byteReadSize: 1
buffer position: 1 buffer limit: 5    buffer position: 0 buffer limit: 3    
byteReadSize: 7
buffer position: 5 buffer limit: 5    buffer position: 1 buffer limit: 3

客户端依旧使用cmd
连接命令: Telnet 127.0.0.1 7000
image.png
ok了, 我对老师的代码进行了一些修改,有注释可以自己看一下

Selector(选择器)

基本介绍

  1. Java 的NIO, 用非阻塞的IO方式, 可以用一个线程, 处理多个的客户端连接, 就会使用到Selector(选择器)
  2. Selector能够检测多个注册的通道上是否有事件发生(注意: 多个Channel以事件的当时可以注册到同一个Selector), 如果有事件发生, 便获取事件然后针对每个事件进行相应的处理, 这样就可以只用一个单线程去管理多个通道, 也就是管理多个连接和请求[示意图]
  3. 只有在 连接/通道 真正有读写事件发生时, 才会进行读写, 就大大地减少了系统开销, 并且不必为每个连接都创建一个线程, 不用去维护多个线程
  4. 避免了多线程之间的上下文切换导致的开销

    Selector示意图的特点说明

    image.png
    说明如下:

  5. Netty 的IO 线程 NioEventLoop 聚合了 Selector(选择器,也叫多路复用器), 可以同时并发处理成百上千个客户端连接

  6. 当线程从某客户端Socket通道进行读写数据时, 若没有数据可用时, 该线程可以进行其他任务
  7. 线程通常将非阻塞IO的空闲时间用于在其他通道上执行IO操作, 所以单独的线程可以管理多个输入和输出通道
  8. 由于读写操作都是非阻塞的, 这就可以充分提升IO线程的运行效率, 避免由于频繁的I/O 阻塞导致的线程挂起
  9. 一个I/O线程可以并发处理N个客户端连接和读写操作, 这从根本上解决了传统同步阻塞I/O一连接一线程模型, 架构的性能,弹性伸缩能力和可靠性都得到了极大的提升

    Selector类相关方法

    Selector类是一个抽象类, 常用方法和说明如下:
    image.png

    注意事项

  10. NIO中的ServerSocketChannel功能类似ServerSocket, SocketChannel功能类似Socket

  11. Select相关方法说明

    1. selector.select() // 阻塞
    2. selector.select(1000) // 阻塞1000毫秒, 在1000毫秒后返回
    3. selector.wakeup() // 唤醒selector
    4. selector.selectNow() // 不阻塞, 立马返还

      NIO 非阻塞 网络编程原理分析图

      NIO 非阻塞 网络编程相关的(Selector , SelectionKey , ServerSocketChannel , 和 SocketChannel) 关系梳理图
      image.png
      对上图的说明
  12. 当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel

  13. Selector 进行监听 select 方法, 返回有事件发生的通道的个数
  14. 将SocketChannel注册到Selector上, register(Selector sel, int ops) ,一个Selector上可以注册多个SocketChannel
  15. 注册后会返回一个SelectionKey,会和该Selector关联(集合)——-[其实我觉得就是一个唯一标识]
  16. 进一步得到各个SelectionKey(有事件发生)
  17. 在通过SelectionKey反向获取SocketChannel, 方法Channel()
  18. 可以通过得到的Channel,完成业务处理
  19. 代码撑腰…..

    NIO 非阻塞 网络编程快速入门

    需求

  20. 编写一个NIO入门案例, 实现服务器和客户端之间的数据简单通讯(非阻塞)

  21. 目的: 理解NIO非阻塞网络编程机制

    编码

    编写Server

    ```java package com.dance.netty.nio.demo.nio;

import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;

public class NIOServer {

public static void main(String[] args) throws Exception {

    // 创建ServerSocketChannel
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

    // 创建一个Selector
    Selector selector = Selector.open();

    // 绑定6666端口
    serverSocketChannel.socket().bind(new InetSocketAddress(6666));

    // 设置为非阻塞
    serverSocketChannel.configureBlocking(false);

    // 将ServerSocketChannel注册到Selector, 事件为 OP_ACCEPT
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    // 循环等待客户端连接
    for (int i = 3; ; i+=3) {

        // 判断是否等待1秒后 无事件
        if (selector.select(3000) == 0) {
            System.out.println("等待" + i + "秒后 , 无连接");
            continue;
        }

        /*
         *  1. 如果返回大于0 代表已经获取到关注的事件 就获取相关的SelectionKey 集合
         *  2. 调用selector.selectedKeys() 返回关注事件的集合
         *  3. 通过SelectionKey反向获取Channel
         *  4. 通过Channel处理数据
         */
        Set<SelectionKey> selectionKeys = selector.selectedKeys();

        Iterator<SelectionKey> iterator = selectionKeys.iterator();

        while (iterator.hasNext()) {
            SelectionKey selectionKey = iterator.next();
            // 如果是连接事件
            if (selectionKey.isAcceptable()) {
                // 因为是事件驱动的, 所以已经判断是连接事件了 在这里调用accept不会阻塞, 会立即给要连接的客户端创建一个SocketChannel
                SocketChannel socketChannel = serverSocketChannel.accept();
                // 将SocketChannel设置为非阻塞
                socketChannel.configureBlocking(false);
                // 将SocketChannel注册到Selector, 并绑定读取事件, 指定字节Buffer的大小为1024
                SelectionKey register = socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                System.out.println("selection key is " + register);
            }

            // 如果是读取事件
            if (selectionKey.isReadable()) {
                // 通过SelectionKey 获取Channel
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                // 通过SelectionKey 获取ByteBuffer
                ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                // 将Channel的数据写入Buffer
                socketChannel.read(buffer);

                // 写完之后 切换为读 不然会直接读取到长度
                buffer.flip();

                // 不能直接使用返回的底层数组,应为会有很多空格, 需要设置偏移量和截止位置
                System.out.println("from 客户端: " + new String(buffer.array(), 0,buffer.limit()));
            }

            // 从集合中移除Key防止重复处理
            iterator.remove();
        }
    }
}

}

<a name="Tf7Fa"></a>
### 编写Client
```java
package com.dance.netty.nio.demo.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

public class NIOClient {

    public static void main(String[] args) throws Exception {

        // 创建一个SocketChannel
        SocketChannel socketChannel = SocketChannel.open();

        // 设置非阻塞
        socketChannel.configureBlocking(false);

        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);

        // 如果没有连接成功
        if (!socketChannel.connect(inetSocketAddress)) {
            while (!socketChannel.finishConnect()) {
                System.out.println("因为连接需要时间, 客户端不会阻塞, 可以做其他工作......");
            }
        }

        // 如果连接成功 就发数据
        String message = "Hello Flower";

        // Wraps a byte array into a buffer. (包装一个字节数组到Buffer)
        ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));

        int write = socketChannel.write(buffer);

        System.out.println("写入字节数: " + write);

        // 阻塞
        int read = System.in.read();
    }

}

执行结果

Server

等待3秒后 , 无连接
等待6秒后 , 无连接
selection key is sun.nio.ch.SelectionKeyImpl@29ee9faa
from 客户端: Hello Flower
等待15秒后 , 无连接
等待18秒后 , 无连接
selection key is sun.nio.ch.SelectionKeyImpl@475530b9
from 客户端: Hello Flower
等待27秒后 , 无连接
等待30秒后 , 无连接
等待33秒后 , 无连接
....

Client1 and Clent2

写入字节数: 12

在Server端,我对老师的代码进行了一些改进,在判断读事件的时候,在将Channel的数据写入Buffer后,做了读写切换,让position位置置换了limit的值,不然limit会是1024,然后再构造String的时候,设置了偏移量和截止位置,截止位置就是buffer的limit,应为做了flip, 不然会有很多空格,会将1024全部读出来

IDEA启动多客户端

我查了一下网上说是IDEA可以设置重复启动一个类的, 但是不知道为啥我的没有,我就想了一个简单的办法
image.png
拷贝一个重命名,就可以启动两个了

SelectionKey

  1. SelectionKey, 表示Selector 和网络通道的注册关系,共4种:

    1. int OP_ACCEPT: 有新的网络连接可以accept, 值为 16
    2. int OP_CONNECT: 代表连接已建立,值为8
    3. int OP_READ: 代表读操作,值为 1
    4. int OP_WRITE: 代表写操作, 值为 4
    5. 源码中:
      public static final int OP READ= 1<< 0;
      public static final int OP_WRITE = 1<< 2;
      public static final int OP_CONNECT = l<< 3;
      public static final int OP ACCEPT = 1<< 4;
      
  2. SelectionKey相关方法

    1. image.png

      ServerSocketChannel

  3. ServerSocketChannel在服务器端监听新的客户端Socket连接

  4. 相关方法如下
  5. image.png

    SocketChannel

  6. SocketChannel, 网络IO通道, 具体负责进行读写操作, NIO把缓冲区的数据写入通道, 或者把通道里的数据读到缓冲区

  7. 相关方法如下
  8. image.png

    NIO网络编程应用实例-群聊系统

    需求

  9. 编写一个NIO群聊系统,实现服务器端和客户端之间的数据简单通讯,非阻塞

  10. 实现多人群聊
  11. 服务器端: 可以监测用户上线, 离线, 并实现消息转发功能
  12. 客户端: 通过Channel可以无阻塞发送消息给其他用户,同时可以接受其他用户发送的消息(由服务器转发得到)
  13. 目的: 进一步了解NIO非阻塞网络编程机制
  14. 示意图分析和代码

    示意图

    image.png

    编码

    Server ```java package com.dance.netty.nio.demo.groupchat;

import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Set;

public class GroupChatServer {

/**
 * 选择器
 */
private Selector selector;

/**
 * 服务器 Channel
 */
private ServerSocketChannel serverSocketChannel;

/**
 * 服务器端口号
 */
private static final int PORT = 6667;

public GroupChatServer() {
    try {
        // 初始化参数
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

/**
 * 监听
 */
public void listener() {
    for (; ; ) {
        try {
            int eventCount = selector.select(10000);
            // 有事件要处理
            if (eventCount > 0) {
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if (selectionKey.isAcceptable()) {
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                        System.out.println("用户: " + socketChannel.getRemoteAddress().toString().substring(1) + " 上线了");
                    }
                    if (selectionKey.isReadable()) {
                        readData(selectionKey);
                    }
                    iterator.remove();
                }
            }
            System.out.println("等待事件发生......");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

/**
 * 处理读事件
 *
 * @param selectionKey channel id
 */
public void readData(SelectionKey selectionKey) {
    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
    ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
    try {
        int readSize = 0;
        int readSizeCount = 0;
        byte[] dist = new byte[0];
        // 为了处理大于1024的数据 采用循环读取
        while ((readSize = socketChannel.read(buffer)) > 0) {
            readSizeCount += readSize;
            byte[] src = buffer.array();
            buffer.flip();
            // 为了防止汉子断裂,采用字节数组合并
            dist = mergerByteArray(dist, dist.length, src, buffer.limit());
        }
        String message = new String(dist);
        System.out.println("用户: " + socketChannel.getRemoteAddress().toString().substring(1) + " , 发送: " + message);
        sendMessageToOtherClients(message, selectionKey);
    } catch (IOException e) {

// e.printStackTrace(); try { System.out.println(“用户: “ + socketChannel.getRemoteAddress().toString().substring(1) + “ 离线了”); // 从selector中取消注册 selectionKey.cancel(); // 关闭通道 socketChannel.close(); } catch (IOException ex) { ex.printStackTrace(); } } }

/**
 * 发送给其他客户端
 *
 * @param message      消息
 * @param selectionKey form client key
 */
public void sendMessageToOtherClients(String message, SelectionKey selectionKey) {
    // 获取所有的在线客户端
    Set<SelectionKey> keys = selector.keys();
    // 排除自己
    keys.stream().filter(key -> !key.equals(selectionKey)).filter(key -> key.channel() instanceof SocketChannel).forEach(key -> {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        // 因为不知道 字节数组的长度 所以采用新的Buffer
        ByteBuffer wrap = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
        try {
            int write = socketChannel.write(wrap);
            System.out.println("输出数据给:"+socketChannel.getRemoteAddress().toString().substring(1));
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
}

/**
 * byte array merger
 *
 * @param left        左边的字节数组
 * @param leftLength  要合并的结束坐标
 * @param right       右边的字节数组
 * @param rightLength 要合并的结束坐标
 * @return 合并后的字节数组
 */
public byte[] mergerByteArray(byte[] left, int leftLength, byte[] right, int rightLength) {
    byte[] bytes = new byte[leftLength + rightLength];
    System.arraycopy(left, 0, bytes, 0, leftLength);
    System.arraycopy(right, 0, bytes, leftLength, rightLength);
    return bytes;
}

public static void main(String[] args) {
    GroupChatServer groupChatServer = new GroupChatServer();
    groupChatServer.listener();
}

}

client
```java
package com.dance.netty.nio.demo.groupchat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;

public class GroupChatClient {

    private final String IP = "127.0.0.1";

    private final int PORT = 6667;

    private Selector selector;

    private SocketChannel socketChannel;

    private String username;

    public GroupChatClient() {
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open(new InetSocketAddress(IP, PORT));
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
            username = socketChannel.getLocalAddress().toString().substring(1);
            System.out.println(username + " init success......");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String message) {
        message = username + " : " + message;
        try {
            int write = socketChannel.write(ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void readMessage() {
        int select = 0;
        try {
            select = selector.select();
            if (select > 0) {
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if (selectionKey.isReadable()) {
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int read = socketChannel.read(buffer);
                        if (read > 0) {
                            buffer.flip();
                            String msg = new String(buffer.array(), 0, buffer.limit());
                            System.out.println(msg);
                        }
                    }
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        GroupChatClient groupChatClient = new GroupChatClient();
        new Thread(() -> {
            while (true) {
                groupChatClient.readMessage();
            }
        }).start();

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()){
            String s = scanner.nextLine();
            groupChatClient.sendMessage(s);
        }
    }

}

测试

  1. 启动Server

image.png

  1. 启动三个客户端

第一个
image.png
image.png
第二个
image.png
Server提示
image.png
第三个
image.png
image.png
客户端上线提示OK

  1. 发送消息

第一个客户端发送消息
image.png
第二个
image.png
第三个
image.png

  1. 第二个客户端回复

image.png
第一个
image.png
第三个
image.png
多人群聊功能实现 ok

  1. 第三个客户端下线

image.png
server端提示
image.png
用户下线提示ok

NIO 与 零拷贝

零拷贝基本介绍

  1. 零拷贝是网络编程的关键, 很多性能优化都离不开
  2. 在Java程序中, 常用的零拷贝有mmap(内存映射) 和 sendFile. 那么, 他们在OS里, 到底是咋么样的一个设计? 我们分析mmap和sendFile这两个零拷贝
  3. 另外我们看一下NIO中如何使用零拷贝

    传统IO数据读写

    Java传统IO和网络编程的一段代码
    image.png

    传统IO模型

    image.png
    DMA : direct memory access 直接内存拷贝( 不使用CPU )

    mmap 优化

  4. mmap 通过内存映射, 将文件映射到内核缓冲区,同时 用户空间可以共享内核空间的数据, 这样,在进行网络传输时, 就可以减少内核空间到用户空间的拷贝次数,如下图

  5. mmap示意图

image.png

sendFile 优化

  1. Linux 2.1 版本 提供了 sendFIle 函数, 其基本原理如下: 数据根本不经过用户态,直接从内核缓冲区进入到Socket Buffer ,同时, 由于和用户态完全无关, 就减少了一次上下文切换
  2. 示意图和小结

image.png

  1. 提示: 零拷贝从操作系统角度, 是没有CPU拷贝的
  2. Linux 在2.4 版本中, 做了一些修改, 避免了从内核缓冲区拷贝到SocketBuffer, 的操作,直接拷贝到协议栈,从而再一次减少了数据拷贝,具体如下图和小结

image.png

  1. 这里其实有一次CPU拷贝

    1. Kernel buffer -> socket buffer
    2. 但是,拷贝的信息很少, 比如 length, offset
    3. 信息很少,可以忽略

      零拷贝的再次理解

  2. 我们说零拷贝, 是从操作系统的角度来说的,因为内核缓冲区之间, 没有数据是重复的(只有 kernel buffer 有一份数据)

  3. 零拷贝不仅仅带了更少的数据复制, 还能带来其他的性能优势, 例如更少的上下文切换, 更少的CPU缓存伪共享以及无CPU校验和计算

    mmap 和 sendFile的区别

  4. mmap适合小数据量读写, sendFile 适合大文件传输

  5. mmap需要4次上下文切换, 3次数据拷贝; sendFile 需要3次上下文切换, 最少2次数据拷贝
  6. sendFile可以利用DMA方式, 减少CPU拷贝, mmap则不能(必须从内核拷贝到Socket缓冲区)

    零拷贝案例

    需求

  7. 使用传统IO方法传递一个大文件

  8. 使用NIO零拷贝方式传递(transferTo)一个大文件
  9. 看看两种传递方式消耗时间分别是多少

    编码

    server ```java package com.dance.netty.nio.demo.zerocopy;

import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel;

public class ZeroCopyServer {

public static void main(String[] args) throws Exception {

    InetSocketAddress inetSocketAddress = new InetSocketAddress(7001);

    ServerSocketChannel open = ServerSocketChannel.open();

    open.socket().bind(inetSocketAddress);

    for (;;){
        SocketChannel accept = open.accept();
        int countSize = 0;
        ByteBuffer allocate = ByteBuffer.allocate(4096);
        while (-1 != countSize){
            countSize  = accept.read(allocate);
            allocate.rewind(); // 倒置 position=0 Mark 作废
        }
    }

}

}

client
```java
package com.dance.netty.nio.demo.zerocopy;

import java.io.File;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.time.Instant;

public class ZeroCopyClient {

    public static void main(String[] args) throws Exception {

        SocketChannel open = SocketChannel.open();
        boolean connect = open.connect(new InetSocketAddress(7001));

        // 获取文件Channel
        File file = new File("src/main/resources/01.jpeg");
        FileChannel channel = new FileInputStream(file).getChannel();

        Instant startTime = Instant.now();

        /*
         * 在Linux下,一个transferTo方法就可以传输完成
         * 在Windows下,调用一次transferTo 只能传输8M,就需要分段传输文件,而且要记录传输时的位置
         * transferTo 底层使用零拷贝
         */
        long l = channel.transferTo(0, channel.size(), open);
        System.out.println(l);

        Instant endTime = Instant.now();
        System.out.println("用时:" + Duration.between(startTime,endTime).toMillis() + "ms");
        channel.close();
    }

}

执行结果

6806
用时:3ms

零拷贝博客

https://blog.csdn.net/weixin_42096901/article/details/103017044
我感觉写的很不错

AIO 基本介绍

  1. JDK7 引入了,Asynchronous I/O,即AIO ,在进行IO编程中,常用到两种模式:Reactor 和 Proactor, Java 的NIO就是Reactor,,当有事件触发时,服务器端得到通知进行相应的处理
  2. AIO 即NIO2.0, 叫异步非阻塞IO, AIO引入异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,他的特点是,先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多,且连接时间较长的应用
  3. 目前AIO还没有广泛应用,Netty也是基于NIO,而不是AIO,因此就不在这里讲AIO了,有兴趣的可以链接一下
  4. <>

    BIO NIO 和 AIO对比

    image.png
    举例说明

  5. 同步阻塞: 到理发店理发,就一直等着,直到轮到自己理发

  6. 同步非阻塞: 到理发店理发,发现前面有其他人,给理发师说一下先干其他事情,一会过来看是否轮到自己
  7. 异步非阻塞: 给理发师打电话,让理发师上门服务,自己干其他事情,理发师上门通知,来你家给你理发

NIO完结撒花花