说明:

粘包:

现象:
发送abc、def,接收abcdef
原因:

  1. 应用层:接收方ByteBuf设置太大(Netty默认1024)
  2. 滑动窗口:假设发送方256 bytes示-个完整报文,但由于接收方处理不及时且窗口大小足够大,这256 bytes字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
  3. Netty使用了Nagle算法将多次间隔较小且数据量小的数据合并成一个大的数据块进行封包发送

拆包(半包):

现象:
发送abcdef,接收abc、def
原因:

  1. 应用层:接收方ByteBuf小于实际发送数据量
  2. 滑动窗口:假设接收方的窗口只剩了128 bytes,发送方的报文大小是256 bytes,这时放不下了,只能先发送前128 bytes,等待ack后才能发送剩余部分,这就造成了半包
  3. MSS限制:当发送的数据超过MSS限制后会将数据切分发送,就会造成半包,本质是因为TCP是流式协议,消息无边界

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:
1、服务端分两次读取到了 两个独立的数据包,分别是D1和D2,没有粘包和拆包
2、服务端一次接受到了两个数据包,D1和D2粘合在一起,称之为TCP粘包
3、服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称之为TCP拆包
4、服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1 1,第二次读取到了D1包的剩余部分内容D1 2和完整的D2,这称之为TCP拆包
image.png
拆包与粘包示意图


粘包拆包演示:

客户端向服务端循环发送数据,模拟出拆包粘包情况,源码地址

客户端:

  1. package tcp;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelPipeline;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. public class MyTcpClient {
  11. public static void main(String[] args) throws Exception{
  12. EventLoopGroup group = new NioEventLoopGroup();
  13. try {
  14. Bootstrap bootstrap = new Bootstrap();
  15. bootstrap.group(group).channel(NioSocketChannel.class)
  16. .handler(new ChannelInitializer<SocketChannel>() {
  17. @Override
  18. protected void initChannel(SocketChannel ch) throws Exception {
  19. ChannelPipeline pipeline = ch.pipeline();
  20. pipeline.addLast(new MyClientHandler());
  21. }
  22. });
  23. ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
  24. channelFuture.channel().closeFuture().sync();
  25. }finally {
  26. group.shutdownGracefully();
  27. }
  28. }
  29. }

客户端Handler:

  1. package tcp;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.SimpleChannelInboundHandler;
  6. import io.netty.util.CharsetUtil;
  7. import java.util.concurrent.atomic.AtomicInteger;
  8. /* 自定义客户端Handler */
  9. public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
  10. private int sum;
  11. @Override
  12. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
  13. byte[] buffer = new byte[msg.readableBytes()];
  14. msg.readBytes(buffer);
  15. String message = new String(buffer, CharsetUtil.UTF_8);
  16. System.out.println("客户端接收到消息=" +message+" 当前次数:"+ (++this.sum) );
  17. }
  18. @Override
  19. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  20. //循环发送数据模拟粘包
  21. for (int i = 0; i < 10; ++i) {
  22. ByteBuf byteBuf = Unpooled.copiedBuffer("hello,server" + i, CharsetUtil.UTF_8);
  23. System.out.println("客户端发送消息,当前是第"+i+"次");
  24. ctx.writeAndFlush(byteBuf);
  25. }
  26. }
  27. /* 异常时触发 */
  28. @Override
  29. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  30. cause.printStackTrace();
  31. ctx.close();
  32. }
  33. }

服务端:

  1. package tcp;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelPipeline;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioServerSocketChannel;
  10. public class MyTcpServer {
  11. public static void main(String[] args) throws Exception{
  12. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  13. EventLoopGroup workerGroup = new NioEventLoopGroup();
  14. try {
  15. ServerBootstrap serverBootstrap = new ServerBootstrap();
  16. serverBootstrap.group(bossGroup,workerGroup)
  17. .channel(NioServerSocketChannel.class)
  18. .childHandler(new ChannelInitializer<SocketChannel>() {
  19. @Override
  20. protected void initChannel(SocketChannel ch) throws Exception {
  21. ChannelPipeline pipeline = ch.pipeline();
  22. pipeline.addLast(new MyServerHandler());
  23. }
  24. });
  25. ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
  26. channelFuture.channel().closeFuture().sync();
  27. }finally {
  28. bossGroup.shutdownGracefully();
  29. workerGroup.shutdownGracefully();
  30. }
  31. }
  32. }

