一、基本介绍

  1. TCP 是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的 socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的
  2. 由于 TCP 无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题,看一张图
  3. TCP 粘包、拆包图解

image.png
假设客户端分别发送了两个数据包 D1 和 D2 给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:

  1. 服务端分两次读取到了两个独立的数据包,分别是 D1 和 D2,没有粘包和拆包
  2. 服务端一次接受到了两个数据包,D1 和 D2 粘合在一起,称之为 TCP 粘包
  3. 服务端分两次读取到了数据包,第一次读取到了完整的 D1 包和 D2 包的部分内容,第二次读取到了 D2 包的剩余内容,这称之为 TCP 拆包
  4. 服务端分两次读取到了数据包,第一次读取到了 D1 包的部分内容 D1_1,第二次读取到了 D1 包的剩余部分内容 D1_2 和完整的 D2 包。

    二、TCP粘包和拆包的现象实例

    在编写 Netty 程序时,如果没有做处理,就会发生粘包和拆包的问题
    看一个具体的实例:

    2.1 客户端

    MyClient

    1. public class MyClient {
    2. public static void main(String[] args) throws InterruptedException {
    3. NioEventLoopGroup group = new NioEventLoopGroup();
    4. try {
    5. Bootstrap bootstrap = new Bootstrap();
    6. bootstrap.group(group).channel(NioSocketChannel.class).handler(new MyClientInitializer());
    7. ChannelFuture channelFuture = bootstrap.connect("localhost", 9090).sync();
    8. channelFuture.channel().closeFuture().sync();
    9. } finally {
    10. group.shutdownGracefully();
    11. }
    12. }
    13. }

    MyClientInitializer

    1. public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    2. @Override
    3. protected void initChannel(SocketChannel ch) throws Exception {
    4. ChannelPipeline pipeline = ch.pipeline();
    5. pipeline.addLast(new MyClientHandler());
    6. }
    7. }

    MyClientHandler

    1. public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    2. int count;
    3. @Override
    4. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    5. // 使用客户端发送10条数据 hello,server
    6. for (int i = 0; i < 10; i++) {
    7. ByteBuf byteBuf = Unpooled.copiedBuffer(" hello,server" + i, StandardCharsets.UTF_8);
    8. ctx.writeAndFlush(byteBuf);
    9. }
    10. }
    11. @Override
    12. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
    13. byte[] buffer = new byte[msg.readableBytes()];
    14. msg.readBytes(buffer);
    15. String message = new String(buffer, StandardCharsets.UTF_8);
    16. System.out.println("客户端接收到消息=" + message);
    17. System.out.println("客户端接收到消息数量=" + (++this.count));
    18. }
    19. @Override
    20. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    21. cause.printStackTrace();
    22. ctx.close();
    23. }
    24. }

    2.2 服务端

    MyServer

    1. public class MyServer {
    2. public static void main(String[] args) throws InterruptedException {
    3. NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    4. NioEventLoopGroup workGroup = new NioEventLoopGroup();
    5. try {
    6. ServerBootstrap serverBootstrap = new ServerBootstrap();
    7. serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
    8. .childHandler(new MyServerInitializer());
    9. ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
    10. channelFuture.channel().closeFuture().sync();
    11. } finally {
    12. bossGroup.shutdownGracefully();
    13. workGroup.shutdownGracefully();
    14. }
    15. }
    16. }

    MyServerInitializer

    1. public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    2. @Override
    3. protected void initChannel(SocketChannel ch) throws Exception {
    4. ChannelPipeline pipeline = ch.pipeline();
    5. pipeline.addLast(new MyServerHandler());
    6. }
    7. }

    MyServerHandler

    1. public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    2. private int count;
    3. @Override
    4. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
    5. byte[] buffer = new byte[msg.readableBytes()];
    6. msg.readBytes(buffer);
    7. // 将buffer转化成字符串
    8. String message = new String(buffer, StandardCharsets.UTF_8);
    9. System.out.println("服务器端接收到数据 " + message);
    10. System.out.println("服务器端接收到消息次数 " + (++this.count));
    11. // 服务器会送数据给客户端,会送一个随机Id
    12. ByteBuf byteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString()+" ", StandardCharsets.UTF_8);
    13. ctx.writeAndFlush(byteBuf);
    14. }
    15. @Override
    16. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    17. cause.printStackTrace();
    18. ctx.close();
    19. }
    20. }

    2.3 输出效果

    通过下图可以看到,MyClient一次性发出了10条消息,但是MyServer有时候是10条消息一起读取展示的,有时候是几条消息拆分展示的,这就会导致一个问题:接受消息时不知道消息头和消息尾,导致消息语义表达有误。image.pngimage.pngimage.png

    三、TCP 粘包和拆包解决方案

    常用方案:使用自定义协议+编解码器来解决。 关键就是要解决服务器端每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的 TCP 粘包、拆包。

看一个具体的实例
要求客户端发送 5 个 Message 对象,客户端每次发送一个 Message 对象,服务器端每次接收一个 Message,分 5 次进行解码,每读取到一个 Message,会回复一个 Message 对象给客户端。
image.png

3.1 协议包

MessageProtocol

  1. public class MessageProtocol {
  2. private int len;
  3. private byte[] content;
  4. public int getLen() {
  5. return len;
  6. }
  7. public void setLen(int len) {
  8. this.len = len;
  9. }
  10. public byte[] getContent() {
  11. return content;
  12. }
  13. public void setContent(byte[] content) {
  14. this.content = content;
  15. }
  16. }

3.2 编码解码器

