官网:https://www.rabbitmq.com/getstarted.html
学习资料:https://github.com/sheng-jie/RabbitMQ

发送和接收消息

消息模型

RabbitMQ 学习一 - 图1
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,队列再将消息发送到监听的消费者。
下面我们我们通过demo来了解RabbitMQ的基本用法。分别创建两个控制台项目My.RabbitMQ.Send、My.RabbitMQ.Receive。

公共类库

  1. Install-Package RabbitMQ.Client
  2. Install-Package Microsoft.Extensions.Configuration
  3. Install-Package Microsoft.Extensions.Configuration.Json
  1. using Microsoft.Extensions.Configuration;
  2. using RabbitMQ.Client;
  3. namespace My.RabbitMQ.Config;
  4. public class MQConnection
  5. {
  6. public static IConnection CreateConnection() {
  7. var config = new ConfigurationBuilder()
  8. .AddInMemoryCollection() //缓存
  9. .SetBasePath(Directory.GetCurrentDirectory())
  10. .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
  11. .Build();
  12. var appSetting = config.GetSection("AppSetting:RabbitMQSetting");
  13. var factory = new ConnectionFactory
  14. {
  15. HostName = appSetting.GetSection("HostName").Value,
  16. UserName = appSetting.GetSection("UserName").Value,
  17. Password = appSetting.GetSection("Password").Value,
  18. Port = 5672,
  19. //VirtualHost= "myRabbit"
  20. };
  21. return factory.CreateConnection();
  22. }
  23. }

公共部分

  1. {
  2. "Logging": {
  3. "LogLevel": {
  4. "Default": "Information",
  5. "Microsoft.AspNetCore": "Warning"
  6. }
  7. },
  8. "AllowedHosts": "*",
  9. "AppSetting": {
  10. "SqlServerSetting": {
  11. "Connection": "Server=192.168.3.40;Database=webdemo;User=sa;Password=longfuchu;"
  12. },
  13. "MysqlSetting": {
  14. "Connection": "server=192.168.3.40;port=3306;user=root;password=123456;database=webdemo;charset=utf8;Allow Zero Datetime=true;sslmode=none;Old Guids=true;Allow User Variables=True"
  15. },
  16. "RedisSetting": {
  17. "Connection": "192.168.3.40:6379,password=",
  18. "Database": "test"
  19. },
  20. "RabbitMQSetting": {
  21. "HostName": "localhost",
  22. "UserName": "admin",
  23. "Password": "admin",
  24. "Port": 5672
  25. },
  26. "MongoDbSetting": {
  27. "Connection": "mongodb://192.168.3.40:2717", ///?replicaSet=rs0
  28. "Database": "test"
  29. }
  30. }
  31. }

消息发送