服务端Handler:

  1. package tcp;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.SimpleChannelInboundHandler;
  6. import io.netty.util.CharsetUtil;
  7. import java.util.UUID;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9. /* 自定义服务端Handler */
  10. public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
  11. private int sum;
  12. @Override
  13. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
  14. byte[] buffer = new byte[msg.readableBytes()];
  15. msg.readBytes(buffer);
  16. String message = new String(buffer, CharsetUtil.UTF_8); //将buffer转成字符串
  17. System.out.println("服务器接收到数据: "+ message);
  18. System.out.println("服务器接收到消息量= " + (++this.sum) );
  19. //服务器返回数据给客户端
  20. ByteBuf responseBytebuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(), CharsetUtil.UTF_8);
  21. ctx.writeAndFlush(responseBytebuf);
  22. }
  23. }

测试:

启动客户端与服务端,多次尝试后将出现粘包现象
image.png
粘包场景还原


解决方案:

使用 handler 时只有源码中声明了 @Sharable 才具有线程安全特性

解决方案 - 自定义协议包:

使用自定义协议 + 编解码器解决,核心在于确定服务端每次读取的数据长度,通过将数据封装到协议包来保证每次需要读取的数据内容,源码地址

自定义协议包:

  1. package tcp.protocoltcp;
  2. import lombok.Getter;
  3. import lombok.Setter;
  4. import lombok.experimental.Accessors;
  5. /* 自定义协议包 */
  6. @Getter
  7. @Setter
  8. @Accessors(chain = true)
  9. public class MessageProtocol {
  10. private int len; //定义长度
  11. private byte[] content; //数据内容
  12. }

自定义编码器:

将需要发送的数据内容封装到协议包内在进行发送

  1. package tcp.protocoltcp;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.MessageToByteEncoder;
  5. /* 自定义协议包对应的编码器 */
  6. public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
  7. @Override
  8. protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
  9. System.out.println("自定义协议包对应编码器被调用");
  10. out.writeInt(msg.getLen());
  11. out.writeBytes(msg.getContent());
  12. }
  13. }

自定义解码器:

将协议包的数据进行解析

  1. package tcp.protocoltcp;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.ReplayingDecoder;
  5. import java.util.List;
  6. /* 自定义协议包对应的解码器 */
  7. public class MyMessageDecoder extends ReplayingDecoder<Void> {
  8. @Override
  9. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  10. System.out.println("自定义协议包对应解码器被调用");
  11. //需要将得到的二进制字节码转为数据包
  12. int length = in.readInt();
  13. byte[] content = new byte[length];
  14. in.readBytes(content);
  15. MessageProtocol protocol = new MessageProtocol()
  16. .setLen(length)
  17. .setContent(content);
  18. out.add(protocol);
  19. }
  20. }

服务端Handler:

  1. package tcp.protocoltcp;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. import io.netty.util.CharsetUtil;
  5. /* 自定义客户端Handler */
  6. public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
  7. private int sum;
  8. @Override
  9. protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
  10. byte[] content = msg.getContent();
  11. int len = msg.getLen();
  12. String message = new String(content, CharsetUtil.UTF_8); //将buffer转成字符串
  13. System.out.println("客户端接收到服务器返回数据: "+ message+" 当前长度: "+len);
  14. System.out.println("客户端接收到消息量= " + (++this.sum) );
  15. }
  16. @Override
  17. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  18. //循环发送数据
  19. for (int i = 0; i < 5; ++i) {
  20. String msg = "当前是第" + i+"次签到";
  21. byte[] bytes = msg.getBytes(CharsetUtil.UTF_8);
  22. int length = msg.getBytes(CharsetUtil.UTF_8).length;
  23. //创建协议包对象
  24. MessageProtocol protocol = new MessageProtocol()
  25. .setContent(bytes)
  26. .setLen(length);
  27. System.out.println("客户端发送消息,当前是第"+i+"次");
  28. ctx.writeAndFlush(protocol);
  29. }
  30. }
  31. /* 异常时触发 */
  32. @Override
  33. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  34. cause.printStackTrace();
  35. ctx.close();
  36. }
  37. }

