Java Netty

1、半包粘包

例如发送两个数据包给服务器,由于服务端一次读取到的字节数不一定的分
没有半包和拆包:服务器分两次读取到两个地理的数据包,这个情况没有拆包和粘包的情况

  • 粘包:服务器一次收到两个数据包,在一起收到的
  • 拆包:第一次读取到完成的第一个包和第二个包的一部分内容,第二次读取到第二个包的剩余内容
  • 整包:第一次读取到第一包的部分内容,第二次读取到第一个包的剩余部分和第二个包的全部
  • 多次拆包:如果接收滑窗非常小,数据量大的时候发生多次发送的接收的情况

    为什么会出现半包和粘包

    Netty粘包和拆包的问题,本质上归结于TCP的粘包和拆包。 :::info 网络上发送一个完整的数据包时,可能会被TCP拆分成多个包进行发送,也可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP的粘包和拆包问题 ::: 1、HTTP 中有一个 Nagle 算法,每个报文都是一段的,使用网络发送发现网络效率低,然后 HTTP 设置一个算法,设置到一定程度发,所以出现一些延时,提高销量,所以形成了粘包
    2、HTTP缓冲区引起的,报文段大的时候的时候直接弄在一起发送过去。

    怎么解决

    不断的从 TCP 的缓冲区中读取数据,每次读取完成都需要判断是否是一个完整的数据包
    如果是读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 TCP 缓冲区中读取,直到得到一个完整的数据包
    • 定长
    • 分隔符
    • 基于长度的变长包

如果当前读到的数据加上已经读取到的数据足以拼接成一个数据包,那就讲已经读取的数据拼接本次读取的数据,构成一个完整的业务数据包传递到业务逻辑上,多余的数据保留,方便下次的读取或者数据链接。
TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,往往采用如下4种方式。
(1)消息长度固定:累计读取到固定长度为LENGTH之后就认为读取到了一个完整的消息。然后将计数器复位,重新开始读下一个数据报文。
(2)回车换行符作为消息结束符:在文本协议中应用比较广泛。
(3)将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符。
(4)通过在消息头中定义长度字段来标示消息的总长度。
Netty中针对这四种场景均有对应的解码器作为解决方案,比如:
(1)通过FixedLengthFrameDecoder定长解码器来解决定长消息的黏包问题;
(2)通过LineBasedFrameDecoder和StringDecoder来解决以回车换行符作为消息结束符的TCP黏包的问题;
(3)通过DelimiterBasedFrameDecoder特殊分隔符解码器来解决以特殊符号作为消息结束符的TCP黏包问题;
(4)通过LengthFieldBasedFrameDecoder自定义长度解码器解决TCP黏包问题。

2、Netty常用的解码器

image.png

LineBasedFrameDecoder

回车换行解码器
配合StringDecoder

DelimiterBasedFrameDecoder

分隔符解码器

FixedLengthFrameDecoder

固定长度解码器,从缓冲池读取固定长度的内容。
image.png

LengthFieldBasedFrameDecoder

不能超过1024个字节不然会报错
基于’长度’解码器(私有协议最常用)

3、拆包的类

Netty拆包的基类 - ByteToMessageDecoder

自解析
内部维护了一个数据累积器cumulation,每次读取到数据都会不断累加,然后尝试对累加到cumulation
的数据进行拆包,拆成一个完整的业务数据包。每次都将读取到的数据通过内存拷贝的方式, 累积到cumulation
image.png
然后调用子类的 decode 方法对累积的数据尝试进行拆包

LengthFieldPrepender

长度解码器
参数说明
lengthFieldLength:长度属性的字节长度
lengthIncludesLengthFieldLength:false,长度字节不算在总长度中,true,算到总长度中

LengthFieldBasedFrameDecoder

