前言

每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和 目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。那么它们的区别是什么呢?
如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列— 它的数据。那么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将 网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。

一、Netty中的解码器基类ByteToMessageDecoder

由于业务层仅仅关系业务数据,而网络层传输的又是字节数据,所以需要有一个解码器将字节数据转化成业务数据类型,比如JSON格式、字符串格式、对象格式等等。Netty中提供了解码器基类用于将接收到的缓冲字节格式数据转化成业务层需要的对象格式。
Netty中的解码器基类为ByteToMessageDecoder,该类继承之ChannelInboundHandlerApapter,既然是解码字节数据,所以必然会实现channelRead方法。源码如下:

  1. /** 累积缓存数据 */
  2. ByteBuf cumulation;
  3. /** 累积缓存合并工具 */
  4. private Cumulator cumulator = MERGE_CUMULATOR;
  5. /** 是否第一次接收 */
  6. private boolean first;
  7. @Override
  8. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  9. if (msg instanceof ByteBuf) {
  10. /** out存储已解析的对象集合 */
  11. CodecOutputList out = CodecOutputList.newInstance();
  12. try {
  13. ByteBuf data = (ByteBuf) msg;
  14. /**
  15. * ByteBuf cumulation表示累积的缓冲字节
  16. * 如果cumluation为空,表示当前没有累积字节
  17. * */
  18. first = cumulation == null;
  19. if (first) {
  20. /** 如果是第一次,则直接将接收的数据赋值给cumlation*/
  21. cumulation = data;
  22. } else {
  23. /** 如果有累积数据,则将累积数据和接收的新数据进行合并*/
  24. cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
  25. }
  26. /** 调用具体的解码器进行解码,解析到对象成功之后存放到out中 */
  27. callDecode(ctx, cumulation, out);
  28. } catch (DecoderException e) {
  29. throw e;
  30. } catch (Exception e) {
  31. throw new DecoderException(e);
  32. } finally {
  33. /** 如果累积字节不为空并且ByteBuf不可读,那么将累积的ByteBuf进行释放*/
  34. if (cumulation != null && !cumulation.isReadable()) {
  35. numReads = 0;
  36. cumulation.release();
  37. cumulation = null;
  38. } else if (++ numReads >= discardAfterReads) {
  39. /** numReads表示读取次数,达到一定次数之后进行可读区域的压缩*/
  40. numReads = 0;
  41. /** 调用ByteBuf的discard方法进行空间压缩*/
  42. discardSomeReadBytes();
  43. }
  44. int size = out.size();
  45. firedChannelRead |= out.insertSinceRecycled();
  46. fireChannelRead(ctx, out, size);
  47. out.recycle();
  48. }
  49. } else {
  50. //如果msg不是ByteBuf类型直接不处理交给下一个ChannelHandlerContext处理
  51. ctx.fireChannelRead(msg);
  52. }
  53. }