消息发送在My.RabbitMQ.Send

  1. Install-Package RabbitMQ.Client
  1. //发送消息
  2. using My.RabbitMQ.Config;
  3. using RabbitMQ.Client;
  4. using System.Text;
  5. //建立连接
  6. using (var connection = MQConnection.CreateConnection())
  7. //创建信道
  8. using (var channel = connection.CreateModel())
  9. {
  10. //申明队列,队列名称(queue)为"hello"
  11. channel.QueueDeclare("hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
  12. //构建byte消息数据包
  13. string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
  14. var body = Encoding.UTF8.GetBytes(message);
  15. //发送数据包
  16. channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
  17. Console.WriteLine(" [x] Sent {0}", message);
  18. }

消息接收

消息接收端在My.RabbitMQ.Receive

  1. Install-Package RabbitMQ.Client
  1. //消息接收
  2. //1.实例化连接工厂
  3. using RabbitMQ.Client.Events;
  4. using RabbitMQ.Client;
  5. using System.Text;
  6. using My.RabbitMQ.Config;
  7. //建立连接
  8. using (var connection = MQConnection.CreateConnection())
  9. //3. 创建信道
  10. using (var channel = connection.CreateModel())
  11. {
  12. //4. 申明队列,队列名称(queue)为"hello"
  13. channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
  14. //5. 构造消费者实例
  15. var consumer = new EventingBasicConsumer(channel);
  16. //6. 绑定消息接收后的事件委托
  17. consumer.Received += (model, ea) =>
  18. {
  19. //报错
  20. //var message = Encoding.UTF8.GetString(ea.Body);
  21. var body = ea.Body.ToArray(); // 将内存区域的内容复制到一个新的数组中
  22. //var body = ea.Body.Span; // 从内存区域获取一个跨度
  23. var message = Encoding.UTF8.GetString(body);
  24. Console.WriteLine(" [x] Received {0}", message);
  25. Thread.Sleep(6000);//模拟耗时
  26. Console.WriteLine(" [x] Done");
  27. };
  28. //7. 启动消费者
  29. channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
  30. Console.WriteLine(" Press [enter] to exit.");
  31. Console.ReadLine();
  32. }

测试

先运行消息接收端,再运行消息发送端(ctrl+F5),结果如下图。
1663317400991.png
从上面的代码中可以看出,发送端和消费端的代码前4步都是一样的。主要的区别在于发送端调用channel.BasicPublish方法发送消息;而接收端需要实例化一个EventingBasicConsumer实例来进行消息处理逻辑。另外一点需要注意的是:消息接收端和发送端的队列名称(queue)必须保持一致,这里指定的队列名称为hello。

工作队列

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反,我们将任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作线程时,任务将在它们之间共享。
这个概念在 Web 应用程序中特别有用,因为在 Web 应用程序中,不可能在较短的 HTTP 请求窗口中处理复杂的任务。
使用工作队列的好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了。我们先启动两个接收端,等待消息接收,再启动一个发送端进行消息发送。

消息模型

RabbitMQ 学习一 - 图3
我们需要生成两个项目,My.RabbitMQNewTask,My.RabbitMQ.Worker

My.RabbitMQNewTask

  1. Install-Package RabbitMQ.Client
  1. //发送消息
  2. using My.RabbitMQ.Config;
  3. using RabbitMQ.Client;
  4. using System.Text;
  5. //建立连接
  6. using (var connection = MQConnection.CreateConnection())
  7. //创建信道
  8. using (var channel = connection.CreateModel())
  9. {
  10. //申明队列,队列名称(queue)为"hello"
  11. //channel.QueueDeclare(queue: "hello",
  12. // durable: false,
  13. // exclusive: false,
  14. // autoDelete: false,
  15. // arguments: null);
  16. //申明队列,队列名称(queue)为"hello",指定durable:true,告知rabbitmq对消息进行持久化
  17. channel.QueueDeclare(queue: "durable_task",
  18. durable: true,
  19. exclusive: false,
  20. autoDelete: false,
  21. arguments: null);
  22. //构建byte消息数据包
  23. //string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
  24. //消息从控制台输入后
  25. var message = GetMessage(args);
  26. var body = Encoding.UTF8.GetBytes(message);
  27. //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
  28. var properties = channel.CreateBasicProperties();
  29. properties.Persistent = true;
  30. //发送数据包
  31. channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
  32. Console.WriteLine(" [x] Sent {0}", message);
  33. }
  34. //获取消息
  35. string GetMessage(string[] args)
  36. {
  37. return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
  38. }

My.RabbitMQ.Worker

  1. Install-Package RabbitMQ.Client
  1. //消息接收
  2. //1.实例化连接工厂
  3. using RabbitMQ.Client.Events;
  4. using RabbitMQ.Client;
  5. using System.Text;
  6. using My.RabbitMQ.Config;
  7. //建立连接
  8. using (var connection = MQConnection.CreateConnection())
  9. //创建信道
  10. using (var channel = connection.CreateModel())
  11. {
  12. //申明队列,队列名称(queue)为"hello"
  13. //channel.QueueDeclare(queue: "hello",
  14. // durable: false,
  15. // exclusive: false,
  16. // autoDelete: false,
  17. // arguments: null);
  18. channel.QueueDeclare(queue: "durable_task",
  19. durable: true,
  20. exclusive: false,
  21. autoDelete: false,
  22. arguments: null);
  23. //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时
  24. channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
  25. //构造消费者实例
  26. var consumer = new EventingBasicConsumer(channel);
  27. //绑定消息接收后的事件委托
  28. consumer.Received += (model, ea) =>
  29. {
  30. //报错
  31. //var message = Encoding.UTF8.GetString(ea.Body);
  32. var body = ea.Body.ToArray(); // 将内存区域的内容复制到一个新的数组中
  33. //var body = ea.Body.Span; // 从内存区域获取一个跨度
  34. var message = Encoding.UTF8.GetString(body);
  35. Console.WriteLine(" [x] Received {0}", message);
  36. //添加假任务以模拟执行时间
  37. int dots = message.Split('.').Length - 1;
  38. Thread.Sleep(dots * 6000);
  39. Console.WriteLine(" [x] Done");
  40. // 发送消息确认信号(手动消息确认)
  41. channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  42. };
  43. //启动消费者
  44. //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕
  45. //autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认
  46. channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
  47. Console.WriteLine(" Press [enter] to exit.");
  48. Console.ReadLine();
  49. }

轮循机制调度测试

使用任务队列的优点之一是能够轻松并行化工作。如果我们正在建立积压的工作,我们可以添加更多的Worker,这样,就可以轻松扩展。
首先,让我们尝试同时运行两个 Worker 实例。它们都会从队列中获取消息。切换到My.RabbitMQ.Worker,按ctrl+F5两次,运行两个 Worker 实例。
1663319140628.png
再切换到My.RabbitMQNewTask,按多次ctrl+F5,发布一些消息。两个 Worker 实例接收到的消息。
1663319396097.png
默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个使用者。平均而言,每个消费者将收到相同数量的消息。这种分发消息的方式称为轮循机制。与三个或更多 Worker一起尝试。

消息确认

按照我们上面的demo,一旦RabbitMQ将消息发送到消费端,消息就会立即从内存中移出,无论消费端是否处理完成。在这种情况下,消息就会丢失。
执行任务可能需要几秒钟。您可能想知道,如果其中一个消费者开始一项长期任务,并且只完成了部分任务,会发生什么。使用我们当前的代码,一旦 RabbitMQ 向使用者发送消息,它就会立即将其标记为删除。在这种情况下,如果您杀死一个Worker,我们将丢失它正在处理的消息。我们还将丢失已分派给此特定Worker但尚未处理的所有消息。
但我们不想失去任何任务。如果一个工人死亡,我们希望将任务交付给另一个Worker。
为了确保一个消息永远不会丢失,RabbitMQ支持消息确认(message acknowledgments)。当消费端接收消息并且处理完成后,会发送一个ack(消息确认)信号到RabbitMQ,RabbitMQ接收到这个信号后,就可以删除掉这条已经处理的消息任务。
如果消费端挂掉了(比如,通道关闭、连接丢失等)没有发送ack信号。RabbitMQ就会明白某个消息没有正常处理,RabbitMQ将会重新将消息入队,如果有另外一个消费端在线,就会快速的重新发送到另外一个消费端。通过这种方式,您可以确保即使Worker偶尔死亡,也不会丢失任何消息。
对消费者传递确认强制执行超时(默认为 30 分钟)。这有助于检测从不确认交货的错误(卡住)消费者。您可以增加此超时,如送达确认超时中所述。
默认情况下,手动消息确认处于打开状态。在前面的示例中,我们通过将自动确认模式(“自动确认模式”)参数设置为 true 来显式关闭它们。完成任务后,是时候删除此标志并手动发送来自工作线程的适当确认了。
在现有的写内联之后,添加对基本确认的调用,并使用autoAck: false更新基本消费:

  1. Console.WriteLine(" [x] Done");
  2. // Note: it is possible to access the channel via
  3. // ((EventingBasicConsumer)sender).Model here
  4. channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  5. };
  6. channel.BasicConsume(queue: "hello",
  7. autoAck: false,
  8. consumer: consumer);

