十、发布确认高级(SpringBoot版)

在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。

于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?
image.png

1、ConfirmCallback 发布端到交换机的回调

ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。
我们需要在生产者的配置中添加下面配置,表示开启发布者确认。

  1. spring.rabbitmq.publisher-confirm-type=correlated #新版本
  2. spring.rabbitmq.publisher-confirms=true #老版本

确认机制方案

5_发布确认高级 - 图2

代码架构图

5_发布确认高级 - 图3

配置文件

  1. spring.rabbitmq.host=*.*.*.*
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=admin
  4. spring.rabbitmq.password=123456
  5. spring.rabbitmq.publisher-confirm-type=correlated #确认消息已发送到交换机(Exchange)
  • NONE 禁用发布确认模式,是默认值
  • CORRELATED 发布消息成功到交换器后会触发回调方法
  • SIMPLE 经测试有两种效果,
    • 其一效果和 CORRELATED 值一样会触发回调方法,
    • 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;

添加配置类

  1. package com.atguigu.rabbitmq.springbootrabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @author: like
  8. * @Date: 2021/07/20 8:07
  9. */
  10. @Configuration
  11. public class ConfirmConfig {
  12. //交换机
  13. public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
  14. //队列
  15. public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
  16. //RoutingKey
  17. public static final String CONFIRM_ROUTING_KEY = "key1";
  18. //声明交换机
  19. @Bean("confirmExchange")
  20. public DirectExchange confirmExchange() {
  21. return new DirectExchange(CONFIRM_EXCHANGE_NAME);
  22. }
  23. //声明队列
  24. @Bean("confirmQueue")
  25. public Queue confirmQueue() {
  26. return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
  27. }
  28. //绑定
  29. @Bean
  30. public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
  31. @Qualifier("confirmExchange") DirectExchange confirmExchange) {
  32. return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
  33. }
  34. }

消息生产者

  1. package com.atguigu.rabbitmq.springbootrabbitmq.controller;
  2. import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig;
  3. import com.atguigu.rabbitmq.springbootrabbitmq.config.MyCallBack;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.web.bind.annotation.GetMapping;
  9. import org.springframework.web.bind.annotation.PathVariable;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import javax.annotation.PostConstruct;
  13. /**
  14. * @author: like
  15. * @Date: 2021/07/20 8:20
  16. */
  17. @Slf4j
  18. @RestController
  19. @RequestMapping("/confirm")
  20. public class ProducerController {
  21. @Autowired
  22. private RabbitTemplate rabbitTemplate;
  23. @Autowired
  24. private MyCallBack myCallBack;
  25. //依赖注入 rabbitTemplate 之后再设置它的回调对象
  26. @PostConstruct
  27. public void init() {
  28. //注入
  29. rabbitTemplate.setConfirmCallback(myCallBack);
  30. }
  31. /**
  32. * 发消息 消息回调和退回
  33. *
  34. * @param message
  35. */
  36. @GetMapping("/sendMessage/{message}")
  37. public void sendMsg(@PathVariable String message) {
  38. //指定消息 id 为 1
  39. CorrelationData correlationData1 = new CorrelationData("1");
  40. rabbitTemplate.convertAndSend(
  41. ConfirmConfig.CONFIRM_EXCHANGE_NAME,
  42. ConfirmConfig.CONFIRM_ROUTING_KEY,
  43. message + "key1",
  44. correlationData1);
  45. log.info("发送消息内容:{}", message + "key1");
  46. //指定消息 id 为 2
  47. CorrelationData correlationData2 = new CorrelationData("2");
  48. rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
  49. ConfirmConfig.CONFIRM_ROUTING_KEY + "2", //故意写错RoutingKey
  50. message + "key2",
  51. correlationData2);
  52. log.info("发送消息内容:{}", message + "key2");
  53. }
  54. }

回调接口

交换机如果收不到消息,可以进行回调接口

  1. package com.atguigu.rabbitmq.springbootrabbitmq.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.connection.CorrelationData;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. @Slf4j
  8. public class MyCallBack implements RabbitTemplate.ConfirmCallback {
  9. /**
  10. * 交换机不管是否收到消息 都回调
  11. *
  12. * @param correlationData 保存回调消息的ID及相关信息
  13. * @param ack 交换机是否收到消息 true false
  14. * @param cause 未收到消息的原因
  15. */
  16. @Override
  17. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  18. String id = correlationData != null ? correlationData.getId() : "";
  19. if (ack) {
  20. log.info("交换机已经收到ID为:{}的消息", id);
  21. } else {
  22. log.info("交换机还未收到ID为:{}的消息,由于原因:{}", id, cause);
  23. }
  24. }
  25. }

消息消费者

  1. package com.atguigu.rabbitmq.springbootrabbitmq.consumer;
  2. import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * @author: like
  9. * @Date: 2021/07/20 8:23
  10. */
  11. @Component
  12. @Slf4j
  13. public class ConfirmConsumer {
  14. @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
  15. public void receiveConfirmMessage(Message message) {
  16. String msg = new String(message.getBody());
  17. log.info("接收到队列confirm.queue消息:{}", msg);
  18. }
  19. }

访问: http://localhost:8080/confirm/sendMessage/大家好

结果分析

5_发布确认高级 - 图4

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key1”,第二条消息的 RoutingKey 为 “key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,

但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。

丢弃的消息交换机是不知道的,需要解决告诉生产者消息传送失败

2、回退消息 ReturnCallback 交换机到队列的回调

通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调,该方法可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非你代码写错了。

使用此接口需要在生产者配置中加入一下配置,表示发布者返回。

  1. spring.rabbitmq.publisher-returns=true #确认消息已发送到队列(Queue)

