1. 三大组件

1.1 Channel & Buffer

channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层

常见的 channel 有

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

buffer 则用来缓冲读写数据,常见的 buffer 有

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

    1.2 Selector

    selector 单从字面意思不好理解,需要结合服务器的设计演化来理解它的用途

    1.2.1 多线程版设计

    image.png
    多线程版缺点

  • 内存占用高

  • 线程上下文切换成本高
  • 只适合连接数少的场景

    1.2.2 线程池版设计

    image.png
    线程池版缺点

  • 阻塞模式下,线程仅能处理一个 socket 连接

  • 仅适合短连接场景

    1.2.3 Selector 版设计

    image.png
    selector 的作用就是配合一个线程来管理多个 channel,获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,不会让线程吊死在一个 channel 上。适合连接数特别多,但流量低的场景(low traffic)

调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理

2. ByteBuffer

2.1 使用姿势

  1. 向 buffer 写入数据,例如调用 channel.read(buffer)
  2. 调用 flip() 切换至读模式
  3. 从 buffer 读取数据,例如调用 buffer.get()
  4. 调用 clear() 或者 compact() 切换至写模式
  5. 重复 1~4 步骤
    1. public static void main(String[] args) {
    2. // 准备输入流
    3. String fileName = Objects.requireNonNull(ByteBufferTest1.class.getClassLoader().getResource("data.txt")).getFile();
    4. try (FileChannel channel = new FileInputStream(fileName).getChannel()) {
    5. // 准备缓冲区
    6. ByteBuffer buffer = ByteBuffer.allocate(10);
    7. while (true) {
    8. int len = channel.read(buffer);
    9. log.info("一共读到 {} 个字节数", len);
    10. if (len == -1) {
    11. break;
    12. }
    13. // 切换到读模式
    14. buffer.flip();
    15. while (buffer.hasRemaining()) {
    16. log.info("读取到内容 {}", (char) buffer.get());
    17. }
    18. // 切换到写模式
    19. buffer.clear();
    20. }
    21. } catch (IOException e) {
    22. e.printStackTrace();
    23. }
    24. }

    2.2 ByteBuffer 结构

    ByteBuffer 有三个重要属性,capacity 容量,position 当前读写指针,limit 读写限制

一开始状态是
image.png
写模式下,position 是写入位置,limit 等于容量,下图表示插入了 4 个字节后的状态
image.png
flip() 动作发生后,position 切换为读取位置,limit 切换为读取限制
image.png
读取 4 个字节后状态为
image.png
clear() 动作发生后状态为
image.png
compact() 方法是把未读完的部分向前压缩,然后切换至写模式
image.png

2.3 常用 API

2.3.1 分配空间

可以使用 allocate 方法为 ByteBuffer 分配空间,其它 buffer 类也有该方法

  1. // java.nio.HeapByteBuffer,Java 堆内存,读写效率低,受到 GC 影响
  2. System.out.println(ByteBuffer.allocate(10).getClass().getName());
  3. // java.nio.DirectByteBuffer, 直接内存,读写效率高(少一次拷贝),不会受到 GC 影响,分配效率低
  4. System.out.println(ByteBuffer.allocateDirect(10).getClass().getName());

2.3.2 向 buffer 写入数据

有两种办法

  • 调用 channel 的 read 方法
  • 调用 buffer 自己的 put 方法 ```java int readBytes = channel.read(buf);

buf.put((byte)127);

  1. <a name="NhWvf"></a>
  2. ### 2.3.3 从 buffer 读取数据
  3. 同样有两种办法
  4. - 调用 channel 的 write 方法
  5. - 调用 buffer 自己的 get 方法
  6. ```java
  7. int writeBytes = channel.write(buf);
  8. byte b = buf.get();

get 方法会让 position 读指针向后走,如果想重复读取数据

  • 可以调用 rewind 方法将 position 重新置为 0
  • 或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针

    2.3.4 mark 和 reset

    mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置