示例代码

以第二个示例代码为基础代码,微调下My.RabbitMQ.Worker中的代码逻辑:

  1. //消息接收
  2. //1.实例化连接工厂
  3. using RabbitMQ.Client.Events;
  4. using RabbitMQ.Client;
  5. using System.Text;
  6. using My.RabbitMQ.Config;
  7. //建立连接
  8. using (var connection = MQConnection.CreateConnection())
  9. {
  10. //3. 创建信道
  11. using (var channel = connection.CreateModel())
  12. {
  13. //4. 申明队列,队列名称(queue)为"hello"
  14. channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
  15. //5. 构造消费者实例
  16. var consumer = new EventingBasicConsumer(channel);
  17. //6. 绑定消息接收后的事件委托
  18. consumer.Received += (model, ea) =>
  19. {
  20. //报错
  21. //var message = Encoding.UTF8.GetString(ea.Body);
  22. var body = ea.Body.ToArray(); // 将内存区域的内容复制到一个新的数组中
  23. //var body = ea.Body.Span; // 从内存区域获取一个跨度
  24. var message = Encoding.UTF8.GetString(body);
  25. Console.WriteLine(" [x] Received {0}", message);
  26. //添加假任务以模拟执行时间
  27. int dots = message.Split('.').Length - 1;
  28. Thread.Sleep(dots * 1000);
  29. Console.WriteLine(" [x] Done");
  30. // 发送消息确认信号(手动消息确认)
  31. channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  32. };
  33. //7. 启动消费者
  34. //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕
  35. //autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认
  36. channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
  37. Console.WriteLine(" Press [enter] to exit.");
  38. Console.ReadLine();
  39. }
  40. }

