说明

自定义适配器可从两个方面入手。

  • 第一,则是直接从DataHandlingAdapter继承,此时可以接触到最原始的TCP数据,可以自定实现数据的继续投递方式。但一般实现算法比较困难,因为所考虑的情况比较多。
  • 第二,则是从CustomDataHandlingAdapter(用户快捷自定义适配器)继承,此时数据的投递必须通过IRequestInfo,ByteBlock将为null。所需考虑的情况比较单一,对于数据的处理也比较简单。

    原始DataHandlingAdapter

    自己实现适配器,然后使其工作。例如:假设如下数据格式,第一个字节表示整个数据长度(包括数据类型和指令类型),第二字节表示数据类型,第三字节表示指令类型,后续字节表示其他数据。

    其次,希望在发送时,只传入数据类型,指令类型和其他数据,而数据长度则由适配器自行封装。最后,希望在接收端每次能接收到一个完整的数据。

2、原始自定义适配器 - 图1

实现

首先,创建类,继承自DataHandlingAdapter,然后实现对应属性,及方法。

CanSplicingSend 属性和实现:能有效减少由封装数据而带来的复制操作,下面会讲解。

  1. class MyDataHandleAdapter : DataHandlingAdapter
  2. {
  3. /// <summary>
  4. /// 是否支持拼接发送,为false的话可以不实现<see cref="PreviewSend(IList{TransferByte}, bool)"/>
  5. /// </summary>
  6. public override bool CanSplicingSend => false;
  7. protected override void PreviewReceived(ByteBlock byteBlock)
  8. {
  9. }
  10. protected override void PreviewSend(byte[] buffer, int offset, int length, bool isAsync)
  11. {
  12. }
  13. protected override void PreviewSend(IList<TransferByte> transferBytes, bool isAsync)
  14. {
  15. //暂时不实现。
  16. }
  17. }

【封装发送数据长度】
封装发送数据时,比较简单,示例如下:

  1. protected override void PreviewSend(byte[] buffer, int offset, int length, bool isAsync)
  2. {
  3. int dataLen = length - offset;//先获取需要发送的实际数据长度
  4. if (dataLen > byte.MaxValue)//超长判断
  5. {
  6. throw new RRQMOverlengthException("发送数据太长。");
  7. }
  8. ByteBlock byteBlock = BytePool.GetByteBlock(64 * 1024);//从内存池申请内存块,因为此处数据绝不超过255,所以避免内存池碎片化,每次申请64K
  9. //ByteBlock byteBlock = BytePool.GetByteBlock(dataLen+1);//实际写法。
  10. try
  11. {
  12. byteBlock.Write((byte)dataLen);//先写长度
  13. byteBlock.Write(buffer, offset, length);//再写数据
  14. if (isAsync)//判断异步
  15. {
  16. byte[] data = byteBlock.ToArray();//使用异步时不能将byteBlock.Buffer进行发送,应当ToArray成新的Byte[]。
  17. this.GoSend(data, 0, data.Length, isAsync);//调用GoSend,实际发送
  18. }
  19. else
  20. {
  21. this.GoSend(byteBlock.Buffer, 0, byteBlock.Len, isAsync);
  22. }
  23. }
  24. finally
  25. {
  26. byteBlock.Dispose();//释放内存块
  27. }
  28. }

【 解封接收数据】
从原生适配器解封数据,需要考虑的情况比较多。在本示例中,需要考虑以下情况:

  1. 一次刚好接收一个数据。
  2. 一次刚好接收了多个数据。
  3. 一次接收了多个数据,但最后一个数据不完整。
  4. 一次未接收完一个数据。

