https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html
http://rabbitmq.mr-ping.com/tutorials_with_csharp/rpc.html

远程过程调用(RPC)

这个教程里,我们用RabbitMQ来建立一个RPC系统:一个客户端和一个可扩展的RPC服务器。因为没有任何耗时任务用于分发,我们会创建一个伪造的用来返回斐波那契数列的RPC服务。

客户端接口

为了说明如何使用一个RPC服务,我们创建一个简单的客户端类。它会暴露一个名为Call的方法,此方法发送一个RPC请求,然后阻塞到收到回答为止。

  1. var rpcClient = new RPCClient();
  2. Console.WriteLine(" [x] Requesting fib(30)");
  3. var response = rpcClient.Call("30");
  4. Console.WriteLine(" [.] Got '{0}'", response);
  5. rpcClient.Close();

回调队列

通常情况下,通过RabbitMQ来使用RPC非常简单。一个客户端发送请求消息,一个服务器返回消息来进行响应。为了能够收到响应,我们需要发送一个带有回调队列地址的请求:

  1. var props = channel.CreateBasicProperties();
  2. props.ReplyTo = replyQueueName;
  3. var messageBytes = Encoding.UTF8.GetBytes(message);
  4. channel.BasicPublish(exchange: "",
  5. routingKey: "rpc_queue",
  6. basicProperties: props,
  7. body: messageBytes);

消息属性

AMQP 0-9-1 协议与定义了14个消息的属性。大部分属性很少会用到,不过以下几个例外:

  • Persistent: 标示此信息为持久的(用值为2来表示)还是暂时的(2之外的其他任何值)。具体可以去 第二个教程 一探究竟。
  • DeliveryMode:那些熟悉协议的人可能会选择使用这个属性,而不是Persistent。他们办的是一个事情。
  • ContentType:用来描述编码的mime-type。例如对于经常用到的JSON编码来说,将此属性设置为application/json是一个很好的做法。
  • ReplyTo: 通常用来对一个回调队列进行命名。
  • CorrelationId: 用于将RPC响应和请求进行关联。

    关联标识

    在上面提供的方法中,我们建议为每个 RPC 请求创建一个回调队列。这是非常低效的,但幸运的是有一个更好的方法 - 让我们为每个客户端创建一个回调队列。
    这样又有一个新问题,当我们从此队列里收到一个响应的时候并不清楚它是属于哪个请求的。这就是CorrelationId属性的用途所在了。我们为每一个请求将其设置为一个唯一值。稍后,当我们从回调队列里收到一条消息的时候,就可以通过这个属性来对响应和请求进行匹配。如果我们收到的消息的CorrelationId值是未知的,那就可以安心的把它丢弃掉,因为它并不属于我们的请求。

    总结

    Exchange-RPC - 图1
    我们的RPC看起来是这样的:

  • 当客户端启动时,会创建一个匿名的独占回调队列。

  • 客户端发送一条带有ReplyTo和CorrelationId两个属性的消息作为一个RPC请求,ReplyTo用于设置回调队列,CorrelationId用于为每一个请求设置一个独一无二的值。
  • 请求被发送到一个rpc_queue队列。
  • RPC工作者(也称为服务器)等待从那个队列中接受请求。当一个请求出现的时候,他会执行任务并且通过ReplyTo 属性所提及的队列来将带有执行结果的消息发回给客户端。
  • 客户端从回调队列那儿等待数据。当消息出现的时候,它会检查CorrelationId属性。如果属性值跟请求相匹配,就将响应返回给应用。

    公共类库

    1. Install-Package RabbitMQ.Client
    2. Install-Package Microsoft.Extensions.Configuration
    3. Install-Package Microsoft.Extensions.Configuration.Json
    ```csharp using Microsoft.Extensions.Configuration; using RabbitMQ.Client;

namespace My.RabbitMQ.Config;

public class MQConnection { public static IConnection CreateConnection() { var config = new ConfigurationBuilder() .AddInMemoryCollection() //缓存 .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile(“appsettings.json”, optional: true, reloadOnChange: true) .Build(); var appSetting = config.GetSection(“AppSetting:RabbitMQSetting”);

  1. var factory = new ConnectionFactory
  2. {
  3. HostName = appSetting.GetSection("HostName").Value,
  4. UserName = appSetting.GetSection("UserName").Value,
  5. Password = appSetting.GetSection("Password").Value,
  6. Port = 5672,
  7. //VirtualHost= "myRabbit"
  8. };
  9. return factory.CreateConnection();
  10. }

}

  1. <a name="OIcdN"></a>
  2. ## 公共部分
  3. ```csharp
  4. {
  5. "Logging": {
  6. "LogLevel": {
  7. "Default": "Information",
  8. "Microsoft.AspNetCore": "Warning"
  9. }
  10. },
  11. "AllowedHosts": "*",
  12. "AppSetting": {
  13. "SqlServerSetting": {
  14. "Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"
  15. },
  16. "MysqlSetting": {
  17. "Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"
  18. },
  19. "RedisSetting": {
  20. "Connection": "192.168.3.40:6379,password=",
  21. "Database": "test"
  22. },
  23. "RabbitMQSetting": {
  24. "HostName": "localhost",
  25. "UserName": "admin",
  26. "Password": "admin",
  27. "Port": 5672
  28. },
  29. "MongoDbSetting": {
  30. "Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0
  31. "Database": "test"
  32. }
  33. }
  34. }