主要改动的是将 autoAck:true修改为autoAck:fasle,以及在消息处理完毕后手动调用BasicAck方法进行手动消息确认。
必须在接收传递的同一通道上发送确认。尝试使用其他通道进行确认将导致通道级协议异常。请参阅有关确认的文档指南以了解更多信息。

测试

使用任务队列的优点之一是能够轻松并行化工作。如果我们正在建立积压的工作,我们可以添加更多的Worker,这样,就可以轻松扩展。
首先,让我们尝试同时运行两个 Worker 实例。它们都会从队列中获取消息。切换到My.RabbitMQ.Worker,按ctrl+F5两次,运行两个 Worker 实例。
再切换到My.RabbitMQNewTask,按多次ctrl+F5,发布一些消息。两个 Worker 实例接收到的消息。

消息持久性

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍将丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久
首先,我们需要确保队列在 RabbitMQ 节点重新启动后仍能存活下来。为此,我们需要将其声明为持久:

  1. 指定durable:true,告知rabbitmq对消息进行持久化
  2. 将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
    1. channel.QueueDeclare(queue: "durable_task",
    2. durable: true,
    3. exclusive: false,
    4. autoDelete: false,
    5. arguments: null);
    在现有的 GetBytes 之后,将“properties.Persistent”设置为 true: ```csharp var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties(); properties.Persistent = true;

  1. <a name="uvKmn"></a>
  2. ### 示例
  3. 在第二个示例的基础代码上修改。<br />修改My.RabbitMQNewTask项目的Program.cs,因为原来的hello队列名称已经在使用,所以需要更改一个新的队列名称,新的队列名称更改为durable_task。
  4. ```csharp
  5. //发送消息
  6. using My.RabbitMQ.Config;
  7. using RabbitMQ.Client;
  8. using System.Text;
  9. //建立连接
  10. using (var connection = MQConnection.CreateConnection())
  11. {
  12. //创建信道
  13. using (var channel = connection.CreateModel())
  14. {
  15. //申明队列,队列名称(queue)为"hello"
  16. //channel.QueueDeclare(queue: "hello",
  17. // durable: false,
  18. // exclusive: false,
  19. // autoDelete: false,
  20. // arguments: null);
  21. //申明队列,队列名称(queue)为"hello",指定durable:true,告知rabbitmq对消息进行持久化
  22. channel.QueueDeclare(queue: "durable_task",
  23. durable: true,
  24. exclusive: false,
  25. autoDelete: false,
  26. arguments: null);
  27. //构建byte消息数据包
  28. //string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
  29. //消息从控制台输入后
  30. var message = GetMessage(args);
  31. var body = Encoding.UTF8.GetBytes(message);
  32. //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
  33. var properties = channel.CreateBasicProperties();
  34. properties.Persistent = true;
  35. //发送数据包
  36. channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
  37. Console.WriteLine(" [x] Sent {0}", message);
  38. }
  39. }
  40. //获取消息
  41. string GetMessage(string[] args)
  42. {
  43. return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
  44. }

