Netty从底层Java通道读取ByteBuf二进制数据,传入Netty通道的流水线,随后开始入站处理。在入站处理过程中,需要将ByteBuf二进制类型解码成Java POJO对象。这个解码过程可以通过Netty的Decoder(解码器)去完成。
在出站处理过程中,业务处理后的结果(出站数据)需要从某个Java POJO对象编码为最终的ByteBuf二进制数据,然后通过底层Java通道发送到对端。在编码过程中,需要用到Netty的Encoder(编码器)去完成数据的编码工作。

6.1 Decoder原理与实战

Netty中的解码器都是Inbound入站处理器类型,都直接或者间接地实现了入站处理的超级接口ChannelInboundHandler。Netty内置了ByteToMessageDecoder解码器

6.1.1 ByteToMessageDecoder解码器处理流程

ByteToMessageDecoder是一个非常重要的解码器基类,是一个抽象类,实现了解码处理的基础逻辑和流程。ByteToMessageDecoder继承自ChannelInboundHandlerAdapter适配器,是一个入站处理器,用于完成从ByteBuf到Java POJO对象的解码功能。
ByteToMessageDecoder解码的流程大致如图
image.png
ByteToMessageDecoder在设计上使用了模板模式(Template Pattern)。
ByteToMessageDecoder的子类要做的是将从入站ByteBuf解码出来的所有Object实例加入父类的List列表中。
实现一个解码器,首先要继承ByteToMessageDecoder抽象类,然后实现其基类的decode()抽象方法。总体来说,流程大致如下:

  1. 继承ByteToMessageDecoder抽象类。
  2. 实现基类的decode()抽象方法,将ByteBuf到目标POJO的解码逻辑写入此方法,以将ByteBuf中的二进制数据解码成一个一个的Java POJO对象。
  3. 解码完成后,需要将解码后的Java POJO对象放入decode()方法的List实参中,此实参是父类所传入的解码结果收集容器。

    6.1.2 自定义Byte2IntegerDecoder整数解码器

    下面是一个小小的ByteToMessageDecoder子类的实战案例:整数解码器。其功能是将ByteBuf中的字节解码成整数类型。 ```java package com.crazymakercircle.netty.decoder; //… public class Byte2IntegerDecoder extends ByteToMessageDecoder { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {
    1. while (in.readableBytes() >= 4) {
    2. int i = in.readInt();
    3. Logger.info("解码出一个整数: " + i);
    4. out.add(i);
    5. }
    } }
    1. 这里编写一个简单的配套处理器IntegerProcessHandler,用于处理Byte2IntegerDecoder解码之后的整数。其功能是:读取上一站的入站数据,把它转换成整数,并且输出到Console(控制台)上
    2. ```java
    3. package com.crazymakercircle.netty.decoder;
    4. //…
    5. public class IntegerProcessHandler extends ChannelInboundHandlerAdapter {
    6. @Override
    7. public void channelRead(ChannelHandlerContext ctx, Object msg)…{
    8. Integer integer = (Integer) msg;
    9. Logger.info("打印出一个整数: " + integer);
    10. }
    11. }

    使用EmbeddedChannel(嵌入式通道)编写一个测试用例,代码如下:

    1. package com.crazymakercircle.netty.decoder;
    2. //…
    3. public class Byte2IntegerDecoderTester {
    4. /**
    5. * 整数解码器的使用实例
    6. */
    7. @Test
    8. public void testByteToIntegerDecoder() {
    9. ChannelInitializer i= new ChannelInitializer<EmbeddedChannel>(){
    10. protected void initChannel(EmbeddedChannel ch) {
    11. ch.pipeline().addLast(new Byte2IntegerDecoder());
    12. ch.pipeline().addLast(new IntegerProcessHandler());
    13. }
    14. };
    15. EmbeddedChannel channel = new EmbeddedChannel(i);
    16. for (int j = 0; j < 100; j++) {
    17. ByteBuf buf = Unpooled.buffer();
    18. buf.writeInt(j);
    19. channel.writeInbound(buf);
    20. }
    21. //…
    22. }
    23. }
    1. ByteBuf缓冲区并没有发送到流水线的TailContext(尾部处理器),将由谁负责释放引用计数呢?<br />其实,基类ByteToMessageDecoder会完成ByteBuf释放工作,它会调用ReferenceCountUtil.release(in)方法将之前的ByteBuf缓冲区的引用计数减1<br />这个ByteBuf先被释放了,如果在后面还需要用到,怎么办?<br />可以在子类的decode()方法中调用一次ReferenceCountUtil.retain(in)来增加一次引用计数,不过在使用完成后要及时将增加的这次计数减去。

    6.1.3 ReplayingDecoder解码器

    使用上面的Byte2IntegerDecoder整数解码器会面临一个问题:需要对ByteBuf的长度进行检查,有足够的字节才能进行整数的读取。这种长度的判断是否可以由Netty来帮忙完成呢?答案是可以的,可以使用Netty的ReplayingDecoder类省去长度的判断。
    ReplayingDecoder类是ByteToMessageDecoder的子类,作用是:
    在读取ByteBuf缓冲区的数据之前,需要检查缓冲区是否有足够的字节。若ByteBuf中有足够的字节,则会正常读取;反之,则会停止解码。
    使用ReplayingDecoder基类改写上一个整数解码器,可以不进行长度检测。创建一个新的整数解码器,类名为Byte2IntegerReplayDecoder,代码如下:

    1. package com.crazymakercircle.netty.decoder;
    2. //…
    3. public class Byte2IntegerReplayDecoder extends ReplayingDecoder {
    4. @Override
    5. public void decode(ChannelHandlerContext ctx,
    6. ByteBuf in, List<Object> out) {
    7. int i = in.readInt();
    8. Logger.info("解码出一个整数: " + i);
    9. out.add(i);
    10. }
    11. }
    1. ReplayingDecoder进行长度判断的原理很简单:内部定义一个新的二进制缓冲区类(类名为ReplayingDecoderBuffer),又对ByteBuf缓冲区进行装饰。该装饰器的特点是,在缓冲区真正读数据之前先进行长度的判断:如果长度合格,就读取数据;否则,抛出ReplayErrorReplayingDecoder捕获到ReplayError后会留着数据,等待下一次IO事件到来时再读取。<br />实质上,ReplayingDecoder的作用远远不止于进行长度判断,它更重要的作用是用于分包传输的应用场景。

    6.1.4 整数的分包解码器的实战案例

    发送端出去的包在传输过程中会进行多次拆分和组装。接收端收到的包和发送端所发送的包不是一模一样的
    image.png
    ReplayingDecoder的一个很重要的属性——state成员属性。该成员属性的作用是保存当前解码器在解码过程中所处的阶段。在Netty源代码中,该属性的定义如下:

    1. public abstract class ReplayingDecoder<S>
    2. extends ByteToMessageDecoder {
    3. //省略不相关的代码
    4. //缓冲区装饰器
    5. private final ReplayingDecoderByteBuf replayable = new ReplayingDecoderByteBuf();
    6. //重要的成员属性,表示解码过程中所处的阶段,类型为泛型,默认为Object
    7. private S state;
    8. //默认的构造器,state值为空,没有用到该属性
    9. protected ReplayingDecoder() {
    10. this((Object)null);
    11. }
    12. //重载的构造器
    13. protected ReplayingDecoder(S initialState) {
    14. //初始化内部的ByteBuf缓冲区装饰器类
    15. this.replayable = new ReplayingDecoderByteBuf();
    16. //读指针检查点,默认为-1
    17. this.checkpoint = -1;
    18. //状态state的默认值为null
    19. this.state = initialState;
    20. }
    21. //省略不相关的方法
    22. }
    1. 下面先基于ReplayingDecoder基础解码器编写一个整数相加的解码器:解码两个整数,并把这两个数据之和作为解码的结果。
    1. public class IntegerAddDecoder extends ReplayingDecoder<IntegerAddDecoder.PHASE> {
    2. //自定义的状态枚举值,代表两个阶段
    3. enum PHASE {
    4. PHASE_1, //第一个阶段,仅仅提取第一个整数,完成后进入第二个阶段
    5. PHASE_2 //第二个阶段,提取第二个整数后,还需要计算相加的结果并输出
    6. }
    7. private int first;
    8. private int second;
    9. public IntegerAddDecoder() {
    10. //在构造函数中,初始化父类的state属性为PHASE_1,表示第一个阶段
    11. super(PHASE.PHASE_1);
    12. }
    13. @Override
    14. protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
    15. switch (state()){//判断当前的状态
    16. case PHASE_1:
    17. //从装饰器ByteBuf 中读取数据
    18. first = byteBuf.readInt();
    19. //第一步解析成功,进入第二步,设置“state”为第二个阶段
    20. checkpoint(PHASE.PHASE_2);
    21. break;
    22. //提取第二个整数后还需要计算相加的结果
    23. //并将和作为解码的结果输出
    24. case PHASE_2:
    25. second = byteBuf.readInt();
    26. Integer sum = first + second;
    27. list.add(sum);
    28. //进入下一轮解码的第一步,设置“state”为第一个阶段
    29. checkpoint(PHASE.PHASE_1);
    30. break;
    31. default:
    32. break;
    33. }
    34. }
    35. }
    1. 每一个阶段一完成就通过checkpoint(PHASE)方法(类似于state属性的setter()方法)把当前的state状态设置为新的PHASE枚举值。checkpoint()方法有两个作用:
    1. 设置state属性的值,更新一下当前的状态。
    2. 设置“读指针检查点”。

      6.1.5 字符串的分包解码器的实战案例

      在原理上,字符串分包解码和整数分包解码是一样的,所不同的是:整数的长度是固定的,目前在Java中是4字节;字符串的长度是不固定的,是可变的。
      如何获取字符串的长度信息呢?这是一个小小的难题,和程序所使用的具体传输协议是强相关的。一般来说,在Netty中进行字符串的传输可以采用普通的Head-Content内容传输协议。该协议的规则很简单:

    3. 在协议的Head部分放置字符串的字节长度,可以用一个整数类型来描述。

    4. 在协议的Content部分,放置字符串的字节数组。

    下面就是基于ReplayingDecoder实现自定义的字符串分包解码器的示例程序:

    1. package com.crazymakercircle.netty.decoder;
    2. //…
    3. public class StringReplayDecoder extends ReplayingDecoder<StringReplayDecoder.PHASE> {
    4. enum PHASE {
    5. PHASE_1, //第一个阶段:解码出字符串的长度
    6. PHASE_2 //第二个阶段:按照第一个阶段的字符串长度解码出字符串的内容
    7. }
    8. private int length;
    9. private byte[] inBytes;
    10. public StringReplayDecoder() {
    11. //在构造函数中,需要初始化父类的state属性为PHASE_1阶段
    12. super(PHASE.PHASE_1);
    13. }
    14. @Override
    15. protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
    16. switch (state()) {
    17. case PHASE_1:
    18. //第一步,从装饰器ByteBuf中读取字符串的长度
    19. length = byteBuf.readInt();
    20. inBytes = new byte[length];
    21. //进入第二步,读取内容
    22. //并设置“读指针检查点”为当前的readerIndex位置
    23. checkpoint(PHASE.PHASE_2);
    24. break;
    25. case PHASE_2:
    26. //第二步,从装饰器ByteBuf 中读取字符串的内容数组
    27. byteBuf.readBytes(inBytes, 0, length);
    28. list.add(new String(inBytes, "UTF-8"));
    29. //第二步解析成功,进入下一个字符串的解析
    30. //并设置“读指针检查点”为当前的readerIndex位置
    31. checkpoint(PHASE.PHASE_1);
    32. break;
    33. default:
    34. break;
    35. }
    36. }
    37. }

    这里编写一个简单的辅助性质的业务处理器。其功能是读取上一站的入站数据,把它转换成字符串,并输出到控制台上。新业务处理器的名称为StringProcessHandler,具体代码如下:

    1. package com.crazymakercircle.netty.decoder;
    2. //…
    3. public class StringProcessHandler extends ChannelInboundHandlerAdapter {
    4. @Override
    5. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    6. String s = (String) msg;
    7. Logger.info("打印出一个字符串: " + s);
    8. // super.channelRead(ctx, msg);
    9. }
    10. }
    1. 编写一个测试用例,代码如下:
    1. package com.crazymakercircle.netty.decoder;
    2. //…
    3. public class StringReplayDecoderTester {
    4. static String content = "弯弯入我心,秋凉知我意!";
    5. @Test
    6. public void testStringReplayingDecoder() {
    7. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>(){
    8. @Override
    9. protected void initChannel(EmbeddedChannel ch) throws Exception {
    10. ch.pipeline().addLast(new StringReplayDecoder());
    11. ch.pipeline().addLast(new StringProcessHandler());
    12. }
    13. };
    14. EmbeddedChannel channel = new EmbeddedChannel(i);
    15. //待发送字符串content的字节数组
    16. byte[] bytes = content.getBytes(Charset.forName("utf-8"));
    17. //循环发送100轮,每一轮可以理解为发送一个Head-Content报文
    18. for (int j = 0; j < 100; j++) {//发送100个包
    19. //每个包为随机1~3个 "弯弯入我心,秋凉知我意"
    20. int random = RandomUtil.randInMod(3);
    21. ByteBuf buffer = Unpooled.buffer();
    22. //发送长度:字节数组长度*重复次数
    23. buffer.writeInt(bytes.length * random);
    24. //重复拷贝content的字节数据到发送缓冲区
    25. for (int k = 0; k < random; k++) {
    26. buffer.writeBytes(bytes);
    27. }
    28. //发送内容:发送buf缓冲区
    29. channel.writeInbound(buffer);
    30. }
    31. }
    32. }

    通过ReplayingDecoder解码器,可以正确地解码分包后的ByteBuf数据包。但是,在实际开发中不建议继承这个类,原因如下:

    1. 不是所有的ByteBuf操作都被ReplayingDecoderBuffer装饰器类支持,可能有些ByteBuf方法在ReplayingDecoder的decode()方法中会抛出ReplayError异常。
    2. 在数据解码逻辑复杂的应用场景下,ReplayingDecoder在解码速度上相对较差。因为在ByteBuf长度不够时,ReplayingDecoder会捕获一个ReplayError异常,并会把ByteBuf中的读指针还原到之前的读指针检查点(checkpoint),然后结束这次解析操作,等待下一次IO读事件。在网络条件比较糟糕时,一个数据包的解析逻辑会被反复执行多次,此时解析过程是一个消耗CPU的操作,解码速度上相对较差。所以,ReplayingDecoder更多地应用于数据解析逻辑简单的场景。

    在数据解析复杂的应用场景下,建议使用前文介绍的解码器ByteToMessageDecoder或者其子类(后文介绍)。这里继承ByteToMessageDecoder基类,实现一个定制的Head-Content协议字符串内容解码器,代码如下:

    1. package com.crazymakercircle.netty.decoder;
    2. //…
    3. public class StringIntegerHeaderDecoder extends ByteToMessageDecoder {
    4. @Override
    5. protected void decode(ChannelHandlerContext channelHandlerContext,
    6. ByteBuf buf, List<Object> out) …{
    7. //可读字节小于4,消息头还没读满,返回
    8. if (buf.readableBytes() < 4) {
    9. return;
    10. }
    11. //消息头已经完整
    12. //在真正开始从缓冲区读取数据之前,调用markReaderIndex()设置mark
    13. buf.markReaderIndex();
    14. int length = buf.readInt();
    15. //从缓冲区读出消息头的大小,这会导致readIndex读指针变化
    16. //如果剩余长度不够消息体的大小,则需要重置读指针,下一次从相同的位置处理
    17. if (buf.readableBytes() < length) {
    18. //读指针重置到消息头的readIndex位置处
    19. buf.resetReaderIndex();
    20. return;
    21. }
    22. //读取数据,编码成字符串
    23. byte[] inBytes = new byte[length];
    24. buf.readBytes(inBytes, 0, length);
    25. out.add(new String(inBytes, "UTF-8"));
    26. }
    27. }
    1. 表面上ByteToMessageDecoder基类是无状态的,不像ReplayingDecoder那样需要使用状态位来保存当前的读取阶段,实际上ByteToMessageDecoder也是有状态的。其内部有一个二进制字节累积器cumulation,用来保存没有解析完的二进制内容。所以,ByteToMessageDecoder及其子类都是有状态的,其实例不能在通道之间共享。在每次初始化通道的流水线时,都要重新创建一个ByteToMessageDecoder或者它的子类的实例。

    6.1.6 MessageToMessageDecoder解码器

    是否存在一些解码器可以将一种POJO对象解码成另外一种POJO对象呢?
    存在。与前面不同的是,解码器需要继承一个新的Netty解码器基类MessageToMessageDecoder。在继承它的时候,需要明确的泛型实参,用于指定入站消息的Java POJO类型。
    下面通过实现一个整数到字符串转换的解码器演示一下MessageToMessageDecoder的使用。代码很简单,如下所示:

    1. package com.crazymakercircle.netty.decoder;
    2. //…
    3. public class Integer2StringDecoder extends MessageToMessageDecoder<Integer> {
    4. @Override
    5. protected void decode(ChannelHandlerContext channelHandlerContext, Integer integer, List<Object> list) throws Exception {
    6. list.add(String.valueOf(integer));
    7. }
    8. }

    6.2 常用的内置Decoder

    1. 固定长度数据包解码器——FixedLengthFrameDecoder

    适用场景:每个接收到的数据包的长度都是固定的,例如100字节。在这种场景下,把FixedLengthFrameDecoder解码器加到流水线中,它就会把入站ByteBuf数据包拆分成一个个长度为100的数据包,然后发往下一个channelHandler入站处理器。

    1. 行分割数据包解码器——LineBasedFrameDecoder

    适用场景:每个ByteBuf数据包使用换行符(或者回车换行符)作为边界分隔符。在这种场景下,把LineBasedFrameDecoder解码器加到流水线中,Netty就会使用换行分隔符把ByteBuf数据包分割成一个一个完整的应用层ByteBuf数据包再发送到下一站。

    1. 自定义分隔符数据包解码器——DelimiterBasedFrameDecoder

    DelimiterBasedFrameDecoder是LineBasedFrameDecoder按照行分割的通用版本,不同之处在于这个解码器更加灵活,可以自定义分隔符,而不是局限于换行符。如果使用这个解码器,那么所接收到的数据包末尾必须带上对应的分隔符。

    1. 自定义长度数据包解码器——LengthFieldBasedFrameDecoder

    这是一种基于灵活长度的解码器,在ByteBuf数据包中加了一个长度字段,保存了原始数据包的长度,解码时会按照原始数据包长度进行提取。

    6.2.1 LineBasedFrameDecoder解码器

    这个解码器的工作原理很简单,依次遍历ByteBuf数据包中的可读字节,判断在二进制字节流中是否存在换行符”\n”或者”\r\n”的字节码。如果有,就以此位置为结束位置,把从可读索引到结束位置之间的字节作为解码成功后的ByteBuf数据包。

    1. public class NettyOpenBoxDecoder {
    2. static String spliter = "\r\n";
    3. static String content = "弯弯入我心,秋凉知我意!";
    4. @Test
    5. public void testLinedBasedFrameDecoder() {
    6. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
    7. @Override
    8. protected void initChannel(EmbeddedChannel ch) throws Exception {
    9. ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
    10. ch.pipeline().addLast(new StringDecoder());
    11. ch.pipeline().addLast(new StringProcessHandler());
    12. }
    13. };
    14. EmbeddedChannel channel = new EmbeddedChannel(i);
    15. for (int j = 0; j < 100; j++) {
    16. int random = RandomUtil.randInMod(3);
    17. ByteBuf buffer = Unpooled.buffer();
    18. for (int k = 0; k < random; k++) {
    19. buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8));
    20. }
    21. buffer.writeBytes(spliter.getBytes(StandardCharsets.UTF_8));
    22. channel.writeInbound(buffer);
    23. }
    24. }
    25. }

    6.2.2 DelimiterBasedFrameDecoder解码器

    DelimiterBasedFrameDecoder解码器不仅可以使用换行符,还可以使用其他特殊字符作为数据包的分隔符,例如制表符”\t”

    1. public DelimiterBasedFrameDecoder(
    2. int maxFrameLength, //解码的数据包的最大长度
    3. Boolean stripDelimiter, //解码后的数据包是否去掉分隔符,一般选择是
    4. ByteBuf delimiter) //分隔符
    5. {
    6. //省略构造器的源代码
    7. }

    下面是一个实战案例。

    1. package com.crazymakercircle.netty.decoder;
    2. //…
    3. public class NettyOpenBoxDecoder {
    4. static String spliter2 = "\t";
    5. static String content = "疯狂创客圈:高性能学习社群!";
    6. /**
    7. * LengthFieldBasedFrameDecoder使用实例
    8. */
    9. @Test
    10. public void testDelimiterBasedFrameDecoder() {
    11. final ByteBuf delimiter = Unpooled.copiedBuffer(spliter2.getBytes(StandardCharsets.UTF_8));
    12. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
    13. @Override
    14. protected void initChannel(EmbeddedChannel channel) throws Exception {
    15. channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, true, delimiter));
    16. channel.pipeline().addLast(new StringDecoder());
    17. channel.pipeline().addLast(new StringProcessHandler());
    18. }
    19. };
    20. EmbeddedChannel channel = new EmbeddedChannel(i);
    21. for (int j = 0; j < 100; j++) {
    22. int random = RandomUtil.randInMod(3);
    23. ByteBuf buffer = Unpooled.buffer();
    24. for (int k = 0; k < random; k++) {
    25. buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8));
    26. }
    27. buffer.writeBytes(spliter2.getBytes(StandardCharsets.UTF_8));
    28. channel.writeInbound(buffer);
    29. }
    30. }
    31. }

    6.2.3 LengthFieldBasedFrameDecoder解码器

    在Netty的开箱即用解码器中,最为复杂的是解码器为LengthFieldBasedFrameDecoder自定义长度数据包。它的难点在于参数比较多,也比较难以理解,但同时它又比较常用,因而下面对它进行重点介绍。
    传输内容中的Length(长度)字段的值是指存放在数据包中要传输内容的字节数。普通的基于Head-Content协议的内容传输尽量用内置的LengthFieldBasedFrameDecoder来解码。
    一个简单的LengthFieldBasedFrameDecoder使用示例如下:

    1. package com.crazymakercircle.netty.decoder;
    2. //…
    3. public class NettyOpenBoxDecoder {
    4. public static final int VERSION = 100;
    5. static String content = "疯狂创客圈:高性能学习社群!";
    6. /**
    7. * LengthFieldBasedFrameDecoder使用示例 1
    8. */
    9. @Test
    10. public void testLengthFieldBasedFrameDecoder1() throws InterruptedException {
    11. final LengthFieldBasedFrameDecoder spliter = new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4);
    12. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
    13. @Override
    14. protected void initChannel(EmbeddedChannel channel) throws Exception {
    15. channel.pipeline().addLast(spliter);
    16. channel.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
    17. channel.pipeline().addLast(new StringProcessHandler());
    18. }
    19. };
    20. EmbeddedChannel channel = new EmbeddedChannel(i);
    21. for (int j = 0; j < 100; j++) {
    22. ByteBuf buffer = Unpooled.buffer();
    23. String s = j + "次发送->" + content;
    24. byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
    25. buffer.writeInt(bytes.length);
    26. buffer.writeBytes(bytes);
    27. channel.writeInbound(buffer);
    28. }
    29. Thread.sleep(Integer.MAX_VALUE);
    30. }
    31. }

    上面的示例程序中用到了一个LengthFieldBasedFrameDecoder构造器,具体如下:

    1. public LengthFieldBasedFrameDecoder(
    2. int maxFrameLength, //发送的数据包的最大长度
    3. int lengthFieldOffset, //长度字段偏移量
    4. int lengthFieldLength, //长度字段本身占用的字节数
    5. int lengthAdjustment, //长度字段的偏移量矫正
    6. int initialBytesToStrip) //丢弃的起始字节数
    7. {
    8. //…
    9. }
    1. maxFrameLength:发送的数据包的最大长度。示例程序中该值为1024,表示一个数据包最多可发送1024字节。
    2. lengthFieldOffset:长度字段偏移量,指的是长度字段位于整个数据包内部字节数组中的下标索引值。
    3. lengthFieldLength:长度字段所占的字节数。如果长度字段是一个int整数,则为4;如果长度字段是一个short整数,则为2。
    4. lengthAdjustment:长度的调整值。这个参数最为难懂。在传输协议比较复杂的情况下,例如协议包含了长度字段、协议版本号、魔数等,那么解码时就需要进行长度调整。长度调整值的计算公式为:内容字段偏移量-长度字段偏移量-长度字段的字节数。
    5. initialBytesToStrip:丢弃的起始字节数。在有效数据字段Content前面,如果还有一些其他字段的字节,作为最终的解析结果可以丢弃。例如,在上面的示例程序中,前面有4字节的长度字段,它起辅助的作用,最终的结果中不需要这个长度,所以丢弃的字节数为4。

    image.png

    6.2.4 多字段Head-Content协议数据包解析的实战案例

    Head-Content协议是最为简单的内容传输协议。在实际使用过程中则没有那么简单,除了长度和内容,在数据包中还可能包含其他字段,例如协议版本号,如图
    image.png
    使用LengthFieldBasedFrameDecoder解码器解析以上带有版本号的Head-Content协议的数据包,该如何进行构造器参数的计算呢?
    第1个参数maxFrameLength可以为1024,表示数据包的最大长度为1024字节。
    第2个参数lengthFieldOffset为0,表示长度字段处于数据包的起始位置。
    第3个参数lengthFieldLength的值为4,表示长度字段的长度为4字节。
    第4个参数lengthAdjustment为2,长度调整值的计算方法为:内容字段偏移量-长度字段偏移量-长度字段的长度=6-0-4=2。换句话说,在这个例子中,lengthAdjustment就是夹在内容字段和长度字段中的部分——版本号的长度。
    第5个参数initialBytesToStrip为6,表示获取最终内容的字节数组时抛弃最前面的6字节数据。换句话说,长度字段、版本字段的值被抛弃。

    1. package com.crazymakercircle.netty.decoder;
    2. //…
    3. public class NettyOpenBoxDecoder {
    4. public static final int VERSION = 100;
    5. static String content = "疯狂创客圈:高性能学习社群!";
    6. /**
    7. * LengthFieldBasedFrameDecoder使用示例 2
    8. */
    9. @Test
    10. public void testLengthFieldBasedFrameDecoder2() throws InterruptedException {
    11. final LengthFieldBasedFrameDecoder spliter = new LengthFieldBasedFrameDecoder(1024, 0, 4, 2, 6);
    12. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
    13. @Override
    14. protected void initChannel(EmbeddedChannel channel) throws Exception {
    15. channel.pipeline().addLast(spliter);
    16. channel.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
    17. channel.pipeline().addLast(new StringProcessHandler());
    18. }
    19. };
    20. EmbeddedChannel ch = new EmbeddedChannel(i);
    21. for (int j = 0; j < 100; j++) {
    22. ByteBuf buffer = Unpooled.buffer();
    23. String s = j + "次发送->" + content;
    24. byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
    25. buffer.writeInt(bytes.length);
    26. buffer.writeChar(VERSION);
    27. buffer.writeBytes(bytes);
    28. ch.writeInbound(buffer);
    29. }
    30. Thread.sleep(Integer.MAX_VALUE);
    31. }
    32. }

    将协议设计得再复杂一点:将2字节的协议版本放在最前面,在长度字段前面加上2字节的版本字段,在长度字段后面加上4字节的魔数,魔数用来对数据包做一些安全的认证。
    image.png
    参数的设置大致如下:
    第1个参数maxFrameLength可以设置为1024,表示数据包的最大长度为1024字节。
    第2个参数lengthFieldOffset可以设置为2,表示长度字段处于版本号的后面。
    第3个参数lengthFieldLength可以设置为4,表示长度字段为4字节。
    第4个参数lengthAdjustment可以设置为4。长度调整值的计算方法为:内容字段偏移量-长度字段偏移量-长度字段的长度=10-2-4=4。在这个例子中,lengthAdjustment就是夹在内容字段和长度字段中的部分——魔数字段的长度。
    第5个参数initialBytesToStrip可以设置为10,表示获取最终Content内容的字节数组时抛弃最前面的10字节数据。换句话说,长度字段、版本字段、魔数字段的值被抛弃。

    1. package com.crazymakercircle.netty.decoder;
    2. //…
    3. static final int MAGICCODE = 9999;
    4. @Test
    5. public void testLengthFieldBasedFrameDecoder3() throws InterruptedException {
    6. final LengthFieldBasedFrameDecoder spliter = new LengthFieldBasedFrameDecoder(1024, 2, 4, 4, 10);
    7. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
    8. @Override
    9. protected void initChannel(EmbeddedChannel channel) throws Exception {
    10. channel.pipeline().addLast(spliter);
    11. channel.pipeline().addLast(new StringDecoder());
    12. channel.pipeline().addLast(new StringProcessHandler());
    13. }
    14. };
    15. EmbeddedChannel ch = new EmbeddedChannel(i);
    16. for (int j = 0; j < 100; j++) {
    17. ByteBuf buffer = Unpooled.buffer();
    18. String s = j + "次发送->" + content;
    19. byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
    20. buffer.writeChar(VERSION);
    21. buffer.writeInt(bytes.length);
    22. buffer.writeInt(MAGICCODE);
    23. buffer.writeBytes(bytes);
    24. ch.writeInbound(buffer);
    25. }
    26. Thread.sleep(Integer.MAX_VALUE);
    27. }

    6.3 Encoder原理与实战

    在Netty的业务处理完成后,业务处理的结果往往是某个Java POJO对象需要编码成最终的ByteBuf二进制类型,通过流水线写入底层的Java通道,这就需要用到Encoder(编码器)
    在Netty中,什么叫编码器?首先,编码器是一个Outbound出站处理器,负责处理“出站”数据;其次,编码器将上一站Outbound出站处理器传过来的输入(Input)数据进行编码或者格式转换,然后传递到下一站ChannelOutboundHandler出站处理器。

    6.3.1 MessageToByteEncoder编码器

    MessageToByteEncoder是一个非常重要的编码器基类,位于Netty的io.netty.handler.codec包中。MessageToByteEncoder的功能是将一个Java POJO对象编码成一个ByteBuf数据包。它是一个抽象类,仅仅实现了编码的基础流程,在编码过程中通过调用encode()抽象方法来完成。它的encode()编码方法是一个抽象方法,没有具体的编码逻辑实现,实现encode()抽象方法的工作需要子类去完成。
    如果要实现一个自己的编码器,则需要继承自MessageToByteEncoder基类,实现它的encode()抽象方法。作为演示,下面实现一个整数编码器。其功能是将Java整数编码成二进制ByteBuf数据包。这个示例程序的代码如下

    1. package com.crazymakercircle.netty.encoder;
    2. //…
    3. public class Integer2ByteEncoder extends MessageToByteEncoder<Integer> {
    4. @Override
    5. protected void encode(ChannelHandlerContext channelHandlerContext, Integer integer, ByteBuf byteBuf) throws Exception {
    6. byteBuf.writeInt(integer);
    7. Logger.info("encoder Integer = " + integer);
    8. }
    9. }

    测试用例,代码如下:

    1. package com.crazymakercircle.netty.encoder;
    2. //…
    3. @Test
    4. public void testIntegerToByteDecoder() {
    5. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
    6. @Override
    7. protected void initChannel(EmbeddedChannel channel) throws Exception {
    8. channel.pipeline().addLast(new Integer2ByteEncoder());
    9. }
    10. };
    11. EmbeddedChannel channel = new EmbeddedChannel(i);
    12. for (int j = 0; j < 100; j++) {
    13. channel.write(j);
    14. }
    15. channel.flush();
    16. ByteBuf buf = (ByteBuf) channel.readOutbound();
    17. while (null != buf) {
    18. System.out.println("o = " + buf.readInt());
    19. buf = (ByteBuf) channel.readOutbound();
    20. }
    21. }

    6.3.2 MessageToMessageEncoder编码器

    继承另外一个Netty的重要编码器——MessageToMessageEncoder编码器,并实现它的encode()抽象方法。在子类的encode()方法实现中,完成原POJO类型到目标POJO类型的转换逻辑。在encode()实现方法中,编码完成后,将解码后的目标对象加入encode()方法中的实参list输出容器即可。
    下面是一个从字符串(String)到整数(Integer)的编码器,演示一下MessageToMessageEncoder的使用。此编码器的具体功能是将字符串中的所有数字提取出来,然后输出到下一站。

    1. package com.crazymakercircle.netty.encoder;
    2. //…
    3. public class String2IntegerEncoder
    4. extends MessageToMessageEncoder<String> {
    5. @Override
    6. protected void encode(
    7. ChannelHandlerContext c, String s, List<Object> list)…{
    8. char[] array = s.toCharArray();
    9. for (char a : array) {
    10. //48 是0的编码,57 是9 的编码
    11. if (a >= 48 && a <= 57) {
    12. list.add(new Integer(a));
    13. }
    14. }
    15. }
    16. }

    测试用例,代码如下:

    1. package com.crazymakercircle.netty.encoder;
    2. //…
    3. public class String2IntegerEncoderTester {
    4. /**
    5. * 测试字符串到整数的编码器
    6. */
    7. @Test
    8. public void testStringToIntergerDecoder() {
    9. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
    10. protected void initChannel(EmbeddedChannel ch) {
    11. ch.pipeline().addLast(new Integer2ByteEncoder());
    12. ch.pipeline().addLast(new String2IntegerEncoder());
    13. }
    14. };
    15. EmbeddedChannel channel = new EmbeddedChannel(i);
    16. for (int j = 0; j < 100; j++) {
    17. String s = "i am " + j;
    18. channel.write(s); //向通道写入含有数字的字符串
    19. }
    20. channel.flush();
    21. ByteBuf buf = (ByteBuf) channel.readOutbound();
    22. while (null != buf) {
    23. System.out.println("o = " + buf.readInt()); //打印数字
    24. buf = (ByteBuf) channel.readOutbound(); //读取数字
    25. }
    26. }
    27. }

    6.4 解码器和编码器的结合

    具有相互配套逻辑的编码器和解码器能否放在同一个类中呢?答案是肯定的,这需要用到Netty的新类型——Codec(编解码器)

    6.4.1 ByteToMessageCodec编解码器

    完成POJO到ByteBuf数据包的编解码器基类为ByteToMessageCodec,它是一个抽象类。从功能上说,继承ByteToMessageCodec就等同于继承了ByteToMessageDecoder和MessageToByteEncoder这两个基类。
    编解码器ByteToMessageCodec同时包含了编码encode()和解码decode()两个抽象方法,这两个方法都需要我们自己实现:

    1. public class Byte2IntegerCodec extends ByteToMessageCodec<Integer> {
    2. @Override
    3. protected void encode(ChannelHandlerContext channelHandlerContext, Integer integer, ByteBuf byteBuf) throws Exception {
    4. byteBuf.writeInt(integer);
    5. System.out.println("write Integer = " + integer);
    6. }
    7. @Override
    8. protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
    9. if (byteBuf.readableBytes() >= 4) {
    10. int i = byteBuf.readInt();
    11. System.out.println("Decoder i = " + i);
    12. list.add(i);
    13. }
    14. }
    15. }

    6.4.2 CombinedChannelDuplexHandler组合器

    编码器和解码器如果要结合起来,除了继承的方法之外,还可以通过组合的方式实现。与继承相比,组合会带来更大的灵活性:编码器和解码器可以捆绑使用,也可以单独使用。
    Netty提供了一个新的组合器——CombinedChannelDuplexHandler基类

    1. package com.crazymakercircle.netty.codec;
    2. //…
    3. public class IntegerDuplexHandler extends CombinedChannelDuplexHandler<
    4. Byte2IntegerDecoder, Integer2ByteEncoder>
    5. {
    6. public IntegerDuplexHandler() {
    7. super(new Byte2IntegerDecoder(), new Integer2ByteEncoder());
    8. }
    9. }