1、粘包问题代码演示

服务端

  1. public class StickAndHalfPacketServer {
  2. static final Logger log = LoggerFactory.getLogger(StickAndHalfPacketServer.class);
  3. void start() {
  4. NioEventLoopGroup boss = new NioEventLoopGroup(1);
  5. NioEventLoopGroup worker = new NioEventLoopGroup();
  6. try {
  7. ServerBootstrap serverBootstrap = new ServerBootstrap();
  8. serverBootstrap.channel(NioServerSocketChannel.class);
  9. serverBootstrap.group(boss, worker);
  10. serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  11. @Override
  12. protected void initChannel(SocketChannel ch) throws Exception {
  13. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
  14. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  15. @Override
  16. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  17. log.debug("connected {}", ctx.channel());
  18. super.channelActive(ctx);
  19. }
  20. @Override
  21. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  22. log.debug("disconnect {}", ctx.channel());
  23. super.channelInactive(ctx);
  24. }
  25. });
  26. }
  27. });
  28. ChannelFuture channelFuture = serverBootstrap.bind(8080);
  29. log.debug("{} binding...", channelFuture.channel());
  30. channelFuture.sync();
  31. log.debug("{} bound...", channelFuture.channel());
  32. channelFuture.channel().closeFuture().sync();
  33. } catch (InterruptedException e) {
  34. log.error("server error", e);
  35. } finally {
  36. boss.shutdownGracefully();
  37. worker.shutdownGracefully();
  38. log.debug("stoped");
  39. }
  40. }
  41. public static void main(String[] args) {
  42. new StickAndHalfPacketServer().start();
  43. }
  44. }

客户端

  1. public class StickAndHalfPacketClient {
  2. static final Logger log = LoggerFactory.getLogger(StickAndHalfPacketClient.class);
  3. public static void main(String[] args) {
  4. NioEventLoopGroup worker = new NioEventLoopGroup();
  5. try {
  6. Bootstrap bootstrap = new Bootstrap();
  7. bootstrap.channel(NioSocketChannel.class);
  8. bootstrap.group(worker);
  9. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  10. @Override
  11. protected void initChannel(SocketChannel ch) throws Exception {
  12. log.debug("connetted...");
  13. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  14. @Override
  15. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  16. log.debug("sending...");
  17. Random r = new Random();
  18. char c = 'a';
  19. for (int i = 0; i < 10; i++) {
  20. ByteBuf buffer = ctx.alloc().buffer();
  21. buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
  22. ctx.writeAndFlush(buffer);
  23. }
  24. }
  25. });
  26. }
  27. });
  28. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
  29. channelFuture.channel().closeFuture().sync();
  30. } catch (InterruptedException e) {
  31. log.error("client error", e);
  32. } finally {
  33. worker.shutdownGracefully();
  34. }
  35. }
  36. }

10条消息一次接收,总共160字节
image.png

2、半包问题代码演示

客户端代码希望发送 1 个消息,这个消息是 160 字节,代码改为

  1. ByteBuf buffer = ctx.alloc().buffer();
  2. for (int i = 0; i < 10; i++) {
  3. buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
  4. }
  5. ctx.writeAndFlush(buffer);

为现象明显,服务端修改一下接收缓冲区,其它代码不变
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);

服务端

  1. public class StickAndHalfPacketServer {
  2. static final Logger log = LoggerFactory.getLogger(StickAndHalfPacketServer.class);
  3. void start() {
  4. NioEventLoopGroup boss = new NioEventLoopGroup(1);
  5. NioEventLoopGroup worker = new NioEventLoopGroup();
  6. try {
  7. ServerBootstrap serverBootstrap = new ServerBootstrap();
  8. serverBootstrap.channel(NioServerSocketChannel.class);
  9. serverBootstrap.group(boss, worker);
  10. serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
  11. serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  12. @Override
  13. protected void initChannel(SocketChannel ch) throws Exception {
  14. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
  15. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  16. @Override
  17. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  18. log.debug("connected {}", ctx.channel());
  19. super.channelActive(ctx);
  20. }
  21. @Override
  22. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  23. log.debug("disconnect {}", ctx.channel());
  24. super.channelInactive(ctx);
  25. }
  26. });
  27. }
  28. });
  29. ChannelFuture channelFuture = serverBootstrap.bind(8080);
  30. log.debug("{} binding...", channelFuture.channel());
  31. channelFuture.sync();
  32. log.debug("{} bound...", channelFuture.channel());
  33. channelFuture.channel().closeFuture().sync();
  34. } catch (InterruptedException e) {
  35. log.error("server error", e);
  36. } finally {
  37. boss.shutdownGracefully();
  38. worker.shutdownGracefully();
  39. log.debug("stoped");
  40. }
  41. }
  42. public static void main(String[] args) {
  43. new StickAndHalfPacketServer().start();
  44. }
  45. }

客户端

  1. public class StickAndHalfPacketClient {
  2. static final Logger log = LoggerFactory.getLogger(StickAndHalfPacketClient.class);
  3. public static void main(String[] args) {
  4. NioEventLoopGroup worker = new NioEventLoopGroup();
  5. try {
  6. Bootstrap bootstrap = new Bootstrap();
  7. bootstrap.channel(NioSocketChannel.class);
  8. bootstrap.group(worker);
  9. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  10. @Override
  11. protected void initChannel(SocketChannel ch) throws Exception {
  12. log.debug("connetted...");
  13. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  14. @Override
  15. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  16. log.debug("sending...");
  17. Random r = new Random();
  18. char c = 'a';
  19. ByteBuf buffer = ctx.alloc().buffer();
  20. for (int i = 0; i < 10; i++) {
  21. buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
  22. }
  23. ctx.writeAndFlush(buffer);
  24. }
  25. });
  26. }
  27. });
  28. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
  29. channelFuture.channel().closeFuture().sync();
  30. } catch (InterruptedException e) {
  31. log.error("client error", e);
  32. } finally {
  33. worker.shutdownGracefully();
  34. }
  35. }
  36. }

一个消息分为两次接收,第一次20字节,第二次 140字节
image.png