发送端确认
事务机制
Spring Boot 中开启 RabbitMQ 事务机制的方式如下,提供一个事务管理器
@BeanRabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
在消息生产者上面做两件事:添加事务注解并设置通信信道为事务模式
@Servicepublic class MsgService {@AutowiredRabbitTemplate rabbitTemplate;@Transactionalpublic void send() {rabbitTemplate.setChannelTransacted(true);rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());int i = 1 / 0;}}
注意两点:
- 发送消息的方法上添加 @Transactional 注解标记事务。
- 调用 setChannelTransacted 方法设置为 true 开启事务模式。
发送方确认机制
spring.rabbitmq.publisher-confirm-type=correlatedspring.rabbitmq.publisher-returns=true
第一行是配置消息到达交换器的确认回调,
第二行则是配置消息到达队列的回调。
第一行属性的配置有三个取值:
- none:表示禁用发布确认模式,默认即此。
- correlated:表示成功发布消息到交换器后会触发的回调方法。
- simple:类似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用。
@Configurationpublic class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);@AutowiredRabbitTemplate rabbitTemplate;@BeanQueue queue() {return new Queue(JAVABOY_QUEUE_NAME);}@BeanDirectExchange directExchange() {return new DirectExchange(JAVABOY_EXCHANGE_NAME);}@BeanBinding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(JAVABOY_QUEUE_NAME);}@PostConstructpublic void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {logger.info("{}:消息成功到达交换器",correlationData.getId());}else{logger.error("{}:消息发送失败", correlationData.getId());}}@Overridepublic void returnedMessage(ReturnedMessage returned) {logger.error("{}:消息未成功路由到队列",returned.getMessage().getMessageProperties().getMessageId());}}
- 定义配置类,实现 RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 两个接口,这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用。
- 定义 initRabbitTemplate 方法并添加 @PostConstruct 注解,在该方法中为 rabbitTemplate 分别配置这两个 Callback。
失败重试
带重试机制
前面所说的事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,这是利用 Spring 中的 retry 机制来完成的,具体配置如下:
spring.rabbitmq.template.retry.enabled=truespring.rabbitmq.template.retry.initial-interval=1000msspring.rabbitmq.template.retry.max-attempts=10spring.rabbitmq.template.retry.max-interval=10000msspring.rabbitmq.template.retry.multiplier=2
从上往下配置含义依次是:
- 开启重试机制。
- 重试起始间隔时间。
- 最大重试次数。
- 最大重试间隔时间。
- 间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)
