1.分隔符解码器

在日常项目中,我们可能会遇到并不是用回车换行分隔符来修饰包尾,有可能会想自定义分隔符来进行拆包粘包问题的解决,这时就要用到netty里面的DelimiterBasedFrameDecoder解码器来解决上述问题

1.服务端代码

  1. package demo1;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. /**
  12. * @author 冯铁城 [17615007230@163.com]
  13. * @date 2022-05-09 19:11:57
  14. * @describe: 时间服务器服务端
  15. */
  16. public class TimeServer {
  17. /**
  18. * 开启socket链接
  19. *
  20. * @param port 端口号
  21. */
  22. public void bind(int port) {
  23. //1.创建线程组用于接受服务端链接
  24. NioEventLoopGroup bossGroup = new NioEventLoopGroup();
  25. //2.创建线程组用户socketChannel读写
  26. NioEventLoopGroup workGroup = new NioEventLoopGroup();
  27. try {
  28. //3.创建服务端
  29. ServerBootstrap server = new ServerBootstrap();
  30. server.group(bossGroup, workGroup)
  31. .channel(NioServerSocketChannel.class)
  32. .option(ChannelOption.SO_BACKLOG, 1024)
  33. .childHandler(new ChildChannelHandler());
  34. //3.绑定端口,开启同步等待
  35. ChannelFuture sync = server.bind(port).sync();
  36. System.out.println("服务端已启动!");
  37. //4.等待服务端监听端口关闭
  38. sync.channel().closeFuture().sync();
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. } finally {
  42. bossGroup.shutdownGracefully();
  43. workGroup.shutdownGracefully();
  44. }
  45. }
  46. private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
  47. @Override
  48. protected void initChannel(SocketChannel socketChannel) throws Exception {
  49. ByteBuf byteBuf = Unpooled.copiedBuffer("-ftc".getBytes());
  50. socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, byteBuf));
  51. socketChannel.pipeline().addLast(new StringDecoder());
  52. socketChannel.pipeline().addLast(new TimeServerHandler());
  53. }
  54. }
  55. private class TimeServerHandler extends ChannelHandlerAdapter {
  56. @Override
  57. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  58. System.out.println("服务端已与" + ctx.channel().id() + "建立链接");
  59. }
  60. @Override
  61. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  62. //1.字符串判定
  63. System.out.println("收到" + ctx.channel().id() + "渠道的请求:" + msg);
  64. //2.定义返回结果
  65. String responseMessage = "响应信息-ftc";
  66. //3.相应数据
  67. ByteBuf response = Unpooled.copiedBuffer(responseMessage.getBytes());
  68. ctx.write(response);
  69. }
  70. @Override
  71. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  72. ctx.flush();
  73. }
  74. @Override
  75. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  76. ctx.close();
  77. }
  78. }
  79. }

2.客户端代码

  1. package demo1;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. /**
  12. * @author 冯铁城 [17615007230@163.com]
  13. * @date 2022-05-09 21:09:16
  14. * @describe: 时间服务器客户端
  15. */
  16. public class TimeClient {
  17. /**
  18. * 创建客户端链接
  19. *
  20. * @param host 主机
  21. * @param port 端口号
  22. */
  23. public void connect(String host, int port) {
  24. //1. 创建用于IO读写的线程组
  25. EventLoopGroup group = new NioEventLoopGroup();
  26. try {
  27. //2.创建客户端启动类
  28. Bootstrap bootstrap = new Bootstrap();
  29. //3.启动类初始化
  30. bootstrap.group(group)
  31. .channel(NioSocketChannel.class)
  32. .option(ChannelOption.TCP_NODELAY, true)
  33. .handler(new ChannelInitializer<SocketChannel>() {
  34. @Override
  35. protected void initChannel(SocketChannel socketChannel) throws Exception {
  36. ByteBuf byteBuf = Unpooled.copiedBuffer("-ftc".getBytes());
  37. socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, byteBuf));
  38. socketChannel.pipeline().addLast(new StringDecoder());
  39. socketChannel.pipeline().addLast(new TimeClientHandler());
  40. }
  41. });
  42. //4.发起异步链接操作
  43. ChannelFuture sync = bootstrap.connect(host, port).sync();
  44. //5.等待客户端链路链接关闭
  45. sync.channel().closeFuture().sync();
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. } finally {
  49. group.shutdownGracefully();
  50. }
  51. }
  52. /**
  53. * @author 冯铁城 [17615007230@163.com]
  54. * @date 2022-05-09 21:01:23
  55. * @describe: 时间服务器客户端
  56. */
  57. public class TimeClientHandler extends ChannelHandlerAdapter {
  58. @Override
  59. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  60. for (int i = 0; i < 20; i++) {
  61. String requestMessage = "发送消息-ftc";
  62. ByteBuf message = Unpooled.buffer(requestMessage.getBytes().length);
  63. message.writeBytes(requestMessage.getBytes());
  64. ctx.writeAndFlush(message);
  65. }
  66. }
  67. @Override
  68. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  69. System.out.println("收到服务端答复:" + msg);
  70. }
  71. @Override
  72. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  73. System.out.println("链接出现异常,异常信息:" + cause.getMessage());
  74. ctx.close();
  75. }
  76. }
  77. }