rewind 和 flip 都会清除 mark 位置

  1. // mark 标记索引,reset 重置索引
  2. public static void main(String[] args) {
  3. ByteBuffer buffer = ByteBuffer.allocate(10);
  4. buffer.put((byte) 'a');
  5. buffer.put((byte) 'b');
  6. buffer.put((byte) 'c');
  7. buffer.put((byte) 'd');
  8. // 切换到读模式
  9. buffer.flip();
  10. // 先读取 2 个
  11. System.out.println((char) buffer.get());
  12. System.out.println((char) buffer.get());
  13. // 标记索引
  14. buffer.mark();
  15. System.out.println((char) buffer.get());
  16. System.out.println((char) buffer.get());
  17. // 重置索引
  18. buffer.reset();
  19. // 再读 2 个
  20. System.out.println((char) buffer.get());
  21. System.out.println((char) buffer.get());
  22. }

2.3.5 与字符串互转

Buffer 是非线程安全的

  1. public static void main(String[] args) {
  2. // String 转 ByteBuffer 方法一
  3. ByteBuffer buffer1 = ByteBuffer.allocate(16);
  4. buffer1.put("hello".getBytes());
  5. ByteBufferUtil.debugAll(buffer1);
  6. // String 转 ByteBuffer 方法二, 它会自动切换到读模式
  7. ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello");
  8. ByteBufferUtil.debugAll(buffer2);
  9. // String 转 ByteBuffer 方法三, 它会自动切换到读模式
  10. ByteBuffer buffer3 = ByteBuffer.wrap("hello".getBytes());
  11. ByteBufferUtil.debugAll(buffer3);
  12. // ByteBuffer 转 String
  13. buffer1.flip();
  14. System.out.println(StandardCharsets.UTF_8.decode(buffer1));
  15. // ByteBuffer 转 String
  16. System.out.println(StandardCharsets.UTF_8.decode(buffer3));
  17. }

2.3.6 粘包与拆包

  1. @Slf4j
  2. public class ByteBufferUnpack {
  3. public static void main(String[] args) {
  4. ByteBuffer source = ByteBuffer.allocate(32);
  5. source.put("Hello world\nI'm zhangsan\n Ho".getBytes());
  6. split(source);
  7. source.put("w are you\n".getBytes());
  8. split(source);
  9. }
  10. private static void split(ByteBuffer source) {
  11. // 切换到读模式
  12. source.flip();
  13. for (int i = 0; i < source.limit(); i++) {
  14. if (source.get(i) == '\n') {
  15. int len = i - source.position() + 1;
  16. ByteBuffer target = ByteBuffer.allocate(len);
  17. for (int j = 0; j < len; j++) {
  18. target.put(source.get());
  19. }
  20. ByteBufferUtil.debugAll(target);
  21. // 切换到读模式
  22. target.flip();
  23. log.info("拆解出信息 {}", StandardCharsets.UTF_8.decode(target));
  24. }
  25. }
  26. // 将未拆解的信息继续存入到 source 中
  27. source.compact();
  28. }
  29. }

3. 文件编程

3.1 FileChannel

3.1.1 获取