RPC示例

我们声明我们的斐波那契函数。它仅假定有效的正整数输入。(不要指望这个适用于大数字,它可能是最慢的递归实现)。

RPC服务端

创建My.MQ.RPC.Server控制台项目

  1. // RPC--RPC服务端
  2. using My.RabbitMQ.Config;
  3. using RabbitMQ.Client;
  4. using RabbitMQ.Client.Events;
  5. using System.Text;
  6. //建立连接
  7. using (var connection = MQConnection.CreateConnection())
  8. //创建信道
  9. using (var channel = connection.CreateModel())
  10. {
  11. channel.QueueDeclare(queue: "rpc_queue", durable: false,
  12. exclusive: false, autoDelete: false, arguments: null);
  13. channel.BasicQos(0, 1, false);
  14. var consumer = new EventingBasicConsumer(channel);
  15. channel.BasicConsume(queue: "rpc_queue",
  16. autoAck: false, consumer: consumer);
  17. Console.WriteLine(" [x] Awaiting RPC requests");
  18. consumer.Received += (model, ea) =>
  19. {
  20. string response = null;
  21. var body = ea.Body.ToArray();
  22. var props = ea.BasicProperties;
  23. var replyProps = channel.CreateBasicProperties();
  24. replyProps.CorrelationId = props.CorrelationId;
  25. try
  26. {
  27. var message = Encoding.UTF8.GetString(body);
  28. int n = int.Parse(message);
  29. Console.WriteLine(" [.] fib({0})", message);
  30. response = fib(n).ToString();
  31. }
  32. catch (Exception e)
  33. {
  34. Console.WriteLine(" [.] " + e.Message);
  35. response = "";
  36. }
  37. finally
  38. {
  39. var responseBytes = Encoding.UTF8.GetBytes(response);
  40. channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
  41. basicProperties: replyProps, body: responseBytes);
  42. channel.BasicAck(deliveryTag: ea.DeliveryTag,
  43. multiple: false);
  44. }
  45. };
  46. Console.WriteLine(" Press [enter] to exit.");
  47. Console.ReadLine();
  48. }
  49. ///斐波那契任务
  50. static int fib(int n)
  51. {
  52. if (n == 0 || n == 1)
  53. {
  54. return n;
  55. }
  56. return fib(n - 1) + fib(n - 2);
  57. }