客户端Handler:

  1. package tcp.protocoltcp;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.SimpleChannelInboundHandler;
  6. import io.netty.util.CharsetUtil;
  7. import java.util.UUID;
  8. /* 自定义服务端Handler */
  9. public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
  10. private int sum;
  11. @Override
  12. protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
  13. byte[] content = msg.getContent();
  14. int len = msg.getLen();
  15. String message = new String(content, CharsetUtil.UTF_8); //将buffer转成字符串
  16. System.out.println("服务器接收到数据: "+ message+" 当前长度: "+len);
  17. System.out.println("服务器接收到消息量= " + (++this.sum) );
  18. //服务器返回数据给客户端
  19. String responseStr = UUID.randomUUID().toString();
  20. int length = responseStr.getBytes("utf-8").length;
  21. byte[] bytes = responseStr.getBytes();
  22. MessageProtocol protocol = new MessageProtocol()
  23. .setLen(length)
  24. .setContent(bytes);
  25. ctx.writeAndFlush(protocol);
  26. }
  27. }

客户端:

  1. package tcp.protocoltcp;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelPipeline;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. /* 模拟出现Tcp粘包情况的客户端 */
  11. public class MyTcpClient {
  12. public static void main(String[] args) throws Exception{
  13. EventLoopGroup group = new NioEventLoopGroup();
  14. try {
  15. Bootstrap bootstrap = new Bootstrap();
  16. bootstrap.group(group).channel(NioSocketChannel.class)
  17. .handler(new ChannelInitializer<SocketChannel>() {
  18. @Override
  19. protected void initChannel(SocketChannel ch) throws Exception {
  20. ChannelPipeline pipeline = ch.pipeline();
  21. pipeline.addLast(new MyMessageEncoder()); //自定义协议包对应的编码器
  22. pipeline.addLast(new MyMessageDecoder()); //自定义协议包对应的解码器
  23. pipeline.addLast(new MyClientHandler());
  24. }
  25. });
  26. ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
  27. channelFuture.channel().closeFuture().sync();
  28. }finally {
  29. group.shutdownGracefully();
  30. }
  31. }
  32. }

服务端:

  1. package tcp.protocoltcp;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelPipeline;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioServerSocketChannel;
  10. /* 模拟出现Tcp粘包情况的服务端 */
  11. public class MyTcpServer {
  12. public static void main(String[] args) throws Exception{
  13. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  14. EventLoopGroup workerGroup = new NioEventLoopGroup();
  15. try {
  16. ServerBootstrap serverBootstrap = new ServerBootstrap();
  17. serverBootstrap.group(bossGroup,workerGroup)
  18. .channel(NioServerSocketChannel.class)
  19. .childHandler(new ChannelInitializer<SocketChannel>() {
  20. @Override
  21. protected void initChannel(SocketChannel ch) throws Exception {
  22. ChannelPipeline pipeline = ch.pipeline();
  23. pipeline.addLast(new MyMessageDecoder()); //自定义协议包的解码器
  24. pipeline.addLast(new MyMessageEncoder()); //自定义协议包的编码器
  25. pipeline.addLast(new MyServerHandler());
  26. }
  27. });
  28. ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
  29. channelFuture.channel().closeFuture().sync();
  30. }finally {
  31. bossGroup.shutdownGracefully();
  32. workerGroup.shutdownGracefully();
  33. }
  34. }
  35. }

测试:

启动服务端和客户端,消息将先通过协议包到达编码器和解码器,每次都只会读取协议包内的数据来实现粘包&拆包问题的解决
image.png
拆包粘包问题得到解决

解决方案 - 定长解码器:

