生产端消息确认

tx机制

tx机制叫做事务机制,RabbitMQ中有三个与tx机制的方法:txSelect()txCommit()txRollback()

  • channel.txSelect() 用于将当前channel设置成transaction模式
  • channel.txCommit() 提交事务
  • channel.txRollback() 回滚事务

使用tx机制,首先要通过txSelect方法开启事务,然后发布消息给broker服务器,如果txCommit提交成功,则说明消息成功被broker接收;如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候可以捕获异常,通过txRollback回滚事务

  1. // 创建名称为hello的队列
  2. channel.QueueDeclare("hello", false, false, false, null);
  3. for (int i = 0; i < 5; i++)
  4. {
  5. // 构建消息数据包
  6. byte[] body = Encoding.UTF8.GetBytes(i.ToString());
  7. try
  8. {
  9. // 开启事务机制
  10. channel.TxSelect();
  11. // 消息发送
  12. channel.BasicPublish(
  13. exchange: "",
  14. routingKey: "hello",
  15. basicProperties: null,
  16. body: body);
  17. // 事务提交
  18. channel.TxCommit();
  19. }
  20. catch (Exception ex)
  21. {
  22. channel.TxRollback();
  23. Assert.Fail(ex.Message);
  24. }
  25. }

Confirm模式

C#的RabbitMQ API中,有三个与Confirm相关的方法:ConfirmSelect()WaitForConfirms()WaitForConfirmOrDie

  • channel.ConfirmSelect() 表示开启Confirm模式
  • channel.WaitForConfirms() 等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回false
  • channel.WaitForConfirmsOrDie()WaitForConfirms作用类型,也是等待所有消息确认。区别在于该方法没有返回值(Void),如果有任意一条消息没有被成功接收,该方法会立即抛出OperationInterrupedException类型异常
  1. channel.QueueDeclare(
  2. queue: "hello",
  3. durable: false,
  4. exclusive: false,
  5. autoDelete: false,
  6. arguments: null);
  7. string message = "hello world";
  8. byte[] messageBody = Encoding.UTF8.GetBytes(message);
  9. // 开启Confirm模式
  10. channel.ConfirmSelect();
  11. // 消息发送
  12. channel.BasicPublish(
  13. exchange: "",
  14. routingKey: "hello",
  15. basicProperties: null,
  16. body: messageBody);
  17. // WaitForConfirms确认消息(可以同时确认多条消息)是否发送成功
  18. if (channel.WaitForConfirms())
  19. {
  20. Console.WriteLine($"Message发送成功");
  21. }
  22. else
  23. {
  24. Assert.Fail();
  25. }

消费端消息确认

自动确认

当RabbbitMQ将消息发送给消费者后,消费者接收到消息后,不等待消息处理结束,立即自动回送一个确认回执。自动确认的用法十分简单,设置消费方法的参数autoAck为true即可

  1. EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
  2. consumer.Received += (model, ea) =>
  3. {
  4. string message =
  5. Encoding.UTF8.GetString(ea.Body.ToArray());
  6. };
  7. channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);

可能存在的问题

  1. 丢失数据:Broker会在接收到确认回执时删除消息,如果消费者接收到消息并返回了确认回执,然后这个消费者在处理消息时挂了,那么这条消息就再也找不回来了
  2. 只要队列不空,RabbitMQ会源源不断的把消息推送给客户端,而不管客户端能否消费的完,如果其中一个消费端消费的较慢,会极大的浪费性能

手动确认(BasicAck)

消费从队列中获取消息后,服务器会将该消息处于不可用状态,等待消费者反馈。Resume方法的参数autoAck设置为false,然后在消费端使用代码 channel.BasicAck()/BasicReject()等方法来确认和拒绝消息即可实现手动确认

  1. EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
  2. consumer.Received += (model, ea) =>
  3. {
  4. string message =
  5. Encoding.wUTF8.GetString(ea.Body.ToArray());
  6. // 手动ack
  7. channel.BasicAck(
  8. deliveryTag: ea.DeliveryTag,
  9. multiple: false);
  10. };
  11. channel.BasicConsume(queue: "hello",
  12. autoAck: false,
  13. consumer: consumer);

改为手动确认方式只需改两处

  1. 开启监听时将autoAck参数改为false
  2. 消息消费成功后返回确认

这段代码中,先处理消息,完成后,再做ack响应,失败就不做ack响应,这样消息会储存在MQUnacked消息里,不会丢失,看起来没啥问题,但是如果其中一条消息在处理时抛出了异常,将导致后续所有消息都会无法消费

消息拒绝

BasicNack()

BasicReject()不同的是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息

  1. EventingBasicConsumer consumer =
  2. new EventingBasicConsumer(channel);
  3. consumer.Received += (model, ea) =>
  4. {
  5. string message =
  6. Encoding.UTF8.GetString(ea.Body.ToArray());
  7. try
  8. {
  9. /* 消费到某条消息时出错
  10. * 导致Broker无法拿到正常回执信息引发后续消息都无法被正常消费
  11. * 如果MQ没得到ack响应,这些消息会堆积在Unacked消息里,不会丢弃,直至客户端断开重连时,才变回ready
  12. * 如果Consumer客户端不断开连接,这些Unacked消息,永远不会变回ready状态
  13. * Unacked消息多了,占用内存越来越大,就会异常
  14. */
  15. MessageConsumer(ea);
  16. channel.BasicAck(
  17. deliveryTag: ea.DeliveryTag,
  18. multiple: false);
  19. }
  20. catch (Exception ex)
  21. {
  22. // 出错了,发nack,并通知MQ把消息塞回的队列头部(不是尾部)
  23. channel.BasicNack(
  24. deliveryTag: ea.DeliveryTag,
  25. multiple: false,
  26. requeue: true);
  27. }
  28. };
  29. channel.BasicConsume(queue: "hello",
  30. autoAck: false,
  31. consumer: consumer);

