处理消息的边界

  • 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
  • 另一种思路是按分隔符拆分,缺点是效率低
  • TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式
      ```java private static void split(ByteBuffer source) { source.flip(); for (int i = 0; i < source.limit(); i++) { // 找到一条完整消息 if (source.get(i) == ‘\n’) {
      1. int length = i + 1 - source.position();
      2. // 把这条完整消息存入新的 ByteBuffer
      3. ByteBuffer target = ByteBuffer.allocate(length);
      4. // 从 source 读,向 target 写
      5. for (int j = 0; j < length; j++) {
      6. target.put(source.get());
      7. }
      8. debugAll(target);
      } } source.compact(); // 0123456789abcdef position 16 limit 16 }

public static void main(String[] args) throws IOException { // 1. 创建 selector, 管理多个 channel Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // 2. 建立 selector 和 channel 的联系(注册) // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件 SelectionKey sscKey = ssc.register(selector, 0, null); // key 只关注 accept 事件 sscKey.interestOps(SelectionKey.OP_ACCEPT); log.debug(“sscKey:{}”, sscKey); ssc.bind(new InetSocketAddress(8080)); while (true) { // 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行 // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理 selector.select(); // 4. 处理事件, selectedKeys 内部包含了所有发生的事件 Iterator iter = selector.selectedKeys().iterator(); // accept, read while (iter.hasNext()) { SelectionKey key = iter.next(); // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题 iter.remove(); log.debug(“key: {}”, key); // 5. 区分事件类型 if (key.isAcceptable()) { // 如果是 accept ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(16); // attachment // 将一个 byteBuffer 作为附件关联到 selectionKey 上 SelectionKey scKey = sc.register(selector, 0, buffer); scKey.interestOps(SelectionKey.OP_READ); log.debug(“{}”, sc); log.debug(“scKey:{}”, scKey); } else if (key.isReadable()) { // 如果是 read try { SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel // 获取 selectionKey 上关联的附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1 if(read == -1) { key.cancel(); } else { split(buffer); // 需要扩容 if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); buffer.flip(); newBuffer.put(buffer); // 0123456789abcdef3333\n key.attach(newBuffer); } }

  1. } catch (IOException e) {
  2. e.printStackTrace();
  3. key.cancel(); // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
  4. }
  5. }
  6. }
  7. }

} ```