前言
每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和 目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。那么它们的区别是什么呢?
如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列— 它的数据。那么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将 网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。
一、Netty中的解码器基类ByteToMessageDecoder
由于业务层仅仅关系业务数据,而网络层传输的又是字节数据,所以需要有一个解码器将字节数据转化成业务数据类型,比如JSON格式、字符串格式、对象格式等等。Netty中提供了解码器基类用于将接收到的缓冲字节格式数据转化成业务层需要的对象格式。
Netty中的解码器基类为ByteToMessageDecoder,该类继承之ChannelInboundHandlerApapter,既然是解码字节数据,所以必然会实现channelRead方法。源码如下:
/** 累积缓存数据 */ ByteBuf cumulation; /** 累积缓存合并工具 */ private Cumulator cumulator = MERGE_CUMULATOR; /** 是否第一次接收 */ private boolean first; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { /** out存储已解析的对象集合 */ CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; /** * ByteBuf cumulation表示累积的缓冲字节 * 如果cumluation为空,表示当前没有累积字节 * */ first = cumulation == null; if (first) { /** 如果是第一次,则直接将接收的数据赋值给cumlation*/ cumulation = data; } else { /** 如果有累积数据,则将累积数据和接收的新数据进行合并*/ cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } /** 调用具体的解码器进行解码,解析到对象成功之后存放到out中 */ callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { /** 如果累积字节不为空并且ByteBuf不可读,那么将累积的ByteBuf进行释放*/ if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { /** numReads表示读取次数,达到一定次数之后进行可读区域的压缩*/ numReads = 0; /** 调用ByteBuf的discard方法进行空间压缩*/ discardSomeReadBytes(); } int size = out.size(); firedChannelRead |= out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { //如果msg不是ByteBuf类型直接不处理交给下一个ChannelHandlerContext处理 ctx.fireChannelRead(msg); } }
这里代码比较多,所以需要捋下逻辑,整体逻辑不复杂,主要分成以下几个步骤:
1、首先判断接收到的数据是否是ByteBuf类型,如果不是则直接交给下一个ChannelHandlerContext处理,如果是ByteBuf类型才尝试进行解析
2、由于网络传输可能存在粘包或拆包的情况,所以每次接收到一个ByteBuf并不一定就是一个完整的业务数据,比如客户端发送了一个字符串过来,但是字符串比较大被分成了两个ByteBuf传递过来,也有可能粘包将两个字符串合并成功一个ByteBuf传递过来
所以需要对ByteBuf进行解析,解析之后可能会剩余部分字节,或者一次性解析不成功,那么就需要暂时将解析不了的ByteBuf缓存起来。该类内部有一个属性ByteBuf类型的cumulation就是用来存储累积的缓存数据。
3、判断cumulation是否为空,如果为空表示没有累积数据,那么当前就是第一次接收数据,则直接将接收的数据存到cumulation中;如果已经存在累积数据,那么就需要对累积数据和接收的新数据进行合并处理
4、将接收到的数据或者是合并累积数据之后的数据调用callDecode方法进行解析,CodecOutputList是用来存储解析成功的对象列表,比如解析成功了两个字符串,那么就将这两个字符串存放到这个List中
5、最后调用fireChannelRead方法将解析到的业务数据列表传递给其他的ChannlHandler进行处理
整体流程捋清楚之后再逐步进行分析,
一、首先看下callDecode方法的实现,看看是如何进行业务数据的解析的。源码如下:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { /** 当ByteBuf可读 */ while (in.isReadable()) { /** 如果outSize有值,表示至少有一个业务包被完全解码成功*/ int outSize = out.size(); if (outSize > 0) { /** 将业务包传递给下一个ChannelHandlerContext处理*/ fireChannelRead(ctx, out, outSize); /** 清理缓存*/ out.clear(); if (ctx.isRemoved()) { break; } outSize = 0; } /** 获取ByteBuf可读字节数*/ int oldInputLength = in.readableBytes(); /** 具体解码处理 */ decodeRemovalReentryProtection(ctx, in, out); if (ctx.isRemoved()) { break; } /** 如果解析的对象数量不变,也就是子类没有解析成功*/ if (outSize == out.size()) { /*** 如果可读字节数没有变化,跳出循环*/ if (oldInputLength == in.readableBytes()) { break; } else { /** 如果可读字节数发生变化,那么继续进行解析*/ continue; } } /** 执行到此处说明out数量发生改变了,也就是成功解析到了对象 * 此时如果可读字节没有变化,那么意思就是没有读任何字节但是解析到了一个对象,很显然是代码出问题了,抛异常 * */ if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); } }
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE; try { /** 调用子类进行具体的解码逻辑,解析对象成功之后会存到out中 */ decode(ctx, in, out); } finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING; decodeState = STATE_INIT; if (removePending) { /** 如果ContextHandlerContext被删除了,就将解析的数据传播出去*/ fireChannelRead(ctx, out, out.size()); out.clear(); handlerRemoved(ctx); } } }
还是捋清楚步骤为主,首先要清楚该方法的作用是什么,callDecode方法的作用是将参数ByteBuf in中的数据进行读取并解析,解析到业务数据之后存放到List