一、序列化选择

从下图(图片来源网络)可以看出,序列化是RPC中至关重要的一个环节,可以说,序列化的优劣,会很大程度的影响RPC调用性能。
7、高级设置、使用 - 图1

1.1 RRQM支持的序列化

在RRQMRPC中,内置了四种序列化方式,分别为RRQMBinaryJsonXml。这四种方式的特点,就是其序列化的特点。

RRQMBinary Json Xml
特点 序列化方式速度快,数据量小,但是兼容的数据格式也比较有限。仅支持基础类型、自定义实体类、数组、List、字典 兼容性好,可读性强,但是受字符串影响,性能不出众,且数据量受限制 兼容性一般,可读性强,同样受字符串影响,性能不出众,且数据量受限制

1.2 使用预设序列化

在RRQMRPC中,选择序列化是非常简单的,且序列化方式完全由调用端决定。
在实际的调用中,通过InvokeOption的参数指定。
7、高级设置、使用 - 图2
实际上,只需要传入相关参数即可。

  1. InvokeOption invokeOption = new InvokeOption();
  2. invokeOption.SerializationType = RRQMCore.Serialization.SerializationType.RRQMBinary;
  3. //invokeOption.SerializationType = RRQMCore.Serialization.SerializationType.Json;
  4. //invokeOption.SerializationType = RRQMCore.Serialization.SerializationType.Xml;
  5. string returnString = client.Invoke<string>("TestOne", invokeOption, "10");

1.3 自定义序列化

a).定义
想要实现自定义序列化,必须通过重写序列化选择器,实现SerializeParameterDeserializeParameter函数。如果还想留用预设序列化,请按下代码示例即可。

  1. public class MySerializationSelector: SerializationSelector
  2. {
  3. /// <summary>
  4. /// 反序列化
  5. /// </summary>
  6. /// <param name="serializationType"></param>
  7. /// <param name="parameterBytes"></param>
  8. /// <param name="parameterType"></param>
  9. /// <returns></returns>
  10. public override object DeserializeParameter(SerializationType serializationType, byte[] parameterBytes, Type parameterType)
  11. {
  12. if (parameterBytes == null)
  13. {
  14. return parameterType.GetDefault();
  15. }
  16. switch (serializationType)
  17. {
  18. case SerializationType.RRQMBinary:
  19. {
  20. return SerializeConvert.RRQMBinaryDeserialize(parameterBytes, 0, parameterType);
  21. }
  22. case SerializationType.Json:
  23. {
  24. return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(parameterBytes), parameterType);
  25. }
  26. case SerializationType.Xml:
  27. {
  28. return SerializeConvert.XmlDeserializeFromBytes(parameterBytes, parameterType);
  29. }
  30. case (SerializationType)4:
  31. {
  32. //此处可自行实现
  33. return default;
  34. }
  35. default:
  36. throw new RRQMRPCException("未指定的反序列化方式");
  37. }
  38. }
  39. /// <summary>
  40. /// 序列化参数
  41. /// </summary>
  42. /// <param name="serializationType"></param>
  43. /// <param name="parameter"></param>
  44. /// <returns></returns>
  45. public override byte[] SerializeParameter(SerializationType serializationType, object parameter)
  46. {
  47. if (parameter == null)
  48. {
  49. return null;
  50. }
  51. switch (serializationType)
  52. {
  53. case SerializationType.RRQMBinary:
  54. {
  55. return SerializeConvert.RRQMBinarySerialize(parameter, true);
  56. }
  57. case SerializationType.Json:
  58. {
  59. return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(parameter));
  60. }
  61. case SerializationType.Xml:
  62. {
  63. return SerializeConvert.XmlSerializeToBytes(parameter);
  64. }
  65. case (SerializationType)4:
  66. {
  67. //此处可自行实现
  68. return default;
  69. }
  70. default:
  71. throw new RRQMRPCException("未指定的序列化方式");
  72. }
  73. }
  74. }

b).使用
首先在解析器客户端配置中赋值解析器。
7、高级设置、使用 - 图3
然后,因为赋值时是SerializationType的枚举类型,所以执行强制类型转换即可。

  1. InvokeOption invokeOption = new InvokeOption();
  2. invokeOption.SerializationType = (RRQMCore.Serialization.SerializationType)4;

二、复用ID(重置ID)

在TouchRpc中,存在于服务器的辅助客户端(SocketClient),与远程客户端(Client)是一一对应关系,其ID也完全一致

所以在任意一方修改ID(调用ResetID),都会同时修改远程ID。

所以合理使用该操作,可以完成复用ID(重置ID)的需求。

注意:此操作仅在SocketClient或Client生效。如果在Service直接调用,则只会修改服务器端ID。

四、调用超时设置

调用RPC,不能无限制等待,必须要有计时器,或者任务取消的功能。

4.1 计时器设置

