7.1 详解粘包和拆包
7.1.1 半包问题的实战案例
package com.crazymakercircle.netty.echoServer;
//…
public class NettyDumpSendClient {
private int serverPort;
private String serverIp;
Bootstrap b = new Bootstrap();
public NettyDumpSendClient(String ip, int port) {
this.serverPort = port;
this.serverIp = ip;
}
public void runClient() {
//创建反应器线程组
//省略启动客户端Bootstrap引导类配置和启动
//阻塞,直到连接完成
f.sync();
Channel channel = f.channel();
//发送大量的文字
String content= "疯狂创客圈:高性能学习者社群!";
byte[] bytes =content.getBytes(Charset.forName("utf-8"));
for (int i = 0; i< 1000; i++) {
//发送ByteBuf
ByteBuf buffer = channel.alloc().buffer();
buffer.writeBytes(bytes);
channel.writeAndFlush(buffer);
}
//省略优雅关闭客户端
}
public static void main(String[] args) throws InterruptedException {
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
String ip = NettyDemoConfig.SOCKET_SERVER_IP;
new NettyDumpSendClient(ip, port).runClient();
}
}
7.1.2 什么是半包问题
- 粘包:接收端(Receiver)收到一个ByteBuf,包含了发送端(Sender)的多个ByteBuf,发送端的多个ByteBuf在接收端“粘”在了一起
- 半包:Receiver将Sender的一个ByteBuf“拆”开了收,收到多个破碎的包。换句话说,Receiver收到了Sender的一个ByteBuf的一小部分。
7.2 使用JSON协议通信
JSON(JavaScript Object Notation,JS对象简谱)是一种轻量级的数据交换格式。它是基于ECMAScript(欧洲计算机协会制定的JS规范)的一个子集,采用完全独立于编程语言的文本格式来存储和表示数据。简洁和清晰的层次结构使得JSON成为理想的数据交换语言。
JSON协议是一种文本协议,易于人阅读和编写,同时也易于机器解析和生成,并能有效地提升网络传输效率。
7.2.1 JSON的核心优势
XML是一种常用的文本协议,和JSON一样都使用结构化方法来标记数据。和XML相比,JSON作为数据包格式传输的时候具有更高的效率。这是因为JSON不像XML那样需要有严格的闭合标签,让有效数据量与总数据包比大大提升,从而在同等数据流量的情况下减少了网络的传输压力。
7.2.2 JSON序列化与反序列化开源库
Java处理JSON数据有三个比较流行的开源类库:阿里巴巴的FastJson、谷歌的Gson和开源社区的Jackson。
在实际开发中,目前主流的策略是Gson和FastJson结合使用。在POJO序列化成JSON字符串的应用场景下,使用谷歌的Gson库;在JSON字符串反序列化成POJO的应用场景下,使用阿里巴巴的FastJson库。
package com.crazymakercircle.util;
//省略import
public class JsonUtil {
//谷歌GsonBuilder构造器
static GsonBuilder gb = new GsonBuilder();
static {
//不需要html escape
gb.disableHtmlEscaping();
}
//序列化:使用Gson将 POJO 转成字符串
public static String pojoToJson(java.lang.Object obj) {
String json = gb.create().toJson(obj);
return json;
}
//反序列化:使用Fastjson将字符串转成 POJO对象
public static <T> T jsonToPojo(String json, Class<T>tClass) {
T t = JSONObject.parseObject(json, tClass);
return t;
}
}
7.2.3 JSON序列化与反序列化的实战案例
首先定义一个POJO类,名称为JsonMsg,包含id和content两个属性,然后使用lombok开源库的@Data注解为属性加上getter()和setter()方法。POJO类的源码如下:
@Data
public class JsonMsg {
private int id;
private String content;
public String convertToJson() {
return JsonUtil.pojoToJson(this);
}
public static JsonMsg parseFromJson(String json) {
return JsonUtil.jsonToPojo(json, JsonMsg.class);
}
}
使用POJO类JsonMsg的序列化、反序列化的实战案例代码如下:
public class JsonMsgDemo {
public JsonMsg buildMsg() {
JsonMsg user = new JsonMsg();
user.setId(1000);
user.setContent("弯弯入我心,秋凉知我意!");
return user;
}
@Test
public void serAndDesr() {
JsonMsg message = buildMsg();
String json = message.convertToJson();
Logger.info("json:=" + json);
JsonMsg msg = JsonMsg.parseFromJson(json);
Logger.info("id:=" + msg.getId());
Logger.info("content:=" + msg.getContent());
}
}
[main|JsonMsgDemo.serAndDesr] |> json:={"id":1000,"content":"弯弯入我心,秋凉知我意!"}
[main|JsonMsgDemo.serAndDesr] |> id:=1000
[main|JsonMsgDemo.serAndDesr] |> content:=弯弯入我心,秋凉知我意!
7.2.4 JSON传输的编码器和解码器
Head-Content数据包的解码过程
Head-Content数据包的编码过程
Netty内置LengthFieldPrepender编码器的作用是在数据包的前面加上内容的二进制字节数组的长度。这个编码器和LengthFieldBasedFrameDecoder解码器是天生的一对,常常配套使用。这组“天仙配”属于Netty所提供的一组非常重要的编码器和解码器,常常用于Head-Content数据包的传输。
//构造器一
public LengthFieldPrepender(int lengthFieldLength) {
this(lengthFieldLength, false);
}
//构造器二
public LengthFieldPrepender(int lengthFieldLength, Boolean lengthIncludesLengthFieldLength)
{
this(lengthFieldLength, 0, lengthIncludesLengthFieldLength);
}
//省略其他的构造器
在上面的构造器中,第一个参数lengthFieldLength表示Head长度字段所占用的字节数,第二个参数lengthIncludesLengthFieldLength表示Head字段的总长度值是否包含长度字段自身的字节数,如果该参数的值为true,表示长度字段的值(总长度)包含了自己的字节数。如果该参数的值为false,表示长度值只包含内容的二进制数据的长度。lengthIncludesLengthFieldLength值一般设置为false。
7.2.5 JSON传输的服务端的实战案例
服务端的程序仅仅读取客户端数据包并完成解码,服务端的程序没有写出任何输出数据包到对端(客户端)
package com.crazymakercircle.netty.protocol;
//…
public class JsonServer {
//省略成员属性、构造器
public void runServer() {
//创建反应器线程组
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//省略引导类的反应器线程、设置配置项等
//5 装配子通道流水线
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有连接到达时会创建一个通道
protected void initChannel(SocketChannel ch)…{
//管理子通道中的Handler
//向子通道流水线添加3个Handler
ch.pipeline().addLast(
new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
ch.pipeline().addLast(new
StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new JsonMsgDecoder());
}
});
//省略端口绑定、服务监听、优雅关闭
}
//服务端业务处理器
static class JsonMsgDecoderextends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)…{
String json = (String) msg;
JsonMsg jsonMsg = JsonMsg.parseFromJson(json);
Logger.info("收到一个 Json 数据包 =>>" + jsonMsg);
}
}
public static void main(String[] args) throws InterruptedException {
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
new JsonServer(port).runServer();
}
}
7.2.6 JSON传输的客户端的实战案例
- 通过谷歌的Gson框架,将POJO序列化成JSON字符串。
- 使用StringEncoder编码器(Netty内置)将JSON字符串编码成二进制字节数组。
使用LengthFieldPrepender编码器(Netty内置)将二进制字节数组编码成Head-Content格式的二进制数据包。 ```java package com.crazymakercircle.netty.protocol; //… public class JsonSendClient { static String content = “疯狂创客圈:高性能学习社群!”; //省略成员属性、构造器 public void runClient() {
//创建反应器线程组
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//省略引导类的反应器线程、设置配置项等
//5 装配通道流水线
b.handler(new ChannelInitializer<SocketChannel>() {
//初始化客户端通道
protected void initChannel(SocketChannel ch) …{
//客户端通道流水线添加2个Handler
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new
StringEncoder(CharsetUtil.UTF_8));
}
});
ChannelFuture f = b.connect();
//…
//阻塞,直到连接完成
f.sync();
Channel channel = f.channel();
//发送 JSON 字符串对象
for (int i = 0; i< 1000; i++) {
JsonMsg user = build(i, i + "->" + content);
<a name="tMapu"></a>
## 7.3 使用Protobuf协议通信
Protobuf(Protocol Buffer)是Google提出的一种数据交换格式,是一套类似JSON或者XML的数据传输格式和规范,用于不同应用或进程之间的通信。Protobuf具有以下特点:
1. 语言无关,平台无关
1. 高效
1. 扩展性、兼容性好
<a name="Ui25f"></a>
## 7.3.1 一个简单的proto文件的实战案例
下面介绍一个非常简单的proto文件:仅仅定义一个消息结构体,并且该消息结构体也非常简单,仅包含两个字段。实例如下:
```java
//[开始头部声明]
syntax = "proto3";
package com.crazymakercircle.netty.protocol;
//[结束头部声明]
//[开始 Java选项配置]
option java_package = "com.crazymakercircle.netty.protocol";
option java_outer_classname = "MsgProtos";
//[结束 Java选项配置]
//[开始消息定义]
message Msg {
uint32 id = 1; //消息ID
string content = 2; //消息内容
}
//[结束消息定义]
7.3.2 通过控制台命令生成POJO和Builder
7.3.3 通过Maven插件生成POJO和Builder
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<extensions>true</extensions>
<configuration>
<!--proto文件路径-->
<protoSourceRoot>${project.basedir}/protobuf</protoSourceRoot>
<!--目标路径-->
<outputDirectory>${project.build.sourceDirectory}</outputDirectory>
<!--设置是否在生成java文件之前清空outputDirectory的文件-->
<clearOutputDirectory>false</clearOutputDirectory>
<!--临时目录-->
<temporaryProtoFileDirectory>${project.build.directory}/protoc-temp</temporaryProtoFileDirectory>
<!--protoc 可执行文件路径-->
<protocExecutable>${project.basedir}/protobuf/protoc3.6.1.exe</protocExecutable>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
</plugin>
7.3.4 Protobuf序列化与反序列化的实战案例
在Maven的pom.xml文件中加上protobuf的Java运行包的依赖,代码如下
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>
- 使用Builder构造POJO消息对象
```java
package com.crazymakercircle.netty.protocol;
//…
public class ProtobufDemo {
public static MsgProtos.Msg buildMsg() {
} //… }MsgProtos.Msg.Builder personBuilder = MsgProtos.Msg.newBuilder();
personBuilder.setId(1000);
personBuilder.setContent("疯狂创客圈:高性能学习社群");
MsgProtos.Msg message = personBuilder.build();
return message;
2. 序列化与反序列化的方式一
方式一为调用Protobuf POJO对象的toByteArray()方法将POJO对象序列化成字节数组,具体的代码如下:
```java
package com.crazymakercircle.netty.protocol;
//…
public class ProtobufDemo {
//第1种方式:序列化与反序列化
@Test
public void serAndDesr1() throws IOException {
MsgProtos.Msg message = buildMsg();
//将Protobuf对象序列化成二进制字节数组
byte[] data = message.toByteArray();
//可以用于网络传输,保存到内存或外存
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
outputStream.write(data);
data = outputStream.toByteArray();
//二进制字节数组反序列化成Protobuf对象
MsgProtos.Msg inMsg = MsgProtos.Msg.parseFrom(data);
Logger.info("id:=" + inMsg.getId());
Logger.info("content:=" + inMsg.getContent());
}
//…
}
序列化与反序列化的方式二 ```java package com.crazymakercircle.netty.protocol; //… public class ProtobufDemo {
//…
//第2种方式:序列化与反序列化 @Test public void serAndDesr2() throws IOException {
MsgProtos.Msg message = buildMsg();
//序列化到二进制码流
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
message.writeTo(outputStream);
ByteArrayInputStream inputStream =
new ByteArrayInputStream(outputStream.toByteArray());
//从二进码流反序列化成Protobuf对象
MsgProtos.Msg inMsg = MsgProtos.Msg.parseFrom(inputStream);
Logger.info("id:=" + inMsg.getId());
Logger.info("content:=" + inMsg.getContent());
} }
4. 序列化与反序列化的方式三
```java
package com.crazymakercircle.netty.protocol;
//…
public class ProtobufDemo {
//…
//第3种方式:序列化与反序列化
//带字节长度:[字节长度][字节数据],用于解决粘包/半包问题
@Test
public void serAndDesr3() throws IOException {
MsgProtos.Msg message = buildMsg();
//序列化到二进制码流
ByteArrayOutputStream outputStream =
new ByteArrayOutputStream();
message.writeDelimitedTo(outputStream);
ByteArrayInputStream inputStream =
new ByteArrayInputStream(outputStream.toByteArray());
//从二进制码字节流反序列化成Protobuf对象
MsgProtos.Msg inMsg =
MsgProtos.Msg.parseDelimitedFrom(inputStream);
Logger.info("id:=" + inMsg.getId());
Logger.info("content:=" + inMsg.getContent());
}
}
反序列化时,调用Protobuf生成的POJO类的parseDelimitedFrom(InputStream)静态方法,从输入流中先读取varint32类型的长度值,然后根据长度值读取此消息的二进制字节,再反序列化得到POJO新的实例。
7.4 Protobuf编解码的实战案例
Netty默认支持Protobuf的编码与解码,内置了一套基础的Protobuf编码和解码器。
7.4.1 Netty内置的Protobuf基础编码器/解码器
- ProtobufEncoder编码器
直接调用了Protobuf POJO实例的toByteArray()方法将自身编码成二进制字节,然后放入Netty的ByteBuf缓冲区中,接着会被发送到下一站编码器。
package io.netty.handler.codec.protobuf;
…
@Sharable
public class ProtobufEncoder extends MessageToMessageEncoder<MessageLiteOrBuilder> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
if (msg instanceof MessageLite) {
out.add(Unpooled.wrappedBuffer( ((MessageLite) msg).toByteArray()));
return;
}
if (msg instanceof MessageLite.Builder) {
out.add(Unpooled.wrappedBuffer(( (MessageLite.Builder) msg).build().toByteArray()));
}
}
}
- ProtobufDecoder解码器
ProtobufDecoder和ProtobufEncoder相互对应,只不过在使用的时候ProtobufDecoder解码器需要指定一个Protobuf POJO实例作为解码的参考原型(prototype),解码时会根据原型实例找到对应的Parser解析器,将二进制的字节解码为Protobuf POJO实例。
- ProtobufVarint32LengthFieldPrepender长度编码器
这个编码器的作用是在ProtobufEncoder生成的字节数组之前前置一个varint32数字,表示序列化的二进制字节数量或者长度。
ProtobufVarint32FrameDecoder长度解码器
7.4.2 Protobuf传输的服务端的实战案例
服务端的实战案例程序代码如下
package com.crazymakercircle.netty.protocol;
//…
public class ProtoBufServer
{
//省略成员属性、构造器
public void runServer()
{
//创建反应器线程组
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try
{
//省略引导类的反应器线程、设置配置项
//5 装配子通道流水线
b.childHandler(new ChannelInitializer<SocketChannel>()
{
//有连接到达时会创建一个通道
protected void initChannel(SocketChannel ch) …
{
//流水线管理子通道中的Handler业务处理器
//向子通道流水线添加3个Handler业务处理器
ch.pipeline().addLast(
new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(
new ProtobufDecoder(MsgProtos.Msg.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufBussinessDecoder());
}
});
//省略端口绑定、服务监听、优雅关闭
}
//服务端的Protobuf业务处理器
static class ProtobufBussinessDecoder
extends ChannelInboundHandlerAdapter
{
@Override
public void channelRead(
ChannelHandlerContext ctx, Object msg) … {
MsgProtos.Msg protoMsg = (MsgProtos.Msg) msg;
//经过流水线的各个解码器取得了POJO实例
Logger.info("收到一个ProtobufPOJO =>>");
Logger.info("protoMsg.getId():=" + protoMsg.getId());
Logger.info("protoMsg.getContent():=" +
protoMsg.getContent());
}
}
}
public static void main(String[] args) throws InterruptedException
{
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
new ProtoBufServer(port).runServer();
}
}
7.4.3 Protobuf传输的客户端的实战案例
使用Netty内置的ProtobufEncoder将Protobuf POJO对象编码成二进制的字节数组。
- 使用Netty内置的ProtobufVarint32LengthFieldPrepender编码器,加上varint32格式的可变长度。Netty会将完成了编码后的Length+Content格式的二进制字节码发送到服务端。
package com.crazymakercircle.netty.protocol;
//…
public class ProtoBufSendClient {
static String content = "疯狂创客圈:高性能学习社群!";
//省略成员属性、构造器
public void runClient() {
//创建反应器线程组
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//省略反应器组、IO通道、通道参数等设置
//5 装配通道流水线
b.handler(new ChannelInitializer<SocketChannel>() {
//初始化客户端通道
protected void initChannel(SocketChannel ch) …{
//客户端流水线添加2个Handler业务处理器
ch.pipeline().addLast(
new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
}
});
ChannelFuture f = b.connect();
//…
//阻塞,直到连接完成
f.sync();
Channel channel = f.channel();
//发送Protobuf对象
for (int i = 0; i< 1000; i++) {
MsgProtos.Msg user = build(i, i + "->" + content);
channel.writeAndFlush(user);
Logger.info("发送报文数:" + i);
}
channel.flush();
//省略关闭等待、优雅关闭
}
//构建ProtoBuf对象
public MsgProtos.Msgbuild(int id, String content) {
MsgProtos.Msg.Builder builder = MsgProtos.Msg.newBuilder();
builder.setId(id);
builder.setContent(content);
return builder.build();
}
public static void main(String[] args) throws InterruptedException {
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
String ip = NettyDemoConfig.SOCKET_SERVER_IP;
new ProtoBufSendClient(ip, port).runClient();
}
}
7.5 详解Protobuf协议语法
在Protobuf中,通信协议的格式是通过proto文件定义的。一个proto文件有两大组成部分:头部声明、消息结构体的定义。头部声明部分主要包含了协议的版本、包名、特定语言的选项设置等;消息结构体部分可以定义一个或者多个消息结构体。
7.5.1 proto文件的头部声明
//[开始声明]
syntax = "proto3";
//定义Protobuf的包名称空间
package com.crazymakercircle.netty.protocol;
//[结束声明]
//[开始 Java 选项配置]
option java_package = "com.crazymakercircle.netty.protocol";
option java_outer_classname = "MsgProtos";
//[结束 Java 选项配置]
- syntax版本号
package包
和java类似,通过package指定包名,用来避免消息名字相冲突,如果两个消息名称相同,但package包名不同,那么可以共存 在java中,会以package指定的包名作为生成的pojo类的包名
option配置选项
与proto文件使用的一些特定语言场景有关,在java中以
java_
开头的option选项会生效 选项option java_package
表示protobuf编译器在生成java pojo消息类时,生成在此选项所配置的java包名下,如果没有该选项,则会以头部声明的package作为java包名 选项option java_multiple_file
表示生成java类时的打包方式- 一个消息对应一个独立的java类
- 所有的消息都作为内部类,打包到一个外部类中(默认)
选项
option java_outer_classname
7.5.2 Protobuf的消息结构体与消息字段
//[开始消息定义]
message Msg {
uint32 id = 1; //消息ID
string content = 2; //消息内容
}
//[结束消息定义]