官网:https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html
学习资料:https://github.com/sheng-jie/RabbitMQ

发送和接收消息

Publish/Subscribe

在这一部分中,我们将做一些完全不同的事情 - 我们将向多个消费者传递信息。此模式称为“发布/订阅”。
为了说明这种模式,我们将构建一个简单的日志记录系统。它将由两个程序组成 - 第一个程序将发出日志消息,第二个程序将接收并打印它们。
在我们的日志记录系统中,接收程序的每个运行副本都将获得消息。这样,我们将能够运行一个接收器并将日志定向到磁盘;同时,我们将能够运行另一个接收器并在屏幕上看到日志。
从本质上讲,已发布的日志消息将广播到所有接收方。

Exchange介绍

细心的你也许发现上面的demo,生产者和消费者直接是通过相同队列名称进行匹配衔接的。消费者订阅某个队列,生产者创建消息发布到队列中,队列再将消息转发到订阅的消费者。这样就会有一个局限性,即消费者一次只能发送消息到某一个队列。
那消费者如何才能发送消息到多个消息队列呢? RabbitMQ提供了Exchange,它类似于路由器的功能,它用于对消息进行路由,将消息发送到多个队列上。Exchange一方面从生产者接收消息,另一方面将消息推送到队列。但exchange必须知道如何处理接收到的消息,是将其附加到特定队列还是附加到多个队列,还是直接忽略。而这些规则由exchange type定义,exchange的原理如下图所示。 RabbitMQ学习二 - 图1
常见的exchange type 有以下几种:

  • direct 直连交换机(明确的路由规则:消费端绑定的队列名称必须和消息发布时指定的路由名称一致)
  • topic 主题交换机(模式匹配的路由规则:支持通配符)
  • fanout 扇形交换机(消息广播,将消息分发到exchange上绑定的所有队列上)

    默认交换机

    之前的示例代码中,我们对交换机还一无所知,但是依然能将消息发送给队列。原因是我们使用了用空字符串(“”)来标示的默认交换机。
    1. var message = GetMessage(args);
    2. var body = Encoding.UTF8.GetBytes(message);
    3. channel.BasicPublish(exchange: "",
    4. routingKey: "hello",
    5. basicProperties: null,
    6. body: body);
    第一个参数就是交换机的名字。空字符串用来表示默认或者无名交换机:如果队列存在的话,消息会依据路由键(routingKey)所指定的名称路由到队列中。

    临时队列

    你可能还记得我们上次使用的是命名过的队列(还记得hello和task_queue吗?)。可以对队列进行命名对我们来说是至关重要的——我们需要将工作者指向同一个队列。当你想在多个生产者和消费者之间共享一个队列时,给队列起个名字是很重要的。
    但是我们的日志系统不需要如此。我们希望了解所有的消息,而不是其中的一个子集。而且我们只对当前正在流动的消息感兴趣,而不是那些老的消息。所以我们需要做两件事情来解决这个问题。
    首先,如论我们何时连接到Rabbit,我们需要的是一个新鲜的空队列。想要做到这点,我们可以创建一个随机命名的队列,或者更简单一点——让服务器为我们选择一个随机队列名称。
    其次,一旦消费者断开连接,队列需要被自动删除。
    在.NET客户端中,当我们不给QueueDeclare()提供参数的情况下,就可以创建一个非持久化、独享的、可自动删除的拥有生成名称的队列。
    1. var queueName = channel.QueueDeclare().QueueName;
    此时,queueName包含的是一个随机的队列名称。看起来可能会类似于amq.gen-JzTY20BRgKO-HjmUJj0wLg这样。

    绑定队列

    RabbitMQ学习二 - 图2
    当我们创建一个扇形交换机和一个队列。现在我们需要通知交换机将消息发送给我们的队列。交换机和队列之间的这种关系称为绑定(binding)。
    1. var queueName = channel.QueueDeclare().QueueName;
    2. channel.QueueBind(queue: queueName,
    3. exchange: "logs",
    4. routingKey: "");
    这样,logs交换机会将消息追加到我们的队列当中。

    公共类库

    1. Install-Package RabbitMQ.Client
    2. Install-Package Microsoft.Extensions.Configuration
    3. Install-Package Microsoft.Extensions.Configuration.Json
    ```csharp using Microsoft.Extensions.Configuration; using RabbitMQ.Client;

namespace My.RabbitMQ.Config;

public class MQConnection { public static IConnection CreateConnection() { var config = new ConfigurationBuilder() .AddInMemoryCollection() //缓存 .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile(“appsettings.json”, optional: true, reloadOnChange: true) .Build(); var appSetting = config.GetSection(“AppSetting:RabbitMQSetting”);

  1. var factory = new ConnectionFactory
  2. {
  3. HostName = appSetting.GetSection("HostName").Value,
  4. UserName = appSetting.GetSection("UserName").Value,
  5. Password = appSetting.GetSection("Password").Value,
  6. Port = 5672,
  7. //VirtualHost= "myRabbit"
  8. };
  9. return factory.CreateConnection();
  10. }

}

  1. <a name="OIcdN"></a>
  2. ## 公共部分
  3. ```csharp
  4. {
  5. "Logging": {
  6. "LogLevel": {
  7. "Default": "Information",
  8. "Microsoft.AspNetCore": "Warning"
  9. }
  10. },
  11. "AllowedHosts": "*",
  12. "AppSetting": {
  13. "SqlServerSetting": {
  14. "Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"
  15. },
  16. "MysqlSetting": {
  17. "Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"
  18. },
  19. "RedisSetting": {
  20. "Connection": "192.168.3.40:6379,password=",
  21. "Database": "test"
  22. },
  23. "RabbitMQSetting": {
  24. "HostName": "localhost",
  25. "UserName": "admin",
  26. "Password": "admin",
  27. "Port": 5672
  28. },
  29. "MongoDbSetting": {
  30. "Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0
  31. "Database": "test"
  32. }
  33. }
  34. }

