1.解决思路
在上文中我们提出了多种解决拆包粘包的问题思路,其中就包括通过包尾添加分隔符来解决。本章将通过回车换行分隔符介结合Netty来进行拆包粘包问题的解决
2.服务端代码改造
package demo1;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.LineBasedFrameDecoder;import io.netty.handler.codec.string.StringDecoder;/*** @author 冯铁城 [17615007230@163.com]* @date 2022-05-09 19:11:57* @describe: 时间服务器服务端*/public class TimeServer {/*** 开启socket链接** @param port 端口号*/public void bind(int port) {//1.创建线程组用于接受服务端链接NioEventLoopGroup bossGroup = new NioEventLoopGroup();//2.创建线程组用户socketChannel读写NioEventLoopGroup workGroup = new NioEventLoopGroup();try {//3.创建服务端ServerBootstrap server = new ServerBootstrap();server.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());//3.绑定端口,开启同步等待ChannelFuture sync = server.bind(port).sync();System.out.println("服务端已启动!");//4.等待服务端监听端口关闭sync.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));socketChannel.pipeline().addLast(new StringDecoder());socketChannel.pipeline().addLast(new TimeServerHandler());}}private class TimeServerHandler extends ChannelHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("服务端已与" + ctx.channel().id() + "建立链接");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//1.字符串判定System.out.println("收到" + ctx.channel().id() + "渠道的请求:" + msg);//2.定义返回结果String responseMessage = "响应信息\r\n";//3.相应数据ByteBuf response = Unpooled.copiedBuffer(responseMessage.getBytes());ctx.write(response);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}}
在ChildChannelHandler.initChannel()方法中添加了如下两行代码
- socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
- socketChannel.pipeline().addLast(new StringDecoder());
同时channelRead()方法中不再将msg转为ButeBuf。而是直接视作字符串处理
同时回复消息时在包尾写入/r/n分隔符
3.客户端代码改造
package demo1;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.LineBasedFrameDecoder;import io.netty.handler.codec.string.StringDecoder;/*** @author 冯铁城 [17615007230@163.com]* @date 2022-05-09 21:09:16* @describe: 时间服务器客户端*/public class TimeClient {/*** 创建客户端链接** @param host 主机* @param port 端口号*/public void connect(String host, int port) {//1. 创建用于IO读写的线程组EventLoopGroup group = new NioEventLoopGroup();try {//2.创建客户端启动类Bootstrap bootstrap = new Bootstrap();//3.启动类初始化bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));socketChannel.pipeline().addLast(new StringDecoder());socketChannel.pipeline().addLast(new TimeClientHandler());}});//4.发起异步链接操作ChannelFuture sync = bootstrap.connect(host, port).sync();//5.等待客户端链路链接关闭sync.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}}/*** @author 冯铁城 [17615007230@163.com]* @date 2022-05-09 21:01:23* @describe: 时间服务器客户端*/public class TimeClientHandler extends ChannelHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {for (int i = 0; i < 20; i++) {String requestMessage = "发送消息\r\n";ByteBuf message = Unpooled.buffer(requestMessage.getBytes().length);message.writeBytes(requestMessage.getBytes());ctx.writeAndFlush(message);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("收到服务端答复:" + msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("链接出现异常,异常信息:" + cause.getMessage());ctx.close();}}}
在ChildChannelHandler.initChannel()方法中添加了如下两行代码
- socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
- socketChannel.pipeline().addLast(new StringDecoder());
同时channelRead()方法中不再将msg转为ButeBuf。而是直接视作字符串处理
同时发送消息时在包尾写入/r/n分隔符
4.调试结果
5.原理
- LineBasedFrameDecoder解码器:遍历ByteBuf中的字节,通过findEndOfLine寻找ascii码=10(换行键)或13(回车键)的位置标记为包体结束位,因此可读索引到结束位的数据就形成了一行,也就是一个包。支持配置最大长度,如果在最大长度读取完后还没有读取到换行回车分隔符,会抛出异常,同时忽略掉之前读取的异常码流

- StringDecoder解码器:将接受到的对象通过Charset.defaultCharset()转为字符串,交给后续的handler处理因此其实上述解决拆包粘包的问题可以不用添加StringDecoder解码器,只不过在获取消息时需要进行byte数组转字符串的操作。加入StringDecode解码器后不需要在消息接受时进行byte数组转字符串的操作了,直接将消息视作字符串操作即可