综上,情况比较复杂,所以就必须自己做接收数据的缓存容器。

  1. /// <summary>
  2. /// 临时包,此包仅当前实例储存
  3. /// </summary>
  4. private ByteBlock tempByteBlock;
  5. /// <summary>
  6. /// 包剩余长度
  7. /// </summary>
  8. private byte surPlusLength;
  9. protected override void PreviewReceived(ByteBlock byteBlock)
  10. {
  11. byte[] buffer = byteBlock.Buffer;
  12. int r = byteBlock.Len;
  13. if (this.tempByteBlock == null)//如果没有临时包,则直接分包。
  14. {
  15. SplitPackage(buffer, 0, r);
  16. }
  17. else
  18. {
  19. if (surPlusLength == r)//接收长度正好等于剩余长度,组合完数据以后直接处理数据。
  20. {
  21. this.tempByteBlock.Write(buffer, 0, surPlusLength);
  22. PreviewHandle(this.tempByteBlock);
  23. this.tempByteBlock = null;
  24. surPlusLength = 0;
  25. }
  26. else if (surPlusLength < r)//接收长度大于剩余长度,先组合包,然后处理包,然后将剩下的分包。
  27. {
  28. this.tempByteBlock.Write(buffer, 0, surPlusLength);
  29. PreviewHandle(this.tempByteBlock);
  30. this.tempByteBlock = null;
  31. SplitPackage(buffer, surPlusLength, r);
  32. }
  33. else//接收长度小于剩余长度,无法处理包,所以必须先组合包,然后等下次接收。
  34. {
  35. this.tempByteBlock.Write(buffer, 0, r);
  36. surPlusLength -= (byte)r;
  37. }
  38. }
  39. }
  40. /// <summary>
  41. /// 分解包
  42. /// </summary>
  43. /// <param name="dataBuffer"></param>
  44. /// <param name="index"></param>
  45. /// <param name="r"></param>
  46. private void SplitPackage(byte[] dataBuffer, int index, int r)
  47. {
  48. while (index < r)
  49. {
  50. byte length = dataBuffer[index];
  51. int recedSurPlusLength = r - index - 1;
  52. if (recedSurPlusLength >= length)
  53. {
  54. ByteBlock byteBlock = BytePool.GetByteBlock(length);
  55. byteBlock.Write(dataBuffer, index + 1, length);
  56. PreviewHandle(byteBlock);
  57. surPlusLength = 0;
  58. }
  59. else//半包
  60. {
  61. this.tempByteBlock = BytePool.GetByteBlock(length);
  62. surPlusLength = (byte)(length - recedSurPlusLength);
  63. this.tempByteBlock.Write(dataBuffer, index + 1, recedSurPlusLength);
  64. }
  65. index += length + 1;
  66. }
  67. }
  68. /// <summary>
  69. /// 处理数据
  70. /// </summary>
  71. /// <param name="byteBlock"></param>
  72. private void PreviewHandle(ByteBlock byteBlock)
  73. {
  74. try
  75. {
  76. this.GoReceived(byteBlock, null);
  77. }
  78. finally
  79. {
  80. byteBlock.Dispose();//在框架里面将内存块释放
  81. }
  82. }

使用自定义适配器

自定义适配器的使用和预设的适配器一样。不过在该案例中,发送数据时,应当传入三个有效值,分别为数据类型指令类型其他数据

对自定义适配器进行单元测试

适配器写完以后,需要经过缜密测试,方才能使用。在RRQM中内置了DataAdapterTester进行模拟、接收的适配器测试类,该测试类能模拟粘包、分包等情况。如果通过,则基本说明能适应99%的情况。具体使用如下:

包长度的工作机制相当于发送固定长度的数据,例如发送方多次发送数据{1,2,3,4,5}。如果设置包长度为10,则在接收时会收到{1,2,3,4,5,1,2,3,4,5},如果包长度设置为7,则会收到{1,2,3,4,5,1,2}{3,4,5,1,2,3,4},以此类推。所以在测试时应当多次设置该值,且最好不要整除发送的数据长度,以模拟更恶劣的环境

【适配器测试】
适配器在发送时会自动封装数据长度,所以不用写。

  1. [Theory]
  2. [InlineData(10000, 3)]
  3. [InlineData(10000, 5)]
  4. [InlineData(10000, 200)]
  5. [InlineData(10000, 500)]
  6. [InlineData(10000, 1000)]
  7. public void MyCustomDataHandlingAdapterShouldBeOk(int inputCount, int bufferLength)
  8. {
  9. DataAdapterTester tester = DataAdapterTester.CreateTester(new MyDataHandlingAdapter(), bufferLength);//用BufferLength模拟粘包,分包
  10. ByteBlock block = new ByteBlock();
  11. block.Write((byte)1);//写入数据类型
  12. block.Write((byte)1);//写入数据指令
  13. byte[] buffer = new byte[100];
  14. new Random().NextBytes(buffer);
  15. block.Write(buffer);//写入数据
  16. byte[] data = block.ToArray();
  17. //输出测试时间,用于衡量适配性能
  18. output.WriteLine(tester.Run(data, inputCount, inputCount, 1000 * 2).ToString());
  19. }

封装函数及分片发送意义

在上述案例中,发送数据时应当传入数据类型指令类型其他数据三个有效值,而在RRQM中,发送函数仅有Send(和重载),这无疑需要我们自己封装其他方法
假设以下情况需要实现:

数据类型有两种,分别为Up(1),Down(0)。指令类型有两种,分别为Go(1)、Hold(0)。数据类型和指令类型可以任意组合,且均可携带其他数据。