不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法

  • 通过 FileInputStream 获取的 channel 只能读
  • 通过 FileOutputStream 获取的 channel 只能写
  • 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定

    3.1.2 读取

    会从 channel 读取数据填充 ByteBuffer,返回值表示读到了多少字节,-1 表示到达了文件的末尾
    1. int readBytes = channel.read(buffer);

    3.1.3 写入

    在 while 中调用 channel.write 是因为 write 方法并不能保证一次将 buffer 中的内容全部写入 channel
    1. while(buffer.hasRemaining()) {
    2. channel.write(buffer);
    3. }

    3.1.4 关闭

    channel 必须关闭,不过调用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法会间接地调用 channel 的 close 方法

    3.1.5 强制写入

    操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘,可以调用 force(true) 方法将文件内容和元数据立刻写入磁盘

    3.2 两个 Channel 传输数据

    1. public static void main(String[] args) {
    2. try (
    3. FileChannel from = new FileInputStream("file/tutorial-io/data.txt").getChannel();
    4. FileChannel to = new FileOutputStream("file/tutorial-io/to.txt").getChannel();) {
    5. // 效率高, 底层会使用操作系统的零拷贝进行优化, 上限是 2G
    6. from.transferTo(0, from.size(), to);
    7. } catch (Exception e) {
    8. e.printStackTrace();
    9. }
    10. }

    3.3 Path

    Path 用来表示文件路径
    Paths 是工具类,用来获取 Path 实例 ```java Path source = Paths.get(“1.txt”); // 相对路径 不带盘符 使用 user.dir 环境变量来定位 1.txt

Path source = Paths.get(“d:\1.txt”); // 绝对路径 代表了 d:\1.txt 反斜杠需要转义

Path source = Paths.get(“d:/1.txt”); // 绝对路径 同样代表了 d:\1.txt

Path projects = Paths.get(“d:\data”, “projects”); // 代表了 d:\data\projects

  1. <a name="Ue1sK"></a>
  2. ## 3.4 Files
  3. ```java
  4. Path path = Paths.get("helloword/data.txt");
  5. System.out.println(Files.exists(path));
  1. // 如果目录已存在,会抛异常 FileAlreadyExistsException
  2. // 不能一次创建多级目录,否则会抛异常 NoSuchFileException
  3. Path path = Paths.get("helloword/d1");
  4. Files.createDirectory(path);
  5. // 创建多级目录用
  6. Path path = Paths.get("helloword/d1/d2");
  7. Files.createDirectories(path);
  1. Path source = Paths.get("helloword/data.txt");
  2. Path target = Paths.get("helloword/target.txt");
  3. // 如果文件已存在,会抛异常 FileAlreadyExistsException
  4. Files.copy(source, target);
  5. // 如果希望用 source 覆盖掉 target,需要用 StandardCopyOption 来控制
  6. Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);
  1. Path source = Paths.get("helloword/data.txt");
  2. Path target = Paths.get("helloword/data.txt");
  3. // StandardCopyOption.ATOMIC_MOVE 保证文件移动的原子性
  4. Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
  1. Path target = Paths.get("helloword/target.txt");
  2. // 如果文件不存在,会抛异常 NoSuchFileException
  3. Files.delete(target);
  1. Path target = Paths.get("helloword/d1");
  2. // 如果目录还有内容,会抛异常 DirectoryNotEmptyException
  3. Files.delete(target);
  1. public static void main(String[] args) throws IOException {
  2. AtomicInteger dirCount = new AtomicInteger(0);
  3. AtomicInteger fileCount = new AtomicInteger(0);
  4. Files.walkFileTree(Paths.get("file"), new SimpleFileVisitor<Path>() {
  5. @Override
  6. public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
  7. dirCount.incrementAndGet();
  8. return super.preVisitDirectory(dir, attrs);
  9. }
  10. @Override
  11. public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
  12. fileCount.incrementAndGet();
  13. log.info("fileName: {}", file.getFileName());
  14. return super.visitFile(file, attrs);
  15. }
  16. });
  17. log.info("dir count {}", dirCount.get());
  18. log.info("file count {}", fileCount.get());
  19. }

4. 网络编程

4.1 阻塞与非阻塞

4.1.1 阻塞

阻塞模式下,相关方法都会导致线程暂停

  • ServerSocketChannel.accept() 会在没有连接建立时让线程暂停
  • SocketChannel.read() 会在通道中没有数据可读时让线程暂停
  • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置

单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持

