1.解决思路

在上文中我们提出了多种解决拆包粘包的问题思路,其中就包括通过包尾添加分隔符来解决。本章将通过回车换行分隔符介结合Netty来进行拆包粘包问题的解决

2.服务端代码改造

  1. package demo1;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.codec.LineBasedFrameDecoder;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. /**
  12. * @author 冯铁城 [17615007230@163.com]
  13. * @date 2022-05-09 19:11:57
  14. * @describe: 时间服务器服务端
  15. */
  16. public class TimeServer {
  17. /**
  18. * 开启socket链接
  19. *
  20. * @param port 端口号
  21. */
  22. public void bind(int port) {
  23. //1.创建线程组用于接受服务端链接
  24. NioEventLoopGroup bossGroup = new NioEventLoopGroup();
  25. //2.创建线程组用户socketChannel读写
  26. NioEventLoopGroup workGroup = new NioEventLoopGroup();
  27. try {
  28. //3.创建服务端
  29. ServerBootstrap server = new ServerBootstrap();
  30. server.group(bossGroup, workGroup)
  31. .channel(NioServerSocketChannel.class)
  32. .option(ChannelOption.SO_BACKLOG, 1024)
  33. .childHandler(new ChildChannelHandler());
  34. //3.绑定端口,开启同步等待
  35. ChannelFuture sync = server.bind(port).sync();
  36. System.out.println("服务端已启动!");
  37. //4.等待服务端监听端口关闭
  38. sync.channel().closeFuture().sync();
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. } finally {
  42. bossGroup.shutdownGracefully();
  43. workGroup.shutdownGracefully();
  44. }
  45. }
  46. private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
  47. @Override
  48. protected void initChannel(SocketChannel socketChannel) throws Exception {
  49. socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
  50. socketChannel.pipeline().addLast(new StringDecoder());
  51. socketChannel.pipeline().addLast(new TimeServerHandler());
  52. }
  53. }
  54. private class TimeServerHandler extends ChannelHandlerAdapter {
  55. @Override
  56. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  57. System.out.println("服务端已与" + ctx.channel().id() + "建立链接");
  58. }
  59. @Override
  60. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  61. //1.字符串判定
  62. System.out.println("收到" + ctx.channel().id() + "渠道的请求:" + msg);
  63. //2.定义返回结果
  64. String responseMessage = "响应信息\r\n";
  65. //3.相应数据
  66. ByteBuf response = Unpooled.copiedBuffer(responseMessage.getBytes());
  67. ctx.write(response);
  68. }
  69. @Override
  70. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  71. ctx.flush();
  72. }
  73. @Override
  74. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  75. ctx.close();
  76. }
  77. }
  78. }

在ChildChannelHandler.initChannel()方法中添加了如下两行代码

  1. socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
  2. socketChannel.pipeline().addLast(new StringDecoder());

同时channelRead()方法中不再将msg转为ButeBuf。而是直接视作字符串处理
同时回复消息时在包尾写入/r/n分隔符

3.客户端代码改造

  1. package demo1;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.handler.codec.LineBasedFrameDecoder;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. /**
  12. * @author 冯铁城 [17615007230@163.com]
  13. * @date 2022-05-09 21:09:16
  14. * @describe: 时间服务器客户端
  15. */
  16. public class TimeClient {
  17. /**
  18. * 创建客户端链接
  19. *
  20. * @param host 主机
  21. * @param port 端口号
  22. */
  23. public void connect(String host, int port) {
  24. //1. 创建用于IO读写的线程组
  25. EventLoopGroup group = new NioEventLoopGroup();
  26. try {
  27. //2.创建客户端启动类
  28. Bootstrap bootstrap = new Bootstrap();
  29. //3.启动类初始化
  30. bootstrap.group(group)
  31. .channel(NioSocketChannel.class)
  32. .option(ChannelOption.TCP_NODELAY, true)
  33. .handler(new ChannelInitializer<SocketChannel>() {
  34. @Override
  35. protected void initChannel(SocketChannel socketChannel) throws Exception {
  36. socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
  37. socketChannel.pipeline().addLast(new StringDecoder());
  38. socketChannel.pipeline().addLast(new TimeClientHandler());
  39. }
  40. });
  41. //4.发起异步链接操作
  42. ChannelFuture sync = bootstrap.connect(host, port).sync();
  43. //5.等待客户端链路链接关闭
  44. sync.channel().closeFuture().sync();
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. } finally {
  48. group.shutdownGracefully();
  49. }
  50. }
  51. /**
  52. * @author 冯铁城 [17615007230@163.com]
  53. * @date 2022-05-09 21:01:23
  54. * @describe: 时间服务器客户端
  55. */
  56. public class TimeClientHandler extends ChannelHandlerAdapter {
  57. @Override
  58. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  59. for (int i = 0; i < 20; i++) {
  60. String requestMessage = "发送消息\r\n";
  61. ByteBuf message = Unpooled.buffer(requestMessage.getBytes().length);
  62. message.writeBytes(requestMessage.getBytes());
  63. ctx.writeAndFlush(message);
  64. }
  65. }
  66. @Override
  67. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  68. System.out.println("收到服务端答复:" + msg);
  69. }
  70. @Override
  71. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  72. System.out.println("链接出现异常,异常信息:" + cause.getMessage());
  73. ctx.close();
  74. }
  75. }
  76. }

在ChildChannelHandler.initChannel()方法中添加了如下两行代码

  1. socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
  2. socketChannel.pipeline().addLast(new StringDecoder());

同时channelRead()方法中不再将msg转为ButeBuf。而是直接视作字符串处理
同时发送消息时在包尾写入/r/n分隔符

4.调试结果

服务端收到了20条请求
image.png
客户端收到了20条响应
image.png

5.原理

  1. LineBasedFrameDecoder解码器:遍历ByteBuf中的字节,通过findEndOfLine寻找ascii码=10(换行键)或13(回车键)的位置标记为包体结束位,因此可读索引到结束位的数据就形成了一行,也就是一个包。支持配置最大长度,如果在最大长度读取完后还没有读取到换行回车分隔符,会抛出异常,同时忽略掉之前读取的异常码流

image.png

  1. StringDecoder解码器:将接受到的对象通过Charset.defaultCharset()转为字符串,交给后续的handler处理因此其实上述解决拆包粘包的问题可以不用添加StringDecoder解码器,只不过在获取消息时需要进行byte数组转字符串的操作。加入StringDecode解码器后不需要在消息接受时进行byte数组转字符串的操作了,直接将消息视作字符串操作即可