这里将代码调整为消费正常就ack,不正常就nack,并等下一次重新消费。看起来没问题,但是如果某条消息在消费时又抛出异常,该消息将会被Nack机制重新扔回队列头部,下一步又消费这条会出异常的消息,又出错,塞回队列……进入死循环,所以要谨慎使用Nack机制。这里可以在catch中记录错误日志依旧使用ack确认消费

BasicReject()

消费端告诉服务器这个消息拒绝接收,不处理,可以设置是否放回到队列中还是丢掉(只能一次拒绝一个消息)

  1. MessagePublisher("hello", $"1");
  2. MessagePublisher("hello", $"2");
  3. MessagePublisher("hello", $"3");
  4. channel.QueueDeclare(
  5. queue: "hello",
  6. durable: false,
  7. exclusive: false,
  8. autoDelete: false,
  9. arguments: null);
  10. EventingBasicConsumer consumer =
  11. new EventingBasicConsumer(channel);
  12. channel.BasicQos(0, 1, false);
  13. consumer.Received += (model, ea) =>
  14. {
  15. string message =
  16. Encoding.UTF8.GetString(ea.Body.ToArray());
  17. if (message == "2")
  18. {
  19. Console.WriteLine($"Message:{message}");
  20. channel.BasicAck(
  21. deliveryTag: ea.DeliveryTag,
  22. multiple: false);
  23. }
  24. else
  25. {
  26. Console.WriteLine($"拒绝处理");
  27. /* BasicReject用于拒绝消息
  28. requeue参数指定了拒绝后是否重新放回queue
  29. 一次只能拒绝一条消息
  30. 设置为true: 消息会被重新仍回queue中
  31. 设置为false:消息将被丢弃
  32. */
  33. channel.BasicReject(
  34. deliveryTag: ea.DeliveryTag,
  35. requeue: true);
  36. }
  37. };
  38. channel.BasicConsume(queue: "hello",
  39. autoAck: false,
  40. consumer: consumer);

BasicRecover()

路由不成功的消息可以使用recovery重新发送到队列中,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己

消息持久化/优先级

Persistent

参数 重启RabbitMQ
exchange.durable=fasle/queue.durable=false exchange/queue将会被丢弃
exchange.durable=fasle exchange将会被丢弃
queue.durable=fasle queue将会被丢弃
exchange.durable=fasle/queue.durable=true exchange将会被丢弃,queue虽然会存在,但队列内消息会全部丢失
exchange.durable=true/queue.durable=true exchange/queue会存在,但队列内消息会全部丢失
exchange.durable=true&&queue.durable=true/消息发布时(persistent=true) 消息真正的持久化
  1. for (int i = 0; i < 100; i++)
  2. {
  3. byte[] messageBody = Encoding.UTF8.GetBytes(i.ToString());
  4. // 设置消息持久化
  5. var props = channel.CreateBasicProperties();
  6. props.Persistent = true;
  7. // 消息发送
  8. channel.BasicPublish(
  9. exchange: "TestExchange",
  10. routingKey: "",
  11. basicProperties: props,
  12. body: messageBody);
  13. }

Priority

queue是先进先出的,即先发送的消息,先被消费。但是在具体业务中可能会遇到要提前处理某些消息的需求,如一个常见的需求:普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。消息实现优先级控制的实现方式是:首先在声明queue是设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级等级即可

生产者

  1. // 声明交换机exchang
  2. channel.ExchangeDeclare(exchange: "myexchange",
  3. type: ExchangeType.Fanout,
  4. durable: true,
  5. autoDelete: false,
  6. arguments: null);
  7. // 声明队列queue
  8. channel.QueueDeclare(queue: "myqueue",
  9. durable: true,
  10. exclusive: false,
  11. autoDelete: false,
  12. arguments: new Dictionary<string, object>() {
  13. //队列优先级最高为10,不加x-max-priority的话,消息发布时设置了消息的优先级也不会生效
  14. {"x-max-priority",10 }
  15. });
  16. // 绑定exchange和queue
  17. channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "mykey");
  18. Console.WriteLine("生产者准备就绪....");
  19. // 测试数据
  20. string[] msgs = { "vip1", "hello1", "hello2", "hello3", "vip5" };
  21. // 设置消息优先级
  22. var props = channel.CreateBasicProperties();
  23. foreach (string msg in msgs)
  24. {
  25. // vip开头的消息,优先级设置为9
  26. if (msg.StartsWith("vip"))
  27. {
  28. props.Priority = 9;
  29. channel.BasicPublish(exchange: "myexchange",
  30. routingKey: "mykey",
  31. basicProperties: props,
  32. body: Encoding.UTF8.GetBytes(msg));
  33. }
  34. // 其他消息优先级为1
  35. else
  36. {
  37. props.Priority = 1;
  38. channel.BasicPublish(exchange: "myexchange",
  39. routingKey: "mykey",
  40. basicProperties: props,
  41. body: Encoding.UTF8.GetBytes(msg));
  42. }

消费端

  1. EventingBasicConsumer consumer =
  2. new EventingBasicConsumer(channel);
  3. // 绑定消息接收后的事件委托
  4. consumer.Received += (model, ea) =>
  5. {
  6. string message =
  7. Encoding.UTF8.GetString(ea.Body.ToArray());
  8. Console.WriteLine($"Message:{message}");
  9. Assert.IsNotNull(message);
  10. };
  11. channel.BasicConsume(
  12. queue: "myqueue",
  13. autoAck: true,
  14. consumer: consumer);

image.png