但多线程下,有新的问题,体现在以下方面

  • 32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
  • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接

    1. public static void main(String[] args) throws Exception {
    2. ServerSocketChannel ssc = ServerSocketChannel.open();
    3. ssc.bind(new InetSocketAddress(9527));
    4. List<SocketChannel> channelList = new ArrayList<>();
    5. ByteBuffer buffer = ByteBuffer.allocate(16);
    6. while (true) {
    7. log.info("connecting");
    8. // 这一步会阻塞
    9. SocketChannel sc = ssc.accept();
    10. log.info("connected {}", sc);
    11. channelList.add(sc);
    12. for (SocketChannel channel : channelList) {
    13. // 把数据读取到 buffer 缓冲区, 这一步也会阻塞
    14. channel.read(buffer);
    15. // 切换到读模式
    16. buffer.flip();
    17. ByteBufferUtil.debugRead(buffer);
    18. // 清除
    19. buffer.clear();
    20. }
    21. }
    22. }
    1. public static void main(String[] args) throws Exception {
    2. SocketChannel sc = SocketChannel.open();
    3. sc.connect(new InetSocketAddress(9527));
    4. System.out.println("waiting");
    5. sc.write(StandardCharsets.UTF_8.encode("hello"));
    6. }

    4.1.2 非阻塞

    非阻塞模式下,相关方法都不会让线程暂停

  • 通过 configureBlocking(false) 方法可以设置为非阻塞

  • ServerSocketChannel.accept() 在没有连接建立时,会返回 null,继续运行
  • SocketChannel.read() 在没有数据可读时会返回 0,但线程不会阻塞,可以去执行其它 SocketChannel 的 read 方法或是去执行 ServerSocketChannel 的 accept 方法
  • 写数据时,线程只是等待数据写入 channel 即可,无需等 channel 通过网络把数据发送出去

但非阻塞模式下,即使没有连接建立和可读数据,线程仍然不断运行,白白浪费 CPU

数据复制过程中,线程实际还是阻塞的

  1. public static void main(String[] args) throws Exception {
  2. ServerSocketChannel ssc = ServerSocketChannel.open();
  3. ssc.bind(new InetSocketAddress(9527));
  4. ssc.configureBlocking(false);
  5. List<SocketChannel> channelList = new ArrayList<>();
  6. ByteBuffer buffer = ByteBuffer.allocate(16);
  7. while (true) {
  8. // 这里不会再阻塞
  9. SocketChannel sc = ssc.accept();
  10. if (sc != null) {
  11. log.info("connected {}", sc);
  12. sc.configureBlocking(false);
  13. channelList.add(sc);
  14. }
  15. for (SocketChannel channel : channelList) {
  16. // 这里不会再阻塞
  17. int bytes = channel.read(buffer);
  18. if (bytes <= 0) {
  19. continue;
  20. }
  21. // 切换到读模式
  22. buffer.flip();
  23. ByteBufferUtil.debugRead(buffer);
  24. // 清除
  25. buffer.clear();
  26. }
  27. }
  28. }

4.1.3 多路复用

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

多路复用仅针对网络 IO,普通文件 IO 无法利用多路复用

如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证

  • 有可连接事件时才去连接
  • 有可读事件才去读取
  • 有可写事件才去写入(限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件)

    4.2 Selector

    一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功,让这个线程能够被充分利用

节约了线程的数量,减少了线程上下文切换

4.2.1 创建

  1. Selector selector = Selector.open();

4.2.2 绑定 channel 事件

也称之为注册事件,绑定的事件 selector 才会关心

  • channel 必须工作在非阻塞模式
  • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
  • 绑定的事件类型可以有

    • connect - 客户端连接成功时触发
    • accept - 服务器端成功接受连接时触发
    • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
    • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
      1. channel.configureBlocking(false);
      2. SelectionKey key = channel.register(selector, 绑定事件);

      4.2.3 监听 channel 事件

      可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件
      1. int count = selector.select();
      1. int count = selector.select(long timeout);
      1. int count = selector.selectNow();

      4.2.4 select 何时不阻塞

      事件发生时
  • 客户端发起连接请求,会触发 accept 事件

  • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
  • channel 可写,会触发 write 事件
  • 在 linux 下 nio bug 发生时