Mandatory 参数

  1. rabbitTemplate.setMandatory(true);
  2. rabbitTemplate.setReturnsCallback(myCallBack);

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。

通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

修改配置

  1. #消息退回
  2. spring.rabbitmq.publisher-returns=true

修改回调接口

  1. package com.atguigu.rabbitmq.springbootrabbitmq.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. @Slf4j
  9. public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  10. /**
  11. * 交换机确认回调方法
  12. * <p>
  13. * 不管是否收到消息 都回调
  14. *
  15. * @param correlationData 保存回调消息的ID及相关信息
  16. * @param ack 交换机是否收到消息 true false
  17. * @param cause 未收到消息的原因
  18. */
  19. @Override
  20. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  21. String id = correlationData != null ? correlationData.getId() : "";
  22. if (ack) {
  23. log.info("交换机已经收到ID为:{}的消息", id);
  24. } else {
  25. log.info("交换机还未收到ID为:{}的消息,由于原因:{}", id, cause);
  26. }
  27. }
  28. //当消息无法路由的时候的回调方法
  29. //只有不可达目的地的时候 才进行回退
  30. @Override
  31. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  32. log.error("消息{},被交换机{}退回,退回原因:{},路由key为:{}",
  33. new String(message.getBody()),
  34. exchange,
  35. replyText,
  36. routingKey);
  37. }
  38. }

修改消息生产者

  1. //依赖注入 rabbitTemplate 之后再设置它的回调对象
  2. @PostConstruct
  3. public void init() {
  4. //注入
  5. rabbitTemplate.setConfirmCallback(myCallBack);
  6. /**
  7. * true:交换机无法将消息进行路由时,会将该消息返回给生产者
  8. * false:如果发现消息无法进行路由,则直接丢弃
  9. */
  10. rabbitTemplate.setMandatory(true);
  11. //设置回退消息交给谁处理
  12. rabbitTemplate.setReturnCallback(myCallBack);
  13. }

访问: http://localhost:8080/confirm/sendMessage/大家好

结果分析

5_发布确认高级 - 图5

3、备份交换机

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。

但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。

而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。

而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。

如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。

在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。

什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,

当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,

通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。

当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

代码架构图

5_发布确认高级 - 图6

修改配置类

  1. package com.atguigu.rabbitmq.springbootrabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @author: like
  8. * @Date: 2021/07/20 8:07
  9. */
  10. @Configuration
  11. public class ConfirmConfig {
  12. //交换机
  13. public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
  14. //队列
  15. public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
  16. //RoutingKey
  17. public static final String CONFIRM_ROUTING_KEY = "key1";
  18. ///////////////////////////关于备份的
  19. //备份交换机
  20. public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
  21. //备份队列
  22. public static final String BACKUP_QUEUE_NAME = "backup.queue";
  23. //报警队列
  24. public static final String WARNING_QUEUE_NAME = "warning.queue";
  25. // //声明交换机 Exchange
  26. // @Bean("confirmExchange")
  27. // public DirectExchange confirmExchange() {
  28. // return new DirectExchange(CONFIRM_EXCHANGE_NAME);
  29. // }
  30. //声明确认 Exchange 交换机的备份交换机
  31. @Bean("confirmExchange")
  32. public DirectExchange confirmExchange() {
  33. return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
  34. .durable(true)
  35. //设置该交换机的备份交换机
  36. .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)
  37. .build();
  38. }
  39. //声明确认队列
  40. @Bean("confirmQueue")
  41. public Queue confirmQueue() {
  42. return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
  43. }
  44. //声明确认队列绑定关系
  45. @Bean
  46. public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
  47. @Qualifier("confirmExchange") DirectExchange confirmExchange) {
  48. return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
  49. }
  50. //************************以下是关于备份的******************************
  51. //备份交换机
  52. @Bean("backupExchange")
  53. public FanoutExchange backupExchange() {
  54. return new FanoutExchange(BACKUP_EXCHANGE_NAME);
  55. }
  56. //备份队列
  57. @Bean("backupQueue")
  58. public Queue backupQueue() {
  59. return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
  60. }
  61. // 声明备份队列绑定关系
  62. @Bean
  63. public Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,
  64. @Qualifier("backupExchange") FanoutExchange backupExchange) {
  65. return BindingBuilder.bind(backupQueue).to(backupExchange);
  66. }
  67. //报警队列
  68. @Bean("warningQueue")
  69. public Queue warningQueue() {
  70. return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
  71. }
  72. // 声明报警队列绑定关系
  73. @Bean
  74. public Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue") Queue warningQueue,
  75. @Qualifier("backupExchange") FanoutExchange backupExchange) {
  76. return BindingBuilder.bind(warningQueue).to(backupExchange);
  77. }
  78. }

报警消费者

  1. package com.atguigu.rabbitmq.springbootrabbitmq.consumer;
  2. import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * @author: like
  9. * @Date: 2021/07/21 8:14
  10. */
  11. @Component
  12. @Slf4j
  13. public class WarningConsumer {
  14. //接收报警消息
  15. @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
  16. public void receiveWarningMsg(Message message){
  17. String msg = new String(message.getBody());
  18. log.error("报警发现不可路由消息:{}",msg);
  19. }
  20. }

测试注意事项

之前已写过 confirm.exchange 交换机,由于更改配置,需要删掉,不然会报错

5_发布确认高级 - 图7

结果分析

访问: http://localhost:8080/confirm/sendMessage/大家好

5_发布确认高级 - 图8

mandatory 参数(回退消息)与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从,谁优先级高?

经过上面结果显示答案是备份交换机优先级高