原文地址:https://www.cnblogs.com/dyg0826/p/11335379.html

    TCP(transport control protocol,传输控制协议)是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发往接收端的包,更有效的发到对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样,接收端,就难于分辨出来了,必须提供科学的拆包机制。即面向流的通信是无消息保护边界的。
    图解TCP的粘包和拆包

    TCP粘包和拆包 - 图1

    假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:

    1.服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包

    2.服务端一次接受到了两个数据包,D1和D2粘合在一起,称之为TCP粘包

    3.服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称之为TCP拆包

    4.服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。

    特别要注意的是,如果TCP的接受滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种情况,即服务端分多次才能将D1和D2包完全接受,期间发生多次拆包。

    粘包、拆包问题的解决方案:定义通信协议

    目前业界主流的协议(protocol)方案可以归纳如下:

    1 定长协议:假设我们规定每3个字节,表示一个有效报文。

    2.特殊字符分隔符协议:在包尾部增加回车或者空格符等特殊字符进行分割 。

    3.长度编码:将消息分为消息头和消息体,消息头中用一个int型数据(4字节),表示消息体长度的字段。在解析时,先读取内容长度Length,其值为实际消息体内容(Content)占用的字节数,之后必须读取到这么多字节的内容,才认为是一个完整的数据报文。

    下面我用长度编码方式解决TCP的粘包、拆包问题

    首先看一下不使用长度编码协议的时候会发生什么问题?

    Server端测试代码

    1. public class MyServer {
    2. public static void main(String[] args) throws Exception {
    3. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    4. EventLoopGroup workerGroup = new NioEventLoopGroup();
    5. try {
    6. ServerBootstrap serverBootstrap = new ServerBootstrap();
    7. serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).
    8. childHandler(new MyServerInitializer());
    9. ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
    10. channelFuture.channel().closeFuture().sync();
    11. } finally {
    12. bossGroup.shutdownGracefully();
    13. workerGroup.shutdownGracefully();
    14. }
    15. }
    16. }
    17. public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    18. private int count;
    19. @Override
    20. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
    21. byte[] buffer = new byte[msg.readableBytes()];
    22. msg.readBytes(buffer);
    23. String message = new String(buffer, Charset.forName("utf-8"));
    24. System.out.println("服务端接收到的消息内容: " + message);
    25. System.out.println("服务端接收到的消息数量: " + (++this.count));
    26. ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(), Charset.forName("utf-8"));
    27. ctx.writeAndFlush(responseByteBuf);
    28. }
    29. @Override
    30. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    31. cause.printStackTrace();
    32. ctx.close();
    33. }
    34. }
    35. public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    36. @Override
    37. protected void initChannel(SocketChannel ch) throws Exception {
    38. System.out.println(this);
    39. ChannelPipeline pipeline = ch.pipeline();
    40. pipeline.addLast(new MyServerHandler());
    41. }
    42. }

    Client端测试代码

    1. public class MyClient {
    2. public static void main(String[] args) throws Exception{
    3. EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    4. try {
    5. Bootstrap bootstrap = new Bootstrap();
    6. bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).
    7. handler(new MyClientInitializer());
    8. ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
    9. channelFuture.channel().closeFuture().sync();
    10. } finally {
    11. eventLoopGroup.shutdownGracefully();
    12. }
    13. }
    14. }
    15. public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    16. private int count;
    17. @Override
    18. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    19. for (int i = 0; i < 10; ++i) {
    20. ByteBuf buffer = Unpooled.copiedBuffer("sent from client ", Charset.forName("utf-8"));
    21. ctx.writeAndFlush(buffer);
    22. }
    23. }
    24. @Override
    25. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
    26. byte[] buffer = new byte[msg.readableBytes()];
    27. msg.readBytes(buffer);
    28. String message = new String(buffer, Charset.forName("utf-8"));
    29. System.out.println("客户端接收到的消息内容: " + message);
    30. System.out.println("客户端接收到的消息数量: " + (++this.count));
    31. }
    32. @Override
    33. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    34. cause.printStackTrace();
    35. ctx.close();
    36. }
    37. }
    38. public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    39. @Override
    40. protected void initChannel(SocketChannel ch) throws Exception {
    41. ChannelPipeline pipeline = ch.pipeline();
    42. pipeline.addLast(new MyClientHandler());
    43. }
    44. }
    1. 启动服务端,然后
    2. 第一次启动客户端,服务端日志:![](https://img2018.cnblogs.com/blog/1496041/201908/1496041-20190811160831364-758557088.png#crop=0&crop=0&crop=1&crop=1&id=TTgda&originHeight=61&originWidth=1738&originalType=binary&ratio=1&rotation=0&showTitle=false&status=done&style=none&title=)
    3. 第二次启动客户端,服务端日志:

    TCP粘包和拆包 - 图2

    1. 第三次启动客户端,服务端日志:

    TCP粘包和拆包 - 图3

    1. 从结果中可以看出发生了粘包问题。
    2. 用长度编码方式解决TCP的粘包和拆包问题
    3. 自定义协议CustomProtocol,解码器MyDecoder,编码器MyEncoder
    1. public class CustomProtocol {
    2. private int length;
    3. private byte[] content;
    4. public int getLength() {
    5. return length;
    6. }
    7. public void setLength(int length) {
    8. this.length = length;
    9. }
    10. public byte[] getContent() {
    11. return content;
    12. }
    13. public void setContent(byte[] content) {
    14. this.content = content;
    15. }
    16. }
    17. public class MyDecoder extends ReplayingDecoder<Void> {
    18. @Override
    19. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    20. int length = in.readInt();
    21. byte[] content = new byte[length];
    22. in.readBytes(content);
    23. CustomProtocol personProtocol = new CustomProtocol();
    24. personProtocol.setLength(length);
    25. personProtocol.setContent(content);
    26. out.add(personProtocol);
    27. }
    28. }
    29. public class MyEncoder extends MessageToByteEncoder<CustomProtocol> {
    30. @Override
    31. protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) throws Exception {
    32. out.writeInt(msg.getLength());
    33. out.writeBytes(msg.getContent());
    34. }
    35. }
    1. Server端测试代码
    1. public class MyServer {
    2. public static void main(String[] args) throws Exception {
    3. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    4. EventLoopGroup workerGroup = new NioEventLoopGroup();
    5. try {
    6. ServerBootstrap serverBootstrap = new ServerBootstrap();
    7. serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).
    8. childHandler(new ChannelInitializer<SocketChannel>() {
    9. @Override
    10. protected void initChannel(SocketChannel ch) throws Exception {
    11. ChannelPipeline pipeline = ch.pipeline();
    12. pipeline.addLast(new MyDecoder());
    13. pipeline.addLast(new MyEncoder());
    14. pipeline.addLast(new MyClientHandler());
    15. }
    16. });
    17. ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
    18. channelFuture.channel().closeFuture().sync();
    19. } finally {
    20. bossGroup.shutdownGracefully();
    21. workerGroup.shutdownGracefully();
    22. }
    23. }
    24. }
    25. public class MyServerHandler extends SimpleChannelInboundHandler<CustomProtocol> {
    26. private int count;
    27. @Override
    28. protected void channelRead0(ChannelHandlerContext ctx, CustomProtocol msg) throws Exception {
    29. int length = msg.getLength();
    30. byte[] content = msg.getContent();
    31. System.out.println("服务端接收到的数据:");
    32. System.out.println("长度: " + length);
    33. System.out.println("内容:" + new String(content, Charset.forName("utf-8")));
    34. System.out.println("服务端接收到的消息数量:" + (++this.count));
    35. String responseMessage = UUID.randomUUID().toString();
    36. int responseLength = responseMessage.getBytes("utf-8").length;
    37. byte[] responseContent = responseMessage.getBytes("utf-8");
    38. CustomProtocol personProtocol = new CustomProtocol();
    39. personProtocol.setLength(responseLength);
    40. personProtocol.setContent(responseContent);
    41. ctx.writeAndFlush(personProtocol);
    42. }
    43. @Override
    44. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    45. cause.printStackTrace();
    46. ctx.close();
    47. }
    48. }
    1. Client端测试代码
    1. public class MyClient {
    2. public static void main(String[] args) throws Exception {
    3. EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    4. try {
    5. Bootstrap bootstrap = new Bootstrap();
    6. bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).
    7. handler(new ChannelInitializer<SocketChannel>() {
    8. @Override
    9. protected void initChannel(SocketChannel ch) throws Exception {
    10. ChannelPipeline pipeline = ch.pipeline();
    11. pipeline.addLast(new MyDecoder());
    12. pipeline.addLast(new MyEncoder());
    13. pipeline.addLast(new MyClientHandler());
    14. }
    15. });
    16. ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
    17. channelFuture.channel().closeFuture().sync();
    18. } finally {
    19. eventLoopGroup.shutdownGracefully();
    20. }
    21. }
    22. }
    23. public class MyClientHandler extends SimpleChannelInboundHandler<CustomProtocol> {
    24. private int count;
    25. @Override
    26. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    27. for (int i = 0; i < 10; ++i) {
    28. String messageToBeSent = "sent from client ";
    29. byte[] content = messageToBeSent.getBytes(Charset.forName("utf-8"));
    30. int length = messageToBeSent.getBytes(Charset.forName("utf-8")).length;
    31. CustomProtocol personProtocol = new CustomProtocol();
    32. personProtocol.setLength(length);
    33. personProtocol.setContent(content);
    34. ctx.writeAndFlush(personProtocol);
    35. }
    36. }
    37. @Override
    38. protected void channelRead0(ChannelHandlerContext ctx, CustomProtocol msg) throws Exception {
    39. int length = msg.getLength();
    40. byte[] content = msg.getContent();
    41. System.out.println("客户端接收到的消息: ");
    42. System.out.println("长度: " + length);
    43. System.out.println("内容:" + new String(content, Charset.forName("utf-8")));
    44. System.out.println("客户端接受到的消息数量:" + (++this.count));
    45. }
    46. @Override
    47. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    48. cause.printStackTrace();
    49. ctx.close();
    50. }
    51. }

    测试结果: TCP粘包和拆包 - 图4

    结果分析:没有发生Tcp的粘包和拆包。