调用 selector.wakeup()

调用 selector.close()

selector 所在线程 interrupt

4.3 处理 accept 事件

  1. public static void main(String[] args) throws Exception {
  2. try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
  3. ssc.bind(new InetSocketAddress(9527));
  4. ssc.configureBlocking(false);
  5. Selector selector = Selector.open();
  6. // 绑定 channel 的 ACCEPT 事件
  7. ssc.register(selector, SelectionKey.OP_ACCEPT);
  8. while (true) {
  9. // count 表示有多少个 channel 发生了事件
  10. int count = selector.select();
  11. if (count <= 0) {
  12. TimeUnit.SECONDS.sleep(1);
  13. continue;
  14. }
  15. // 获取到所有事件
  16. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  17. // 遍历所有事件,挨个处理
  18. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  19. while (iterator.hasNext()) {
  20. SelectionKey selectionKey = iterator.next();
  21. // 判断事件类型
  22. if (selectionKey.isAcceptable()) {
  23. ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
  24. SocketChannel socketChannel = channel.accept();
  25. log.info("连接建立事件 {}", socketChannel);
  26. }
  27. // 处理完成后, 必须要移除事件
  28. iterator.remove();
  29. }
  30. }
  31. }
  32. }

4.4 处理 read 事件

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

  1. public static void main(String[] args) throws Exception {
  2. try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
  3. ssc.bind(new InetSocketAddress(9527));
  4. ssc.configureBlocking(false);
  5. Selector selector = Selector.open();
  6. // 绑定 channel 的 ACCEPT 事件
  7. ssc.register(selector, SelectionKey.OP_ACCEPT);
  8. while (true) {
  9. // select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
  10. selector.select();
  11. // 获取到所有事件
  12. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  13. // 遍历所有事件,挨个处理
  14. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  15. while (iterator.hasNext()) {
  16. SelectionKey selectionKey = iterator.next();
  17. // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
  18. iterator.remove();
  19. // 判断事件类型
  20. if (selectionKey.isAcceptable()) {
  21. SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
  22. log.info("连接建立事件 {}", socketChannel);
  23. // 绑定 channel 的 READ 事件
  24. socketChannel.configureBlocking(false);
  25. ByteBuffer buffer = ByteBuffer.allocate(16);
  26. // 使用 attach 传递 buffer
  27. socketChannel.register(selector, SelectionKey.OP_READ, buffer);
  28. } else if (selectionKey.isReadable()) {
  29. try {
  30. SocketChannel channel = (SocketChannel) selectionKey.channel();
  31. // 从 selectionKey 获取到 attach(buffer)
  32. ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
  33. // 把客户端数据读到缓冲区中
  34. int read = channel.read(buffer);
  35. if (read == -1) {
  36. log.warn("未读取到客户端任何数据, 关闭该连接{}", channel);
  37. // 客户端正常、异常关闭时都会触发 read 事件,而事件要么处理,要么取消,否则下次该事件仍会触发
  38. selectionKey.cancel();
  39. channel.close();
  40. } else {
  41. split(buffer);
  42. // 说明未找到, 这个 buffer 不够需要扩容
  43. if (buffer.limit() == buffer.position()) {
  44. ByteBuffer target = ByteBuffer.allocate(buffer.capacity() * 2);
  45. buffer.flip();
  46. target.put(buffer);
  47. // 替换原来的 buffer
  48. buffer = null;
  49. selectionKey.attach(target);
  50. }
  51. }
  52. } catch (IOException e) {
  53. e.printStackTrace();
  54. // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
  55. selectionKey.cancel();
  56. }
  57. }
  58. }
  59. }
  60. }
  61. }
  62. private static void split(ByteBuffer source) {
  63. // 切换到读模式
  64. source.flip();
  65. // 判断是否有结尾标识符
  66. for (int i = 0; i < source.limit(); i++) {
  67. if (source.get(i) == '\n') {
  68. int len = i - source.position() + 1;
  69. ByteBuffer target = ByteBuffer.allocate(len);
  70. for (int j = 0; j < len; j++) {
  71. target.put(source.get());
  72. }
  73. // 切换到读模式, 打印数据
  74. target.flip();
  75. log.info("读取到客户端数据 {}", StandardCharsets.UTF_8.decode(target));
  76. }
  77. }
  78. // 压缩, 将未拆解的信息继续存入到 source 中
  79. source.compact();
  80. }
  1. public static void main(String[] args) throws Exception {
  2. SocketChannel sc = SocketChannel.open();
  3. sc.connect(new InetSocketAddress(9527));
  4. System.out.println("waiting");
  5. sc.write(StandardCharsets.UTF_8.encode("start\nHello, I'm masteryourself\nend\n"));
  6. }