直接对InvokeOptionTimeout 属性赋值即可,单位为毫秒

  1. InvokeOption invokeOption = new InvokeOption();
  2. invokeOption.Timeout = 1000 * 10;//10秒后无反应,则抛出RRQMTimeoutException异常
  3. string returnString = client.Invoke<string>("TestOne", invokeOption, "10");

4.2 任务取消

在RPC调用时,计时器是一个好的选择,但是还不够完美,有时候我们希望能手动终结某个调用任务。这时候,计时器就不堪重任,需要能主动取消任务的功能。熟悉.net的小伙伴都知道,CancellationToken是具备这个功能的。同样的,只需要对InvokeOptionCancellationToken 赋值即可。

  1. InvokeOption invokeOption = new InvokeOption();
  2. CancellationTokenSource tokenSource = new CancellationTokenSource();
  3. invokeOption.CancellationToken = tokenSource.Token;
  4. //tokenSource.Cancel();//调用时取消任务
  5. string returnString = client.Invoke<string>("TestOne", invokeOption, "10");

4.3 服务任务取消

实际上7.2的取消任务,仅仅能实现让客户端取消请求,但是服务器并不知道,如果想让服务器也感知任务消息,就必须依托于调用上下文。

此处的取消,有可能是调用者主动取消。也有可能是调用者已经掉线。

  1. public class ElapsedTimeRpcServer : ServerProvider
  2. {
  3. [Description("测试可取消的调用")]
  4. [RRQMRPC(MethodFlags.IncludeCallContext)]
  5. public bool DelayInvoke(ICallContext serverCallContext,int tick)//同步服务
  6. {
  7. for (int i = 0; i < tick; i++)
  8. {
  9. Thread.Sleep(100);
  10. if (serverCallContext.TokenSource.IsCancellationRequested)
  11. {
  12. Console.WriteLine("客户端已经取消该任务!");
  13. return false;//实际上在取消时,客户端得不到该值
  14. }
  15. }
  16. return true;
  17. }
  18. }

五、代理类型添加

通过之前的学习,大家可能大概明白了,在RRQMRPC中,客户端与服务器在进行交互时,所需的数据结构不要求是同一类型,仅是数据类型结构相同即可。所以在声明了服务以后,服务中所包含的自定义类型,会被复刻成结构相同的类型,但是这也仅仅局限于参数与服务相同程序集的时候。如果服务中引入了其他程序集的数据结构,则不会复刻。所以在客户端调用时,需要引入同一程序集。

但是,往往在服务中,会引入其他程序集,例如,我们习惯在项目中建立一个Models程序集,用于存放所有的实体模型,那是不是意味着客户端也必须引入这个程序集才能调用呢?没别的方法了??
有,且不只有一种

5.1 添加代理类型

在服务注册之前,任意时刻,可调用CodeGenerator.AddProxyType静态方法,添加代理类型,同时可传入一个bool值,表明是否深度搜索,比如,假如RpcArgsClassLib.ProxyClass1中还有其他类型,则参数为True时,依然会代理。

  1. RPCService rpcService = new RPCService();
  2. CodeGenerator.AddProxyType<RpcArgsClassLib.ProxyClass1>();
  3. CodeGenerator.AddProxyType<RpcArgsClassLib.ProxyClass2>(deepSearch:true);

5.2 标记自定义类

在需要代理的类上面声明RRQMProxy标签,然后也可以重新指定代理类名。

  1. [RRQMProxy("MyArgs")]
  2. public class Args
  3. {
  4. }

六、大数据传输

在RPC中,并没有对传输的数据做限制,但是因为RPC默认使用的固定包头适配器中,默认设置的可传递数据为10Mb,所以在RPC中,用户可一次性传递的数据包大约为9.9Mb。所以,如果用户传递超出阈值的数据,适配器则会触发异常,而无法接收。但在实际上RPC的使用中,大数据的传输也是很重要的一个环节,所以RRQM已经做了大数据的传输思路建议,希望能有效解决大家的麻烦。

6.1 设置适配器参数(推荐指数:⭐)

操作原理:在固定包头适配器中,默认限制了单次可发送数据包的最大值,所以可以修改此值实现目的。

该方法简单粗暴,能够解决一定程度的大数据问题,但并不建议这么做,除非对带宽有绝对信心。
订阅Connecting或重写OnConnecting,然后对e.DataHandlingAdapter重新赋值。
注意:客户端必须同样设置。

  1. TcpRpcParser tcpRPCParser = new TcpRpcParser();
  2. tcpRPCParser.Connecting += (client, e) =>
  3. {
  4. e.DataHandlingAdapter = new FixedHeaderDataHandlingAdapter() { MaxSizeHeader = 1024 * 1024 * 20 };
  5. };

6.2 RPC嵌套Channel(推荐指数:⭐⭐⭐⭐⭐)

操作原理:先利用RPC让客户端与服务器约定特定的Channel,然后后续数据通过Channel传递,最后由RPC返回结果。