使用定长解码器FixedLengthFrameDecoder对完整数据包的长度进行指定,只有符合长度时才会被获取到,不符合长度则会等待下一次数据包进行数据拼接,有线程安全问题,源码地址
image.png
定长解码器源码中的注释

服务端:

  1. package tcp.fixedlengthframe;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.codec.FixedLengthFrameDecoder;
  10. import io.netty.util.CharsetUtil;
  11. import java.nio.charset.StandardCharsets;
  12. import java.util.UUID;
  13. /* 使用定长解码器解决拆包粘包问题 - 服务端 */
  14. public class MyTcpServer2 {
  15. public static void main(String[] args) throws Exception{
  16. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  17. EventLoopGroup workerGroup = new NioEventLoopGroup();
  18. try {
  19. ServerBootstrap serverBootstrap = new ServerBootstrap();
  20. serverBootstrap.group(bossGroup,workerGroup)
  21. .channel(NioServerSocketChannel.class)
  22. .childHandler(new ChannelInitializer<SocketChannel>() {
  23. @Override
  24. protected void initChannel(SocketChannel ch) throws Exception {
  25. ChannelPipeline pipeline = ch.pipeline();
  26. pipeline.addLast(new FixedLengthFrameDecoder(13)); //添加定长解码器,指定数据长度为10
  27. pipeline.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
  28. @Override
  29. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
  30. String message = msg.toString(CharsetUtil.UTF_8);
  31. System.out.println("服务器接收到数据: "+ message);
  32. }
  33. });
  34. }
  35. });
  36. ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
  37. channelFuture.channel().closeFuture().sync();
  38. }finally {
  39. bossGroup.shutdownGracefully();
  40. workerGroup.shutdownGracefully();
  41. }
  42. }
  43. }

客户端:

  1. package tcp.fixedlengthframe;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.util.CharsetUtil;
  10. import java.nio.charset.StandardCharsets;
  11. /* 使用定长解码器解决拆包粘包问题 - 客户端 */
  12. public class MyTcpClient2 {
  13. public static void main(String[] args) throws Exception{
  14. EventLoopGroup group = new NioEventLoopGroup();
  15. try {
  16. Bootstrap bootstrap = new Bootstrap();
  17. bootstrap.group(group).channel(NioSocketChannel.class)
  18. .handler(new ChannelInitializer<SocketChannel>() {
  19. @Override
  20. protected void initChannel(SocketChannel ch) throws Exception {
  21. ChannelPipeline pipeline = ch.pipeline();
  22. pipeline.addLast(new ChannelInboundHandlerAdapter(){
  23. @Override
  24. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  25. //循环发送数据模拟粘包
  26. for (int i = 0; i < 10; ++i) {
  27. ByteBuf buffer = ctx.alloc().buffer();
  28. buffer.writeBytes(("hello,server"+i).getBytes(StandardCharsets.UTF_8));
  29. ctx.writeAndFlush(buffer);
  30. }
  31. }
  32. });
  33. }
  34. });
  35. ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
  36. channelFuture.channel().closeFuture().sync();
  37. }finally {
  38. group.shutdownGracefully();
  39. }
  40. }
  41. }

测试:

定长解码器将数据包进行长度分割,成功获取数据
image.png

解决方案 - 分隔符解码器:

使用分隔符解码器LineBasedFrameDecoder对数据包的长度进行分割,每个数据末尾需要有分割符,实际开发中较为少用,有线程安全问题,源码地址
image.png
分隔符解码器源码使用介绍

