说明
自定义适配器可从两个方面入手。
- 第一,则是直接从DataHandlingAdapter继承,此时可以接触到最原始的TCP数据,可以自定实现数据的继续投递方式。但一般实现算法比较困难,因为所考虑的情况比较多。
- 第二,则是从CustomDataHandlingAdapter(用户快捷自定义适配器)继承,此时数据的投递必须通过IRequestInfo,ByteBlock将为null。所需考虑的情况比较单一,对于数据的处理也比较简单。
原始DataHandlingAdapter
自己实现适配器,然后使其工作。例如:假设如下数据格式,第一个字节表示整个数据长度(包括数据类型和指令类型),第二字节表示数据类型,第三字节表示指令类型,后续字节表示其他数据。其次,希望在发送时,只传入数据类型,指令类型和其他数据,而数据长度则由适配器自行封装。最后,希望在接收端每次能接收到一个完整的数据。
实现
首先,创建类,继承自DataHandlingAdapter
,然后实现对应属性,及方法。
CanSplicingSend 属性和实现:能有效减少由封装数据而带来的复制操作,下面会讲解。
class MyDataHandleAdapter : DataHandlingAdapter
{
/// <summary>
/// 是否支持拼接发送,为false的话可以不实现<see cref="PreviewSend(IList{TransferByte}, bool)"/>
/// </summary>
public override bool CanSplicingSend => false;
protected override void PreviewReceived(ByteBlock byteBlock)
{
}
protected override void PreviewSend(byte[] buffer, int offset, int length, bool isAsync)
{
}
protected override void PreviewSend(IList<TransferByte> transferBytes, bool isAsync)
{
//暂时不实现。
}
}
【封装发送数据长度】
封装发送数据时,比较简单,示例如下:
protected override void PreviewSend(byte[] buffer, int offset, int length, bool isAsync)
{
int dataLen = length - offset;//先获取需要发送的实际数据长度
if (dataLen > byte.MaxValue)//超长判断
{
throw new RRQMOverlengthException("发送数据太长。");
}
ByteBlock byteBlock = BytePool.GetByteBlock(64 * 1024);//从内存池申请内存块,因为此处数据绝不超过255,所以避免内存池碎片化,每次申请64K
//ByteBlock byteBlock = BytePool.GetByteBlock(dataLen+1);//实际写法。
try
{
byteBlock.Write((byte)dataLen);//先写长度
byteBlock.Write(buffer, offset, length);//再写数据
if (isAsync)//判断异步
{
byte[] data = byteBlock.ToArray();//使用异步时不能将byteBlock.Buffer进行发送,应当ToArray成新的Byte[]。
this.GoSend(data, 0, data.Length, isAsync);//调用GoSend,实际发送
}
else
{
this.GoSend(byteBlock.Buffer, 0, byteBlock.Len, isAsync);
}
}
finally
{
byteBlock.Dispose();//释放内存块
}
}
【 解封接收数据】
从原生适配器解封数据,需要考虑的情况比较多。在本示例中,需要考虑以下情况:
- 一次刚好接收一个数据。
- 一次刚好接收了多个数据。
- 一次接收了多个数据,但最后一个数据不完整。
- 一次未接收完一个数据。
综上,情况比较复杂,所以就必须自己做接收数据的缓存容器。
/// <summary>
/// 临时包,此包仅当前实例储存
/// </summary>
private ByteBlock tempByteBlock;
/// <summary>
/// 包剩余长度
/// </summary>
private byte surPlusLength;
protected override void PreviewReceived(ByteBlock byteBlock)
{
byte[] buffer = byteBlock.Buffer;
int r = byteBlock.Len;
if (this.tempByteBlock == null)//如果没有临时包,则直接分包。
{
SplitPackage(buffer, 0, r);
}
else
{
if (surPlusLength == r)//接收长度正好等于剩余长度,组合完数据以后直接处理数据。
{
this.tempByteBlock.Write(buffer, 0, surPlusLength);
PreviewHandle(this.tempByteBlock);
this.tempByteBlock = null;
surPlusLength = 0;
}
else if (surPlusLength < r)//接收长度大于剩余长度,先组合包,然后处理包,然后将剩下的分包。
{
this.tempByteBlock.Write(buffer, 0, surPlusLength);
PreviewHandle(this.tempByteBlock);
this.tempByteBlock = null;
SplitPackage(buffer, surPlusLength, r);
}
else//接收长度小于剩余长度,无法处理包,所以必须先组合包,然后等下次接收。
{
this.tempByteBlock.Write(buffer, 0, r);
surPlusLength -= (byte)r;
}
}
}
/// <summary>
/// 分解包
/// </summary>
/// <param name="dataBuffer"></param>
/// <param name="index"></param>
/// <param name="r"></param>
private void SplitPackage(byte[] dataBuffer, int index, int r)
{
while (index < r)
{
byte length = dataBuffer[index];
int recedSurPlusLength = r - index - 1;
if (recedSurPlusLength >= length)
{
ByteBlock byteBlock = BytePool.GetByteBlock(length);
byteBlock.Write(dataBuffer, index + 1, length);
PreviewHandle(byteBlock);
surPlusLength = 0;
}
else//半包
{
this.tempByteBlock = BytePool.GetByteBlock(length);
surPlusLength = (byte)(length - recedSurPlusLength);
this.tempByteBlock.Write(dataBuffer, index + 1, recedSurPlusLength);
}
index += length + 1;
}
}
/// <summary>
/// 处理数据
/// </summary>
/// <param name="byteBlock"></param>
private void PreviewHandle(ByteBlock byteBlock)
{
try
{
this.GoReceived(byteBlock, null);
}
finally
{
byteBlock.Dispose();//在框架里面将内存块释放
}
}
使用自定义适配器
自定义适配器的使用和预设的适配器一样。不过在该案例中,发送数据时,应当传入三个有效值,分别为数据类型,指令类型,其他数据。
对自定义适配器进行单元测试
适配器写完以后,需要经过缜密测试,方才能使用。在RRQM中内置了DataAdapterTester
进行模拟、接收的适配器测试类,该测试类能模拟粘包、分包等情况。如果通过,则基本说明能适应99%的情况。具体使用如下:
包长度的工作机制相当于发送固定长度的数据,例如发送方多次发送数据{1,2,3,4,5}。如果设置包长度为10,则在接收时会收到{1,2,3,4,5,1,2,3,4,5},如果包长度设置为7,则会收到{1,2,3,4,5,1,2}{3,4,5,1,2,3,4},以此类推。所以在测试时应当多次设置该值,且最好不要整除发送的数据长度,以模拟更恶劣的环境
【适配器测试】
适配器在发送时会自动封装数据长度,所以不用写。
[Theory]
[InlineData(10000, 3)]
[InlineData(10000, 5)]
[InlineData(10000, 200)]
[InlineData(10000, 500)]
[InlineData(10000, 1000)]
public void MyCustomDataHandlingAdapterShouldBeOk(int inputCount, int bufferLength)
{
DataAdapterTester tester = DataAdapterTester.CreateTester(new MyDataHandlingAdapter(), bufferLength);//用BufferLength模拟粘包,分包
ByteBlock block = new ByteBlock();
block.Write((byte)1);//写入数据类型
block.Write((byte)1);//写入数据指令
byte[] buffer = new byte[100];
new Random().NextBytes(buffer);
block.Write(buffer);//写入数据
byte[] data = block.ToArray();
//输出测试时间,用于衡量适配性能
output.WriteLine(tester.Run(data, inputCount, inputCount, 1000 * 2).ToString());
}
封装函数及分片发送意义
在上述案例中,发送数据时应当传入数据类型,指令类型,其他数据三个有效值,而在RRQM中,发送函数仅有Send(和重载),这无疑需要我们自己封装其他方法
。
假设以下情况需要实现:
数据类型有两种,分别为Up(1),Down(0)。指令类型有两种,分别为Go(1)、Hold(0)。数据类型和指令类型可以任意组合,且均可携带其他数据。
面对上述情况,我们可以封装以下函数使用:
public class MySocketClient : SimpleSocketClient
{
public void Up_Go_Send(byte[] data)
{
ByteBlock byteBlock = BytePool.GetByteBlock(this.BufferLength);//内存池实现,可以直接new byte[].
byteBlock.Write((byte)1);
byteBlock.Write((byte)1);
byteBlock.Write(data);
try
{
this.Send(byteBlock);
}
finally
{
byteBlock.Dispose();
}
}
public void Down_Go_Send(byte[] data)
{
ByteBlock byteBlock = BytePool.GetByteBlock(this.BufferLength);//内存池实现,可以直接new byte[].
byteBlock.Write((byte)0);
byteBlock.Write((byte)1);
byteBlock.Write(data);
try
{
this.Send(byteBlock);
}
finally
{
byteBlock.Dispose();
}
}
public void Up_Hold_Send(byte[] data)
{
ByteBlock byteBlock = BytePool.GetByteBlock(this.BufferLength);//内存池实现,可以直接new byte[].
byteBlock.Write((byte)1);
byteBlock.Write((byte)0);
byteBlock.Write(data);
try
{
this.Send(byteBlock);
}
finally
{
byteBlock.Dispose();
}
}
public void Down_Hold_Send(byte[] data)
{
ByteBlock byteBlock = BytePool.GetByteBlock(this.BufferLength);//内存池实现,可以直接new byte[].
byteBlock.Write((byte)0);
byteBlock.Write((byte)0);
byteBlock.Write(data);
try
{
this.Send(byteBlock);
}
finally
{
byteBlock.Dispose();
}
}
}
为什么要分片发送??
在示例代码中不难看出,封装的函数将发送数据进行了Write操作(相当于Copy),这无疑是消耗性能的。只是在该案例中,复制的数据最大为255,感觉优化效果甚微,倘若我们需要发送的数据是1Mb,那就相当于在封装数据时,因为前两个字节的存在而复制1Mb的数据(冤死了),然后在适配器中还需要因为数据包头,再复制一次。
优化。。。
所以我们在封装时,可以使用分片发送,但是同时也需要适配器支持。不然内部会出错。
protected override void PreviewSend(IList<TransferByte> transferBytes, bool isAsync)
{
int dataLen = 0;
foreach (var item in transferBytes)
{
dataLen += item.Length;
}
if (dataLen > byte.MaxValue)//超长判断
{
throw new RRQMOverlengthException("发送数据太长。");
}
ByteBlock byteBlock = BytePool.GetByteBlock(64 * 1024);//从内存池申请内存块,因为此处数据绝不超过255,所以避免内存池碎片化,每次申请64K
//ByteBlock byteBlock = BytePool.GetByteBlock(dataLen+1);//实际写法。
try
{
byteBlock.Write((byte)dataLen);//先写长度
foreach (var item in transferBytes)
{
byteBlock.Write(item.Buffer, item.Offset, item.Length);//依次写入
}
if (isAsync)//判断异步
{
byte[] data = byteBlock.ToArray();//使用异步时不能将byteBlock.Buffer进行发送,应当ToArray成新的Byte[]。
this.GoSend(data, 0, data.Length, isAsync);//调用GoSend,实际发送
}
else
{
this.GoSend(byteBlock.Buffer, 0, byteBlock.Len, isAsync);
}
}
finally
{
byteBlock.Dispose();
}
}
重新封装函数。。。
public void Up_Go_SplicingSend(byte[] data)
{
List<TransferByte> transferBytes = new List<TransferByte>();
transferBytes.Add(new TransferByte(new byte[] { 1}));
transferBytes.Add(new TransferByte(new byte[] { 1}));
transferBytes.Add(new TransferByte(data));
this.Send(transferBytes);
}
public void Down_Go_SplicingSend(byte[] data)
{
List<TransferByte> transferBytes = new List<TransferByte>();
transferBytes.Add(new TransferByte(new byte[] { 0 }));
transferBytes.Add(new TransferByte(new byte[] { 1 }));
transferBytes.Add(new TransferByte(data));
this.Send(transferBytes);
}
public void Up_Hold_SplicingSend(byte[] data)
{
List<TransferByte> transferBytes = new List<TransferByte>();
transferBytes.Add(new TransferByte(new byte[] { 1 }));
transferBytes.Add(new TransferByte(new byte[] { 0 }));
transferBytes.Add(new TransferByte(data));
this.Send(transferBytes);
}
public void Down_Hold_SplicingSend(byte[] data)
{
List<TransferByte> transferBytes = new List<TransferByte>();
transferBytes.Add(new TransferByte(new byte[] { 0 }));
transferBytes.Add(new TransferByte(new byte[] { 0 }));
transferBytes.Add(new TransferByte(data));
this.Send(transferBytes);
}