介绍
:::tips RabbitMQ提供了生产者消息确认机制来避免消息发送到MQ过程中丢失,消息被投递出去以后,会返回一个结果给发送者,表示消息是否处理成功
返回结果有两种方式:
- publish-confirm,生产者确认
- 消息被生产者成功投递到交换机,返回ack
 - 消息未被生产者投递到交换机,返回nack
 
 - publish-return,生产者回调
- 消息未被交换机路由到队列,返回失败原因
 
 

ConfirmCallback
:::tips ConfirmCallback是一个回调接口,消息被生产者投递出去后就会触发这个回调,确认消息是否到达RabbitMQ服务器,即判断消息是否成功被生产者投递到达交换机
- 消息成功投递到交换机,返回ack,触发ConfirmCallback回调接口
 - 消息未成功投递到交换机,返回nack,触发ConfirmCallback回调接口
 投递过程中出现异常,触发ConfirmCallback回调接口 :::
添加配置
:::tips 在消息生产者的配置文件中添加配置 :::
spring:rabbitmq:#设置消息生产者确认机制的类型:simple同步等待、correlated异步回调publisher-confirm-type: correlated
编写代码
:::tips 在发送消息时指定回调内容,因为每个业务的逻辑处理不一定相同 ::: ```java @SpringBootTest @Slf4j public class MyTest{
public void test() throws InterruptedException {
//指定交换机String exchangeName = "task.direct";//指定路由keyString routingKey = "task";//构建消息String message = "这是一条测试消息";//构建CorrelationData对象,并为消息设置全局唯一的消息IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());//定义消息被生产者投递出去后触发的回调代码correlationData.getFuture().addCallback(result -> {if(result.isAck()){//ack:消息投递到交换机成功log.debug("消息发送成功,ID:{}", correlationData.getId());}else{//nack:消息投递到交换机失败log.error("消息发送失败,ID:{},原因{}", correlationData.getId(), result.getReason());}},ex -> {log.error("消息发送异常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage());});//发送消息rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData);//休眠,等待ack回执Thread.sleep(2000);
}
}
<a name="vhXXT"></a># ReturnCallback:::tipsReturnCallback也是一个回调接口,消息未被交换机成功路由到队列时会触发这个回调,确认消息是否到达队列,即判断消息是否成功被交换机路由到队列- 消息被交换机成功路由到队列,不返回内容- 消息未被交换机成功路由到队列,返回错误信息,触发ReturnCallback回调接口:::<a name="WiJ6n"></a>### 添加配置:::tips在消息生产者的配置文件中添加配置:::```yamlspring:rabbitmq:#开启消息生产者回调publisher-returns: truetemplate:#设置消息路由到队列失败时是否返回回调mandatory: true
编写代码
:::tips 创建一个配置类,在类上打上@Configuration注解,同时实现ApplicationContextAware接口,重写接口中的setApplicationContext方法,方法中的ApplicationContext就是Spring容器对象,然后从Spring容器对象中获取RabbitTemplate对象,调用RabbitTemplate对象的setReturnCallback方法,定义消息未被交换机成功路由到队列时触发的回调内容 :::
@Configurationpublic class XxxConfig implements ApplicationContextAware{@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {//消息投递失败的日志System.out.println("消息发送失败!应答码:{" + replyCode + "},原因{" + replyText + "},交换机{" + exchange + "},路由键{" + routingKey + "},消息{" + message.toString() + "}");//如果有业务需要,可以重发消息});}}
:::