修改My.RabbitMQ.Worker项目的Program.cs,因为原来的hello队列名称已经在使用,所以需要更改一个新的队列名称,新的队列名称更改为durable_task。

  1. //消息接收
  2. //1.实例化连接工厂
  3. using RabbitMQ.Client.Events;
  4. using RabbitMQ.Client;
  5. using System.Text;
  6. //实例化连接
  7. var factory = new ConnectionFactory
  8. {
  9. HostName = "localhost",
  10. UserName = "admin",
  11. Password = "admin",
  12. Port = 5672,
  13. //VirtualHost= "myRabbit"
  14. };
  15. //建立连接
  16. using (var connection = factory.CreateConnection())
  17. {
  18. //创建信道
  19. using (var channel = connection.CreateModel())
  20. {
  21. //申明队列,队列名称(queue)为"hello"
  22. //channel.QueueDeclare(queue: "hello",
  23. // durable: false,
  24. // exclusive: false,
  25. // autoDelete: false,
  26. // arguments: null);
  27. channel.QueueDeclare(queue: "durable_task",
  28. durable: true,
  29. exclusive: false,
  30. autoDelete: false,
  31. arguments: null);
  32. //构造消费者实例
  33. var consumer = new EventingBasicConsumer(channel);
  34. //绑定消息接收后的事件委托
  35. consumer.Received += (model, ea) =>
  36. {
  37. //报错
  38. //var message = Encoding.UTF8.GetString(ea.Body);
  39. var body = ea.Body.ToArray(); // 将内存区域的内容复制到一个新的数组中
  40. //var body = ea.Body.Span; // 从内存区域获取一个跨度
  41. var message = Encoding.UTF8.GetString(body);
  42. Console.WriteLine(" [x] Received {0}", message);
  43. //添加假任务以模拟执行时间
  44. int dots = message.Split('.').Length - 1;
  45. Thread.Sleep(dots * 1000);
  46. Console.WriteLine(" [x] Done");
  47. // 发送消息确认信号(手动消息确认)
  48. channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  49. };
  50. //启动消费者
  51. //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕
  52. //autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认
  53. channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
  54. Console.WriteLine(" Press [enter] to exit.");
  55. Console.ReadLine();
  56. }
  57. }

关于消息持久性的说明

将邮件标记为持久性并不能完全保证邮件不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但当 RabbitMQ 接受消息但尚未保存消息时,仍有一个较短的时间窗口。另外, RabbitMQ 不会对每条消息都执行 fsync(2) — 它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果您需要更强的保证,则可以使用发布者确认

测试

我们先运行两次My.RabbitMQNewTask,发布两条消息,然后关闭My.RabbitMQNewTask。
再运行My.RabbitMQ.Worker,开始消费持久性的消息。
1663322403194.png

公平调度