4.4.1 为何要移除

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

    4.4.2 cancel 的作用

    因为客户端正常、异常关闭时都会触发 read 事件,而事件要么处理,要么取消,否则下次该事件仍会触发

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

4.4.3 处理消息边界

image.png
一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽

另一种思路是按分隔符拆分,缺点是效率低

TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量

  • Http 1.1 是 TLV 格式
  • Http 2.0 是 LTV 格式

image.png

4.4.4 ByteBuffer 大小分配

每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer

ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer

  • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
  • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

    4.5 处理 write 事件

    非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)

用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略

  • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
  • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
  • 如果不取消,会每次可写均会触发 write 事件

只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发,因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注

  1. public static void main(String[] args) throws Exception {
  2. try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
  3. ssc.bind(new InetSocketAddress(9527));
  4. ssc.configureBlocking(false);
  5. Selector selector = Selector.open();
  6. // 绑定 channel 的 ACCEPT 事件
  7. ssc.register(selector, SelectionKey.OP_ACCEPT);
  8. while (true) {
  9. // select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
  10. selector.select();
  11. // 获取到所有事件
  12. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  13. // 遍历所有事件,挨个处理
  14. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  15. while (iterator.hasNext()) {
  16. SelectionKey selectionKey = iterator.next();
  17. // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
  18. iterator.remove();
  19. // 判断事件类型
  20. if (selectionKey.isAcceptable()) {
  21. SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
  22. log.info("连接建立事件 {}", socketChannel);
  23. // 绑定 channel 的 READ 事件
  24. socketChannel.configureBlocking(false);
  25. SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_READ);
  26. StringBuilder sb = new StringBuilder();
  27. // 一旦建立连接, 服务端就开始向客户端发送数据
  28. for (int i = 0; i < 3000000; i++) {
  29. sb.append("a");
  30. }
  31. ByteBuffer buffer = StandardCharsets.UTF_8.encode(sb.toString());
  32. int write = socketChannel.write(buffer);
  33. log.info("本次共写入 {} 个字节", write);
  34. // 如果有剩余未读字节,才需要关注写事件
  35. if (buffer.hasRemaining()) {
  36. log.info("需要关注写事件");
  37. // 在原有事件的基础上, 多添加一个关注写事件
  38. sk.interestOps(sk.interestOps() + SelectionKey.OP_WRITE);
  39. // 使用 attach 传递 buffer
  40. sk.attach(buffer);
  41. }
  42. } else if (selectionKey.isWritable()) {
  43. ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
  44. SocketChannel channel = (SocketChannel) selectionKey.channel();
  45. int write = channel.write(buffer);
  46. log.info("本次共写入 {} 个字节", write);
  47. // 如果已经写完了
  48. if (!buffer.hasRemaining()) {
  49. selectionKey.interestOps(selectionKey.interestOps() - SelectionKey.OP_WRITE);
  50. selectionKey.attach(null);
  51. }
  52. }
  53. }
  54. }
  55. }
  56. }
  1. public static void main(String[] args) throws Exception {
  2. SocketChannel sc = SocketChannel.open();
  3. sc.configureBlocking(false);
  4. Selector selector = Selector.open();
  5. sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
  6. sc.connect(new InetSocketAddress(9527));
  7. int total = 0;
  8. while (true) {
  9. // select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
  10. selector.select();
  11. // 获取到所有事件
  12. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  13. // 遍历所有事件,挨个处理
  14. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  15. while (iterator.hasNext()) {
  16. SelectionKey selectionKey = iterator.next();
  17. // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
  18. iterator.remove();
  19. if (selectionKey.isConnectable()) {
  20. SocketChannel channel = (SocketChannel) selectionKey.channel();
  21. // 一定要调用, 完成连接
  22. channel.finishConnect();
  23. log.info("客户端连接成功 {}", channel);
  24. } else if (selectionKey.isReadable()) {
  25. SocketChannel channel = (SocketChannel) selectionKey.channel();
  26. ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
  27. total += channel.read(buffer);
  28. buffer.clear();
  29. log.info("一共读取了 {} 个字节", total);
  30. }
  31. }
  32. }
  33. }