1)ServiceToClient
【Service端】

  1. /// <summary>
  2. /// "测试ServiceToClient创建通道,从而实现流数据的传输"
  3. /// </summary>
  4. /// <param name="callContext"></param>
  5. /// <param name="channelID"></param>
  6. [Description("测试ServiceToClient创建通道,从而实现流数据的传输")]
  7. [RRQMRPC(MethodFlags.IncludeCallContext)]
  8. public int Test28_ServiceToClientChannel(ICallContext callContext, int channelID,int count)
  9. {
  10. int size = 0;
  11. if (callContext.Caller is RpcSocketClient socketClient)
  12. {
  13. if (socketClient.TrySubscribeChannel(channelID, out Channel channel))
  14. {
  15. for (int i = 0; i < count; i++)
  16. {
  17. size += 1024;
  18. channel.Write(new byte[1024]);
  19. }
  20. channel.Complete();//必须调用指令函数,如Complete,Cancel,Dispose
  21. }
  22. }
  23. return size;
  24. }

【Client端】

  1. public void Test28(TcpRpcClient client)
  2. {
  3. int count = 1000;
  4. ChannelStatus status = ChannelStatus.Default;
  5. Channel channel = client.CreateChannel();
  6. Task.Run(() =>
  7. {
  8. while (channel.MoveNext())
  9. {
  10. byte[] data = channel.GetCurrent();
  11. }
  12. status = channel.Status;
  13. });
  14. int result = server.Test28_ServiceToClientChannel(channel.ID,count);
  15. Thread.Sleep(2000);
  16. Assert.Equal(count*1024,result);
  17. Assert.Equal(ChannelStatus.Completed, status);
  18. }

2)ClientToService
【Service端】

  1. /// <summary>
  2. /// "测试ClientToService创建通道,从而实现流数据的传输"
  3. /// </summary>
  4. /// <param name="callContext"></param>
  5. /// <param name="channelID"></param>
  6. [Description("测试ClientToService创建通道,从而实现流数据的传输")]
  7. [RRQMRPC(MethodFlags.IncludeCallContext)]
  8. public int Test29_ClientToServiceChannel(ICallContext callContext)
  9. {
  10. //调用该函数时,创建通道,然后返回通道ID。
  11. if (callContext.Caller is RpcSocketClient socketClient)
  12. {
  13. Channel channel = socketClient.CreateChannel();
  14. Task.Run(()=>
  15. {
  16. while (channel.MoveNext())
  17. {
  18. byte[] data = channel.GetCurrent();//此处可以处理数据
  19. }
  20. if (channel.Status== ChannelStatus.Completed)//完成
  21. {
  22. }
  23. });
  24. return channel.ID;
  25. }
  26. return -1;
  27. }

【Client端】

  1. public void Test29(TcpRpcClient client)
  2. {
  3. int channelID = server.Test29_ClientToServiceChannel();
  4. if (client.TrySubscribeChannel(channelID,out Channel channel))
  5. {
  6. for (int i = 0; i < 1000; i++)
  7. {
  8. channel.Write(new byte[1024]);
  9. }
  10. channel.Complete();//必须调用指令函数,如Complete,Cancel,Dispose
  11. }
  12. }

6.3 Stream数据发送(推荐指数:⭐⭐⭐⭐⭐)

可以把数据当作流数据来发送,此时,用法和Protocol组件一致。详细用法参阅:

注意:TcpRpcClient继承自ProtocolClient,所以具有ProtocolClient所有功能。TcpRpcParser继承自ProtocolService,所以具有ProtocolService所有功能。

该设置的作用一般在服务复杂度较高的时候设置为True,如果复杂度较低,则直接使用接收线程处理,这样可以避免线程的切换,效率会更高。

  1. [Description("测试并发性能")]
  2. [RRQMRPC(async: true)]
  3. public int ConPerformance(int num)
  4. {
  5. return ++num;
  6. }

7.2 RPC线程调度

在async为False时,服务由接收线程直接触发,然后阻塞等待服务返回的话,此时会阻塞整个TCP的接收。所以这块在反向调用RPC或者执行高复杂度的代码时应该严格注意。 在async为True时,服务由Task触发,即使服务阻塞,也不影响再次接收,且返回值由Task线程投递,对于服务的返回没有任何影响。

建议:没事不用管,默认True挺好。

八、RPC服务器广播(或Invoke)全部客户端。

  1. [RRQMRPC]
  2. public void InvokeAll()
  3. {
  4. if (this.RpcService.TryGetRpcParser("tcpRpcParser", out IRpcParser rpcParser))//通过添加解析器时的键,找到对应的解析器
  5. {
  6. if (rpcParser is TcpRpcParser tcpRpcParser)//转换为TcpRpcParser,或对应解析器类型
  7. {
  8. RpcSocketClient[] clients = tcpRpcParser.GetClients();//获取到所以客户端。
  9. foreach (RpcSocketClient client in clients)
  10. {
  11. client.Invoke("Callback",null);
  12. }
  13. }
  14. }
  15. }