纸上得来终觉浅, 我们来用Netty自己实现一个自定义协议的读写

1.数据帧

1.1 什么是半包粘包

假如现在有一段数据

  1. {"a":1,"b":2}
  2. {"c":3,"d":4}

我们接收到之后, 怎么分辨它是两条信息呢?
五.Netty实现自定义应用层协议 - 图1

  • 粘包的主要原因:
  1. 发送方写入数据 < 缓冲区大小
  2. 接收方读取缓冲区数据不及时
  • 半包的主要原因:
  1. 发送方写入数据 > 缓冲区大小
  2. 发送的数据大于协议的MTU(Maximum Transmission Unit, 最大传输单元), 必须拆包;(Ethernet v2 1500 字节)(IPv4 68字节~64KiB)(IPv6 1280字节~64KiB)

五.Netty实现自定义应用层协议 - 图2

  • Netty提供了一些方案去快速的解决这些问题, 我们后面再讨论

五.Netty实现自定义应用层协议 - 图3

1.2 定义我们的协议格式

为了理解Netty, 我们需要自定义一个协议格式去实现编码解码等代码

https://gitee.com/spitman/learnnetty/blob/master/src/main/java/org/zyj/io/protocal/Message.java

  • 我们按照二进制协议的方式去设计自定义的协议

五.Netty实现自定义应用层协议 - 图4
假设我们的协议格式如上例所示

  • length: 占用4字节, 表示body的长度
  • version: 占用4字节, 表示协议的版本号
  • opCode: 占用4字节, 表示操作类型
  • streamId: 占用4字节, 用于幂等处理
  • MessageBody: 是一个JSON字符串, utf-8编码

五.Netty实现自定义应用层协议 - 图5

2.编码

https://gitee.com/spitman/learnnetty/blob/master/src/main/java/org/zyj/io/protocal/custom/MessageEncoder.java

  • 此例中是将Java对象, 转换成二进制报文

下面是一个非常粗糙的实现, 有很多bug, 不要纠结于实现, 关键是理解pipeline中数据的流转

  1. public class MessageEncoder extends ChannelOutboundHandlerAdapter {
  2. @Override
  3. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  4. ByteBuf buf = ctx.alloc().ioBuffer();
  5. try {
  6. if (msg instanceof Message) {
  7. Message myMessage = (Message) msg;
  8. encode(ctx, myMessage, buf);
  9. ctx.write(buf, promise);
  10. }
  11. } finally {
  12. if (buf != null) {
  13. buf.release();
  14. }
  15. }
  16. }
  17. private void encode(ChannelHandlerContext ctx, Message myMessage, ByteBuf buf) {
  18. //1. 计算请求体body长度
  19. int length = myMessage.getBody().length;
  20. //2. 按照协议规则写入报文
  21. buf.writeInt(length);
  22. buf.writeInt(myMessage.getVersion());
  23. buf.writeInt(myMessage.getOpCode());
  24. buf.writeInt(myMessage.getStreamId());
  25. buf.writeBytes(myMessage.getBody());
  26. }
  27. }

3.解码

https://gitee.com/spitman/learnnetty/blob/master/src/main/java/org/zyj/io/protocal/custom/MessageDecoder.java

  • 此例中是将二进制报文, 转换成Java对象
  • 注意! 这里并没有解决半包和粘包问题

    1. /**
    2. * 此代码没考虑粘包半包问题, 但不影响我们学习pipeline中数据流转的规则
    3. */
    4. public class MessageDecoder extends ChannelInboundHandlerAdapter {
    5. @Override
    6. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    7. ByteBuf receiveBuf = (ByteBuf) msg;
    8. int length = receiveBuf.readInt();
    9. int version = receiveBuf.readInt();
    10. int opCode = receiveBuf.readInt();
    11. int streamId = receiveBuf.readInt();
    12. ByteBuf bodyBuf = receiveBuf.readBytes(length);
    13. byte[] body = bodyBuf.array();
    14. //组装message对象
    15. Message message = new Message();
    16. message.setVersion(version);
    17. message.setOpCode(opCode);
    18. message.setStreamId(streamId);
    19. message.setBody(body);
    20. ctx.fireChannelRead(message);
    21. }
    22. }

    4.Netty针对自定义协议的处理方案

    4.1 编码器

    4.1.1 MessageToByteEncoder

  • 实际使用Netty的时候可以继承 io.netty.handler.codec.MessageToByteEncoder 实现Encoder功能

    1. public class MessageEncoder extends MessageToByteEncoder<Message> {
    2. @Override
    3. protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
    4. out.writeInt(msg.getBody().length);
    5. out.writeInt(msg.getVersion());
    6. out.writeInt(msg.getOpCode());
    7. out.writeInt(msg.getStreamId());
    8. out.writeBytes(msg.getBody());
    9. }
    10. }

    4.2 解码器

    4.2.1 FixedLengthFrameDecoder

  • 处理固定长度的报文

    4.2.2 解码器 LineBasedFrameDecoder

  • 按行分割的报文

    4.2.3 ReplayingDecoder

    https://gitee.com/spitman/learnnetty/blob/master/src/main/java/org/zyj/io/protocal/replaying/ReplayingMessageDecoder.java

使用ReplayingDecoder可以很优雅的解决半包粘包问题

  • 实际使用Netty的时候可以继承 io.netty.handler.codec.ReplayingDecoder 实现Decoder的功能

ReplayingDecoder 允许让你实现decode()方法,
在decode()中, 就像已经接收到所有所需的字节,而不用去检查所需字节的可用性。

4.2.4 LengthFieldBasedFrameDecoder

https://gitee.com/spitman/learnnetty/blob/master/src/main/java/org/zyj/io/protocal/length_field_based_frame/LengthFieldBasedFrameMessageDecoder.java

  • 如果你的协议格式更加复杂, 可以用它专门处理半包和粘包问题
  • 处理完粘包和半包问题后, 会把数据交给子类的 decode() 函数处理