官网:https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html
学习资料:https://github.com/sheng-jie/RabbitMQ
发送和接收消息
Publish/Subscribe
在这一部分中,我们将做一些完全不同的事情 - 我们将向多个消费者传递信息。此模式称为“发布/订阅”。
为了说明这种模式,我们将构建一个简单的日志记录系统。它将由两个程序组成 - 第一个程序将发出日志消息,第二个程序将接收并打印它们。
在我们的日志记录系统中,接收程序的每个运行副本都将获得消息。这样,我们将能够运行一个接收器并将日志定向到磁盘;同时,我们将能够运行另一个接收器并在屏幕上看到日志。
从本质上讲,已发布的日志消息将广播到所有接收方。
Exchange介绍
细心的你也许发现上面的demo,生产者和消费者直接是通过相同队列名称进行匹配衔接的。消费者订阅某个队列,生产者创建消息发布到队列中,队列再将消息转发到订阅的消费者。这样就会有一个局限性,即消费者一次只能发送消息到某一个队列。
那消费者如何才能发送消息到多个消息队列呢? RabbitMQ提供了Exchange,它类似于路由器的功能,它用于对消息进行路由,将消息发送到多个队列上。Exchange一方面从生产者接收消息,另一方面将消息推送到队列。但exchange必须知道如何处理接收到的消息,是将其附加到特定队列还是附加到多个队列,还是直接忽略。而这些规则由exchange type定义,exchange的原理如下图所示。 
常见的exchange type 有以下几种:
- direct 直连交换机(明确的路由规则:消费端绑定的队列名称必须和消息发布时指定的路由名称一致)
- topic 主题交换机(模式匹配的路由规则:支持通配符)
- fanout 扇形交换机(消息广播,将消息分发到exchange上绑定的所有队列上)
默认交换机
之前的示例代码中,我们对交换机还一无所知,但是依然能将消息发送给队列。原因是我们使用了用空字符串(“”)来标示的默认交换机。
第一个参数就是交换机的名字。空字符串用来表示默认或者无名交换机:如果队列存在的话,消息会依据路由键(routingKey)所指定的名称路由到队列中。var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "",routingKey: "hello",basicProperties: null,body: body);
临时队列
你可能还记得我们上次使用的是命名过的队列(还记得hello和task_queue吗?)。可以对队列进行命名对我们来说是至关重要的——我们需要将工作者指向同一个队列。当你想在多个生产者和消费者之间共享一个队列时,给队列起个名字是很重要的。
但是我们的日志系统不需要如此。我们希望了解所有的消息,而不是其中的一个子集。而且我们只对当前正在流动的消息感兴趣,而不是那些老的消息。所以我们需要做两件事情来解决这个问题。
首先,如论我们何时连接到Rabbit,我们需要的是一个新鲜的空队列。想要做到这点,我们可以创建一个随机命名的队列,或者更简单一点——让服务器为我们选择一个随机队列名称。
其次,一旦消费者断开连接,队列需要被自动删除。
在.NET客户端中,当我们不给QueueDeclare()提供参数的情况下,就可以创建一个非持久化、独享的、可自动删除的拥有生成名称的队列。
此时,queueName包含的是一个随机的队列名称。看起来可能会类似于amq.gen-JzTY20BRgKO-HjmUJj0wLg这样。var queueName = channel.QueueDeclare().QueueName;
绑定队列

