6.1 Decoder原理与实战6.1.1 ByteToMessageDecoder解码器处理流程6.1.2 自定义Byte2IntegerDecoder整数解码器6.1.3 ReplayingDecoder解码器6.1.4 整数的分包解码器的实战案例6.1.5 字符串的分包解码器的实战案例6.1.6 MessageToMessageDecoder解码器6.2 常用的内置Decoder6.2.1 LineBasedFrameDecoder解码器6.2.2 DelimiterBasedFrameDecoder解码器6.2.3 LengthFieldBasedFrameDecoder解码器6.2.4 多字段Head-Content协议数据包解析的实战案例6.3 Encoder原理与实战6.3.1 MessageToByteEncoder编码器6.3.2 MessageToMessageEncoder编码器6.4 解码器和编码器的结合6.4.1 ByteToMessageCodec编解码器6.4.2 CombinedChannelDuplexHandler组合器Netty从底层Java通道读取ByteBuf二进制数据,传入Netty通道的流水线,随后开始入站处理。在入站处理过程中,需要将ByteBuf二进制类型解码成Java POJO对象。这个解码过程可以通过Netty的Decoder(解码器)去完成。在出站处理过程中,业务处理后的结果(出站数据)需要从某个Java POJO对象编码为最终的ByteBuf二进制数据,然后通过底层Java通道发送到对端。在编码过程中,需要用到Netty的Encoder(编码器)去完成数据的编码工作。 6.1 Decoder原理与实战Netty中的解码器都是Inbound入站处理器类型,都直接或者间接地实现了入站处理的超级接口ChannelInboundHandler。Netty内置了ByteToMessageDecoder解码器 6.1.1 ByteToMessageDecoder解码器处理流程ByteToMessageDecoder是一个非常重要的解码器基类,是一个抽象类,实现了解码处理的基础逻辑和流程。ByteToMessageDecoder继承自ChannelInboundHandlerAdapter适配器,是一个入站处理器,用于完成从ByteBuf到Java POJO对象的解码功能。ByteToMessageDecoder解码的流程大致如图ByteToMessageDecoder在设计上使用了模板模式(Template Pattern)。ByteToMessageDecoder的子类要做的是将从入站ByteBuf解码出来的所有Object实例加入父类的List列表中。实现一个解码器,首先要继承ByteToMessageDecoder抽象类,然后实现其基类的decode()抽象方法。总体来说,流程大致如下: 继承ByteToMessageDecoder抽象类。实现基类的decode()抽象方法,将ByteBuf到目标POJO的解码逻辑写入此方法,以将ByteBuf中的二进制数据解码成一个一个的Java POJO对象。解码完成后,需要将解码后的Java POJO对象放入decode()方法的List实参中,此实参是父类所传入的解码结果收集容器。 6.1.2 自定义Byte2IntegerDecoder整数解码器下面是一个小小的ByteToMessageDecoder子类的实战案例:整数解码器。其功能是将ByteBuf中的字节解码成整数类型。 ```java package com.crazymakercircle.netty.decoder; //… public class Byte2IntegerDecoder extends ByteToMessageDecoder { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { while (in.readableBytes() >= 4) { int i = in.readInt(); Logger.info("解码出一个整数: " + i); out.add(i); } } } 这里编写一个简单的配套处理器IntegerProcessHandler,用于处理Byte2IntegerDecoder解码之后的整数。其功能是:读取上一站的入站数据,把它转换成整数,并且输出到Console(控制台)上```javapackage com.crazymakercircle.netty.decoder;//…public class IntegerProcessHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg)…{ Integer integer = (Integer) msg; Logger.info("打印出一个整数: " + integer); }}使用EmbeddedChannel(嵌入式通道)编写一个测试用例,代码如下: package com.crazymakercircle.netty.decoder;//…public class Byte2IntegerDecoderTester { /** * 整数解码器的使用实例 */ @Test public void testByteToIntegerDecoder() { ChannelInitializer i= new ChannelInitializer<EmbeddedChannel>(){ protected void initChannel(EmbeddedChannel ch) { ch.pipeline().addLast(new Byte2IntegerDecoder()); ch.pipeline().addLast(new IntegerProcessHandler()); } }; EmbeddedChannel channel = new EmbeddedChannel(i); for (int j = 0; j < 100; j++) { ByteBuf buf = Unpooled.buffer(); buf.writeInt(j); channel.writeInbound(buf); } //… }} ByteBuf缓冲区并没有发送到流水线的TailContext(尾部处理器),将由谁负责释放引用计数呢?<br />其实,基类ByteToMessageDecoder会完成ByteBuf释放工作,它会调用ReferenceCountUtil.release(in)方法将之前的ByteBuf缓冲区的引用计数减1<br />这个ByteBuf先被释放了,如果在后面还需要用到,怎么办?<br />可以在子类的decode()方法中调用一次ReferenceCountUtil.retain(in)来增加一次引用计数,不过在使用完成后要及时将增加的这次计数减去。 6.1.3 ReplayingDecoder解码器使用上面的Byte2IntegerDecoder整数解码器会面临一个问题:需要对ByteBuf的长度进行检查,有足够的字节才能进行整数的读取。这种长度的判断是否可以由Netty来帮忙完成呢?答案是可以的,可以使用Netty的ReplayingDecoder类省去长度的判断。ReplayingDecoder类是ByteToMessageDecoder的子类,作用是:在读取ByteBuf缓冲区的数据之前,需要检查缓冲区是否有足够的字节。若ByteBuf中有足够的字节,则会正常读取;反之,则会停止解码。使用ReplayingDecoder基类改写上一个整数解码器,可以不进行长度检测。创建一个新的整数解码器,类名为Byte2IntegerReplayDecoder,代码如下: package com.crazymakercircle.netty.decoder;//…public class Byte2IntegerReplayDecoder extends ReplayingDecoder { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { int i = in.readInt(); Logger.info("解码出一个整数: " + i); out.add(i); }} ReplayingDecoder进行长度判断的原理很简单:内部定义一个新的二进制缓冲区类(类名为ReplayingDecoderBuffer),又对ByteBuf缓冲区进行装饰。该装饰器的特点是,在缓冲区真正读数据之前先进行长度的判断:如果长度合格,就读取数据;否则,抛出ReplayError。ReplayingDecoder捕获到ReplayError后会留着数据,等待下一次IO事件到来时再读取。<br />实质上,ReplayingDecoder的作用远远不止于进行长度判断,它更重要的作用是用于分包传输的应用场景。 6.1.4 整数的分包解码器的实战案例发送端出去的包在传输过程中会进行多次拆分和组装。接收端收到的包和发送端所发送的包不是一模一样的ReplayingDecoder的一个很重要的属性——state成员属性。该成员属性的作用是保存当前解码器在解码过程中所处的阶段。在Netty源代码中,该属性的定义如下: public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder { //省略不相关的代码 //缓冲区装饰器 private final ReplayingDecoderByteBuf replayable = new ReplayingDecoderByteBuf(); //重要的成员属性,表示解码过程中所处的阶段,类型为泛型,默认为Object private S state; //默认的构造器,state值为空,没有用到该属性 protected ReplayingDecoder() { this((Object)null); } //重载的构造器 protected ReplayingDecoder(S initialState) { //初始化内部的ByteBuf缓冲区装饰器类 this.replayable = new ReplayingDecoderByteBuf(); //读指针检查点,默认为-1 this.checkpoint = -1; //状态state的默认值为null this.state = initialState; } //省略不相关的方法} 下面先基于ReplayingDecoder基础解码器编写一个整数相加的解码器:解码两个整数,并把这两个数据之和作为解码的结果。public class IntegerAddDecoder extends ReplayingDecoder<IntegerAddDecoder.PHASE> { //自定义的状态枚举值,代表两个阶段 enum PHASE { PHASE_1, //第一个阶段,仅仅提取第一个整数,完成后进入第二个阶段 PHASE_2 //第二个阶段,提取第二个整数后,还需要计算相加的结果并输出 } private int first; private int second; public IntegerAddDecoder() { //在构造函数中,初始化父类的state属性为PHASE_1,表示第一个阶段 super(PHASE.PHASE_1); } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { switch (state()){//判断当前的状态 case PHASE_1: //从装饰器ByteBuf 中读取数据 first = byteBuf.readInt(); //第一步解析成功,进入第二步,设置“state”为第二个阶段 checkpoint(PHASE.PHASE_2); break; //提取第二个整数后还需要计算相加的结果 //并将和作为解码的结果输出 case PHASE_2: second = byteBuf.readInt(); Integer sum = first + second; list.add(sum); //进入下一轮解码的第一步,设置“state”为第一个阶段 checkpoint(PHASE.PHASE_1); break; default: break; } }} 每一个阶段一完成就通过checkpoint(PHASE)方法(类似于state属性的setter()方法)把当前的state状态设置为新的PHASE枚举值。checkpoint()方法有两个作用: 设置state属性的值,更新一下当前的状态。设置“读指针检查点”。 6.1.5 字符串的分包解码器的实战案例在原理上,字符串分包解码和整数分包解码是一样的,所不同的是:整数的长度是固定的,目前在Java中是4字节;字符串的长度是不固定的,是可变的。如何获取字符串的长度信息呢?这是一个小小的难题,和程序所使用的具体传输协议是强相关的。一般来说,在Netty中进行字符串的传输可以采用普通的Head-Content内容传输协议。该协议的规则很简单: 在协议的Head部分放置字符串的字节长度,可以用一个整数类型来描述。 在协议的Content部分,放置字符串的字节数组。 下面就是基于ReplayingDecoder实现自定义的字符串分包解码器的示例程序: package com.crazymakercircle.netty.decoder;//…public class StringReplayDecoder extends ReplayingDecoder<StringReplayDecoder.PHASE> { enum PHASE { PHASE_1, //第一个阶段:解码出字符串的长度 PHASE_2 //第二个阶段:按照第一个阶段的字符串长度解码出字符串的内容 } private int length; private byte[] inBytes; public StringReplayDecoder() { //在构造函数中,需要初始化父类的state属性为PHASE_1阶段 super(PHASE.PHASE_1); } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { switch (state()) { case PHASE_1: //第一步,从装饰器ByteBuf中读取字符串的长度 length = byteBuf.readInt(); inBytes = new byte[length]; //进入第二步,读取内容 //并设置“读指针检查点”为当前的readerIndex位置 checkpoint(PHASE.PHASE_2); break; case PHASE_2: //第二步,从装饰器ByteBuf 中读取字符串的内容数组 byteBuf.readBytes(inBytes, 0, length); list.add(new String(inBytes, "UTF-8")); //第二步解析成功,进入下一个字符串的解析 //并设置“读指针检查点”为当前的readerIndex位置 checkpoint(PHASE.PHASE_1); break; default: break; } }} 这里编写一个简单的辅助性质的业务处理器。其功能是读取上一站的入站数据,把它转换成字符串,并输出到控制台上。新业务处理器的名称为StringProcessHandler,具体代码如下: package com.crazymakercircle.netty.decoder;//…public class StringProcessHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String s = (String) msg; Logger.info("打印出一个字符串: " + s);// super.channelRead(ctx, msg); }} 编写一个测试用例,代码如下:package com.crazymakercircle.netty.decoder;//…public class StringReplayDecoderTester { static String content = "弯弯入我心,秋凉知我意!"; @Test public void testStringReplayingDecoder() { ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>(){ @Override protected void initChannel(EmbeddedChannel ch) throws Exception { ch.pipeline().addLast(new StringReplayDecoder()); ch.pipeline().addLast(new StringProcessHandler()); } }; EmbeddedChannel channel = new EmbeddedChannel(i); //待发送字符串content的字节数组 byte[] bytes = content.getBytes(Charset.forName("utf-8")); //循环发送100轮,每一轮可以理解为发送一个Head-Content报文 for (int j = 0; j < 100; j++) {//发送100个包 //每个包为随机1~3个 "弯弯入我心,秋凉知我意" int random = RandomUtil.randInMod(3); ByteBuf buffer = Unpooled.buffer(); //发送长度:字节数组长度*重复次数 buffer.writeInt(bytes.length * random); //重复拷贝content的字节数据到发送缓冲区 for (int k = 0; k < random; k++) { buffer.writeBytes(bytes); } //发送内容:发送buf缓冲区 channel.writeInbound(buffer); } }} 通过ReplayingDecoder解码器,可以正确地解码分包后的ByteBuf数据包。但是,在实际开发中不建议继承这个类,原因如下: 不是所有的ByteBuf操作都被ReplayingDecoderBuffer装饰器类支持,可能有些ByteBuf方法在ReplayingDecoder的decode()方法中会抛出ReplayError异常。在数据解码逻辑复杂的应用场景下,ReplayingDecoder在解码速度上相对较差。因为在ByteBuf长度不够时,ReplayingDecoder会捕获一个ReplayError异常,并会把ByteBuf中的读指针还原到之前的读指针检查点(checkpoint),然后结束这次解析操作,等待下一次IO读事件。在网络条件比较糟糕时,一个数据包的解析逻辑会被反复执行多次,此时解析过程是一个消耗CPU的操作,解码速度上相对较差。所以,ReplayingDecoder更多地应用于数据解析逻辑简单的场景。 在数据解析复杂的应用场景下,建议使用前文介绍的解码器ByteToMessageDecoder或者其子类(后文介绍)。这里继承ByteToMessageDecoder基类,实现一个定制的Head-Content协议字符串内容解码器,代码如下: package com.crazymakercircle.netty.decoder;//…public class StringIntegerHeaderDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buf, List<Object> out) …{ //可读字节小于4,消息头还没读满,返回 if (buf.readableBytes() < 4) { return; } //消息头已经完整 //在真正开始从缓冲区读取数据之前,调用markReaderIndex()设置mark buf.markReaderIndex(); int length = buf.readInt(); //从缓冲区读出消息头的大小,这会导致readIndex读指针变化 //如果剩余长度不够消息体的大小,则需要重置读指针,下一次从相同的位置处理 if (buf.readableBytes() < length) { //读指针重置到消息头的readIndex位置处 buf.resetReaderIndex(); return; } //读取数据,编码成字符串 byte[] inBytes = new byte[length]; buf.readBytes(inBytes, 0, length); out.add(new String(inBytes, "UTF-8")); }} 表面上ByteToMessageDecoder基类是无状态的,不像ReplayingDecoder那样需要使用状态位来保存当前的读取阶段,实际上ByteToMessageDecoder也是有状态的。其内部有一个二进制字节累积器cumulation,用来保存没有解析完的二进制内容。所以,ByteToMessageDecoder及其子类都是有状态的,其实例不能在通道之间共享。在每次初始化通道的流水线时,都要重新创建一个ByteToMessageDecoder或者它的子类的实例。 6.1.6 MessageToMessageDecoder解码器是否存在一些解码器可以将一种POJO对象解码成另外一种POJO对象呢?存在。与前面不同的是,解码器需要继承一个新的Netty解码器基类MessageToMessageDecoder。在继承它的时候,需要明确的泛型实参,用于指定入站消息的Java POJO类型。下面通过实现一个整数到字符串转换的解码器演示一下MessageToMessageDecoder的使用。代码很简单,如下所示: package com.crazymakercircle.netty.decoder;//…public class Integer2StringDecoder extends MessageToMessageDecoder<Integer> { @Override protected void decode(ChannelHandlerContext channelHandlerContext, Integer integer, List<Object> list) throws Exception { list.add(String.valueOf(integer)); }} 6.2 常用的内置Decoder 固定长度数据包解码器——FixedLengthFrameDecoder 适用场景:每个接收到的数据包的长度都是固定的,例如100字节。在这种场景下,把FixedLengthFrameDecoder解码器加到流水线中,它就会把入站ByteBuf数据包拆分成一个个长度为100的数据包,然后发往下一个channelHandler入站处理器。 行分割数据包解码器——LineBasedFrameDecoder 适用场景:每个ByteBuf数据包使用换行符(或者回车换行符)作为边界分隔符。在这种场景下,把LineBasedFrameDecoder解码器加到流水线中,Netty就会使用换行分隔符把ByteBuf数据包分割成一个一个完整的应用层ByteBuf数据包再发送到下一站。 自定义分隔符数据包解码器——DelimiterBasedFrameDecoder DelimiterBasedFrameDecoder是LineBasedFrameDecoder按照行分割的通用版本,不同之处在于这个解码器更加灵活,可以自定义分隔符,而不是局限于换行符。如果使用这个解码器,那么所接收到的数据包末尾必须带上对应的分隔符。 自定义长度数据包解码器——LengthFieldBasedFrameDecoder 这是一种基于灵活长度的解码器,在ByteBuf数据包中加了一个长度字段,保存了原始数据包的长度,解码时会按照原始数据包长度进行提取。 6.2.1 LineBasedFrameDecoder解码器这个解码器的工作原理很简单,依次遍历ByteBuf数据包中的可读字节,判断在二进制字节流中是否存在换行符”\n”或者”\r\n”的字节码。如果有,就以此位置为结束位置,把从可读索引到结束位置之间的字节作为解码成功后的ByteBuf数据包。 public class NettyOpenBoxDecoder { static String spliter = "\r\n"; static String content = "弯弯入我心,秋凉知我意!"; @Test public void testLinedBasedFrameDecoder() { ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() { @Override protected void initChannel(EmbeddedChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringProcessHandler()); } }; EmbeddedChannel channel = new EmbeddedChannel(i); for (int j = 0; j < 100; j++) { int random = RandomUtil.randInMod(3); ByteBuf buffer = Unpooled.buffer(); for (int k = 0; k < random; k++) { buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8)); } buffer.writeBytes(spliter.getBytes(StandardCharsets.UTF_8)); channel.writeInbound(buffer); } }} 6.2.2 DelimiterBasedFrameDecoder解码器DelimiterBasedFrameDecoder解码器不仅可以使用换行符,还可以使用其他特殊字符作为数据包的分隔符,例如制表符”\t” public DelimiterBasedFrameDecoder( int maxFrameLength, //解码的数据包的最大长度 Boolean stripDelimiter, //解码后的数据包是否去掉分隔符,一般选择是 ByteBuf delimiter) //分隔符{ //省略构造器的源代码} 下面是一个实战案例。 package com.crazymakercircle.netty.decoder;//…public class NettyOpenBoxDecoder { static String spliter2 = "\t"; static String content = "疯狂创客圈:高性能学习社群!"; /** * LengthFieldBasedFrameDecoder使用实例 */ @Test public void testDelimiterBasedFrameDecoder() { final ByteBuf delimiter = Unpooled.copiedBuffer(spliter2.getBytes(StandardCharsets.UTF_8)); ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() { @Override protected void initChannel(EmbeddedChannel channel) throws Exception { channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, true, delimiter)); channel.pipeline().addLast(new StringDecoder()); channel.pipeline().addLast(new StringProcessHandler()); } }; EmbeddedChannel channel = new EmbeddedChannel(i); for (int j = 0; j < 100; j++) { int random = RandomUtil.randInMod(3); ByteBuf buffer = Unpooled.buffer(); for (int k = 0; k < random; k++) { buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8)); } buffer.writeBytes(spliter2.getBytes(StandardCharsets.UTF_8)); channel.writeInbound(buffer); } }} 6.2.3 LengthFieldBasedFrameDecoder解码器在Netty的开箱即用解码器中,最为复杂的是解码器为LengthFieldBasedFrameDecoder自定义长度数据包。它的难点在于参数比较多,也比较难以理解,但同时它又比较常用,因而下面对它进行重点介绍。传输内容中的Length(长度)字段的值是指存放在数据包中要传输内容的字节数。普通的基于Head-Content协议的内容传输尽量用内置的LengthFieldBasedFrameDecoder来解码。一个简单的LengthFieldBasedFrameDecoder使用示例如下: package com.crazymakercircle.netty.decoder;//…public class NettyOpenBoxDecoder { public static final int VERSION = 100; static String content = "疯狂创客圈:高性能学习社群!"; /** * LengthFieldBasedFrameDecoder使用示例 1 */ @Test public void testLengthFieldBasedFrameDecoder1() throws InterruptedException { final LengthFieldBasedFrameDecoder spliter = new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4); ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() { @Override protected void initChannel(EmbeddedChannel channel) throws Exception { channel.pipeline().addLast(spliter); channel.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); channel.pipeline().addLast(new StringProcessHandler()); } }; EmbeddedChannel channel = new EmbeddedChannel(i); for (int j = 0; j < 100; j++) { ByteBuf buffer = Unpooled.buffer(); String s = j + "次发送->" + content; byte[] bytes = s.getBytes(StandardCharsets.UTF_8); buffer.writeInt(bytes.length); buffer.writeBytes(bytes); channel.writeInbound(buffer); } Thread.sleep(Integer.MAX_VALUE); }} 上面的示例程序中用到了一个LengthFieldBasedFrameDecoder构造器,具体如下: public LengthFieldBasedFrameDecoder( int maxFrameLength, //发送的数据包的最大长度 int lengthFieldOffset, //长度字段偏移量 int lengthFieldLength, //长度字段本身占用的字节数 int lengthAdjustment, //长度字段的偏移量矫正 int initialBytesToStrip) //丢弃的起始字节数{ //…} maxFrameLength:发送的数据包的最大长度。示例程序中该值为1024,表示一个数据包最多可发送1024字节。lengthFieldOffset:长度字段偏移量,指的是长度字段位于整个数据包内部字节数组中的下标索引值。lengthFieldLength:长度字段所占的字节数。如果长度字段是一个int整数,则为4;如果长度字段是一个short整数,则为2。lengthAdjustment:长度的调整值。这个参数最为难懂。在传输协议比较复杂的情况下,例如协议包含了长度字段、协议版本号、魔数等,那么解码时就需要进行长度调整。长度调整值的计算公式为:内容字段偏移量-长度字段偏移量-长度字段的字节数。initialBytesToStrip:丢弃的起始字节数。在有效数据字段Content前面,如果还有一些其他字段的字节,作为最终的解析结果可以丢弃。例如,在上面的示例程序中,前面有4字节的长度字段,它起辅助的作用,最终的结果中不需要这个长度,所以丢弃的字节数为4。 6.2.4 多字段Head-Content协议数据包解析的实战案例Head-Content协议是最为简单的内容传输协议。在实际使用过程中则没有那么简单,除了长度和内容,在数据包中还可能包含其他字段,例如协议版本号,如图使用LengthFieldBasedFrameDecoder解码器解析以上带有版本号的Head-Content协议的数据包,该如何进行构造器参数的计算呢?第1个参数maxFrameLength可以为1024,表示数据包的最大长度为1024字节。第2个参数lengthFieldOffset为0,表示长度字段处于数据包的起始位置。第3个参数lengthFieldLength的值为4,表示长度字段的长度为4字节。第4个参数lengthAdjustment为2,长度调整值的计算方法为:内容字段偏移量-长度字段偏移量-长度字段的长度=6-0-4=2。换句话说,在这个例子中,lengthAdjustment就是夹在内容字段和长度字段中的部分——版本号的长度。第5个参数initialBytesToStrip为6,表示获取最终内容的字节数组时抛弃最前面的6字节数据。换句话说,长度字段、版本字段的值被抛弃。 package com.crazymakercircle.netty.decoder;//…public class NettyOpenBoxDecoder { public static final int VERSION = 100; static String content = "疯狂创客圈:高性能学习社群!"; /** * LengthFieldBasedFrameDecoder使用示例 2 */ @Test public void testLengthFieldBasedFrameDecoder2() throws InterruptedException { final LengthFieldBasedFrameDecoder spliter = new LengthFieldBasedFrameDecoder(1024, 0, 4, 2, 6); ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() { @Override protected void initChannel(EmbeddedChannel channel) throws Exception { channel.pipeline().addLast(spliter); channel.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); channel.pipeline().addLast(new StringProcessHandler()); } }; EmbeddedChannel ch = new EmbeddedChannel(i); for (int j = 0; j < 100; j++) { ByteBuf buffer = Unpooled.buffer(); String s = j + "次发送->" + content; byte[] bytes = s.getBytes(StandardCharsets.UTF_8); buffer.writeInt(bytes.length); buffer.writeChar(VERSION); buffer.writeBytes(bytes); ch.writeInbound(buffer); } Thread.sleep(Integer.MAX_VALUE); }} 将协议设计得再复杂一点:将2字节的协议版本放在最前面,在长度字段前面加上2字节的版本字段,在长度字段后面加上4字节的魔数,魔数用来对数据包做一些安全的认证。参数的设置大致如下:第1个参数maxFrameLength可以设置为1024,表示数据包的最大长度为1024字节。第2个参数lengthFieldOffset可以设置为2,表示长度字段处于版本号的后面。第3个参数lengthFieldLength可以设置为4,表示长度字段为4字节。第4个参数lengthAdjustment可以设置为4。长度调整值的计算方法为:内容字段偏移量-长度字段偏移量-长度字段的长度=10-2-4=4。在这个例子中,lengthAdjustment就是夹在内容字段和长度字段中的部分——魔数字段的长度。第5个参数initialBytesToStrip可以设置为10,表示获取最终Content内容的字节数组时抛弃最前面的10字节数据。换句话说,长度字段、版本字段、魔数字段的值被抛弃。 package com.crazymakercircle.netty.decoder;//…static final int MAGICCODE = 9999;@Test public void testLengthFieldBasedFrameDecoder3() throws InterruptedException { final LengthFieldBasedFrameDecoder spliter = new LengthFieldBasedFrameDecoder(1024, 2, 4, 4, 10); ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() { @Override protected void initChannel(EmbeddedChannel channel) throws Exception { channel.pipeline().addLast(spliter); channel.pipeline().addLast(new StringDecoder()); channel.pipeline().addLast(new StringProcessHandler()); } }; EmbeddedChannel ch = new EmbeddedChannel(i); for (int j = 0; j < 100; j++) { ByteBuf buffer = Unpooled.buffer(); String s = j + "次发送->" + content; byte[] bytes = s.getBytes(StandardCharsets.UTF_8); buffer.writeChar(VERSION); buffer.writeInt(bytes.length); buffer.writeInt(MAGICCODE); buffer.writeBytes(bytes); ch.writeInbound(buffer); } Thread.sleep(Integer.MAX_VALUE); } 6.3 Encoder原理与实战在Netty的业务处理完成后,业务处理的结果往往是某个Java POJO对象需要编码成最终的ByteBuf二进制类型,通过流水线写入底层的Java通道,这就需要用到Encoder(编码器)在Netty中,什么叫编码器?首先,编码器是一个Outbound出站处理器,负责处理“出站”数据;其次,编码器将上一站Outbound出站处理器传过来的输入(Input)数据进行编码或者格式转换,然后传递到下一站ChannelOutboundHandler出站处理器。 6.3.1 MessageToByteEncoder编码器MessageToByteEncoder是一个非常重要的编码器基类,位于Netty的io.netty.handler.codec包中。MessageToByteEncoder的功能是将一个Java POJO对象编码成一个ByteBuf数据包。它是一个抽象类,仅仅实现了编码的基础流程,在编码过程中通过调用encode()抽象方法来完成。它的encode()编码方法是一个抽象方法,没有具体的编码逻辑实现,实现encode()抽象方法的工作需要子类去完成。如果要实现一个自己的编码器,则需要继承自MessageToByteEncoder基类,实现它的encode()抽象方法。作为演示,下面实现一个整数编码器。其功能是将Java整数编码成二进制ByteBuf数据包。这个示例程序的代码如下 package com.crazymakercircle.netty.encoder;//…public class Integer2ByteEncoder extends MessageToByteEncoder<Integer> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Integer integer, ByteBuf byteBuf) throws Exception { byteBuf.writeInt(integer); Logger.info("encoder Integer = " + integer); }} 测试用例,代码如下: package com.crazymakercircle.netty.encoder;//… @Testpublic void testIntegerToByteDecoder() { ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() { @Override protected void initChannel(EmbeddedChannel channel) throws Exception { channel.pipeline().addLast(new Integer2ByteEncoder()); } }; EmbeddedChannel channel = new EmbeddedChannel(i); for (int j = 0; j < 100; j++) { channel.write(j); } channel.flush(); ByteBuf buf = (ByteBuf) channel.readOutbound(); while (null != buf) { System.out.println("o = " + buf.readInt()); buf = (ByteBuf) channel.readOutbound(); }} 6.3.2 MessageToMessageEncoder编码器继承另外一个Netty的重要编码器——MessageToMessageEncoder编码器,并实现它的encode()抽象方法。在子类的encode()方法实现中,完成原POJO类型到目标POJO类型的转换逻辑。在encode()实现方法中,编码完成后,将解码后的目标对象加入encode()方法中的实参list输出容器即可。下面是一个从字符串(String)到整数(Integer)的编码器,演示一下MessageToMessageEncoder的使用。此编码器的具体功能是将字符串中的所有数字提取出来,然后输出到下一站。 package com.crazymakercircle.netty.encoder;//…public class String2IntegerEncoder extends MessageToMessageEncoder<String> { @Override protected void encode( ChannelHandlerContext c, String s, List<Object> list)…{ char[] array = s.toCharArray(); for (char a : array) { //48 是0的编码,57 是9 的编码 if (a >= 48 && a <= 57) { list.add(new Integer(a)); } } }} 测试用例,代码如下: package com.crazymakercircle.netty.encoder;//…public class String2IntegerEncoderTester { /** * 测试字符串到整数的编码器 */ @Test public void testStringToIntergerDecoder() { ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() { protected void initChannel(EmbeddedChannel ch) { ch.pipeline().addLast(new Integer2ByteEncoder()); ch.pipeline().addLast(new String2IntegerEncoder()); } }; EmbeddedChannel channel = new EmbeddedChannel(i); for (int j = 0; j < 100; j++) { String s = "i am " + j; channel.write(s); //向通道写入含有数字的字符串 } channel.flush(); ByteBuf buf = (ByteBuf) channel.readOutbound(); while (null != buf) { System.out.println("o = " + buf.readInt()); //打印数字 buf = (ByteBuf) channel.readOutbound(); //读取数字 } }} 6.4 解码器和编码器的结合具有相互配套逻辑的编码器和解码器能否放在同一个类中呢?答案是肯定的,这需要用到Netty的新类型——Codec(编解码器) 6.4.1 ByteToMessageCodec编解码器完成POJO到ByteBuf数据包的编解码器基类为ByteToMessageCodec,它是一个抽象类。从功能上说,继承ByteToMessageCodec就等同于继承了ByteToMessageDecoder和MessageToByteEncoder这两个基类。编解码器ByteToMessageCodec同时包含了编码encode()和解码decode()两个抽象方法,这两个方法都需要我们自己实现: public class Byte2IntegerCodec extends ByteToMessageCodec<Integer> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Integer integer, ByteBuf byteBuf) throws Exception { byteBuf.writeInt(integer); System.out.println("write Integer = " + integer); } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { if (byteBuf.readableBytes() >= 4) { int i = byteBuf.readInt(); System.out.println("Decoder i = " + i); list.add(i); } }} 6.4.2 CombinedChannelDuplexHandler组合器编码器和解码器如果要结合起来,除了继承的方法之外,还可以通过组合的方式实现。与继承相比,组合会带来更大的灵活性:编码器和解码器可以捆绑使用,也可以单独使用。Netty提供了一个新的组合器——CombinedChannelDuplexHandler基类 package com.crazymakercircle.netty.codec;//…public class IntegerDuplexHandler extends CombinedChannelDuplexHandler< Byte2IntegerDecoder, Integer2ByteEncoder>{ public IntegerDuplexHandler() { super(new Byte2IntegerDecoder(), new Integer2ByteEncoder()); }}