服务端:

  1. package tcp.linebaseframe;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.channel.*;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import io.netty.handler.codec.FixedLengthFrameDecoder;
  9. import io.netty.handler.codec.LineBasedFrameDecoder;
  10. import io.netty.util.CharsetUtil;
  11. /* 使用分隔符解码器解决拆包粘包问题 - 服务端 */
  12. public class MyTcpServer3 {
  13. public static void main(String[] args) throws Exception{
  14. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  15. EventLoopGroup workerGroup = new NioEventLoopGroup();
  16. try {
  17. ServerBootstrap serverBootstrap = new ServerBootstrap();
  18. serverBootstrap.group(bossGroup,workerGroup)
  19. .channel(NioServerSocketChannel.class)
  20. .childHandler(new ChannelInitializer<SocketChannel>() {
  21. @Override
  22. protected void initChannel(SocketChannel ch) throws Exception {
  23. ChannelPipeline pipeline = ch.pipeline();
  24. pipeline.addLast(new LineBasedFrameDecoder(1024)); //添加分隔符解码器,需要设置数据包最大长度
  25. pipeline.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
  26. @Override
  27. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
  28. String message = msg.toString(CharsetUtil.UTF_8);
  29. System.out.println("服务器接收到数据: "+ message);
  30. }
  31. });
  32. }
  33. });
  34. ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
  35. channelFuture.channel().closeFuture().sync();
  36. }finally {
  37. bossGroup.shutdownGracefully();
  38. workerGroup.shutdownGracefully();
  39. }
  40. }
  41. }

客户端:

  1. package tcp.linebaseframe;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.channel.*;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.channel.socket.nio.NioSocketChannel;
  8. import java.nio.charset.StandardCharsets;
  9. /* 使用分隔符解码器解决拆包粘包问题 - 客户端 */
  10. public class MyTcpClient3 {
  11. public static void main(String[] args) throws Exception{
  12. EventLoopGroup group = new NioEventLoopGroup();
  13. try {
  14. Bootstrap bootstrap = new Bootstrap();
  15. bootstrap.group(group).channel(NioSocketChannel.class)
  16. .handler(new ChannelInitializer<SocketChannel>() {
  17. @Override
  18. protected void initChannel(SocketChannel ch) throws Exception {
  19. ChannelPipeline pipeline = ch.pipeline();
  20. pipeline.addLast(new ChannelInboundHandlerAdapter(){
  21. @Override
  22. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  23. //循环发送数据,末尾需要使用分隔符,这样服务端分隔符解码器才能正确识别
  24. for (int i = 0; i < 10; ++i) {
  25. ByteBuf buffer = ctx.alloc().buffer();
  26. buffer.writeBytes(("hello,server"+i+"\n").getBytes(StandardCharsets.UTF_8));
  27. ctx.writeAndFlush(buffer);
  28. }
  29. }
  30. });
  31. }
  32. });
  33. ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
  34. channelFuture.channel().closeFuture().sync();
  35. }finally {
  36. group.shutdownGracefully();
  37. }
  38. }
  39. }

测试:

分隔符解码器根据分隔符对数据包进行分割,成功获取数据
image.png

解决方案 - 长度分割解码器:

使用长度分割解码器 LengthFieldBasedFrameDecoder 对数据包进行切割,可以拆出数据包内的指定索引位的数据,有线程安全问题,源码地址
主要参数如下所示:

参数名 含义
maxFrameLength 单个数据包最大长度
lengthFieldOffset 长度字段起始索引位
lengthFieldLength 长度字段长度(16进制长度)
lengthAdjustment 从长度字段结束后开始算,还有几个字节才是内容
initialBytesToStrip 接收数据是从头部开始的几个字节是不需要的

