1、短连接(粘包问题)
将 TCP 连接改成短连接,一个请求一个短连接。这样的话,建立连接到释放连接之间的消息即为传输的信息,消息也就产生了边界。
优点:方法十分简单,不需要在我们的应用中做过多修改
缺点:效率低下,TCP 连接和断开都会涉及三次握手以及四次握手,每个消息都会涉及这些过程,十分浪费性能
public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {// 分 10 次发送for (int i = 0; i < 10; i++) {send();}}private static void send() {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("conneted...");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx.writeAndFlush(buffer);// 发完即关ctx.close();}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}}
2、固定长度
让所有数据包长度固定
客户端消息不足16个字节时,进行填充
public class StickAndHalfPacketClient {static final Logger log = LoggerFactory.getLogger(StickAndHalfPacketClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");ByteBuf buffer = ctx.alloc().buffer();for (int i = 0; i < 10; i++) {buffer.writeBytes(fillByteTo16(i));}ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}private static byte[] fillByteTo16(int a){byte[] data = new byte[16];data[0] = (byte)a;IntStream.range(1,16).forEach(i -> data[i] = '#');return data;}}
服务端使用定长解码器
public class StickAndHalfPacketServer {static final Logger log = LoggerFactory.getLogger(StickAndHalfPacketServer.class);void start() {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new FixedLengthFrameDecoder(16));ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("connected {}", ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("disconnect {}", ctx.channel());super.channelInactive(ctx);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080);log.debug("{} binding...", channelFuture.channel());channelFuture.sync();log.debug("{} bound...", channelFuture.channel());channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();log.debug("stoped");}}public static void main(String[] args) {new StickAndHalfPacketServer().start();}}
3、固定分隔符
public class StickAndHalfPacketClient {static final Logger log = LoggerFactory.getLogger(StickAndHalfPacketClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connetted...");ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes("hello netty\nhello coder\nhahahah\n ddadaodahfdohqwopygfqficgdkuyqrofghoqvofgfq\n".getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}}
public class StickAndHalfPacketServer {static final Logger log = LoggerFactory.getLogger(StickAndHalfPacketServer.class);void start() {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("connected {}", ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("disconnect {}", ctx.channel());super.channelInactive(ctx);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080);log.debug("{} binding...", channelFuture.channel());channelFuture.sync();log.debug("{} bound...", channelFuture.channel());channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();log.debug("stoped");}}public static void main(String[] args) {new StickAndHalfPacketServer().start();}}
4、预设长度
在发送消息前,先约定用定长字节表示接下来数据的长度
private final int maxFrameLength; //发送数据包的最大长度private final int lengthFieldOffset; //长度字段偏移量private final int lengthFieldLength; //长度字段本身占用的字节数private final int lengthFieldEndOffset;private final int lengthAdjustment; //长度字段的偏移量矫正private final int initialBytesToStrip; //丢弃的起始字节数
public class HeadContent {public static void main(String[] args) {EmbeddedChannel channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(1024,0,4,3,4),new LoggingHandler(LogLevel.DEBUG));final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();addMsg("hello,world",buffer);addMsg("hello,java",buffer);addMsg("hello,netty",buffer);channel.writeInbound(buffer);}private static void addMsg(String msg,ByteBuf buffer){final byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);final int length = bytes.length;buffer.writeInt(length);buffer.writeBytes(bytes);buffer.writeBytes("-v1".getBytes(StandardCharsets.UTF_8));}}
