发送端确认

事务机制

Spring Boot 中开启 RabbitMQ 事务机制的方式如下,提供一个事务管理器

  1. @Bean
  2. RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
  3. return new RabbitTransactionManager(connectionFactory);
  4. }

在消息生产者上面做两件事:添加事务注解并设置通信信道为事务模式

  1. @Service
  2. public class MsgService {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. @Transactional
  6. public void send() {
  7. rabbitTemplate.setChannelTransacted(true);
  8. rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());
  9. int i = 1 / 0;
  10. }
  11. }

注意两点:

  1. 发送消息的方法上添加 @Transactional 注解标记事务。
  2. 调用 setChannelTransacted 方法设置为 true 开启事务模式。

    发送方确认机制

    1. spring.rabbitmq.publisher-confirm-type=correlated
    2. spring.rabbitmq.publisher-returns=true

第一行是配置消息到达交换器的确认回调,
第二行则是配置消息到达队列的回调。
第一行属性的配置有三个取值:

  1. none:表示禁用发布确认模式,默认即此。
  2. correlated:表示成功发布消息到交换器后会触发的回调方法。
  3. simple:类似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用。
  1. @Configuration
  2. public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
  3. public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
  4. public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
  5. private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
  6. @Autowired
  7. RabbitTemplate rabbitTemplate;
  8. @Bean
  9. Queue queue() {
  10. return new Queue(JAVABOY_QUEUE_NAME);
  11. }
  12. @Bean
  13. DirectExchange directExchange() {
  14. return new DirectExchange(JAVABOY_EXCHANGE_NAME);
  15. }
  16. @Bean
  17. Binding binding() {
  18. return BindingBuilder.bind(queue())
  19. .to(directExchange())
  20. .with(JAVABOY_QUEUE_NAME);
  21. }
  22. @PostConstruct
  23. public void initRabbitTemplate() {
  24. rabbitTemplate.setConfirmCallback(this);
  25. rabbitTemplate.setReturnsCallback(this);
  26. }
  27. @Override
  28. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  29. if (ack) {
  30. logger.info("{}:消息成功到达交换器",correlationData.getId());
  31. }else{
  32. logger.error("{}:消息发送失败", correlationData.getId());
  33. }
  34. }
  35. @Override
  36. public void returnedMessage(ReturnedMessage returned) {
  37. logger.error("{}:消息未成功路由到队列",returned.getMessage().getMessageProperties().getMessageId());
  38. }
  39. }
  1. 定义配置类,实现 RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 两个接口,这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用。
  2. 定义 initRabbitTemplate 方法并添加 @PostConstruct 注解,在该方法中为 rabbitTemplate 分别配置这两个 Callback。

失败重试

带重试机制

前面所说的事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,这是利用 Spring 中的 retry 机制来完成的,具体配置如下:

  1. spring.rabbitmq.template.retry.enabled=true
  2. spring.rabbitmq.template.retry.initial-interval=1000ms
  3. spring.rabbitmq.template.retry.max-attempts=10
  4. spring.rabbitmq.template.retry.max-interval=10000ms
  5. spring.rabbitmq.template.retry.multiplier=2

从上往下配置含义依次是:

  • 开启重试机制。
  • 重试起始间隔时间。
  • 最大重试次数。
  • 最大重试间隔时间。
  • 间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)

参考

https://mp.weixin.qq.com/s?__biz=MzI1NDY0MTkzNQ==&mid=2247494501&idx=1&sn=82de6d7ab3b18c5aa5ed59dcacff540a&chksm=e9c0b905deb73013b41018961604e94cede8a1b0d1439247f874f5dfa239c8fd869a9671bda4&scene=132