1、短连接(粘包问题)

将 TCP 连接改成短连接,一个请求一个短连接。这样的话,建立连接到释放连接之间的消息即为传输的信息,消息也就产生了边界。

优点:方法十分简单,不需要在我们的应用中做过多修改
缺点:效率低下,TCP 连接和断开都会涉及三次握手以及四次握手,每个消息都会涉及这些过程,十分浪费性能

  1. public class HelloWorldClient {
  2. static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
  3. public static void main(String[] args) {
  4. // 10 次发送
  5. for (int i = 0; i < 10; i++) {
  6. send();
  7. }
  8. }
  9. private static void send() {
  10. NioEventLoopGroup worker = new NioEventLoopGroup();
  11. try {
  12. Bootstrap bootstrap = new Bootstrap();
  13. bootstrap.channel(NioSocketChannel.class);
  14. bootstrap.group(worker);
  15. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  16. @Override
  17. protected void initChannel(SocketChannel ch) throws Exception {
  18. log.debug("conneted...");
  19. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
  20. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  21. @Override
  22. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  23. log.debug("sending...");
  24. ByteBuf buffer = ctx.alloc().buffer();
  25. buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
  26. ctx.writeAndFlush(buffer);
  27. // 发完即关
  28. ctx.close();
  29. }
  30. });
  31. }
  32. });
  33. ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
  34. channelFuture.channel().closeFuture().sync();
  35. } catch (InterruptedException e) {
  36. log.error("client error", e);
  37. } finally {
  38. worker.shutdownGracefully();
  39. }
  40. }
  41. }

2、固定长度

让所有数据包长度固定

客户端消息不足16个字节时,进行填充

  1. public class StickAndHalfPacketClient {
  2. static final Logger log = LoggerFactory.getLogger(StickAndHalfPacketClient.class);
  3. public static void main(String[] args) {
  4. NioEventLoopGroup worker = new NioEventLoopGroup();
  5. try {
  6. Bootstrap bootstrap = new Bootstrap();
  7. bootstrap.channel(NioSocketChannel.class);
  8. bootstrap.group(worker);
  9. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  10. @Override
  11. protected void initChannel(SocketChannel ch) throws Exception {
  12. log.debug("connetted...");
  13. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  14. @Override
  15. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  16. log.debug("sending...");
  17. ByteBuf buffer = ctx.alloc().buffer();
  18. for (int i = 0; i < 10; i++) {
  19. buffer.writeBytes(fillByteTo16(i));
  20. }
  21. ctx.writeAndFlush(buffer);
  22. }
  23. });
  24. }
  25. });
  26. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
  27. channelFuture.channel().closeFuture().sync();
  28. } catch (InterruptedException e) {
  29. log.error("client error", e);
  30. } finally {
  31. worker.shutdownGracefully();
  32. }
  33. }
  34. private static byte[] fillByteTo16(int a){
  35. byte[] data = new byte[16];
  36. data[0] = (byte)a;
  37. IntStream.range(1,16).forEach(i -> data[i] = '#');
  38. return data;
  39. }
  40. }

服务端使用定长解码器

  1. public class StickAndHalfPacketServer {
  2. static final Logger log = LoggerFactory.getLogger(StickAndHalfPacketServer.class);
  3. void start() {
  4. NioEventLoopGroup boss = new NioEventLoopGroup(1);
  5. NioEventLoopGroup worker = new NioEventLoopGroup();
  6. try {
  7. ServerBootstrap serverBootstrap = new ServerBootstrap();
  8. serverBootstrap.channel(NioServerSocketChannel.class);
  9. serverBootstrap.group(boss, worker);
  10. serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
  11. serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  12. @Override
  13. protected void initChannel(SocketChannel ch) throws Exception {
  14. ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
  15. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
  16. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  17. @Override
  18. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  19. log.debug("connected {}", ctx.channel());
  20. super.channelActive(ctx);
  21. }
  22. @Override
  23. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  24. log.debug("disconnect {}", ctx.channel());
  25. super.channelInactive(ctx);
  26. }
  27. });
  28. }
  29. });
  30. ChannelFuture channelFuture = serverBootstrap.bind(8080);
  31. log.debug("{} binding...", channelFuture.channel());
  32. channelFuture.sync();
  33. log.debug("{} bound...", channelFuture.channel());
  34. channelFuture.channel().closeFuture().sync();
  35. } catch (InterruptedException e) {
  36. log.error("server error", e);
  37. } finally {
  38. boss.shutdownGracefully();
  39. worker.shutdownGracefully();
  40. log.debug("stoped");
  41. }
  42. }
  43. public static void main(String[] args) {
  44. new StickAndHalfPacketServer().start();
  45. }
  46. }

