1. 粘包与半包
1.1 粘包现象
客户端一共发送了 10 条消息,每条消息是 10 个字节,而服务端只接收了 1 次
@Slf4j
public class StickyPackageNettyServe {
public static void main(String[] args) {
NioEventLoopGroup boos = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new ServerBootstrap()
.group(boos, work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.bind(9527);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
boos.shutdownGracefully();
work.shutdownGracefully();
}
}
}
@Slf4j
public class StickyPackageNettyClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 客户端发送了 10 次数据, 希望服务端也是接收 10 次数据
for (int i = 0; i < 10; i++) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
ctx.writeAndFlush(buf);
}
// 发送完成后关闭通道
ctx.close();
}
});
}
})
.connect("127.0.0.1", 9527);
channelFuture.sync();
Channel channel = channelFuture.channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
17:41:28.529 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x4804e381, L:/127.0.0.1:9527 - R:/127.0.0.1:56867] READ: 100B
1.2 半包现象
客户端发送一条消息,这个消息是 1100 字节,但是服务端接收了两次(1024B + 76B)
@Slf4j
public class HalfPackageNettyServe {
public static void main(String[] args) {
NioEventLoopGroup boos = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new ServerBootstrap()
.group(boos, work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.bind(9527);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
boos.shutdownGracefully();
work.shutdownGracefully();
}
}
}
@Slf4j
public class HalfPackageNettyClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 110; i++) {
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
}
// 客户端发送一次数据, 希望服务端也接收一次数据
ctx.writeAndFlush(buffer);
// 发送完成后关闭通道
ctx.close();
}
});
}
})
.connect("127.0.0.1", 9527);
channelFuture.sync();
Channel channel = channelFuture.channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
17:44:05.366 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x239806ac, L:/127.0.0.1:9527 - R:/127.0.0.1:57352] READ: 1024B
17:44:05.369 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x239806ac, L:/127.0.0.1:9527 - R:/127.0.0.1:57352] READ: 76B
1.3 现象分析
1.3.1 粘包现象
现象表现是发送 abc def,接收 abcdef
原因分析
- 应用层:接收方 ByteBuf 设置太大(Netty 默认 1024)
- 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
- Nagle 算法:会造成粘包
1.3.2 半包
现象表现是发送 abcdef,接收 abc def
原因分析
- 应用层:接收方 ByteBuf 小于实际发送数据量
- 滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时放不下了,只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包
1.3.3 滑动窗口
TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长性能就越差
为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值
窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用图中深色的部分即要发送的数据,高亮的部分即窗口
- 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
- 如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动
- 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收
MSS 限制
- 链路层对一次能够发送的最大数据有限制,这个限制称之为 MTU(maximum transmission unit),不同的链路设备的 MTU 值也有所不同,例如
- 以太网的 MTU 是 1500
- FDDI(光纤分布式数据接口)的 MTU 是 4352
- 本地回环地址的 MTU 是 65535 - 本地测试不走网卡
- MSS 是最大段长度(maximum segment size),它是 MTU 刨去 tcp 头和 ip 头后剩余能够作为数据传输的字节数
- ipv4 tcp 头占用 20 bytes,ip 头占用 20 bytes,因此以太网 MSS 的值为 1500 - 40 = 1460
- TCP 在传递大量数据时,会按照 MSS 大小将数据进行分割发送
- MSS 的值在三次握手时通知对方自己 MSS 的值,然后在两者之间选择一个小值作为 MSS
Nagle 算法
- 即使发送一个字节,也需要加入 tcp 头和 ip 头,也就是总字节数会使用 41 bytes,非常不经济。因此为了提高网络利用率,tcp 希望尽可能发送足够大的数据,这就是 Nagle 算法产生的缘由
- 该算法是指发送端即使还有应该发送的数据,但如果这部分数据很少的话,则进行延迟发送
- 如果 SO_SNDBUF 的数据达到 MSS,则需要发送
- 如果 SO_SNDBUF 中含有 FIN(表示需要连接关闭)这时将剩余数据发送,再关闭
- 如果 TCP_NODELAY = true,则需要发送
- 已发送的数据都收到 ack 时,则需要发送
- 上述条件不满足,但发生超时(一般为 200ms)则需要发送
- 除上述情况,延迟发送
1.4 解决方案
短链接,发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界,缺点效率太低
每一条消息采用固定长度,缺点浪费空间
每一条消息采用分隔符,例如 \n,缺点需要转义
每一条消息分为 head 和 body,head 中包含 body 的长度
1.4.1 短连接
每次重新创建一个新的连接发送消息
半包用这种办法还是不好解决,因为接收方的缓冲区大小是有限的
public class ShortConnectionNettyServer {
public static void main(String[] args) {
NioEventLoopGroup boos = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new ServerBootstrap()
.group(boos, work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.bind(9527);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
boos.shutdownGracefully();
work.shutdownGracefully();
}
}
}
@Slf4j
public class ShortConnectionNettyClient {
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
sendMessage();
}
}
private static void sendMessage() {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
// 客户端发送一次数据, 希望服务端也接收一次数据
ctx.writeAndFlush(buffer);
// 发送完成之后关闭 channel
ctx.close();
}
});
}
})
.connect("127.0.0.1", 9527);
channelFuture.sync();
Channel channel = channelFuture.channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
1.4.2 固定长度
让所有数据包长度固定(假设长度为 8 字节),服务器端添加
FixedLengthFrameDecoder
ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
缺点是,数据包的大小不好把握
- 长度定的太大,浪费
- 长度定的太小,对某些数据包又显得不够
@Slf4j
public class FixedLengthNettyServer {
public static void main(String[] args) {
NioEventLoopGroup boos = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new ServerBootstrap()
.group(boos, work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(10));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.bind(9527);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
boos.shutdownGracefully();
work.shutdownGracefully();
}
}
}
@Slf4j
public class FixedLengthNettyClient {
public static void main(String[] args) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
}
// 客户端发送一次数据, 希望服务端也接收一次数据
ctx.writeAndFlush(buffer);
// 发送完成之后关闭 channel
ctx.close();
}
});
}
})
.connect("127.0.0.1", 9527);
channelFuture.sync();
Channel channel = channelFuture.channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
1.4.3 固定分隔符
服务端加入
LineBasedFrameDecoder
,默认以 \n 或 \r\n 作为分隔符,如果超出指定长度仍未出现分隔符,则抛出异常ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
客户端在每条消息之后,加入 \n 分隔符
@Slf4j
public class LineBasedNettyServer {
public static void main(String[] args) {
NioEventLoopGroup boos = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new ServerBootstrap()
.group(boos, work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.bind(9527);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
boos.shutdownGracefully();
work.shutdownGracefully();
}
}
}
@Slf4j
public class LineBasedNettyClient {
public static void main(String[] args) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, '\n'});
}
// 客户端发送一次数据, 希望服务端也接收一次数据
ctx.writeAndFlush(buffer);
// 发送完成之后关闭 channel
ctx.close();
}
});
}
})
.connect("127.0.0.1", 9527);
channelFuture.sync();
Channel channel = channelFuture.channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
1.4.4 预设长度
在发送消息前,先约定用定长字节表示接下来数据的长度
分别表示最大长度,长度偏移,长度占用字节,长度调整,剥离字节数
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 1, 0, 1));
@Slf4j
public class LengthFieldBasedNettyServer {
public static void main(String[] args) {
NioEventLoopGroup boos = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new ServerBootstrap()
.group(boos, work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(2048, 0, 4, 0, 0));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.bind(9527);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
boos.shutdownGracefully();
work.shutdownGracefully();
}
}
}
@Slf4j
public class LengthFieldBasedNettyClient {
public static void main(String[] args) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
int length = new Random().nextInt(200) + 1000;
log.info("客户端一共准备发送 {} 个字节", length);
char init = 'a';
ByteBuf buffer = ctx.alloc().buffer();
// 先写入 4 个字节的长度
buffer.writeInt(length);
for (int i = 0; i < length; i++) {
if (init == 'z') {
init = 'a';
}
buffer.writeByte(init++);
}
// 客户端发送一次数据, 希望服务端也接收一次数据
ctx.writeAndFlush(buffer);
// 发送完成之后关闭 channel
ctx.close();
}
});
}
})
.connect("127.0.0.1", 9527);
channelFuture.sync();
Channel channel = channelFuture.channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
// 客户端日志
18:35:31.216 [nioEventLoopGroup-2-1] INFO org.masteryourself.tutorial.netty.lengthfieldbased.LengthFieldBasedNettyClient - 客户端一共准备发送 1193 个字节
// 服务端日志,一共收到 1197B,前 4 个字节表示数据包长度
18:35:31.251 [nioEventLoopGroup-3-4] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xe699b9a5, L:/127.0.0.1:9527 - R:/127.0.0.1:51315] READ: 1197B
2. 协议设计
什么时候可以加 @Sharable
- 当 handler 不保存状态时,就可以安全地在多线程下被共享
- 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
- 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类
2.1 redis 协议举例
@Slf4j
public class RedisProtocolNettyClient {
private static final byte[] LINE = {13, 10};
public static void main(String[] args) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
set(ctx);
get(ctx);
}
});
}
})
.connect("127.0.0.1", 9527);
channelFuture.sync();
Channel channel = channelFuture.channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
} finally {
eventLoopGroup.shutdownGracefully();
}
}
private static void get(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*2".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("get".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("aaa".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
private static void set(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("set".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("aaa".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("bbb".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
}
2.2 http 协议举例
@Slf4j
public class HttpProtocolNettyServer {
public static void main(String[] args) {
NioEventLoopGroup boos = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new ServerBootstrap()
.group(boos, work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
log.info(msg.uri());
// 返回响应
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
byte[] bytes = "<h1>you know netty</h1>".getBytes();
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, bytes.length);
response.content().writeBytes(bytes);
// 写回响应
ctx.writeAndFlush(response);
}
});
}
})
.bind(9527);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
boos.shutdownGracefully();
work.shutdownGracefully();
}
}
}
2.3 自定义协议
魔数,用来在第一时间判定是否是无效数据包 版本号,可以支持协议的升级 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk 指令类型,是登录、注册、单聊、群聊… 跟业务相关 请求序号,为了双工通信,提供异步能力 正文长度 消息正文
@Data
@Builder
public class Message implements Serializable {
private Byte messageType;
private Integer sequenceId;
private String username;
private String password;
}
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 1. 4 个字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 个字节的版本
out.writeByte(1);
// 3. 1 个字节的序列化方式(jdk 为 0, json 为 1)
out.writeByte(0);
// 4. 1 个字节的消息类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节的请求序列号
out.writeInt(msg.getSequenceId());
// 6. 无意义, 对其填充(共 16 个字节)
out.writeByte(0xff);
// 获取内容长度
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] content = bos.toByteArray();
// 7. 4 个字节长度
out.writeInt(content.length);
// 8. 消息正文
out.writeBytes(content);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 1. 获取魔数
int magicNum = in.readInt();
// 2. 获取版本
byte version = in.readByte();
// 3. 获取序列化类型
byte serializerType = in.readByte();
// 4. 获取消息类型
byte messageType = in.readByte();
// 5. 获取请求序列号
int sequenceId = in.readInt();
// 6. 无意义字节
in.readByte();
// 7. 获取内容长度
int length = in.readInt();
// 8. 解析消息正文
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
log.debug("magicNum:{}, version:{}, serializerType:{}, messageType:{}, sequenceId:{}, length:{}",
magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
out.add(message);
}
}
public class CustomizeProtocolTest {
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(),
new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
new MessageCodec()
);
Message message = Message.builder()
.messageType((byte) 1)
.sequenceId(10001)
.username("zhangsan")
.password("123456")
.build();
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);
channel.writeInbound(buf);
}
}
3. 聊天室案例
3.1 登录
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<org.masteryourself.tutorial.netty.chat.message.LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, org.masteryourself.tutorial.netty.chat.message.LoginRequestMessage msg) throws Exception {
String userName = msg.getUsername();
String password = msg.getPassword();
boolean login = UserServiceFactory.getUserService().login(userName, password);
if (login) {
ctx.writeAndFlush(new LoginResponseMessage(true, "登录成功"));
return;
}
ctx.writeAndFlush(new LoginResponseMessage(false, "登录失败"));
}
}
public class ClientRequestHandler extends ChannelInboundHandlerAdapter {
private final CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
private final AtomicBoolean LOGIN_FLAG = new AtomicBoolean(false);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名:");
String username = scanner.nextLine();
System.out.println("请输入密码:");
String password = scanner.nextLine();
// 构造消息对象
LoginRequestMessage message = new LoginRequestMessage(username, password);
// 发送消息
ctx.writeAndFlush(message);
System.out.println("等待后续操作...");
try {
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 如果登录失败
if (!LOGIN_FLAG.get()) {
System.out.println("登录失败, 系统退出");
ctx.channel().close();
return;
}
// 处理业务逻辑
while (true) {
System.out.println("==================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
String command = scanner.nextLine();
String[] s = command.split(" ");
switch (s[0]) {
case "send":
ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
break;
case "gsend":
ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
break;
case "gcreate":
Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));
set.add(username); // 加入自己
ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));
break;
case "gmembers":
ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
break;
case "quit":
ctx.channel().close();
return;
}
}
}
}, "SYSTEM IN").start();
// 继续调用 chain
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof LoginResponseMessage) {
LoginResponseMessage response = (LoginResponseMessage) msg;
if (response.isSuccess()) {
LOGIN_FLAG.set(true);
}
WAIT_FOR_LOGIN.countDown();
}
}
}
3.2 单聊
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<org.masteryourself.tutorial.netty.chat.message.LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, org.masteryourself.tutorial.netty.chat.message.LoginRequestMessage msg) throws Exception {
String userName = msg.getUsername();
String password = msg.getPassword();
boolean login = UserServiceFactory.getUserService().login(userName, password);
if (login) {
// 存储到会话中
SessionFactory.getSession().bind(ctx.channel(), userName);
ctx.writeAndFlush(new LoginResponseMessage(true, "登录成功"));
return;
}
ctx.writeAndFlush(new LoginResponseMessage(false, "登录失败"));
}
}
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
String to = msg.getTo();
Channel channel = SessionFactory.getSession().getChannel(to);
if (channel != null) {
// 在线就发送给对方
channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
return;
}
// 不在线就提醒给发送人
ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不在线"));
}
}
3.3 群聊
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
Set<String> members = msg.getMembers();
// 先判断名称是否重复
Group group = GroupSessionFactory.getGroupSession().createGroup(groupName, members);
if (group != null) {
ctx.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "已经存在"));
return;
}
// 消息提醒
for (String member : members) {
Channel channel = SessionFactory.getSession().getChannel(member);
if (channel != null) {
channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName));
}
}
}
}
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
Set<String> members = GroupSessionFactory.getGroupSession().getMembers(msg.getGroupName());
for (String member : members) {
Channel channel = SessionFactory.getSession().getChannel(member);
if (channel != null) {
channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));
}
}
}
}
@ChannelHandler.Sharable
public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
Group group = GroupSessionFactory.getGroupSession().joinMember(msg.getGroupName(), msg.getUsername());
if (group == null) {
ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));
return;
}
ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群加入成功"));
}
}
@ChannelHandler.Sharable
public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
Group group = GroupSessionFactory.getGroupSession().removeMember(msg.getGroupName(), msg.getUsername());
if (group == null) {
ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));
return;
}
ctx.writeAndFlush(new GroupJoinResponseMessage(true, "已退出群" + msg.getGroupName()));
// 通知其它人
Set<String> members = GroupSessionFactory.getGroupSession().getMembers(msg.getGroupName());
for (String member : members) {
Channel channel = SessionFactory.getSession().getChannel(member);
if (channel != null) {
channel.writeAndFlush(new GroupJoinResponseMessage(true, "已退出群" + msg.getGroupName()));
}
}
}
}
@ChannelHandler.Sharable
public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
Set<String> members = GroupSessionFactory.getGroupSession()
.getMembers(msg.getGroupName());
ctx.writeAndFlush(new GroupMembersResponseMessage(members));
}
}
3.4 退出
@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
/**
* 连接断开时触发
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
}
/**
* 出现异常时触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.error(cause.getMessage(), cause);
}
}
3.5 空闲检测
3.5.1 连接假死
问题
- 假死的连接占用的资源不能自动释放
- 向假死的连接发送数据,得到的反馈是发送超时
原因
- 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
- 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
- 应用程序线程阻塞,无法进行数据读写
3.5.2 服务端解决
怎么判断客户端连接是否假死呢?如果能收到客户端数据,说明没有假死。因此策略就可以定为,每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死
// 用来判断是不是 [读空闲时间过长],或 [写空闲时间过长]
// 5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 用来触发特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
// 触发了读空闲事件
if (IdleState.READER_IDLE == event.state()) {
log.info("已经 5s 没有读取到数据了, 干掉连接");
ctx.channel().close();
}
}
});
3.5.3 客户端解决
客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器
// 用来判断是不是[读空闲时间过长],或[写空闲时间过长]
// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 用来触发特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
// 触发了写空闲事件
if (event.state() == IdleState.WRITER_IDLE) {
log.info("3s 没有写数据了,发送一个心跳包");
ctx.writeAndFlush(new PingMessage());
}
}
});