面对上述情况,我们可以封装以下函数使用:

  1. public class MySocketClient : SimpleSocketClient
  2. {
  3. public void Up_Go_Send(byte[] data)
  4. {
  5. ByteBlock byteBlock = BytePool.GetByteBlock(this.BufferLength);//内存池实现,可以直接new byte[].
  6. byteBlock.Write((byte)1);
  7. byteBlock.Write((byte)1);
  8. byteBlock.Write(data);
  9. try
  10. {
  11. this.Send(byteBlock);
  12. }
  13. finally
  14. {
  15. byteBlock.Dispose();
  16. }
  17. }
  18. public void Down_Go_Send(byte[] data)
  19. {
  20. ByteBlock byteBlock = BytePool.GetByteBlock(this.BufferLength);//内存池实现,可以直接new byte[].
  21. byteBlock.Write((byte)0);
  22. byteBlock.Write((byte)1);
  23. byteBlock.Write(data);
  24. try
  25. {
  26. this.Send(byteBlock);
  27. }
  28. finally
  29. {
  30. byteBlock.Dispose();
  31. }
  32. }
  33. public void Up_Hold_Send(byte[] data)
  34. {
  35. ByteBlock byteBlock = BytePool.GetByteBlock(this.BufferLength);//内存池实现,可以直接new byte[].
  36. byteBlock.Write((byte)1);
  37. byteBlock.Write((byte)0);
  38. byteBlock.Write(data);
  39. try
  40. {
  41. this.Send(byteBlock);
  42. }
  43. finally
  44. {
  45. byteBlock.Dispose();
  46. }
  47. }
  48. public void Down_Hold_Send(byte[] data)
  49. {
  50. ByteBlock byteBlock = BytePool.GetByteBlock(this.BufferLength);//内存池实现,可以直接new byte[].
  51. byteBlock.Write((byte)0);
  52. byteBlock.Write((byte)0);
  53. byteBlock.Write(data);
  54. try
  55. {
  56. this.Send(byteBlock);
  57. }
  58. finally
  59. {
  60. byteBlock.Dispose();
  61. }
  62. }
  63. }

为什么要分片发送??

在示例代码中不难看出,封装的函数将发送数据进行了Write操作(相当于Copy),这无疑是消耗性能的。只是在该案例中,复制的数据最大为255,感觉优化效果甚微,倘若我们需要发送的数据是1Mb,那就相当于在封装数据时,因为前两个字节的存在而复制1Mb的数据(冤死了),然后在适配器中还需要因为数据包头,再复制一次。

优化。。。
所以我们在封装时,可以使用分片发送,但是同时也需要适配器支持。不然内部会出错。

  1. protected override void PreviewSend(IList<TransferByte> transferBytes, bool isAsync)
  2. {
  3. int dataLen = 0;
  4. foreach (var item in transferBytes)
  5. {
  6. dataLen += item.Length;
  7. }
  8. if (dataLen > byte.MaxValue)//超长判断
  9. {
  10. throw new RRQMOverlengthException("发送数据太长。");
  11. }
  12. ByteBlock byteBlock = BytePool.GetByteBlock(64 * 1024);//从内存池申请内存块,因为此处数据绝不超过255,所以避免内存池碎片化,每次申请64K
  13. //ByteBlock byteBlock = BytePool.GetByteBlock(dataLen+1);//实际写法。
  14. try
  15. {
  16. byteBlock.Write((byte)dataLen);//先写长度
  17. foreach (var item in transferBytes)
  18. {
  19. byteBlock.Write(item.Buffer, item.Offset, item.Length);//依次写入
  20. }
  21. if (isAsync)//判断异步
  22. {
  23. byte[] data = byteBlock.ToArray();//使用异步时不能将byteBlock.Buffer进行发送,应当ToArray成新的Byte[]。
  24. this.GoSend(data, 0, data.Length, isAsync);//调用GoSend,实际发送
  25. }
  26. else
  27. {
  28. this.GoSend(byteBlock.Buffer, 0, byteBlock.Len, isAsync);
  29. }
  30. }
  31. finally
  32. {
  33. byteBlock.Dispose();
  34. }
  35. }

重新封装函数。。。

  1. public void Up_Go_SplicingSend(byte[] data)
  2. {
  3. List<TransferByte> transferBytes = new List<TransferByte>();
  4. transferBytes.Add(new TransferByte(new byte[] { 1}));
  5. transferBytes.Add(new TransferByte(new byte[] { 1}));
  6. transferBytes.Add(new TransferByte(data));
  7. this.Send(transferBytes);
  8. }
  9. public void Down_Go_SplicingSend(byte[] data)
  10. {
  11. List<TransferByte> transferBytes = new List<TransferByte>();
  12. transferBytes.Add(new TransferByte(new byte[] { 0 }));
  13. transferBytes.Add(new TransferByte(new byte[] { 1 }));
  14. transferBytes.Add(new TransferByte(data));
  15. this.Send(transferBytes);
  16. }
  17. public void Up_Hold_SplicingSend(byte[] data)
  18. {
  19. List<TransferByte> transferBytes = new List<TransferByte>();
  20. transferBytes.Add(new TransferByte(new byte[] { 1 }));
  21. transferBytes.Add(new TransferByte(new byte[] { 0 }));
  22. transferBytes.Add(new TransferByte(data));
  23. this.Send(transferBytes);
  24. }
  25. public void Down_Hold_SplicingSend(byte[] data)
  26. {
  27. List<TransferByte> transferBytes = new List<TransferByte>();
  28. transferBytes.Add(new TransferByte(new byte[] { 0 }));
  29. transferBytes.Add(new TransferByte(new byte[] { 0 }));
  30. transferBytes.Add(new TransferByte(data));
  31. this.Send(transferBytes);
  32. }