3、固定分隔符

  1. public class StickAndHalfPacketClient {
  2. static final Logger log = LoggerFactory.getLogger(StickAndHalfPacketClient.class);
  3. public static void main(String[] args) {
  4. NioEventLoopGroup worker = new NioEventLoopGroup();
  5. try {
  6. Bootstrap bootstrap = new Bootstrap();
  7. bootstrap.channel(NioSocketChannel.class);
  8. bootstrap.group(worker);
  9. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  10. @Override
  11. protected void initChannel(SocketChannel ch) throws Exception {
  12. log.debug("connetted...");
  13. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  14. @Override
  15. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  16. log.debug("sending...");
  17. ByteBuf buffer = ctx.alloc().buffer();
  18. buffer.writeBytes("hello netty\nhello coder\nhahahah\n ddadaodahfdohqwopygfqficgdkuyqrofghoqvofgfq\n".getBytes(StandardCharsets.UTF_8));
  19. ctx.writeAndFlush(buffer);
  20. }
  21. });
  22. }
  23. });
  24. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
  25. channelFuture.channel().closeFuture().sync();
  26. } catch (InterruptedException e) {
  27. log.error("client error", e);
  28. } finally {
  29. worker.shutdownGracefully();
  30. }
  31. }
  32. }
  1. public class StickAndHalfPacketServer {
  2. static final Logger log = LoggerFactory.getLogger(StickAndHalfPacketServer.class);
  3. void start() {
  4. NioEventLoopGroup boss = new NioEventLoopGroup(1);
  5. NioEventLoopGroup worker = new NioEventLoopGroup();
  6. try {
  7. ServerBootstrap serverBootstrap = new ServerBootstrap();
  8. serverBootstrap.channel(NioServerSocketChannel.class);
  9. serverBootstrap.group(boss, worker);
  10. serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
  11. serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  12. @Override
  13. protected void initChannel(SocketChannel ch) throws Exception {
  14. ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
  15. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
  16. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  17. @Override
  18. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  19. log.debug("connected {}", ctx.channel());
  20. super.channelActive(ctx);
  21. }
  22. @Override
  23. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  24. log.debug("disconnect {}", ctx.channel());
  25. super.channelInactive(ctx);
  26. }
  27. });
  28. }
  29. });
  30. ChannelFuture channelFuture = serverBootstrap.bind(8080);
  31. log.debug("{} binding...", channelFuture.channel());
  32. channelFuture.sync();
  33. log.debug("{} bound...", channelFuture.channel());
  34. channelFuture.channel().closeFuture().sync();
  35. } catch (InterruptedException e) {
  36. log.error("server error", e);
  37. } finally {
  38. boss.shutdownGracefully();
  39. worker.shutdownGracefully();
  40. log.debug("stoped");
  41. }
  42. }
  43. public static void main(String[] args) {
  44. new StickAndHalfPacketServer().start();
  45. }
  46. }

4、预设长度

在发送消息前,先约定用定长字节表示接下来数据的长度

  1. private final int maxFrameLength; //发送数据包的最大长度
  2. private final int lengthFieldOffset; //长度字段偏移量
  3. private final int lengthFieldLength; //长度字段本身占用的字节数
  4. private final int lengthFieldEndOffset;
  5. private final int lengthAdjustment; //长度字段的偏移量矫正
  6. private final int initialBytesToStrip; //丢弃的起始字节数
  1. public class HeadContent {
  2. public static void main(String[] args) {
  3. EmbeddedChannel channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(1024,0,4,3,4),
  4. new LoggingHandler(LogLevel.DEBUG));
  5. final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
  6. addMsg("hello,world",buffer);
  7. addMsg("hello,java",buffer);
  8. addMsg("hello,netty",buffer);
  9. channel.writeInbound(buffer);
  10. }
  11. private static void addMsg(String msg,ByteBuf buffer){
  12. final byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
  13. final int length = bytes.length;
  14. buffer.writeInt(length);
  15. buffer.writeBytes(bytes);
  16. buffer.writeBytes("-v1".getBytes(StandardCharsets.UTF_8));
  17. }
  18. }