MyMessageEncoder

  1. // 编码器
  2. public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
  3. @Override
  4. protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
  5. System.out.println("MyMessageEncoder encode 方法被调用。");
  6. out.writeInt(msg.getLen());
  7. out.writeBytes(msg.getContent());
  8. }
  9. }

MyMessageDecoder

  1. // 解码器
  2. public class MyMessageDecoder extends ReplayingDecoder<Void> {
  3. @Override
  4. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  5. System.out.println("MyMessageDecoder decode 被调用");
  6. // 需要将得到的二进制字节码 -> MessageProtocol 数据包对象
  7. int length = in.readInt();
  8. byte[] content = new byte[length];
  9. // 获取内容
  10. in.readBytes(content);
  11. // 封装成 MessageProtocol 对象 ,放入out,传递给下一个 handler 业务处理
  12. MessageProtocol messageProtocol = new MessageProtocol();
  13. messageProtocol.setLen(length);
  14. messageProtocol.setContent(content);
  15. out.add(messageProtocol);
  16. }
  17. }

3.3 客户端

MyClient

  1. public class MyClient {
  2. public static void main(String[] args) throws InterruptedException {
  3. NioEventLoopGroup group = new NioEventLoopGroup();
  4. try {
  5. Bootstrap bootstrap = new Bootstrap();
  6. bootstrap.group(group).channel(NioSocketChannel.class).handler(new MyClientInitializer());
  7. ChannelFuture channelFuture = bootstrap.connect("localhost", 9090).sync();
  8. channelFuture.channel().closeFuture().sync();
  9. } finally {
  10. group.shutdownGracefully();
  11. }
  12. }
  13. }

�MyClientInitializer

  1. public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel ch) throws Exception {
  4. ChannelPipeline pipeline = ch.pipeline();
  5. // 加入编码器
  6. pipeline.addLast(new MyMessageEncoder());
  7. // 解码器
  8. pipeline.addLast(new MyMessageDecoder());
  9. pipeline.addLast(new MyClientHandler());
  10. }
  11. }

MyClientHandler

  1. public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
  2. int count;
  3. @Override
  4. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  5. // 使用客户端发送10条数据 "今天天气冷,吃火锅" 编号
  6. for (int i = 0; i < 5; i++) {
  7. String msg = "你好呀。。。";
  8. byte[] content = msg.getBytes(StandardCharsets.UTF_8);
  9. int length = content.length;
  10. // 创建协议包
  11. MessageProtocol messageProtocol = new MessageProtocol();
  12. messageProtocol.setLen(length);
  13. messageProtocol.setContent(content);
  14. ctx.writeAndFlush(messageProtocol);
  15. }
  16. }
  17. @Override
  18. protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
  19. int len = msg.getLen();
  20. byte[] content = msg.getContent();
  21. String message = new String(content, StandardCharsets.UTF_8);
  22. System.out.println("客户端接收到消息长度=" + len);
  23. System.out.println("客户端接收到消息内容=" + message);
  24. System.out.println("客户端接收到消息数量=" + (++this.count) + "\n");
  25. }
  26. @Override
  27. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  28. cause.printStackTrace();
  29. System.out.println("异常信息=" + cause.getMessage());
  30. ctx.close();
  31. }
  32. }

3.4 服务端

MyServer

  1. public class MyServer {
  2. public static void main(String[] args) throws InterruptedException {
  3. NioEventLoopGroup bossGroup = new NioEventLoopGroup();
  4. NioEventLoopGroup workGroup = new NioEventLoopGroup();
  5. try {
  6. ServerBootstrap serverBootstrap = new ServerBootstrap();
  7. serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
  8. .childHandler(new MyServerInitializer());
  9. ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
  10. channelFuture.channel().closeFuture().sync();
  11. } finally {
  12. bossGroup.shutdownGracefully();
  13. workGroup.shutdownGracefully();
  14. }
  15. }
  16. }

MyServerInitializer

  1. public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel ch) throws Exception {
  4. ChannelPipeline pipeline = ch.pipeline();
  5. // 解码器
  6. pipeline.addLast(new MyMessageDecoder());
  7. // 编码器
  8. pipeline.addLast(new MyMessageEncoder());
  9. pipeline.addLast(new MyServerHandler());
  10. }
  11. }

MyServerHandler

  1. public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
  2. private int count;
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
  5. // 接收到数据 并处理
  6. int len = msg.getLen();
  7. byte[] content = msg.getContent();
  8. System.out.println("服务器接收到信息如下");
  9. System.out.println("长度=" + len);
  10. System.out.println("内容=" + new String(content, StandardCharsets.UTF_8));
  11. System.out.println("服务器接收到消息包数量 " + (++this.count) + "\n");
  12. // 回复消息
  13. String responseContent = UUID.randomUUID().toString();
  14. byte[] responseContentBytes = responseContent.getBytes(StandardCharsets.UTF_8);
  15. int length = responseContentBytes.length;
  16. // 构建一个协议包
  17. MessageProtocol messageProtocol = new MessageProtocol();
  18. messageProtocol.setContent(responseContentBytes);
  19. messageProtocol.setLen(length);
  20. ctx.writeAndFlush(messageProtocol);
  21. }
  22. @Override
  23. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  24. cause.printStackTrace();
  25. ctx.close();
  26. }
  27. }

3.5 输出效果

如下是服务端和客户端的输出,可以看到消息没有乱,根据每个协议包输出的。
无论运行几次,Server都是分5次接收的,这样就解决了TCP粘包问题。
image.png
image.png