官网:https://www.rabbitmq.com/getstarted.html
学习资料:https://github.com/sheng-jie/RabbitMQ
发送和接收消息
消息模型
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,队列再将消息发送到监听的消费者。
下面我们我们通过demo来了解RabbitMQ的基本用法。分别创建两个控制台项目My.RabbitMQ.Send、My.RabbitMQ.Receive。
公共类库
Install-Package RabbitMQ.ClientInstall-Package Microsoft.Extensions.ConfigurationInstall-Package Microsoft.Extensions.Configuration.Json
using Microsoft.Extensions.Configuration;using RabbitMQ.Client;namespace My.RabbitMQ.Config;public class MQConnection{public static IConnection CreateConnection() {var config = new ConfigurationBuilder().AddInMemoryCollection() //缓存.SetBasePath(Directory.GetCurrentDirectory()).AddJsonFile("appsettings.json", optional: true, reloadOnChange: true).Build();var appSetting = config.GetSection("AppSetting:RabbitMQSetting");var factory = new ConnectionFactory{HostName = appSetting.GetSection("HostName").Value,UserName = appSetting.GetSection("UserName").Value,Password = appSetting.GetSection("Password").Value,Port = 5672,//VirtualHost= "myRabbit"};return factory.CreateConnection();}}
公共部分
{"Logging": {"LogLevel": {"Default": "Information","Microsoft.AspNetCore": "Warning"}},"AllowedHosts": "*","AppSetting": {"SqlServerSetting": {"Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"},"MysqlSetting": {"Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"},"RedisSetting": {"Connection": "192.168.3.40:6379,password=","Database": "test"},"RabbitMQSetting": {"HostName": "localhost","UserName": "admin","Password": "admin","Port": 5672},"MongoDbSetting": {"Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0"Database": "test"}}}
消息发送
消息发送在My.RabbitMQ.Send
Install-Package RabbitMQ.Client
//发送消息using My.RabbitMQ.Config;using RabbitMQ.Client;using System.Text;//建立连接using (var connection = MQConnection.CreateConnection())//创建信道using (var channel = connection.CreateModel()){//申明队列,队列名称(queue)为"hello"channel.QueueDeclare("hello", durable: false, exclusive: false, autoDelete: false, arguments: null);//构建byte消息数据包string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";var body = Encoding.UTF8.GetBytes(message);//发送数据包channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);Console.WriteLine(" [x] Sent {0}", message);}
消息接收
消息接收端在My.RabbitMQ.Receive
Install-Package RabbitMQ.Client
//消息接收//1.实例化连接工厂using RabbitMQ.Client.Events;using RabbitMQ.Client;using System.Text;using My.RabbitMQ.Config;//建立连接using (var connection = MQConnection.CreateConnection())//3. 创建信道using (var channel = connection.CreateModel()){//4. 申明队列,队列名称(queue)为"hello"channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);//5. 构造消费者实例var consumer = new EventingBasicConsumer(channel);//6. 绑定消息接收后的事件委托consumer.Received += (model, ea) =>{//报错//var message = Encoding.UTF8.GetString(ea.Body);var body = ea.Body.ToArray(); // 将内存区域的内容复制到一个新的数组中//var body = ea.Body.Span; // 从内存区域获取一个跨度var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);Thread.Sleep(6000);//模拟耗时Console.WriteLine(" [x] Done");};//7. 启动消费者channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}
测试
先运行消息接收端,再运行消息发送端(ctrl+F5),结果如下图。
从上面的代码中可以看出,发送端和消费端的代码前4步都是一样的。主要的区别在于发送端调用channel.BasicPublish方法发送消息;而接收端需要实例化一个EventingBasicConsumer实例来进行消息处理逻辑。另外一点需要注意的是:消息接收端和发送端的队列名称(queue)必须保持一致,这里指定的队列名称为hello。
工作队列
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反,我们将任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作线程时,任务将在它们之间共享。
这个概念在 Web 应用程序中特别有用,因为在 Web 应用程序中,不可能在较短的 HTTP 请求窗口中处理复杂的任务。
使用工作队列的好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了。我们先启动两个接收端,等待消息接收,再启动一个发送端进行消息发送。
消息模型

我们需要生成两个项目,My.RabbitMQNewTask,My.RabbitMQ.Worker
My.RabbitMQNewTask
Install-Package RabbitMQ.Client
//发送消息using My.RabbitMQ.Config;using RabbitMQ.Client;using System.Text;//建立连接using (var connection = MQConnection.CreateConnection())//创建信道using (var channel = connection.CreateModel()){//申明队列,队列名称(queue)为"hello"//channel.QueueDeclare(queue: "hello",// durable: false,// exclusive: false,// autoDelete: false,// arguments: null);//申明队列,队列名称(queue)为"hello",指定durable:true,告知rabbitmq对消息进行持久化channel.QueueDeclare(queue: "durable_task",durable: true,exclusive: false,autoDelete: false,arguments: null);//构建byte消息数据包//string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";//消息从控制台输入后var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);//将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为truevar properties = channel.CreateBasicProperties();properties.Persistent = true;//发送数据包channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);Console.WriteLine(" [x] Sent {0}", message);}//获取消息string GetMessage(string[] args){return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");}
My.RabbitMQ.Worker
Install-Package RabbitMQ.Client
//消息接收//1.实例化连接工厂using RabbitMQ.Client.Events;using RabbitMQ.Client;using System.Text;using My.RabbitMQ.Config;//建立连接using (var connection = MQConnection.CreateConnection())//创建信道using (var channel = connection.CreateModel()){//申明队列,队列名称(queue)为"hello"//channel.QueueDeclare(queue: "hello",// durable: false,// exclusive: false,// autoDelete: false,// arguments: null);channel.QueueDeclare(queue: "durable_task",durable: true,exclusive: false,autoDelete: false,arguments: null);//设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//构造消费者实例var consumer = new EventingBasicConsumer(channel);//绑定消息接收后的事件委托consumer.Received += (model, ea) =>{//报错//var message = Encoding.UTF8.GetString(ea.Body);var body = ea.Body.ToArray(); // 将内存区域的内容复制到一个新的数组中//var body = ea.Body.Span; // 从内存区域获取一个跨度var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);//添加假任务以模拟执行时间int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 6000);Console.WriteLine(" [x] Done");// 发送消息确认信号(手动消息确认)channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};//启动消费者//autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕//autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}
轮循机制调度测试
使用任务队列的优点之一是能够轻松并行化工作。如果我们正在建立积压的工作,我们可以添加更多的Worker,这样,就可以轻松扩展。
首先,让我们尝试同时运行两个 Worker 实例。它们都会从队列中获取消息。切换到My.RabbitMQ.Worker,按ctrl+F5两次,运行两个 Worker 实例。
再切换到My.RabbitMQNewTask,按多次ctrl+F5,发布一些消息。两个 Worker 实例接收到的消息。
默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个使用者。平均而言,每个消费者将收到相同数量的消息。这种分发消息的方式称为轮循机制。与三个或更多 Worker一起尝试。
消息确认
按照我们上面的demo,一旦RabbitMQ将消息发送到消费端,消息就会立即从内存中移出,无论消费端是否处理完成。在这种情况下,消息就会丢失。
执行任务可能需要几秒钟。您可能想知道,如果其中一个消费者开始一项长期任务,并且只完成了部分任务,会发生什么。使用我们当前的代码,一旦 RabbitMQ 向使用者发送消息,它就会立即将其标记为删除。在这种情况下,如果您杀死一个Worker,我们将丢失它正在处理的消息。我们还将丢失已分派给此特定Worker但尚未处理的所有消息。
但我们不想失去任何任务。如果一个工人死亡,我们希望将任务交付给另一个Worker。
为了确保一个消息永远不会丢失,RabbitMQ支持消息确认(message acknowledgments)。当消费端接收消息并且处理完成后,会发送一个ack(消息确认)信号到RabbitMQ,RabbitMQ接收到这个信号后,就可以删除掉这条已经处理的消息任务。
如果消费端挂掉了(比如,通道关闭、连接丢失等)没有发送ack信号。RabbitMQ就会明白某个消息没有正常处理,RabbitMQ将会重新将消息入队,如果有另外一个消费端在线,就会快速的重新发送到另外一个消费端。通过这种方式,您可以确保即使Worker偶尔死亡,也不会丢失任何消息。
对消费者传递确认强制执行超时(默认为 30 分钟)。这有助于检测从不确认交货的错误(卡住)消费者。您可以增加此超时,如送达确认超时中所述。
默认情况下,手动消息确认处于打开状态。在前面的示例中,我们通过将自动确认模式(“自动确认模式”)参数设置为 true 来显式关闭它们。完成任务后,是时候删除此标志并手动发送来自工作线程的适当确认了。
在现有的写内联之后,添加对基本确认的调用,并使用autoAck: false更新基本消费:
Console.WriteLine(" [x] Done");// Note: it is possible to access the channel via// ((EventingBasicConsumer)sender).Model herechannel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: "hello",autoAck: false,consumer: consumer);
示例代码
以第二个示例代码为基础代码,微调下My.RabbitMQ.Worker中的代码逻辑:
//消息接收//1.实例化连接工厂using RabbitMQ.Client.Events;using RabbitMQ.Client;using System.Text;using My.RabbitMQ.Config;//建立连接using (var connection = MQConnection.CreateConnection()){//3. 创建信道using (var channel = connection.CreateModel()){//4. 申明队列,队列名称(queue)为"hello"channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);//5. 构造消费者实例var consumer = new EventingBasicConsumer(channel);//6. 绑定消息接收后的事件委托consumer.Received += (model, ea) =>{//报错//var message = Encoding.UTF8.GetString(ea.Body);var body = ea.Body.ToArray(); // 将内存区域的内容复制到一个新的数组中//var body = ea.Body.Span; // 从内存区域获取一个跨度var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);//添加假任务以模拟执行时间int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done");// 发送消息确认信号(手动消息确认)channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};//7. 启动消费者//autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕//autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}
主要改动的是将 autoAck:true修改为autoAck:fasle,以及在消息处理完毕后手动调用BasicAck方法进行手动消息确认。
必须在接收传递的同一通道上发送确认。尝试使用其他通道进行确认将导致通道级协议异常。请参阅有关确认的文档指南以了解更多信息。
测试
使用任务队列的优点之一是能够轻松并行化工作。如果我们正在建立积压的工作,我们可以添加更多的Worker,这样,就可以轻松扩展。
首先,让我们尝试同时运行两个 Worker 实例。它们都会从队列中获取消息。切换到My.RabbitMQ.Worker,按ctrl+F5两次,运行两个 Worker 实例。
再切换到My.RabbitMQNewTask,按多次ctrl+F5,发布一些消息。两个 Worker 实例接收到的消息。
消息持久性
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍将丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。
首先,我们需要确保队列在 RabbitMQ 节点重新启动后仍能存活下来。为此,我们需要将其声明为持久:
- 指定durable:true,告知rabbitmq对消息进行持久化
- 将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
在现有的 GetBytes 之后,将“properties.Persistent”设置为 true: ```csharp var body = Encoding.UTF8.GetBytes(message);channel.QueueDeclare(queue: "durable_task",durable: true,exclusive: false,autoDelete: false,arguments: null);
var properties = channel.CreateBasicProperties(); properties.Persistent = true;
<a name="uvKmn"></a>### 示例在第二个示例的基础代码上修改。<br />修改My.RabbitMQNewTask项目的Program.cs,因为原来的hello队列名称已经在使用,所以需要更改一个新的队列名称,新的队列名称更改为durable_task。```csharp//发送消息using My.RabbitMQ.Config;using RabbitMQ.Client;using System.Text;//建立连接using (var connection = MQConnection.CreateConnection()){//创建信道using (var channel = connection.CreateModel()){//申明队列,队列名称(queue)为"hello"//channel.QueueDeclare(queue: "hello",// durable: false,// exclusive: false,// autoDelete: false,// arguments: null);//申明队列,队列名称(queue)为"hello",指定durable:true,告知rabbitmq对消息进行持久化channel.QueueDeclare(queue: "durable_task",durable: true,exclusive: false,autoDelete: false,arguments: null);//构建byte消息数据包//string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";//消息从控制台输入后var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);//将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为truevar properties = channel.CreateBasicProperties();properties.Persistent = true;//发送数据包channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);Console.WriteLine(" [x] Sent {0}", message);}}//获取消息string GetMessage(string[] args){return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");}
修改My.RabbitMQ.Worker项目的Program.cs,因为原来的hello队列名称已经在使用,所以需要更改一个新的队列名称,新的队列名称更改为durable_task。
//消息接收//1.实例化连接工厂using RabbitMQ.Client.Events;using RabbitMQ.Client;using System.Text;//实例化连接var factory = new ConnectionFactory{HostName = "localhost",UserName = "admin",Password = "admin",Port = 5672,//VirtualHost= "myRabbit"};//建立连接using (var connection = factory.CreateConnection()){//创建信道using (var channel = connection.CreateModel()){//申明队列,队列名称(queue)为"hello"//channel.QueueDeclare(queue: "hello",// durable: false,// exclusive: false,// autoDelete: false,// arguments: null);channel.QueueDeclare(queue: "durable_task",durable: true,exclusive: false,autoDelete: false,arguments: null);//构造消费者实例var consumer = new EventingBasicConsumer(channel);//绑定消息接收后的事件委托consumer.Received += (model, ea) =>{//报错//var message = Encoding.UTF8.GetString(ea.Body);var body = ea.Body.ToArray(); // 将内存区域的内容复制到一个新的数组中//var body = ea.Body.Span; // 从内存区域获取一个跨度var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);//添加假任务以模拟执行时间int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done");// 发送消息确认信号(手动消息确认)channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};//启动消费者//autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕//autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}
关于消息持久性的说明
将邮件标记为持久性并不能完全保证邮件不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但当 RabbitMQ 接受消息但尚未保存消息时,仍有一个较短的时间窗口。另外, RabbitMQ 不会对每条消息都执行 fsync(2) — 它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果您需要更强的保证,则可以使用发布者确认。
测试
我们先运行两次My.RabbitMQNewTask,发布两条消息,然后关闭My.RabbitMQNewTask。
再运行My.RabbitMQ.Worker,开始消费持久性的消息。
公平调度
RabbitMQ的消息分发默认按照消费端的数量,按顺序循环分发。这样仅是确保了消费端被平均分发消息的数量,但却忽略了消费端的闲忙情况。这就可能出现某个消费端一直处理耗时任务处于阻塞状态,某个消费端一直处理一般任务处于空置状态,而只是它们分配的任务数量一样。
为了更改此行为,我们可以将 BasicQos 方法与预取计数 = 1 设置结合使用。这告诉 RabbitMQ 不要一次向一个工作线程提供多个消息。或者,换句话说,在工作人员处理并确认前一条消息之前,不要向该工作人员发送新消息。相反,它会将其分派给下一个不忙的工作线程。
在工作线程中的现有队列声明之后添加对BasicQos的调用:
channel.QueueDeclare(queue: "durable_task",durable: true,exclusive: false,autoDelete: false,arguments: null);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
这时你需要注意的是如果所有的消费端都处于忙碌状态,你的队列可能会被塞满。你需要注意这一点,要么添加更多的消费端,要么采取其他策略。
示例
在上面的基础代码下,修改My.RabbitMQ.Worker
//消息接收//1.实例化连接工厂using RabbitMQ.Client.Events;using RabbitMQ.Client;using System.Text;using My.RabbitMQ.Config;//建立连接using (var connection = MQConnection.CreateConnection()){//创建信道using (var channel = connection.CreateModel()){//申明队列,队列名称(queue)为"hello"//channel.QueueDeclare(queue: "hello",// durable: false,// exclusive: false,// autoDelete: false,// arguments: null);channel.QueueDeclare(queue: "durable_task",durable: true,exclusive: false,autoDelete: false,arguments: null);//设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//构造消费者实例var consumer = new EventingBasicConsumer(channel);//绑定消息接收后的事件委托consumer.Received += (model, ea) =>{//报错//var message = Encoding.UTF8.GetString(ea.Body);var body = ea.Body.ToArray(); // 将内存区域的内容复制到一个新的数组中//var body = ea.Body.Span; // 从内存区域获取一个跨度var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);//添加假任务以模拟执行时间int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 3000);Console.WriteLine(" [x] Done");// 发送消息确认信号(手动消息确认)channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};//启动消费者//autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕//autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}
测试
我们先运行多次My.RabbitMQ.Worker,如同时运行3个Worker实例。
再开始多次My.RabbitMQNewTask,发布多条消息。
问题1
参数 1: 无法从“System.ReadOnlyMemory
var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var message = Encoding.UTF8.GetString(body); // <------错误点Console.WriteLine(" [x] Received {0}", message);};
解决办法
var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray(); // 将内存区域的内容复制到一个新的数组中var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);};
var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.Span; // 从内存区域获取一个跨度var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);};