这里代码比较多,所以需要捋下逻辑,整体逻辑不复杂,主要分成以下几个步骤:
1、首先判断接收到的数据是否是ByteBuf类型,如果不是则直接交给下一个ChannelHandlerContext处理,如果是ByteBuf类型才尝试进行解析
2、由于网络传输可能存在粘包或拆包的情况,所以每次接收到一个ByteBuf并不一定就是一个完整的业务数据,比如客户端发送了一个字符串过来,但是字符串比较大被分成了两个ByteBuf传递过来,也有可能粘包将两个字符串合并成功一个ByteBuf传递过来
所以需要对ByteBuf进行解析,解析之后可能会剩余部分字节,或者一次性解析不成功,那么就需要暂时将解析不了的ByteBuf缓存起来。该类内部有一个属性ByteBuf类型的cumulation就是用来存储累积的缓存数据。
3、判断cumulation是否为空,如果为空表示没有累积数据,那么当前就是第一次接收数据,则直接将接收的数据存到cumulation中;如果已经存在累积数据,那么就需要对累积数据和接收的新数据进行合并处理
4、将接收到的数据或者是合并累积数据之后的数据调用callDecode方法进行解析,CodecOutputList是用来存储解析成功的对象列表,比如解析成功了两个字符串,那么就将这两个字符串存放到这个List中
5、最后调用fireChannelRead方法将解析到的业务数据列表传递给其他的ChannlHandler进行处理
整体流程捋清楚之后再逐步进行分析,
一、首先看下callDecode方法的实现,看看是如何进行业务数据的解析的。源码如下:

  1. protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
  2. try {
  3. /** 当ByteBuf可读 */
  4. while (in.isReadable()) {
  5. /** 如果outSize有值,表示至少有一个业务包被完全解码成功*/
  6. int outSize = out.size();
  7. if (outSize > 0) {
  8. /** 将业务包传递给下一个ChannelHandlerContext处理*/
  9. fireChannelRead(ctx, out, outSize);
  10. /** 清理缓存*/
  11. out.clear();
  12. if (ctx.isRemoved()) {
  13. break;
  14. }
  15. outSize = 0;
  16. }
  17. /** 获取ByteBuf可读字节数*/
  18. int oldInputLength = in.readableBytes();
  19. /** 具体解码处理 */
  20. decodeRemovalReentryProtection(ctx, in, out);
  21. if (ctx.isRemoved()) {
  22. break;
  23. }
  24. /** 如果解析的对象数量不变,也就是子类没有解析成功*/
  25. if (outSize == out.size()) {
  26. /*** 如果可读字节数没有变化,跳出循环*/
  27. if (oldInputLength == in.readableBytes()) {
  28. break;
  29. } else {
  30. /** 如果可读字节数发生变化,那么继续进行解析*/
  31. continue;
  32. }
  33. }
  34. /** 执行到此处说明out数量发生改变了,也就是成功解析到了对象
  35. * 此时如果可读字节没有变化,那么意思就是没有读任何字节但是解析到了一个对象,很显然是代码出问题了,抛异常
  36. * */
  37. if (oldInputLength == in.readableBytes()) {
  38. throw new DecoderException(
  39. StringUtil.simpleClassName(getClass()) +
  40. ".decode() did not read anything but decoded a message.");
  41. }
  42. if (isSingleDecode()) {
  43. break;
  44. }
  45. }
  46. } catch (DecoderException e) {
  47. throw e;
  48. } catch (Exception cause) {
  49. throw new DecoderException(cause);
  50. }
  51. }
  1. final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
  2. throws Exception {
  3. decodeState = STATE_CALLING_CHILD_DECODE;
  4. try {
  5. /** 调用子类进行具体的解码逻辑,解析对象成功之后会存到out中 */
  6. decode(ctx, in, out);
  7. } finally {
  8. boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
  9. decodeState = STATE_INIT;
  10. if (removePending) {
  11. /** 如果ContextHandlerContext被删除了,就将解析的数据传播出去*/
  12. fireChannelRead(ctx, out, out.size());
  13. out.clear();
  14. handlerRemoved(ctx);
  15. }
  16. }
  17. }

还是捋清楚步骤为主,首先要清楚该方法的作用是什么,callDecode方法的作用是将参数ByteBuf in中的数据进行读取并解析,解析到业务数据之后存放到List out中,并将业务数据传播给下一个ChannelHandlerContext ctx
主要步骤如下:
1、while循环中判断in是否可读,可读的情况下才尝试解析
2、判断当前的out是否为空,如果不为空则先将解析成功的业务数据调用fireChannelRead方法将业务数据传播出去,并调用out.clear()清空
3、获取in的可读字节数,并调用decodeRemovalReentryProtection(ctx, in ,out)方法进行解析,该方法的工作就是实际的解析过程,调用子类的decode方法进行解析。具体的解析规则需要具体的子类去实现
4、解析完成之后再判断一次out是否为空,如果out数量不变,就表示没有解析业务数据成功,此时再判断可读字节数有没有发生改变,如果没有发生变化说明当前的缓存in还不足以解析成一个业务数据,所以直接跳出循环,等着再接收一下数据来了再解析
5、如果可读字节数发生了变化,那么可能部分数据已经被解析成功了,所以执行continue继续进解析