3.调试结果

image.png
image.png
可以看到客户端在发送消息时在包体尾部添加”-ftc”来表示一个数据包的结束,通过DelimiterBasedFrameDecoder实现了拆包粘包问题的解决

4.原理

与LineBasedFrameDecoder原理一样,都是通过遍历字节数组找到对应的包尾标识,来对缓冲区字节数组进行包体划分,进而解决的拆包粘包问题

2.定长解码器

除了在包体增加标识位外,也可以通过消息定长来实现拆包粘包问题的解决。netty中FixedLengthFrameDecoder可以解决上述问题

1.服务端代码

  1. package demo1;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  10. import io.netty.handler.codec.FixedLengthFrameDecoder;
  11. import io.netty.handler.codec.string.StringDecoder;
  12. /**
  13. * @author 冯铁城 [17615007230@163.com]
  14. * @date 2022-05-09 19:11:57
  15. * @describe: 时间服务器服务端
  16. */
  17. public class TimeServer {
  18. /**
  19. * 开启socket链接
  20. *
  21. * @param port 端口号
  22. */
  23. public void bind(int port) {
  24. //1.创建线程组用于接受服务端链接
  25. NioEventLoopGroup bossGroup = new NioEventLoopGroup();
  26. //2.创建线程组用户socketChannel读写
  27. NioEventLoopGroup workGroup = new NioEventLoopGroup();
  28. try {
  29. //3.创建服务端
  30. ServerBootstrap server = new ServerBootstrap();
  31. server.group(bossGroup, workGroup)
  32. .channel(NioServerSocketChannel.class)
  33. .option(ChannelOption.SO_BACKLOG, 1024)
  34. .childHandler(new ChildChannelHandler());
  35. //3.绑定端口,开启同步等待
  36. ChannelFuture sync = server.bind(port).sync();
  37. System.out.println("服务端已启动!");
  38. //4.等待服务端监听端口关闭
  39. sync.channel().closeFuture().sync();
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. } finally {
  43. bossGroup.shutdownGracefully();
  44. workGroup.shutdownGracefully();
  45. }
  46. }
  47. private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
  48. @Override
  49. protected void initChannel(SocketChannel socketChannel) throws Exception {
  50. socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20));
  51. socketChannel.pipeline().addLast(new StringDecoder());
  52. socketChannel.pipeline().addLast(new TimeServerHandler());
  53. }
  54. }
  55. private class TimeServerHandler extends ChannelHandlerAdapter {
  56. @Override
  57. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  58. System.out.println("服务端已与" + ctx.channel().id() + "建立链接");
  59. }
  60. @Override
  61. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  62. //1.字符串判定
  63. System.out.println("收到" + ctx.channel().id() + "渠道的请求:" + msg);
  64. //2.定义返回结果
  65. String responseMessage = "nihao woshidasahguaa";
  66. //3.相应数据
  67. ByteBuf response = Unpooled.copiedBuffer(responseMessage.getBytes());
  68. ctx.write(response);
  69. }
  70. @Override
  71. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  72. ctx.flush();
  73. }
  74. @Override
  75. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  76. ctx.close();
  77. }
  78. }
  79. }

