• 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
    • 另一种思路是按分隔符拆分,缺点是效率低
    • TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
      • Http 1.1 是 TLV 格式
      • Http 2.0 是 LTV 格式

    ByteBuffer 大小分配

    • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
    • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
      • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
      • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

    服务器端

    1. private static void split(ByteBuffer source) {
    2. source.flip();
    3. for (int i = 0; i < source.limit(); i++) {
    4. // 找到一条完整消息
    5. if (source.get(i) == '\n') {
    6. int length = i + 1 - source.position();
    7. // 把这条完整消息存入新的 ByteBuffer
    8. ByteBuffer target = ByteBuffer.allocate(length);
    9. // 从 source 读,向 target 写
    10. for (int j = 0; j < length; j++) {
    11. target.put(source.get());
    12. }
    13. debugAll(target);
    14. }
    15. }
    16. source.compact(); // 0123456789abcdef position 16 limit 16
    17. }
    18. public static void main(String[] args) throws IOException {
    19. // 1. 创建 selector, 管理多个 channel
    20. Selector selector = Selector.open();
    21. ServerSocketChannel ssc = ServerSocketChannel.open();
    22. ssc.configureBlocking(false);
    23. // 2. 建立 selector 和 channel 的联系(注册)
    24. // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
    25. SelectionKey sscKey = ssc.register(selector, 0, null);
    26. // key 只关注 accept 事件
    27. sscKey.interestOps(SelectionKey.OP_ACCEPT);
    28. log.debug("sscKey:{}", sscKey);
    29. ssc.bind(new InetSocketAddress(8080));
    30. while (true) {
    31. // 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
    32. // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
    33. selector.select();
    34. // 4. 处理事件, selectedKeys 内部包含了所有发生的事件
    35. Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
    36. while (iter.hasNext()) {
    37. SelectionKey key = iter.next();
    38. // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
    39. iter.remove();
    40. log.debug("key: {}", key);
    41. // 5. 区分事件类型
    42. if (key.isAcceptable()) { // 如果是 accept
    43. ServerSocketChannel channel = (ServerSocketChannel) key.channel();
    44. SocketChannel sc = channel.accept();
    45. sc.configureBlocking(false);
    46. ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
    47. // 将一个 byteBuffer 作为附件关联到 selectionKey 上
    48. SelectionKey scKey = sc.register(selector, 0, buffer);
    49. scKey.interestOps(SelectionKey.OP_READ);
    50. log.debug("{}", sc);
    51. log.debug("scKey:{}", scKey);
    52. } else if (key.isReadable()) { // 如果是 read
    53. try {
    54. SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
    55. // 获取 selectionKey 上关联的附件
    56. ByteBuffer buffer = (ByteBuffer) key.attachment();
    57. int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
    58. if(read == -1) {
    59. key.cancel();
    60. } else {
    61. split(buffer);
    62. // 需要扩容
    63. if (buffer.position() == buffer.limit()) {
    64. ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
    65. buffer.flip();
    66. newBuffer.put(buffer); // 0123456789abcdef3333\n
    67. key.attach(newBuffer);
    68. }
    69. }
    70. } catch (IOException e) {
    71. e.printStackTrace();
    72. key.cancel(); // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
    73. }
    74. }
    75. }
    76. }
    77. }

    客户端

    1. SocketChannel sc = SocketChannel.open();
    2. sc.connect(new InetSocketAddress("localhost", 8080));
    3. SocketAddress address = sc.getLocalAddress();
    4. // sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
    5. sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
    6. sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
    7. System.in.read();