二、分析完了解析的流程,再看下fireChannelRead传播的过程,源码如下:

  1. /**
  2. * @param ctx:下一个ChannelHandlerContext
  3. * @param msgs:解析成功的业务数据列表
  4. * @param numElements:业务数据个数
  5. * */
  6. static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
  7. for (int i = 0; i < numElements; i ++) {
  8. ctx.fireChannelRead(msgs.getUnsafe(i));
  9. }
  10. }

逻辑比较简单,就是遍历解析成功的业务数据列表,调用下一个ChannelHandlerContext的fireChannelRead方法进行传播
三、最后再分析下累积数据合并的逻辑
ByteBuf合并的任务由一个合并工具接口Cumlator,实现类源码如下:

  1. /**
  2. * 将累积未读的缓存数据cumulation和新接收到的字节数据in进行合并
  3. * */
  4. public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
  5. final ByteBuf buffer;
  6. /**
  7. * cumulation.writeIndex:累积缓存的写索引
  8. * cumulation.maxCapacity:累积缓存的最大容量
  9. * in.readableBytes:新接收缓存的可读字节数
  10. *
  11. * 如果累积缓存区容量不足 或者 累积缓冲区引用数大于1 或者 累积缓冲区是只读的
  12. * 则不可以直接将新接收的数据in写入到cumulation对象中,就需要进行重新分配缓冲区
  13. * */
  14. if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
  15. || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
  16. /** 将累积缓冲区进行扩容,重新分配内存给buffer对象,并将累积缓存数据写入新buffer中 */
  17. buffer = expandCumulation(alloc, cumulation, in.readableBytes());
  18. } else {
  19. /** 将累积缓冲区赋值给buffer */
  20. buffer = cumulation;
  21. }
  22. /** 将新接收的数据写入到buffer中 */
  23. buffer.writeBytes(in);
  24. /** 释放in对象*/
  25. in.release();
  26. return buffer;
  27. }
  28. /**
  29. * @param alloc:内存分配器
  30. * @param cumulation:累积缓冲区
  31. * @param readable:可读容量
  32. * */
  33. static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
  34. ByteBuf oldCumulation = cumulation;
  35. /** 重新分配缓冲区,容量大小为缓冲区容量+新接收的可读容量 */
  36. cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
  37. /** 将累积缓冲写入到扩容之后的缓冲区对象中 */
  38. cumulation.writeBytes(oldCumulation);
  39. /** 释放旧的累积缓冲区对象 */
  40. oldCumulation.release();
  41. return cumulation;
  42. }

这里逻辑比较清晰,合并缓存实际就是将新接收的ByteBuf写入到累积的ByteBuf中,如果累积的ByteBuf容量不足,就先进行扩容操作,然后再将新接收的ByteBuf写入进行达到合并的效果。

二、解码器之LengthFieldBasedFrameDecoder

