一、序列化选择
从下图(图片来源网络)可以看出,序列化是RPC中至关重要的一个环节,可以说,序列化的优劣,会很大程度的影响RPC调用性能。
1.1 RRQM支持的序列化
在RRQMRPC中,内置了四种序列化方式,分别为RRQMBinary
、Json
、Xml
。这四种方式的特点,就是其序列化的特点。
RRQMBinary | Json | Xml | |
---|---|---|---|
特点 | 序列化方式速度快,数据量小,但是兼容的数据格式也比较有限。仅支持基础类型、自定义实体类、数组、List、字典 | 兼容性好,可读性强,但是受字符串影响,性能不出众,且数据量受限制 | 兼容性一般,可读性强,同样受字符串影响,性能不出众,且数据量受限制 |
1.2 使用预设序列化
在RRQMRPC中,选择序列化是非常简单的,且序列化方式完全由调用端
决定。
在实际的调用中,通过InvokeOption
的参数指定。
实际上,只需要传入相关参数即可。
InvokeOption invokeOption = new InvokeOption();
invokeOption.SerializationType = RRQMCore.Serialization.SerializationType.RRQMBinary;
//invokeOption.SerializationType = RRQMCore.Serialization.SerializationType.Json;
//invokeOption.SerializationType = RRQMCore.Serialization.SerializationType.Xml;
string returnString = client.Invoke<string>("TestOne", invokeOption, "10");
1.3 自定义序列化
a).定义
想要实现自定义序列化,必须通过重写序列化选择器,实现SerializeParameter
和DeserializeParameter
函数。如果还想留用预设序列化,请按下代码示例即可。
public class MySerializationSelector: SerializationSelector
{
/// <summary>
/// 反序列化
/// </summary>
/// <param name="serializationType"></param>
/// <param name="parameterBytes"></param>
/// <param name="parameterType"></param>
/// <returns></returns>
public override object DeserializeParameter(SerializationType serializationType, byte[] parameterBytes, Type parameterType)
{
if (parameterBytes == null)
{
return parameterType.GetDefault();
}
switch (serializationType)
{
case SerializationType.RRQMBinary:
{
return SerializeConvert.RRQMBinaryDeserialize(parameterBytes, 0, parameterType);
}
case SerializationType.Json:
{
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(parameterBytes), parameterType);
}
case SerializationType.Xml:
{
return SerializeConvert.XmlDeserializeFromBytes(parameterBytes, parameterType);
}
case (SerializationType)4:
{
//此处可自行实现
return default;
}
default:
throw new RRQMRPCException("未指定的反序列化方式");
}
}
/// <summary>
/// 序列化参数
/// </summary>
/// <param name="serializationType"></param>
/// <param name="parameter"></param>
/// <returns></returns>
public override byte[] SerializeParameter(SerializationType serializationType, object parameter)
{
if (parameter == null)
{
return null;
}
switch (serializationType)
{
case SerializationType.RRQMBinary:
{
return SerializeConvert.RRQMBinarySerialize(parameter, true);
}
case SerializationType.Json:
{
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(parameter));
}
case SerializationType.Xml:
{
return SerializeConvert.XmlSerializeToBytes(parameter);
}
case (SerializationType)4:
{
//此处可自行实现
return default;
}
default:
throw new RRQMRPCException("未指定的序列化方式");
}
}
}
b).使用
首先在解析器
和客户端
配置中赋值解析器。
然后,因为赋值时是SerializationType
的枚举类型,所以执行强制类型转换即可。
InvokeOption invokeOption = new InvokeOption();
invokeOption.SerializationType = (RRQMCore.Serialization.SerializationType)4;
二、复用ID(重置ID)
在TouchRpc中,存在于服务器的辅助客户端(SocketClient),与远程客户端(Client)是一一对应关系,其ID也完全一致。
所以在任意一方修改ID(调用ResetID),都会同时修改远程ID。
所以合理使用该操作,可以完成复用ID(重置ID)的需求。
注意:此操作仅在SocketClient或Client生效。如果在Service直接调用,则只会修改服务器端ID。
四、调用超时设置
调用RPC,不能无限制等待,必须要有计时器,或者任务取消的功能。
4.1 计时器设置
直接对InvokeOption
的Timeout
属性赋值即可,单位为毫秒
。
InvokeOption invokeOption = new InvokeOption();
invokeOption.Timeout = 1000 * 10;//10秒后无反应,则抛出RRQMTimeoutException异常
string returnString = client.Invoke<string>("TestOne", invokeOption, "10");
4.2 任务取消
在RPC调用时,计时器是一个好的选择,但是还不够完美,有时候我们希望能手动终结某个调用任务。这时候,计时器就不堪重任,需要能主动取消任务的功能。熟悉.net的小伙伴都知道,CancellationToken是具备这个功能的。同样的,只需要对InvokeOption
的CancellationToken
赋值即可。
InvokeOption invokeOption = new InvokeOption();
CancellationTokenSource tokenSource = new CancellationTokenSource();
invokeOption.CancellationToken = tokenSource.Token;
//tokenSource.Cancel();//调用时取消任务
string returnString = client.Invoke<string>("TestOne", invokeOption, "10");
4.3 服务任务取消
实际上7.2的取消任务,仅仅能实现让客户端取消请求,但是服务器并不知道,如果想让服务器也感知任务消息,就必须依托于调用上下文。
此处的取消,有可能是调用者主动取消。也有可能是调用者已经掉线。
public class ElapsedTimeRpcServer : ServerProvider
{
[Description("测试可取消的调用")]
[RRQMRPC(MethodFlags.IncludeCallContext)]
public bool DelayInvoke(ICallContext serverCallContext,int tick)//同步服务
{
for (int i = 0; i < tick; i++)
{
Thread.Sleep(100);
if (serverCallContext.TokenSource.IsCancellationRequested)
{
Console.WriteLine("客户端已经取消该任务!");
return false;//实际上在取消时,客户端得不到该值
}
}
return true;
}
}
五、代理类型添加
通过之前的学习,大家可能大概明白了,在RRQMRPC中,客户端与服务器在进行交互时,所需的数据结构不要求是同一类型,仅是数据类型结构相同即可。所以在声明了服务以后,服务中所包含的自定义类型,会被复刻成结构相同的类型,但是这也仅仅局限于参数与服务
相同程序集
的时候。如果服务中引入了其他程序集的数据结构,则不会复刻。所以在客户端调用时,需要引入同一程序集。
但是,往往在服务中,会引入其他程序集,例如,我们习惯在项目中建立一个Models程序集,用于存放所有的实体模型,那是不是意味着客户端也必须引入这个程序集才能调用呢?没别的方法了??
有,且不只有一种
5.1 添加代理类型
在服务注册之前,任意时刻,可调用CodeGenerator.AddProxyType静态方法,添加代理类型,同时可传入一个bool值,表明是否深度搜索,比如,假如RpcArgsClassLib.ProxyClass1中还有其他类型,则参数为True时,依然会代理。
RPCService rpcService = new RPCService();
CodeGenerator.AddProxyType<RpcArgsClassLib.ProxyClass1>();
CodeGenerator.AddProxyType<RpcArgsClassLib.ProxyClass2>(deepSearch:true);
5.2 标记自定义类
在需要代理的类上面声明RRQMProxy标签,然后也可以重新指定代理类名。
[RRQMProxy("MyArgs")]
public class Args
{
}
六、大数据传输
在RPC中,并没有对传输的数据做限制,但是因为RPC默认使用的固定包头适配器中,默认设置的可传递数据为10Mb,所以在RPC中,用户可一次性传递的数据包大约为9.9Mb。所以,如果用户传递超出阈值的数据,适配器则会触发异常,而无法接收。但在实际上RPC的使用中,大数据的传输也是很重要的一个环节,所以RRQM已经做了大数据的传输思路建议,希望能有效解决大家的麻烦。
6.1 设置适配器参数(推荐指数:⭐)
操作原理:在固定包头适配器中,默认限制了单次可发送数据包的最大值,所以可以修改此值实现目的。
该方法简单粗暴,能够解决一定程度的大数据问题,但并不建议这么做,除非对带宽有绝对信心。
订阅Connecting或重写OnConnecting,然后对e.DataHandlingAdapter重新赋值。
注意:客户端必须同样设置。
TcpRpcParser tcpRPCParser = new TcpRpcParser();
tcpRPCParser.Connecting += (client, e) =>
{
e.DataHandlingAdapter = new FixedHeaderDataHandlingAdapter() { MaxSizeHeader = 1024 * 1024 * 20 };
};
6.2 RPC嵌套Channel(推荐指数:⭐⭐⭐⭐⭐)
操作原理:先利用RPC让客户端与服务器约定特定的Channel,然后后续数据通过Channel传递,最后由RPC返回结果。
1)ServiceToClient
【Service端】
/// <summary>
/// "测试ServiceToClient创建通道,从而实现流数据的传输"
/// </summary>
/// <param name="callContext"></param>
/// <param name="channelID"></param>
[Description("测试ServiceToClient创建通道,从而实现流数据的传输")]
[RRQMRPC(MethodFlags.IncludeCallContext)]
public int Test28_ServiceToClientChannel(ICallContext callContext, int channelID,int count)
{
int size = 0;
if (callContext.Caller is RpcSocketClient socketClient)
{
if (socketClient.TrySubscribeChannel(channelID, out Channel channel))
{
for (int i = 0; i < count; i++)
{
size += 1024;
channel.Write(new byte[1024]);
}
channel.Complete();//必须调用指令函数,如Complete,Cancel,Dispose
}
}
return size;
}
【Client端】
public void Test28(TcpRpcClient client)
{
int count = 1000;
ChannelStatus status = ChannelStatus.Default;
Channel channel = client.CreateChannel();
Task.Run(() =>
{
while (channel.MoveNext())
{
byte[] data = channel.GetCurrent();
}
status = channel.Status;
});
int result = server.Test28_ServiceToClientChannel(channel.ID,count);
Thread.Sleep(2000);
Assert.Equal(count*1024,result);
Assert.Equal(ChannelStatus.Completed, status);
}
2)ClientToService
【Service端】
/// <summary>
/// "测试ClientToService创建通道,从而实现流数据的传输"
/// </summary>
/// <param name="callContext"></param>
/// <param name="channelID"></param>
[Description("测试ClientToService创建通道,从而实现流数据的传输")]
[RRQMRPC(MethodFlags.IncludeCallContext)]
public int Test29_ClientToServiceChannel(ICallContext callContext)
{
//调用该函数时,创建通道,然后返回通道ID。
if (callContext.Caller is RpcSocketClient socketClient)
{
Channel channel = socketClient.CreateChannel();
Task.Run(()=>
{
while (channel.MoveNext())
{
byte[] data = channel.GetCurrent();//此处可以处理数据
}
if (channel.Status== ChannelStatus.Completed)//完成
{
}
});
return channel.ID;
}
return -1;
}
【Client端】
public void Test29(TcpRpcClient client)
{
int channelID = server.Test29_ClientToServiceChannel();
if (client.TrySubscribeChannel(channelID,out Channel channel))
{
for (int i = 0; i < 1000; i++)
{
channel.Write(new byte[1024]);
}
channel.Complete();//必须调用指令函数,如Complete,Cancel,Dispose
}
}
6.3 Stream数据发送(推荐指数:⭐⭐⭐⭐⭐)
可以把数据当作流数据来发送,此时,用法和Protocol组件一致。详细用法参阅:
注意:TcpRpcClient继承自ProtocolClient,所以具有ProtocolClient所有功能。TcpRpcParser继承自ProtocolService,所以具有ProtocolService所有功能。
- 【RRQMSocket(十)】C# 基于TCP自定义协议Stream、文件等发送与接收
七、RPC异步及线程调度
7.1 RPC异步设置
在RRQMRPC-TCP中,服务在定义时,属性标签有个属性async,值得注意。这个属性默认是True,意为该服务是异步触发,需要切换线程。
该设置的作用一般在服务复杂度较高的时候设置为True,如果复杂度较低,则直接使用接收线程处理,这样可以避免线程的切换,效率会更高。
[Description("测试并发性能")]
[RRQMRPC(async: true)]
public int ConPerformance(int num)
{
return ++num;
}
7.2 RPC线程调度
在async为
False
时,服务由接收线程直接触发,然后阻塞等待服务返回的话,此时会阻塞整个TCP
的接收。所以这块在反向调用RPC
或者执行高复杂度
的代码时应该严格注意。 在async为True
时,服务由Task触发,即使服务阻塞,也不影响再次接收,且返回值由Task线程投递,对于服务的返回没有任何影响。
建议:没事不用管,默认True挺好。
八、RPC服务器广播(或Invoke)全部客户端。
[RRQMRPC]
public void InvokeAll()
{
if (this.RpcService.TryGetRpcParser("tcpRpcParser", out IRpcParser rpcParser))//通过添加解析器时的键,找到对应的解析器
{
if (rpcParser is TcpRpcParser tcpRpcParser)//转换为TcpRpcParser,或对应解析器类型
{
RpcSocketClient[] clients = tcpRpcParser.GetClients();//获取到所以客户端。
foreach (RpcSocketClient client in clients)
{
client.Invoke("Callback",null);
}
}
}
}