RabbitMQ的消息分发默认按照消费端的数量,按顺序循环分发。这样仅是确保了消费端被平均分发消息的数量,但却忽略了消费端的闲忙情况。这就可能出现某个消费端一直处理耗时任务处于阻塞状态,某个消费端一直处理一般任务处于空置状态,而只是它们分配的任务数量一样。
RabbitMQ 学习一 - 图7
为了更改此行为,我们可以将 BasicQos 方法与预取计数 = 1 设置结合使用。这告诉 RabbitMQ 不要一次向一个工作线程提供多个消息。或者,换句话说,在工作人员处理并确认前一条消息之前,不要向该工作人员发送新消息。相反,它会将其分派给下一个不忙的工作线程。
在工作线程中的现有队列声明之后添加对BasicQos的调用:

  1. channel.QueueDeclare(queue: "durable_task",
  2. durable: true,
  3. exclusive: false,
  4. autoDelete: false,
  5. arguments: null);
  6. channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

这时你需要注意的是如果所有的消费端都处于忙碌状态,你的队列可能会被塞满。你需要注意这一点,要么添加更多的消费端,要么采取其他策略。

示例

在上面的基础代码下,修改My.RabbitMQ.Worker

  1. //消息接收
  2. //1.实例化连接工厂
  3. using RabbitMQ.Client.Events;
  4. using RabbitMQ.Client;
  5. using System.Text;
  6. using My.RabbitMQ.Config;
  7. //建立连接
  8. using (var connection = MQConnection.CreateConnection())
  9. {
  10. //创建信道
  11. using (var channel = connection.CreateModel())
  12. {
  13. //申明队列,队列名称(queue)为"hello"
  14. //channel.QueueDeclare(queue: "hello",
  15. // durable: false,
  16. // exclusive: false,
  17. // autoDelete: false,
  18. // arguments: null);
  19. channel.QueueDeclare(queue: "durable_task",
  20. durable: true,
  21. exclusive: false,
  22. autoDelete: false,
  23. arguments: null);
  24. //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时
  25. channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
  26. //构造消费者实例
  27. var consumer = new EventingBasicConsumer(channel);
  28. //绑定消息接收后的事件委托
  29. consumer.Received += (model, ea) =>
  30. {
  31. //报错
  32. //var message = Encoding.UTF8.GetString(ea.Body);
  33. var body = ea.Body.ToArray(); // 将内存区域的内容复制到一个新的数组中
  34. //var body = ea.Body.Span; // 从内存区域获取一个跨度
  35. var message = Encoding.UTF8.GetString(body);
  36. Console.WriteLine(" [x] Received {0}", message);
  37. //添加假任务以模拟执行时间
  38. int dots = message.Split('.').Length - 1;
  39. Thread.Sleep(dots * 3000);
  40. Console.WriteLine(" [x] Done");
  41. // 发送消息确认信号(手动消息确认)
  42. channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  43. };
  44. //启动消费者
  45. //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕
  46. //autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认
  47. channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
  48. Console.WriteLine(" Press [enter] to exit.");
  49. Console.ReadLine();
  50. }
  51. }

测试

我们先运行多次My.RabbitMQ.Worker,如同时运行3个Worker实例。
再开始多次My.RabbitMQNewTask,发布多条消息。
1663323453828.png

问题1

参数 1: 无法从“System.ReadOnlyMemory

  1. var consumer = new EventingBasicConsumer(channel);
  2. consumer.Received += (model, ea) =>
  3. {
  4. var body = ea.Body;
  5. var message = Encoding.UTF8.GetString(body); // <------错误点
  6. Console.WriteLine(" [x] Received {0}", message);
  7. };

解决办法

  1. var consumer = new EventingBasicConsumer(channel);
  2. consumer.Received += (model, ea) =>
  3. {
  4. var body = ea.Body.ToArray(); // 将内存区域的内容复制到一个新的数组中
  5. var message = Encoding.UTF8.GetString(body);
  6. Console.WriteLine(" [x] Received {0}", message);
  7. };
  1. var consumer = new EventingBasicConsumer(channel);
  2. consumer.Received += (model, ea) =>
  3. {
  4. var body = ea.Body.Span; // 从内存区域获取一个跨度
  5. var message = Encoding.UTF8.GetString(body);
  6. Console.WriteLine(" [x] Received {0}", message);
  7. };