ByteToMessageDecoder的作用实际上是使用了模版方法设计模式,将具体的解码业务留给子类实现,而除了解码工作之外的工作都已经实现了。比如将不能解析的数据进行累积缓存、合并缓存、将业务数据继续传播等。
LengthFieldBasedFrameDecoder是ByteToMessageDecoder的子类,作用是将数据包分成包头和消息体,包头中包含了消息体的长度,先读取包头读取长度,然后再读取对应长度的消息。
通过上一节可知,ByteToMessageDecoder将具体的解码工作是执行decode方法,这个方法是个抽象方法,需要子类去实现,所以LengthFieldBasedFrameDecoder的核心也就是实现decode方法。
LengthFieldBasedFrameDecoder的构造函数源码如下:

  1. /** 构造函数
  2. * @param byteOrder:数据存储采用大端模式或小端模式
  3. * @param maxFrameLength:发生的数据帧最大长度
  4. * @param lengthFieldOffset:数据长度值位于字节数组的位置
  5. * @param lengthFieldLength:数据长度值占用字节数组的长度
  6. * @param initialBytesToStrip:跳过数据包前多少位数
  7. * @param failFast:值为true表示如果读取到长度值超过maxFrameLength直接报错
  8. *
  9. * 如lengthFieldOffset=0,lengthFieldLength=4,那么接收到ByteBuf之后,会从ByteBuf的数组第0位字节开始读取4个字节,得到数据包的长度
  10. * 然后再从数组第4位字节开始读取指定长度的数据进行解析
  11. * */
  12. public LengthFieldBasedFrameDecoder(
  13. ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
  14. int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
  15. /** 参数校验*/
  16. if (byteOrder == null) {
  17. throw new NullPointerException("byteOrder");
  18. }
  19. if (maxFrameLength <= 0) {
  20. throw new IllegalArgumentException(
  21. "maxFrameLength must be a positive integer: " +
  22. maxFrameLength);
  23. }
  24. if (lengthFieldOffset < 0) {
  25. throw new IllegalArgumentException(
  26. "lengthFieldOffset must be a non-negative integer: " +
  27. lengthFieldOffset);
  28. }
  29. if (initialBytesToStrip < 0) {
  30. throw new IllegalArgumentException(
  31. "initialBytesToStrip must be a non-negative integer: " +
  32. initialBytesToStrip);
  33. }
  34. /** 长度值位数不可超过最大长度 */
  35. if (lengthFieldOffset > maxFrameLength - lengthFieldLength) {
  36. throw new IllegalArgumentException(
  37. "maxFrameLength (" + maxFrameLength + ") " +
  38. "must be equal to or greater than " +
  39. "lengthFieldOffset (" + lengthFieldOffset + ") + " +
  40. "lengthFieldLength (" + lengthFieldLength + ").");
  41. }
  42. /** 属性赋值 */
  43. this.byteOrder = byteOrder;
  44. this.maxFrameLength = maxFrameLength;
  45. this.lengthFieldOffset = lengthFieldOffset;
  46. this.lengthFieldLength = lengthFieldLength;
  47. this.lengthAdjustment = lengthAdjustment;
  48. lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;//表示长度值的结束下标值
  49. this.initialBytesToStrip = initialBytesToStrip;
  50. this.failFast = failFast;
  51. }

LengthFieldBasedFrameDecoder核心属性lengthFieldOffset表示数据包长度的开始位,lengthFiledEndOffset表示数据包长度的结束位,也就表示数组[lengthFieldOffset] ~ 数组[lengthFieldEndOffset] 的值表示数据包长度
再看下LengthFieldBasedFrameDecoder的具体实现,源码如下:

  1. protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
  2. if (discardingTooLongFrame) {
  3. discardingTooLongFrame(in);
  4. }
  5. /** 如果ByteBuf的长度小于数据包长度的偏移量,那么表示数据包长度都无法解析,直接返回null*/
  6. if (in.readableBytes() < lengthFieldEndOffset) {
  7. return null;
  8. }
  9. /** 长度偏移量 + ByteBuf的读索引,表示需要读取的数据包长度区域偏移量*/
  10. int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
  11. /** 获取包长度*/
  12. long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
  13. /** 当frameLength为负数,直接抛异常*/
  14. if (frameLength < 0) {
  15. failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
  16. }
  17. frameLength += lengthAdjustment + lengthFieldEndOffset;
  18. /** 当包长度小于长度区位直接抛异常*/
  19. if (frameLength < lengthFieldEndOffset) {
  20. failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
  21. }
  22. /** 超过最大长度表示出现大包情况,则直接丢弃数据*/
  23. if (frameLength > maxFrameLength) {
  24. exceededFrameLength(in, frameLength);
  25. return null;
  26. }
  27. int frameLengthInt = (int) frameLength;
  28. /** 如果ByteBuf可读字节数小于数据包长度,那么表示无法解析成一个完成的数据包,直接返回null*/
  29. if (in.readableBytes() < frameLengthInt) {
  30. return null;
  31. }
  32. /** 如果需要跳过的字节数大于数据包长度,那么直接抛异常*/
  33. if (initialBytesToStrip > frameLengthInt) {
  34. failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
  35. }
  36. /** 跳过指定长度的字节 */
  37. in.skipBytes(initialBytesToStrip);
  38. /** 进入到这里说明参数校验都已经通过,并且可以解析成一个完整的业务数据包了 */
  39. //获取读索引
  40. int readerIndex = in.readerIndex();
  41. //获取数据包总长度-跳过的字节数 = 实际需要读的字节数
  42. int actualFrameLength = frameLengthInt - initialBytesToStrip;
  43. /** 从ByteBuf中抽取完整的业务数据包, 从readerIndex位置开始读到actualFrameLength位置 */
  44. ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
  45. /** 修改ByteBuf的读索引值 */
  46. in.readerIndex(readerIndex + actualFrameLength);
  47. /** 返回读取到的完整数据包 */
  48. return frame;
  49. }

