问题描述

在 TCP 套接字编程中,解决消息边界问题是很重要的。当接收者尝试从套接字中读取的消息与预期的不符的时候会发生两种情况。其一,如果套接字中没有其他消息,接收者将会阻塞等待,同时无法处理接收到的消息,如果发送方同时在等待接受端的响应消息的时候就会形成死锁;其二,如果套接字中有其他消息,接收者会将后面消息的一部分甚至全部读取到第一条消息中,这会产生一些协议上的错误。

ByteBuffer 大小分配

  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
  • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
  • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

解决办法

  • 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
  • 另一种思路是按分隔符拆分,缺点是效率低
  • TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量


分隔符拆分

服务端

  1. package cn.inetty.nio.selector2;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.io.IOException;
  4. import java.net.InetSocketAddress;
  5. import java.nio.ByteBuffer;
  6. import java.nio.channels.SelectionKey;
  7. import java.nio.channels.Selector;
  8. import java.nio.channels.ServerSocketChannel;
  9. import java.nio.channels.SocketChannel;
  10. import java.nio.charset.Charset;
  11. import static cn.inetty.nio.ByteBufferUtil.debugAll;
  12. @Slf4j
  13. public class Server {
  14. public static void main(String[] args) throws IOException {
  15. // 使用系统默认的 Selector Provider 创建一个选择器(多路复用器) 这个选择器将保持打开状态 直到其 close 方法将之关闭
  16. // Selector 是整个 NIO 的核心,使用它可以实现在单线程环境下管理多个 channels,也就是说可以管理多个网络连接。
  17. // 具体的说就是可以通过 Selector 检查一个或多个 NIO Channel (通道) 是否处于可读、可写的状态和在通道上发生的事件。
  18. var selector = Selector.open();
  19. // ServerSocketChannel 是一个侦听套接字的可选通道 是面向流的 通过其 open 方法创建服务器套接字通道,新创建的服务器套接字通道以已经打开,但还没有绑定,可以通过其 bind 方法绑定
  20. // 服务器套接字可以安全地提供给多个并发线程使用
  21. var ssc = ServerSocketChannel.open();
  22. // configureBlocking 是 ServerSocketChannel 父类定义的方法,用于调整当前通道的阻止模式,false 表示非阻塞模式
  23. ssc.configureBlocking(false);
  24. // SelectionKey 是代表 SelectableChannel 注册的 Selector 令牌 每次向选择器注册通道的时候都会创建一个选择键,这个选择键的密钥会一直保持有效,直到通过调用 cancel 方法关闭通道或者选择器的时候才会在下一次选择操作的时候被删除,可以使用 isValid 方法检验密钥是否有效
  25. // SelectableChannel 是可通过 Selector 多路复用的通道 register 返回一个选择键 表示通道与选择器的注册 一旦注册完成,通道就会保持注册状态,直到被注销
  26. // 三个参数依次为:要注册此通道的选择器、为结果密钥设置的兴趣、结果密钥的附近
  27. var selectionKey = ssc.register(selector, 0, null);
  28. // 通过 interestOps 更改选择键的兴趣集 下面代码表示选择键将关注 accept 方法 accept方法会在服务器成功接受连接的时候触发,与之对应的是 connect 方法,此方法会在客户端连接成功时触发,也可以在 register 的时候设置
  29. selectionKey.interestOps(SelectionKey.OP_ACCEPT);
  30. // bind 方法将通道的套接字绑定到本地地址并配套套接字以侦听连接
  31. // InetSocketAddress 实现了 IP 套接字地址(IP地址+端口号),也可以是主机名+端口号
  32. ssc.bind(new InetSocketAddress(8888));
  33. while (true) {
  34. // select 方法会选择至少一个Channel通道,并将 I/O 操作准备就绪,如果有读写事件发生就交给 Thread 来处理,没有事件的时候通道将会被阻塞
  35. selector.select();
  36. // 获取 selected-key集 的迭代器
  37. var iter = selector.selectedKeys().iterator();
  38. while (iter.hasNext()) {
  39. var key = iter.next();
  40. // 判断事件类型,isAcceptable 表示此密钥的通道是否准备好勒接受新的套接字连接 如果为 true 并且 readyOps() & OP_ACCEPT!=0 那么就会触发 accept 方法
  41. if (key.isAcceptable()) {
  42. // channel 方法获取一个通道,这个通道就是使用 key 创建的通道,参见上面的 ServerSocketChannel.open() 代码
  43. // 即使是取消密钥之后,方法仍然会继续返回通道
  44. var channel = (ServerSocketChannel)key.channel();
  45. // accept 方法表示接受与通道套接字的连接,此时通道会阻塞直到发生 I/O 操作
  46. var sc = channel.accept();
  47. // 修改模式为非阻塞模式
  48. sc.configureBlocking(false);
  49. // allocate 分配一个新的字节缓冲区并指定容量,默认位置是0,使用 capacity 可以获取此缓冲区的容量
  50. var buf = ByteBuffer.allocate(16);
  51. // 将 ByteBuffer 作为附件关联到 SelectionKey 中
  52. var scKey = sc.register(selector, SelectionKey.OP_READ, buf);
  53. System.out.println(sc);
  54. } else if (key.isReadable()) {
  55. try {
  56. // SocketChannel 是一个面向流的连接套接字的可选通道
  57. var channel = (SocketChannel)key.channel();
  58. // attachment方法用于检索当前附近 在这里可以获取 SelectionKey 关联的 ByteBuffer
  59. var buf = (ByteBuffer)key.attachment();
  60. // 从通道中读取一个字节序列到给定的缓冲区
  61. int read = channel.read(buf);
  62. if (read == -1) {
  63. // cancel 方法请求取消此密钥通道和选择器的注册,一旦取消则密钥永远无效
  64. key.cancel();
  65. } else {
  66. split(buf);
  67. // 如果 buffer 中满了则要扩容 ByteBuffer
  68. if (buf.position() == buf.limit()) {
  69. var newBuf = ByteBuffer.allocate(buf.capacity()*2);
  70. // 为新通道写入序列准备好缓冲区,同时将位置设置为0, 即读模式,与之相对的还有 写模式 ,即 clear 和 compact 方法,前者重置位置,后者保留位置
  71. buf.flip();
  72. newBuf.put(buf);
  73. // 使用 SelectionKey 的 attach 方法可以将给定对象附加到当前键,之后可以使用 attachment 方法检索附加对象
  74. // 需要注意的是一次只能附加一个 Object,并且调用这个方法会到之丢弃任何以前的附件,如果为 null 则表示丢弃当前附件
  75. key.attach(newBuf);
  76. }
  77. }
  78. System.out.println(Charset.defaultCharset().decode(buf));
  79. } catch (IOException e) {
  80. key.cancel();
  81. e.printStackTrace();
  82. }
  83. }
  84. // 处理完毕需要将事件移除
  85. iter.remove();
  86. }
  87. }
  88. }
  89. static void split(ByteBuffer buffer) {
  90. buffer.flip();
  91. for (int i = 0; i< buffer.limit(); i++) {
  92. if (buffer.get(i) == '\n') {
  93. var len = i + 1 - buffer.position();
  94. var target = ByteBuffer.allocate(len);
  95. for (int j = 0; j < len; j++) {
  96. target.put(buffer.get());
  97. }
  98. debugAll(target);
  99. }
  100. }
  101. buffer.compact();
  102. }
  103. }

客户端

  1. package cn.inetty.nio.selector2;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.channels.SocketChannel;
  5. import java.nio.charset.Charset;
  6. public class Client1 {
  7. public static void main(String[] args) throws IOException {
  8. SocketChannel open = SocketChannel.open();
  9. open.connect(new InetSocketAddress("localhost", 8888));
  10. // open.write(Charset.defaultCharset().encode("hello!"));
  11. open.write(Charset.defaultCharset().encode("0123456789abcdef"));
  12. open.write(Charset.defaultCharset().encode("0123456789abcdef111\n"));
  13. System.in.read();
  14. System.out.println("wait");
  15. }
  16. }