当我们创建一个扇形交换机和一个队列。现在我们需要通知交换机将消息发送给我们的队列。交换机和队列之间的这种关系称为绑定(binding)。
这样,logs交换机会将消息追加到我们的队列当中。var queueName = channel.QueueDeclare().QueueName;channel.QueueBind(queue: queueName,exchange: "logs",routingKey: "");
公共类库
```csharp using Microsoft.Extensions.Configuration; using RabbitMQ.Client;Install-Package RabbitMQ.ClientInstall-Package Microsoft.Extensions.ConfigurationInstall-Package Microsoft.Extensions.Configuration.Json
namespace My.RabbitMQ.Config;
public class MQConnection { public static IConnection CreateConnection() { var config = new ConfigurationBuilder() .AddInMemoryCollection() //缓存 .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile(“appsettings.json”, optional: true, reloadOnChange: true) .Build(); var appSetting = config.GetSection(“AppSetting:RabbitMQSetting”);
var factory = new ConnectionFactory{HostName = appSetting.GetSection("HostName").Value,UserName = appSetting.GetSection("UserName").Value,Password = appSetting.GetSection("Password").Value,Port = 5672,//VirtualHost= "myRabbit"};return factory.CreateConnection();}
}
<a name="OIcdN"></a>## 公共部分```csharp{"Logging": {"LogLevel": {"Default": "Information","Microsoft.AspNetCore": "Warning"}},"AllowedHosts": "*","AppSetting": {"SqlServerSetting": {"Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"},"MysqlSetting": {"Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"},"RedisSetting": {"Connection": "192.168.3.40:6379,password=","Database": "test"},"RabbitMQSetting": {"HostName": "localhost","UserName": "admin","Password": "admin","Port": 5672},"MongoDbSetting": {"Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0"Database": "test"}}}
日志消息示例
消息模型

用来发送日志消息的生产者程序看起来跟上个教程中的没多大区别。最重大的改变是,现在我们希望将消息发布到logs交换机而不是未命名的那个。发送的时候我们需要提供一个routingKey,但是它的值会被扇形交换机忽略掉。我们创建一个My.MQ.EmitLog.Demo的控制台项目:
Install-Package RabbitMQ.Client
声明交换机
// 日志消息示例using My.RabbitMQ.Config;using RabbitMQ.Client;using System.Text;//建立连接using (var connection = MQConnection.CreateConnection())//创建信道using (var channel = connection.CreateModel()){//声明交换机类型为Fanout,交换机名称为"logs"的交换机channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);//构建byte消息数据包var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);//发送数据包channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);Console.WriteLine(" [x] Sent {0}", message);}Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();static string GetMessage(string[] args){return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!");}
运行程序后,我们在MQ控制台可以查看到,在交换机列表里多了一个名称为”logs”的交换机。
如果还没有队列绑定到交换,消息将丢失,但这对我们来说没关系;如果没有消费者在听,我们可以安全地丢弃消息。
绑定交换机
我们继续创建一个控制台名为My.MQ.ReceiveLogs.Demo的项目,将队列绑定到指定的交换机上.
Install-Package RabbitMQ.Client
// 队列绑定交换机//实例化连接using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;using My.RabbitMQ.Config;//建立连接using (var connection = MQConnection.CreateConnection())using (var channel = connection.CreateModel()){//声明交换机类型为Fanout,交换机名称为"logs"的交换机channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);// 声明一个以服务器命名的队列var queueName = channel.QueueDeclare(queue: "").QueueName;//交换机"logs"绑定一个队列channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");Console.WriteLine(" [*] Waiting for logs.");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] {0}", message);};//启动一个基本内容类消费者channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}
测试
分别运行My.MQ.ReceiveLogs.Demo和My.MQ.EmitLog.Demo,
如果你想要将日志保存到一个文件中,先运行My.MQ.EmitLog.Demo,需要在控制台中输入:
logs_from_rabbit.log
几个核心概念:
- Publisher:生产者,消息的发送方。
- Connection:网络连接。
- Channel:信道,多路复用连接中的一条独立的双向数据流通道。
- Exchange:交换器(路由器),负责消息的路由到相应队列。
- Binding:队列与交换器间的关联绑定。消费者将关注的队列绑定到指定交换器上,以便Exchange能准确分发消息到指定队列。
- Queue:队列,消息的缓冲存储区。
- Virtual Host:虚拟主机,虚拟主机提供资源的逻辑分组和分离。包含连接,交换,队列,绑定,用户权限,策略等。
- Broker:消息队列的服务器实体。
- Consumer:消费者,消息的接收方。