这里代码比较多,但是总体的逻辑并不复杂,主要是对于数据长度的校验工作,核心步骤如下:
1、调用getUnadjustedFrameLength方法读取数据包的长度
2、判断数据包长度和缓冲区的可读数据长度,如果数据包长度大于缓冲区可读长度表示发生拆包现象,直接返回null;如果数据包长度小于等于缓冲区可读长度,那么表示可以读取到一个完整的数据包
3、根据设置的跳过字节长度,跳过指定位数的数据
4、调用extractFrame方法从缓冲区读取一个完整的数据包
5、更新缓冲区的读索引值,并返回完整的数据包
接下里对核心方法进行分析
1、getUnadjustedFrameLength方法源码如下:

  1. /**
  2. * 获取包的长度
  3. * 从buf的字节数组中offset位置开始读取,读取位数为length
  4. * */
  5. protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
  6. buf = buf.order(order);
  7. long frameLength;
  8. switch (length) {
  9. case 1://读取1个字节,表示长度为byte类型
  10. frameLength = buf.getUnsignedByte(offset);
  11. break;
  12. case 2://读取2个字节,表示长度为short类型
  13. frameLength = buf.getUnsignedShort(offset);
  14. break;
  15. case 3://读取3个字节,表示长度为Medium类型
  16. frameLength = buf.getUnsignedMedium(offset);
  17. break;
  18. case 4://读取4个字节,表示长度为int类型
  19. frameLength = buf.getUnsignedInt(offset);
  20. break;
  21. case 8://读取8个字节,表示长度为long类型
  22. frameLength = buf.getLong(offset);
  23. break;
  24. default:
  25. throw new DecoderException(
  26. "unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
  27. }
  28. return frameLength;
  29. }

主要是判断数据包长度的占字节数,根据字节长度从缓冲区ByteBuf中读取指定的值
2、extractFrame方法源码如下:

  1. 1 protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
  2. 2 return buffer.retainedSlice(index, length);
  3. 3 }

代码比较简洁实际就是调用ByteBuuuf的retainedSlice方法,该方法的作用是从ByteBuf从截取一部分数据,从index位置开始截取长度为length的数据。而retain的作用是不修改原缓冲区的读写索引,相当于复制一部分数据生成新的ByteBuf对象。
总结LengthFieldBasedFrameDecoder的实现原理:
1、首先从ByteBuf的指定位置读取指定长度的字节数据,得到数据包的长度,并进行长度校验,如果长度校验不通过则直接返回null,表示解析业务数据包失败。
2、从ByteBuf中指定位置开始复制指定长度的数据,得到一个完整的数据包
3、修改ByteBuf的读索引,并返回完整的数据包