2.客户端代码

  1. package demo1;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  10. import io.netty.handler.codec.FixedLengthFrameDecoder;
  11. import io.netty.handler.codec.string.StringDecoder;
  12. /**
  13. * @author 冯铁城 [17615007230@163.com]
  14. * @date 2022-05-09 21:09:16
  15. * @describe: 时间服务器客户端
  16. */
  17. public class TimeClient {
  18. /**
  19. * 创建客户端链接
  20. *
  21. * @param host 主机
  22. * @param port 端口号
  23. */
  24. public void connect(String host, int port) {
  25. //1. 创建用于IO读写的线程组
  26. EventLoopGroup group = new NioEventLoopGroup();
  27. try {
  28. //2.创建客户端启动类
  29. Bootstrap bootstrap = new Bootstrap();
  30. //3.启动类初始化
  31. bootstrap.group(group)
  32. .channel(NioSocketChannel.class)
  33. .option(ChannelOption.TCP_NODELAY, true)
  34. .handler(new ChannelInitializer<SocketChannel>() {
  35. @Override
  36. protected void initChannel(SocketChannel socketChannel) throws Exception {
  37. socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20));
  38. socketChannel.pipeline().addLast(new StringDecoder());
  39. socketChannel.pipeline().addLast(new TimeClientHandler());
  40. }
  41. });
  42. //4.发起异步链接操作
  43. ChannelFuture sync = bootstrap.connect(host, port).sync();
  44. //5.等待客户端链路链接关闭
  45. sync.channel().closeFuture().sync();
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. } finally {
  49. group.shutdownGracefully();
  50. }
  51. }
  52. /**
  53. * @author 冯铁城 [17615007230@163.com]
  54. * @date 2022-05-09 21:01:23
  55. * @describe: 时间服务器客户端
  56. */
  57. public class TimeClientHandler extends ChannelHandlerAdapter {
  58. @Override
  59. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  60. for (int i = 0; i < 20; i++) {
  61. String requestMessage = "nihao woshidasahchun";
  62. ByteBuf message = Unpooled.buffer(requestMessage.getBytes().length);
  63. message.writeBytes(requestMessage.getBytes());
  64. ctx.writeAndFlush(message);
  65. }
  66. }
  67. @Override
  68. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  69. System.out.println("收到服务端答复:" + msg);
  70. }
  71. @Override
  72. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  73. System.out.println("链接出现异常,异常信息:" + cause.getMessage());
  74. ctx.close();
  75. }
  76. }
  77. }
  78. package demo1;
  79. import io.netty.bootstrap.Bootstrap;
  80. import io.netty.buffer.ByteBuf;
  81. import io.netty.buffer.Unpooled;
  82. import io.netty.channel.*;
  83. import io.netty.channel.nio.NioEventLoopGroup;
  84. import io.netty.channel.socket.SocketChannel;
  85. import io.netty.channel.socket.nio.NioSocketChannel;
  86. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  87. import io.netty.handler.codec.string.StringDecoder;
  88. /**
  89. * @author 冯铁城 [17615007230@163.com]
  90. * @date 2022-05-09 21:09:16
  91. * @describe: 时间服务器客户端
  92. */
  93. public class TimeClient {
  94. /**
  95. * 创建客户端链接
  96. *
  97. * @param host 主机
  98. * @param port 端口号
  99. */
  100. public void connect(String host, int port) {
  101. //1. 创建用于IO读写的线程组
  102. EventLoopGroup group = new NioEventLoopGroup();
  103. try {
  104. //2.创建客户端启动类
  105. Bootstrap bootstrap = new Bootstrap();
  106. //3.启动类初始化
  107. bootstrap.group(group)
  108. .channel(NioSocketChannel.class)
  109. .option(ChannelOption.TCP_NODELAY, true)
  110. .handler(new ChannelInitializer<SocketChannel>() {
  111. @Override
  112. protected void initChannel(SocketChannel socketChannel) throws Exception {
  113. ByteBuf byteBuf = Unpooled.copiedBuffer("-ftc".getBytes());
  114. socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, byteBuf));
  115. socketChannel.pipeline().addLast(new StringDecoder());
  116. socketChannel.pipeline().addLast(new TimeClientHandler());
  117. }
  118. });
  119. //4.发起异步链接操作
  120. ChannelFuture sync = bootstrap.connect(host, port).sync();
  121. //5.等待客户端链路链接关闭
  122. sync.channel().closeFuture().sync();
  123. } catch (InterruptedException e) {
  124. e.printStackTrace();
  125. } finally {
  126. group.shutdownGracefully();
  127. }
  128. }
  129. /**
  130. * @author 冯铁城 [17615007230@163.com]
  131. * @date 2022-05-09 21:01:23
  132. * @describe: 时间服务器客户端
  133. */
  134. public class TimeClientHandler extends ChannelHandlerAdapter {
  135. @Override
  136. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  137. for (int i = 0; i < 20; i++) {
  138. String requestMessage = "发送消息-ftc";
  139. ByteBuf message = Unpooled.buffer(requestMessage.getBytes().length);
  140. message.writeBytes(requestMessage.getBytes());
  141. ctx.writeAndFlush(message);
  142. }
  143. }
  144. @Override
  145. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  146. System.out.println("收到服务端答复:" + msg);
  147. }
  148. @Override
  149. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  150. System.out.println("链接出现异常,异常信息:" + cause.getMessage());
  151. ctx.close();
  152. }
  153. }
  154. }

3.调试结果

image.png
image.png
如图服务端与客户端分别发送了长度为20的消息,同时设置FixedLengthFrameDecoder的定长为20,客户端与服务端的消息收发完全没有问题

4.原理

通过定长指定消息的长度,只需要按照定长读取缓冲区字节数组即可。读取规定长度后,标志位重置再继续读取下一个包内容