参数说明
maxFrameLength:包的最大长度
lengthFieldOffset:长度属性的起始位(偏移位),包中存放长度属性字段的起始位置
lengthFieldLength:长度属性的长度
lengthAdjustment:长度调节值,在总长被定义为包含包头长度时,修正信息长度
initialBytesToStrip:跳过的字节数,根据需要跳过lengthFieldLength个字节,以便接收端直接接受到不含“长度属性”的内容
编解码器的作用就是讲原始字节数据与自定义的消息对象进行互转
Decoder(解码器)
Encoder(编码器)
支持业界主流的序列化框架
Protobuf
Jboss Marshalling
Java Serialization
解码1拆包:把整个 ByteBuf 数据,分成一个个 ByteBuf,每个表示一个包
解码2反序列化:把每个包的 ByteBuf 字节数组转成 java object

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelOption;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  11. public class StickyDemoClient {
  12. public static void main(String[] args) throws Exception {
  13. int port = 8080;
  14. if (args != null && args.length > 0) {
  15. try {
  16. port = Integer.valueOf(args[0]);
  17. } catch (NumberFormatException e) {
  18. }
  19. }
  20. new StickyDemoClient().connect(port, "127.0.0.1");
  21. }
  22. public void connect(int port, String host) throws Exception {
  23. // 工作线程组
  24. EventLoopGroup group = new NioEventLoopGroup();
  25. try {
  26. Bootstrap b = new Bootstrap();
  27. b.group(group).channel(NioSocketChannel.class)
  28. .option(ChannelOption.TCP_NODELAY, true)
  29. .handler(new ChannelInitializer<SocketChannel>() {
  30. @Override
  31. public void initChannel(SocketChannel ch) throws Exception {
  32. //ch.pipeline().addLast("framer", new FixedLengthFrameDecoder(139));
  33. // ch.pipeline().addLast("framer", new StickyDemoDecodeHandlerV2(
  34. // Unpooled.wrappedBuffer(new byte[] { '#' })));
  35. ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192,
  36. Unpooled.wrappedBuffer(new byte[] { '#' })));
  37. ch.pipeline().addLast(new StickyDemoClientHandler());
  38. }
  39. });
  40. // 发起异步连接操作
  41. ChannelFuture f = b.connect(host, port).sync();
  42. // 等待客户端链路关闭
  43. f.channel().closeFuture().sync();
  44. } finally {
  45. // 优雅退出,释放线程池资源
  46. group.shutdownGracefully();
  47. }
  48. }
  49. }
  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. import io.netty.util.CharsetUtil;
  6. public class StickyDemoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
  7. private static String[] alphabets = {"A", "B", "C", "D", "E", "F", "G", "H", "I",
  8. "J", "K", "L", "M", "N", "O", "P"};
  9. @Override
  10. public void channelActive(ChannelHandlerContext ctx) {
  11. for(int i=0; i<10; i++) {
  12. StringBuilder builder = new StringBuilder();
  13. builder.append("这是第");
  14. builder.append(i);
  15. builder.append("条消息, 内容是:");
  16. for(int j=0; j<100; j++) {
  17. builder.append(alphabets[i]);
  18. }
  19. builder.append("......");
  20. builder.append("#");
  21. System.out.println(builder.toString().getBytes().length);
  22. ctx.writeAndFlush(Unpooled.copiedBuffer(builder.toString(),
  23. CharsetUtil.UTF_8));
  24. }
  25. }
  26. @Override
  27. public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
  28. System.out.println("客户端接收到消息:" + in.toString(CharsetUtil.UTF_8));
  29. }
  30. @Override
  31. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  32. cause.printStackTrace();
  33. ctx.close();
  34. }
  35. }
  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.ByteBufAllocator;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. public class StickyDemoDecodeHandler extends ChannelInboundHandlerAdapter {
  8. //存放待拆包数据的缓冲区
  9. private ByteBuf cache;
  10. private int frameLength;
  11. public StickyDemoDecodeHandler(int length) {
  12. this.frameLength = length;
  13. }
  14. static ByteBuf expandCache(ByteBufAllocator alloc, ByteBuf cache, int readable) {
  15. ByteBuf oldCache = cache;
  16. cache = alloc.buffer(oldCache.readableBytes() + readable);
  17. cache.writeBytes(oldCache);
  18. oldCache.release();
  19. return cache;
  20. }
  21. @Override
  22. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  23. ByteBuf data = (ByteBuf) msg;
  24. try {
  25. //读取每一个消息,创建缓冲区
  26. if (cache == null) {
  27. cache = ctx.alloc().buffer(1024);
  28. } else {
  29. //如果现有的缓冲区容量太小,无法容纳原有数据+新读入的数据,就扩容(重新创建一个大的,并把数据拷贝过去)
  30. if (cache.writerIndex() > cache.maxCapacity() - data.readableBytes()) {
  31. cache = expandCache(ctx.alloc(), cache, data.readableBytes());
  32. }
  33. }
  34. //把新的数据读入缓冲区
  35. cache.writeBytes(data);
  36. //每次读取frameLength(定长)的数据,做为一个包,存储起来
  37. List<ByteBuf> output = new ArrayList<>();
  38. while (cache.readableBytes() >= frameLength) {
  39. output.add(cache.readBytes(frameLength));
  40. }
  41. //还有部分数据不够一个包,10, 15, 一个10个,还剩5个
  42. if (cache.isReadable()) {
  43. cache.discardReadBytes();
  44. }
  45. for (int i = 0; i < output.size(); i++) {
  46. ctx.fireChannelRead(output.get(i));
  47. }
  48. } finally {
  49. data.release();
  50. }
  51. }
  52. @Override
  53. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  54. cause.printStackTrace();
  55. ctx.close();
  56. }
  57. }
  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.ByteBufAllocator;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. public class StickyDemoDecodeHandlerV2 extends ChannelInboundHandlerAdapter {
  8. private ByteBuf cache;
  9. private byte delimiter; //包分隔符
  10. public StickyDemoDecodeHandlerV2(ByteBuf delimiter) {
  11. if (delimiter == null) {
  12. throw new NullPointerException("delimiter");
  13. }
  14. if (!delimiter.isReadable()) {
  15. throw new IllegalArgumentException("empty delimiter");
  16. }
  17. this.delimiter = delimiter.readByte();
  18. ;
  19. }
  20. static ByteBuf expandCache(ByteBufAllocator alloc, ByteBuf cache, int readable) {
  21. ByteBuf oldCache = cache;
  22. cache = alloc.buffer(oldCache.readableBytes() + readable);
  23. cache.writeBytes(oldCache);
  24. oldCache.release();
  25. return cache;
  26. }
  27. @Override
  28. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  29. ByteBuf data = (ByteBuf) msg;
  30. try {
  31. if (cache == null) {
  32. cache = ctx.alloc().buffer(1024);
  33. } else {
  34. if (cache.writerIndex() > cache.maxCapacity() - data.readableBytes()) {
  35. cache = expandCache(ctx.alloc(), cache, data.readableBytes());
  36. }
  37. }
  38. cache.writeBytes(data);
  39. List<ByteBuf> output = new ArrayList<>();
  40. int frameIndex = 0;
  41. int frameEndIndex = 0;
  42. int length = cache.readableBytes();
  43. while (frameIndex <= length) {
  44. frameEndIndex = cache.indexOf(frameIndex + 1, length, delimiter);
  45. if (frameEndIndex == -1) {
  46. cache.discardReadBytes();
  47. break;
  48. }
  49. output.add(cache.readBytes(frameEndIndex - frameIndex));
  50. cache.skipBytes(1);
  51. frameIndex = frameEndIndex + 1;
  52. }
  53. if (cache.isReadable()) {
  54. cache.discardReadBytes();
  55. }
  56. for (int i = 0; i < output.size(); i++) {
  57. ctx.fireChannelRead(output.get(i));
  58. }
  59. } finally {
  60. data.release();
  61. }
  62. }
  63. @Override
  64. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  65. cause.printStackTrace();
  66. ctx.close();
  67. }
  68. }
  1. import io.netty.bootstrap.ServerBootstrap;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  10. public class StickyDemoServer {
  11. public static void main(String[] args) throws Exception {
  12. int port = 8080;
  13. if (args != null && args.length > 0) {
  14. try {
  15. port = Integer.valueOf(args[0]);
  16. } catch (NumberFormatException e) {
  17. // 采用默认值
  18. }
  19. }
  20. new StickyDemoServer().bind(port);
  21. }
  22. public void bind(int port) throws Exception {
  23. // 第一步:
  24. // 配置服务端的NIO线程组
  25. // 主线程组, 用于接受客户端的连接,但是不做任何具体业务处理,像老板一样,负责接待客户,不具体服务客户
  26. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  27. // 工作线程组, 老板线程组会把任务丢给他,让手下线程组去做任务,服务客户
  28. EventLoopGroup workerGroup = new NioEventLoopGroup();
  29. try {
  30. // 类ServerBootstrap用于配置Server相关参数,并启动Server
  31. ServerBootstrap b = new ServerBootstrap();
  32. // 链式调用
  33. // 配置parentGroup和childGroup
  34. b.group(bossGroup, workerGroup)
  35. // 配置Server通道
  36. .channel(NioServerSocketChannel.class)
  37. // 配置通道的ChannelPipeline
  38. .childHandler(new ChildChannelHandler());
  39. // 绑定端口,并启动server,同时设置启动方式为同步
  40. ChannelFuture f = b.bind(port).sync();
  41. System.out.println(StickyDemoServer.class.getName() + " 启动成功,在地址[" + f.channel().localAddress() + "]上等待客户请求......");
  42. // 等待服务端监听端口关闭
  43. f.channel().closeFuture().sync();
  44. } finally {
  45. // 优雅退出,释放线程池资源
  46. bossGroup.shutdownGracefully();
  47. workerGroup.shutdownGracefully();
  48. }
  49. }
  50. private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
  51. @Override
  52. protected void initChannel(SocketChannel ch) throws Exception {
  53. //ch.pipeline().addLast("framer", new FixedLengthFrameDecoder(139));
  54. ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Unpooled.wrappedBuffer(new byte[] { '#' })));
  55. //ch.pipeline().addLast("framer", new StickyDemoDecodeHandler(139));
  56. // ch.pipeline().addLast("framer", new StickyDemoDecodeHandlerV2(
  57. // Unpooled.wrappedBuffer(new byte[] { '#' })));
  58. ch.pipeline().addLast(new StickyDemoServerHandler());
  59. }
  60. }
  61. }
  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelFutureListener;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import io.netty.util.CharsetUtil;
  7. public class StickyDemoServerHandler extends ChannelInboundHandlerAdapter {
  8. @Override
  9. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  10. ByteBuf in = (ByteBuf) msg;
  11. System.out.println(
  12. "服务器接收到消息:" + in.toString(CharsetUtil.UTF_8));
  13. ctx.write(in);
  14. // ctx.write(Unpooled.copiedBuffer("#", CharsetUtil.UTF_8));
  15. //compositeBuffer.addComponent(in);
  16. // ByteBuf buf = ctx.alloc().directBuffer();
  17. // buf.writeBytes("#".getBytes());
  18. // CompositeByteBuf compositeBuffer = ctx.alloc().compositeBuffer();
  19. // compositeBuffer.addComponents(true, in, buf);
  20. // ctx.write(compositeBuffer);
  21. }
  22. @Override
  23. public void channelReadComplete(ChannelHandlerContext ctx)
  24. throws Exception {
  25. ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
  26. .addListener(ChannelFutureListener.CLOSE);
  27. }
  28. @Override
  29. public void exceptionCaught(ChannelHandlerContext ctx,
  30. Throwable cause) {
  31. cause.printStackTrace();
  32. ctx.close();
  33. }
  34. }