纸上得来终觉浅, 我们来用Netty自己实现一个自定义协议的读写
1.数据帧
1.1 什么是半包粘包
假如现在有一段数据
{"a":1,"b":2}
{"c":3,"d":4}
我们接收到之后, 怎么分辨它是两条信息呢?
- 粘包的主要原因:
- 发送方写入数据 < 缓冲区大小
- 接收方读取缓冲区数据不及时
- 半包的主要原因:
- 发送方写入数据 > 缓冲区大小
- 发送的数据大于协议的MTU(Maximum Transmission Unit, 最大传输单元), 必须拆包;(Ethernet v2 1500 字节)(IPv4 68字节~64KiB)(IPv6 1280字节~64KiB)
- Netty提供了一些方案去快速的解决这些问题, 我们后面再讨论
1.2 定义我们的协议格式
为了理解Netty, 我们需要自定义一个协议格式去实现编码解码等代码
https://gitee.com/spitman/learnnetty/blob/master/src/main/java/org/zyj/io/protocal/Message.java
- 我们按照二进制协议的方式去设计自定义的协议
假设我们的协议格式如上例所示
- length: 占用4字节, 表示body的长度
- version: 占用4字节, 表示协议的版本号
- opCode: 占用4字节, 表示操作类型
- streamId: 占用4字节, 用于幂等处理
- MessageBody: 是一个JSON字符串, utf-8编码
2.编码
- 此例中是将Java对象, 转换成二进制报文
下面是一个非常粗糙的实现, 有很多bug, 不要纠结于实现, 关键是理解pipeline中数据的流转
public class MessageEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = ctx.alloc().ioBuffer();
try {
if (msg instanceof Message) {
Message myMessage = (Message) msg;
encode(ctx, myMessage, buf);
ctx.write(buf, promise);
}
} finally {
if (buf != null) {
buf.release();
}
}
}
private void encode(ChannelHandlerContext ctx, Message myMessage, ByteBuf buf) {
//1. 计算请求体body长度
int length = myMessage.getBody().length;
//2. 按照协议规则写入报文
buf.writeInt(length);
buf.writeInt(myMessage.getVersion());
buf.writeInt(myMessage.getOpCode());
buf.writeInt(myMessage.getStreamId());
buf.writeBytes(myMessage.getBody());
}
}
3.解码
- 此例中是将二进制报文, 转换成Java对象
注意! 这里并没有解决半包和粘包问题
/**
* 此代码没考虑粘包半包问题, 但不影响我们学习pipeline中数据流转的规则
*/
public class MessageDecoder extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf receiveBuf = (ByteBuf) msg;
int length = receiveBuf.readInt();
int version = receiveBuf.readInt();
int opCode = receiveBuf.readInt();
int streamId = receiveBuf.readInt();
ByteBuf bodyBuf = receiveBuf.readBytes(length);
byte[] body = bodyBuf.array();
//组装message对象
Message message = new Message();
message.setVersion(version);
message.setOpCode(opCode);
message.setStreamId(streamId);
message.setBody(body);
ctx.fireChannelRead(message);
}
}
4.Netty针对自定义协议的处理方案
4.1 编码器
4.1.1 MessageToByteEncoder
实际使用Netty的时候可以继承
io.netty.handler.codec.MessageToByteEncoder
实现Encoder功能public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
out.writeInt(msg.getBody().length);
out.writeInt(msg.getVersion());
out.writeInt(msg.getOpCode());
out.writeInt(msg.getStreamId());
out.writeBytes(msg.getBody());
}
}
4.2 解码器
4.2.1 FixedLengthFrameDecoder
-
4.2.2 解码器 LineBasedFrameDecoder
-
4.2.3 ReplayingDecoder
使用ReplayingDecoder可以很优雅的解决半包粘包问题
- 实际使用Netty的时候可以继承 io.netty.handler.codec.ReplayingDecoder 实现Decoder的功能
ReplayingDecoder 允许让你实现decode()方法,
在decode()中, 就像已经接收到所有所需的字节,而不用去检查所需字节的可用性。
4.2.4 LengthFieldBasedFrameDecoder
- 如果你的协议格式更加复杂, 可以用它专门处理半包和粘包问题
- 处理完粘包和半包问题后, 会把数据交给子类的 decode() 函数处理