TCP粘包拆包

客户端通过socket给服务端发送数据,为了传输更有效率,会将多次间隔较小的且数据量小的数据,通过nagle算法,合并成一个大的数据块,然后进行封包。这样做提高了效率,缺点就是你发送到服务端的数据,服务端不知道是不是完整的,不知道哪几小块数据拼起来才是原来的数据。举个例子:客户端要发送原信息是A和B两个数据包,服务端接收到之后,可能出现如下情况:

  • 正常情况:读取到了A和B两个数据包;
  • 粘包:A和B两个数据包一起读取了;
  • 拆包:读取了A数据包的一部分,A的另一部分和B数据包一起读取了;

由于TCP是没有消息保护边界的,也就是上面的消息,没有边界,服务端并不知道hello的o是一个边界,hello是一个单词,所以我们就得中服务端处理边界问题。这也就是粘包拆包问题。

Netty中粘拆包如何解决

  • 使用自定义协议+编解码器来解决。就是:服务端你不是不知道消息的长度吗?那我就让客户端发送的消息封装成一个对象,对象包括消息长度和消息内容,服务端读取的时候通过对象就可以拿到每次读取的长度了。

下面看具体案例:

  • 封装消息对象 MessageProtocol.java:
  1. @Data
  2. public class MessageProtocol {
  3. private int len; // 长度
  4. private byte[] content; // 发送的内容
  5. }
  • 解码器 MessageDecoder.java:
  1. public class MessageDecoder extends ReplayingDecoder<Void> {
  2. @Override
  3. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  4. System.out.println("MessageDecoder.decode 被调用");
  5. // 将byte转成MessageProtocol对象
  6. MessageProtocol msg = new MessageProtocol();
  7. int len = in.readInt();
  8. byte[] content = new byte[len];
  9. in.readBytes(content);
  10. msg.setContent(content);
  11. msg.setLen(len);
  12. // 放入到out中传递给下一个handler处理
  13. out.add(msg);
  14. }
  15. }
  • 编码器 MessageEncoder.java:
  1. public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {
  2. @Override
  3. protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
  4. System.out.println("MessageEncoder.encode被调用");
  5. out.writeInt(msg.getLen());
  6. out.writeBytes(msg.getContent());
  7. }
  8. }
  • 客户端 —- NettyClient.java:
  1. public class NettyClient {
  2. public static void main(String[] args) throws Exception {
  3. // 1. 创建事件循环组
  4. EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
  5. try {
  6. // 2. 创建启动对象
  7. Bootstrap bootstrap = new Bootstrap();
  8. // 3. 设置相关参数
  9. bootstrap.group(eventLoopGroup) // 设置线程组
  10. .channel(NioSocketChannel.class) // 设置通道
  11. .handler(new NettyClientInitializer());
  12. // 4. 连接服务端
  13. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
  14. // 5. 监听通道关闭
  15. channelFuture.channel().closeFuture().sync();
  16. } finally {
  17. eventLoopGroup.shutdownGracefully();
  18. }
  19. }
  20. }
  • 客户端 —- NettyClientInitializer.java:
  1. public class NettyClientInitializer extends ChannelInitializer<SocketChannel>{
  2. @Override
  3. protected void initChannel(SocketChannel sc) throws Exception {
  4. ChannelPipeline pipeline = sc.pipeline();
  5. pipeline.addLast(new MessageEncoder());
  6. pipeline.addLast(new MessageDecoder());
  7. pipeline.addLast(new NettyClientHandler());
  8. }
  9. }
  • 客户端 —- NettyClientHandler.java:
  1. public class NettyClientHandler extends SimpleChannelInboundHandler<MessageProtocol>{
  2. @Override
  3. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  4. // 发送10条数据
  5. for (int i=0; i<5; i++) {
  6. String msg = "hello " + i;
  7. byte[] bys = msg.getBytes("utf-8");
  8. int len = msg.getBytes("utf-8").length;
  9. // 创建协议包
  10. MessageProtocol message = new MessageProtocol();
  11. message.setLen(len);
  12. message.setContent(bys);
  13. // 发送
  14. ctx.writeAndFlush(message);
  15. }
  16. }
  17. @Override
  18. protected void channelRead0(ChannelHandlerContext ch, MessageProtocol msg) throws Exception {
  19. int len = msg.getLen();
  20. byte[] bys = msg.getContent();
  21. System.out.println("客户端收到消息:长度 = " + len + ", 内容 = " + new String(bys, Charset.forName("utf-8")));
  22. }
  23. }
  • 服务端 NettyServer.java:
  1. public class NettyServer {
  2. public static void main(String[] args) throws Exception {
  3. // 1. 创建boss group (boss group和work group含有的子线程数默认是cpu数 * 2)
  4. EventLoopGroup bossGroup = new NioEventLoopGroup();
  5. // 2. 创建work group
  6. EventLoopGroup workGroup = new NioEventLoopGroup();
  7. try {
  8. // 3. 创建服务端启动对象
  9. ServerBootstrap bootstrap = new ServerBootstrap();
  10. // 4. 配置启动参数
  11. bootstrap.group(bossGroup, workGroup) // 设置两个线程组
  12. .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道
  13. .childHandler(new NettyServerInitializer());
  14. // 5. 启动服务器并绑定端口
  15. ChannelFuture cf = bootstrap.bind(6666).sync();
  16. // 6. 对关闭通道进行监听
  17. cf.channel().closeFuture().sync();
  18. } finally {
  19. bossGroup.shutdownGracefully();
  20. workGroup.shutdownGracefully();
  21. }
  22. }
  23. }
  • 服务端 NettyServerInitializer.java:
  1. public class NettyServerInitializer extends ChannelInitializer<SocketChannel>{
  2. @Override
  3. protected void initChannel(SocketChannel sc) throws Exception {
  4. sc.pipeline().addLast(new MessageDecoder());
  5. sc.pipeline().addLast(new MessageEncoder());
  6. sc.pipeline().addLast(new NettyServerHandler());
  7. }
  8. }
  • 服务端 NettyServerHandler.java:
  1. public class NettyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
  2. private int count;
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
  5. // 接收数据并处理
  6. int len = msg.getLen();
  7. byte[] bys = msg.getContent();
  8. System.out.println("服务端第" + (++count) + "次收到消息:长度 = " + len + ", 内容 = " + new String(bys, Charset.forName("utf-8")));
  9. // 给客户端回复消息
  10. String responseContent = UUID.randomUUID().toString();
  11. byte[] rbys = responseContent.getBytes("utf-8");
  12. int rlen = responseContent.getBytes("utf-8").length;
  13. MessageProtocol rmsg = new MessageProtocol();
  14. rmsg.setContent(rbys);
  15. rmsg.setLen(rlen);
  16. ctx.writeAndFlush(rmsg);
  17. }
  18. @Override
  19. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  20. System.out.println(cause.getMessage());
  21. ctx.close();
  22. }
  23. }