日志消息示例

消息模型

RabbitMQ学习二 - 图3
用来发送日志消息的生产者程序看起来跟上个教程中的没多大区别。最重大的改变是,现在我们希望将消息发布到logs交换机而不是未命名的那个。发送的时候我们需要提供一个routingKey,但是它的值会被扇形交换机忽略掉。我们创建一个My.MQ.EmitLog.Demo的控制台项目:

  1. Install-Package RabbitMQ.Client

声明交换机

  1. // 日志消息示例
  2. using My.RabbitMQ.Config;
  3. using RabbitMQ.Client;
  4. using System.Text;
  5. //建立连接
  6. using (var connection = MQConnection.CreateConnection())
  7. //创建信道
  8. using (var channel = connection.CreateModel())
  9. {
  10. //声明交换机类型为Fanout,交换机名称为"logs"的交换机
  11. channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
  12. //构建byte消息数据包
  13. var message = GetMessage(args);
  14. var body = Encoding.UTF8.GetBytes(message);
  15. //发送数据包
  16. channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
  17. Console.WriteLine(" [x] Sent {0}", message);
  18. }
  19. Console.WriteLine(" Press [enter] to exit.");
  20. Console.ReadLine();
  21. static string GetMessage(string[] args)
  22. {
  23. return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!");
  24. }

运行程序后,我们在MQ控制台可以查看到,在交换机列表里多了一个名称为”logs”的交换机。
1663380561376.png
如果还没有队列绑定到交换,消息将丢失,但这对我们来说没关系;如果没有消费者在听,我们可以安全地丢弃消息。

绑定交换机

我们继续创建一个控制台名为My.MQ.ReceiveLogs.Demo的项目,将队列绑定到指定的交换机上.

  1. Install-Package RabbitMQ.Client
  1. // 队列绑定交换机
  2. //实例化连接
  3. using RabbitMQ.Client;
  4. using RabbitMQ.Client.Events;
  5. using System.Text;
  6. using My.RabbitMQ.Config;
  7. //建立连接
  8. using (var connection = MQConnection.CreateConnection())
  9. using (var channel = connection.CreateModel())
  10. {
  11. //声明交换机类型为Fanout,交换机名称为"logs"的交换机
  12. channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
  13. // 声明一个以服务器命名的队列
  14. var queueName = channel.QueueDeclare(queue: "").QueueName;
  15. //交换机"logs"绑定一个队列
  16. channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
  17. Console.WriteLine(" [*] Waiting for logs.");
  18. var consumer = new EventingBasicConsumer(channel);
  19. consumer.Received += (model, ea) =>
  20. {
  21. byte[] body = ea.Body.ToArray();
  22. var message = Encoding.UTF8.GetString(body);
  23. Console.WriteLine(" [x] {0}", message);
  24. };
  25. //启动一个基本内容类消费者
  26. channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
  27. Console.WriteLine(" Press [enter] to exit.");
  28. Console.ReadLine();
  29. }

测试

分别运行My.MQ.ReceiveLogs.Demo和My.MQ.EmitLog.Demo,
1663381662845.png
如果你想要将日志保存到一个文件中,先运行My.MQ.EmitLog.Demo,需要在控制台中输入:

  1. logs_from_rabbit.log

再运行My.MQ.ReceiveLogs.Demo。

几个核心概念:

  1. Publisher:生产者,消息的发送方。
  2. Connection:网络连接。
  3. Channel:信道,多路复用连接中的一条独立的双向数据流通道。
  4. Exchange:交换器(路由器),负责消息的路由到相应队列。
  5. Binding:队列与交换器间的关联绑定。消费者将关注的队列绑定到指定交换器上,以便Exchange能准确分发消息到指定队列。
  6. Queue:队列,消息的缓冲存储区。
  7. Virtual Host:虚拟主机,虚拟主机提供资源的逻辑分组和分离。包含连接,交换,队列,绑定,用户权限,策略等。
  8. Broker:消息队列的服务器实体。
  9. Consumer:消费者,消息的接收方。