4.6 多线程优化

Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数

这个问题在 jdk 1.8.60 才修复

现在都是多核 cpu,设计时要充分考虑别让 cpu 的力量被白白浪费。前面的代码只有一个选择器,没有充分利用多核 cpu,如何改进?分两组选择器

  • 单线程配一个选择器,专门处理 accept 事件
  • 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件 ```java @Slf4j public class BossEventLoop implements Runnable {

    private Selector boss; private WorkEventLoop[] works; private final AtomicInteger requestIndex = new AtomicInteger(0);

    public void register() {

    1. try {
    2. boss = Selector.open();
    3. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    4. serverSocketChannel.bind(new InetSocketAddress(9527));
    5. serverSocketChannel.configureBlocking(false);
    6. serverSocketChannel.register(boss, SelectionKey.OP_ACCEPT, null);
    7. this.initWorkEventLoop();
    8. new Thread(this, "boss").start();
    9. } catch (IOException e) {
    10. log.error(e.getMessage(), e);
    11. }

    }

    private void initWorkEventLoop() {

    1. works = new WorkEventLoop[Runtime.getRuntime().availableProcessors()];
    2. for (int i = 0; i < works.length; i++) {
    3. works[i] = new WorkEventLoop(i);
    4. }

    }

    @Override public void run() {

    1. while (true) {
    2. try {
    3. boss.select();
    4. Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
    5. while (iterator.hasNext()) {
    6. SelectionKey selectionKey = iterator.next();
    7. iterator.remove();
    8. // boss 线程只处理连接事件
    9. if (selectionKey.isAcceptable()) {
    10. SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
    11. log.info("连接建立事件 {}", socketChannel);
    12. // 轮训交给 work 处理
    13. socketChannel.configureBlocking(false);
    14. works[requestIndex.incrementAndGet() % works.length].register(socketChannel);
    15. }
    16. }
    17. } catch (IOException e) {
    18. log.error(e.getMessage(), e);
    19. }
    20. }

    }

}

  1. ```java
  2. @Slf4j
  3. public class WorkEventLoop implements Runnable {
  4. private Selector work;
  5. private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();
  6. public WorkEventLoop(int index) {
  7. try {
  8. work = Selector.open();
  9. new Thread(this, "work-" + index).start();
  10. } catch (IOException e) {
  11. log.error(e.getMessage(), e);
  12. }
  13. }
  14. public void register(SocketChannel socketChannel) {
  15. tasks.add(() -> {
  16. try {
  17. SelectionKey selectionKey = socketChannel.register(work, 0, null);
  18. selectionKey.interestOps(SelectionKey.OP_READ);
  19. work.selectNow();
  20. } catch (IOException e) {
  21. log.error(e.getMessage(), e);
  22. }
  23. });
  24. // 调用 wakeup() 方法让 select() 不再阻塞,所以代码才能走到 tasks.poll() 否则将会一直阻塞住
  25. work.wakeup();
  26. }
  27. @Override
  28. public void run() {
  29. while (true) {
  30. try {
  31. // 它会导致阻塞, 无法 channel 无法注册到 Selector 上
  32. work.select();
  33. Runnable task = tasks.poll();
  34. if (task != null) {
  35. task.run();
  36. }
  37. Iterator<SelectionKey> iterator = work.selectedKeys().iterator();
  38. while (iterator.hasNext()) {
  39. SelectionKey selectionKey = iterator.next();
  40. iterator.remove();
  41. if (selectionKey.isReadable()) {
  42. SocketChannel channel = (SocketChannel) selectionKey.channel();
  43. ByteBuffer buffer = ByteBuffer.allocate(16);
  44. channel.read(buffer);
  45. // 切换到读模式
  46. buffer.flip();
  47. log.info("从客户端获取数据 {}", StandardCharsets.UTF_8.decode(buffer));
  48. selectionKey.cancel();
  49. channel.close();
  50. }
  51. }
  52. } catch (IOException e) {
  53. log.error(e.getMessage(), e);
  54. }
  55. }
  56. }
  57. }