客户端:

  1. package tcp.lengthfieldbasedframe;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.ByteBufAllocator;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.util.CharsetUtil;
  10. import java.nio.charset.StandardCharsets;
  11. import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
  12. import static io.netty.util.internal.StringUtil.NEWLINE;
  13. /* 使用 解码器解决拆包粘包问题 - 客户端 */
  14. public class MyTcpClient4 {
  15. public static void main(String[] args) throws Exception{
  16. EventLoopGroup group = new NioEventLoopGroup();
  17. try {
  18. Bootstrap bootstrap = new Bootstrap();
  19. bootstrap.group(group).channel(NioSocketChannel.class)
  20. .handler(new ChannelInitializer<SocketChannel>() {
  21. @Override
  22. protected void initChannel(SocketChannel ch) throws Exception {
  23. ChannelPipeline pipeline = ch.pipeline();
  24. pipeline.addLast(new ChannelInboundHandlerAdapter(){
  25. @Override
  26. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  27. //循环发送数据
  28. for (int i = 0; i < 10; ++i) {
  29. ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
  30. String info = "hello,server";
  31. byte[] infoBytes = info.getBytes(CharsetUtil.UTF_8);
  32. buffer.writeInt(infoBytes.length); //写入数据长度,是不需要的信息,将会转换为16进制进行储存,因此服务端截取应该是从4开始
  33. buffer.writeBytes(infoBytes); //写入内容
  34. Log(buffer);
  35. ctx.writeAndFlush(buffer);
  36. }
  37. }
  38. });
  39. }
  40. });
  41. ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
  42. channelFuture.channel().closeFuture().sync();
  43. }finally {
  44. group.shutdownGracefully();
  45. }
  46. }
  47. private static void Log(ByteBuf buffer) {
  48. int Length = buffer.readableBytes();
  49. int rows = Length / 16 + (Length % 15 == 0 ? 0 : 1) + 4;
  50. StringBuilder buf =
  51. new StringBuilder(rows * 80 * 2)
  52. .append("read index:")
  53. .append(buffer.readerIndex())
  54. .append(" write index: ")
  55. .append(buffer.writerIndex())
  56. .append(" capacity:")
  57. .append(buffer.capacity())
  58. .append(NEWLINE);
  59. appendPrettyHexDump(buf, buffer);
  60. System.out.println(buf.toString());
  61. }
  62. }

服务端:

客户端写入数据转为Byte字节后,前4位为writeInt方法写入的有效数据长度,且接收时数据包的长度位是不需要的,因此 initialBytesToStrip 值为 4,lengthFieldOffset 值为 0,lengthFieldLength 值为 4,长度字段和目标数据字段中间没有其他字段,因此 lengthAdjustment 值为 0

  1. package tcp.lengthfieldbasedframe;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.channel.*;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
  9. import io.netty.handler.codec.LineBasedFrameDecoder;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. import io.netty.handler.codec.string.StringEncoder;
  12. import io.netty.util.CharsetUtil;
  13. import lombok.extern.java.Log;
  14. import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
  15. import static io.netty.util.internal.StringUtil.NEWLINE;
  16. /* 使用 解码器解决拆包粘包问题 - 服务端 */
  17. public class MyTcpServer4 {
  18. public static void main(String[] args) throws Exception{
  19. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  20. EventLoopGroup workerGroup = new NioEventLoopGroup();
  21. try {
  22. ServerBootstrap serverBootstrap = new ServerBootstrap();
  23. serverBootstrap.group(bossGroup,workerGroup)
  24. .channel(NioServerSocketChannel.class)
  25. .childHandler(new ChannelInitializer<SocketChannel>() {
  26. @Override
  27. protected void initChannel(SocketChannel ch) throws Exception {
  28. ChannelPipeline pipeline = ch.pipeline();
  29. pipeline.addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,4)); //添加分隔符解码器,需要设置数据包最大长度
  30. pipeline.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
  31. @Override
  32. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
  33. Log(msg);
  34. String message = msg.toString(CharsetUtil.UTF_8);
  35. System.out.println("服务器接收到数据: "+ message);
  36. }
  37. });
  38. }
  39. });
  40. ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
  41. channelFuture.channel().closeFuture().sync();
  42. }finally {
  43. bossGroup.shutdownGracefully();
  44. workerGroup.shutdownGracefully();
  45. }
  46. }
  47. private static void Log(ByteBuf buffer) {
  48. int Length = buffer.readableBytes();
  49. int rows = Length / 16 + (Length % 15 == 0 ? 0 : 1) + 4;
  50. StringBuilder buf =
  51. new StringBuilder(rows * 80 * 2)
  52. .append("read index:")
  53. .append(buffer.readerIndex())
  54. .append(" write index: ")
  55. .append(buffer.writerIndex())
  56. .append(" capacity:")
  57. .append(buffer.capacity())
  58. .append(NEWLINE);
  59. appendPrettyHexDump(buf, buffer);
  60. System.out.println(buf.toString());
  61. }
  62. }

测试:

运行结果可查看具体字节截取效果
image.png