服务器代码很简单

  • 跟之前一样,一开始我们建立连接、信道,并且声明队列。
  • 可能我们会希望运行多个服务器进程。为了在多个服务器间均分负载,我们需要在channel.BasicQos中设置prefetchCount。
  • 我们使用BasicConsume去访问队列。然后我们注册一个投递处理程序,我们在这个处理程序中完成工作并发回响应。

    RPC 客户端

    创建My.MQ.RPC.Client控制台项目 ```csharp using System; using System.Collections.Concurrent; using System.Text; using My.RabbitMQ.Config; using RabbitMQ.Client; using RabbitMQ.Client.Events;

// RPC—RPC客户端 public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly BlockingCollection respQueue = new BlockingCollection(); private readonly IBasicProperties props;

  1. public RpcClient()
  2. {
  3. connection = MQConnection.CreateConnection();
  4. channel = connection.CreateModel();
  5. replyQueueName = channel.QueueDeclare().QueueName;
  6. consumer = new EventingBasicConsumer(channel);
  7. props = channel.CreateBasicProperties();
  8. var correlationId = Guid.NewGuid().ToString();
  9. props.CorrelationId = correlationId;
  10. props.ReplyTo = replyQueueName;
  11. consumer.Received += (model, ea) =>
  12. {
  13. var body = ea.Body.ToArray();
  14. var response = Encoding.UTF8.GetString(body);
  15. if (ea.BasicProperties.CorrelationId == correlationId)
  16. {
  17. respQueue.Add(response);
  18. }
  19. };
  20. channel.BasicConsume(
  21. consumer: consumer,
  22. queue: replyQueueName,
  23. autoAck: true);
  24. }
  25. public string Call(string message)
  26. {
  27. var messageBytes = Encoding.UTF8.GetBytes(message);
  28. channel.BasicPublish(
  29. exchange: "",
  30. routingKey: "rpc_queue",
  31. basicProperties: props,
  32. body: messageBytes);
  33. return respQueue.Take();
  34. }
  35. public void Close()
  36. {
  37. connection.Close();
  38. }

}

  1. ```csharp
  2. //RPC--生成客户端请求
  3. public class Program
  4. {
  5. public static void Main()
  6. {
  7. var rpcClient = new RpcClient();
  8. Console.WriteLine(" [x] Requesting fib(30)");
  9. var response = rpcClient.Call("30");
  10. Console.WriteLine(" [.] Got '{0}'", response);
  11. rpcClient.Close();
  12. }
  13. }

客户端代码稍显复杂:

  • 我们创建一个连接和信道,并且声明一个独享的callback队列用于回复。
  • 我们订阅callback队列,这样就可以接收到RPC的响应。
  • 我们的Call方法生成实际的RPC请求。
  • 这里,我们首先生成一个唯一的CorrelationId数字并且保存起来——整个循环都会使用这个值来获取相对应的响应。
  • 接下来,我们发送带有ReplyTo 和 CorrelationId属性的请求信息。
  • 此刻,我们可以坐等匹配的回应到来。
  • 整个循环所做的工作很简单,就是检查每个响应消息,看它们是不是我们需要的那个。如果是的话就将响应保存起来。
  • 最后我们将响应返回给用户。

    测试

    分别运行My.MQ.RPC.Server和My.MQ.RPC.Client
    1663406593687.png
    这里所呈现的设计方式并不是实现RPC服务的唯一方法,但是它具备一些重要的优势:

  • 如果RPC服务器过慢,你可以通过再运行一个服务器来进行扩展。试试在一个新的控制台中运行第二个RPCServer吧。

  • 在客户端这边,RPC只需要发送和接收一条消息。不需要类似QueueDeclare这样的同步调用。因此RPC客户端在一次RPC请求中,只需要进行一次网络的往返。

我们的代码依旧很简洁,并且只去尝试解决重要的而不是更加复杂的问题,比如:

  • 如果没有服务器运行,客户端是不是要做出反应?
  • 客户端是不是需要有针对RPC的某种超时设置?
  • 如果服务器发生故障,抛出异常,是不是需要转发给客户端?
  • 在进行处理之前防止无效的消息传入(比如检查边界、类型)。