5. NIO VS BIO

5.1 stream vs channel

stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)

stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用

二者均为全双工,即读写可以同时进行

5.2 IO 模型

同步阻塞、同步非阻塞、同步多路复用、异步非阻塞

  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)

当调用一次 channel.read 或 stream.read 后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

  • 等待数据阶段
  • 复制数据阶段

阻塞 IO
image.png
非阻塞 IO
image.png
多路复用
image.png
异步 IO
image.png
阻塞 IO VS 多路复用
image.pngimage.png

5.3 零拷贝

5.3.1 传统 IO

  1. File f = new File("helloword/data.txt");
  2. RandomAccessFile file = new RandomAccessFile(file, "r");
  3. byte[] buf = new byte[(int)f.length()];
  4. file.read(buf);
  5. Socket socket = ...;
  6. socket.getOutputStream().write(buf);

传统 IO 工作流程
image.png

  1. Java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 Java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu
  2. 从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA
  3. 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝
  4. 接下来要向网卡写数据,这项能力 Java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

Java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

用户态与内核态的切换发生了 3 次,数据拷贝了共 4 次

5.3.2 NIO 优化

  1. // HeapByteBuffer 使用的还是 Java 内存
  2. ByteBuffer.allocate(10);
  3. // DirectByteBuffer 使用的是操作系统内存
  4. ByteBuffer.allocateDirect(10);

image.png
大部分步骤与优化前相同,不再赘述。唯有一点:Java 可以使用 DirectByteBuf 将堆外内存映射到 JVM 内存中来直接访问使用

  • 这块内存不受 JVM 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
  • Java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
    • DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存

减少了 1 次数据拷贝,用户态与内核态的切换次数没有减少

5.3.3 零拷贝

Linux 2.1
image.png
底层采用了 linux 2.1 后提供的 sendFile 方法,Java 中对应着两个 channel 调用transferTo/transferFrom 方法拷贝数据

  1. Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA 将数据读入内核缓冲区,不会使用 cpu
  2. 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
  3. 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

只发生了 1 次用户态与内核态的切换,数据拷贝了 3 次

Linux 2.4
image.png
还是 sendFile 方法

  1. Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA 将数据读入内核缓冲区,不会使用 cpu
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu

整个过程只发生了 1 次用户态与内核态的切换,数据拷贝了 2 次

所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 JVM 内存中

零拷贝的优点有

  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合小文件传输