介绍
:::tips RabbitMQ提供了生产者消息确认机制来避免消息发送到MQ过程中丢失,消息被投递出去以后,会返回一个结果给发送者,表示消息是否处理成功
返回结果有两种方式:
- publish-confirm,生产者确认
- 消息被生产者成功投递到交换机,返回ack
- 消息未被生产者投递到交换机,返回nack
- publish-return,生产者回调
- 消息未被交换机路由到队列,返回失败原因
ConfirmCallback
:::tips ConfirmCallback是一个回调接口,消息被生产者投递出去后就会触发这个回调,确认消息是否到达RabbitMQ服务器,即判断消息是否成功被生产者投递到达交换机
- 消息成功投递到交换机,返回ack,触发ConfirmCallback回调接口
- 消息未成功投递到交换机,返回nack,触发ConfirmCallback回调接口
投递过程中出现异常,触发ConfirmCallback回调接口 :::
添加配置
:::tips 在消息生产者的配置文件中添加配置 :::
spring:
rabbitmq:
#设置消息生产者确认机制的类型:simple同步等待、correlated异步回调
publisher-confirm-type: correlated
编写代码
:::tips 在发送消息时指定回调内容,因为每个业务的逻辑处理不一定相同 ::: ```java @SpringBootTest @Slf4j public class MyTest{
public void test() throws InterruptedException {
//指定交换机
String exchangeName = "task.direct";
//指定路由key
String routingKey = "task";
//构建消息
String message = "这是一条测试消息";
//构建CorrelationData对象,并为消息设置全局唯一的消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//定义消息被生产者投递出去后触发的回调代码
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
//ack:消息投递到交换机成功
log.debug("消息发送成功,ID:{}", correlationData.getId());
}else{
//nack:消息投递到交换机失败
log.error("消息发送失败,ID:{},原因{}", correlationData.getId(), result.getReason());
}
},
ex -> {
log.error("消息发送异常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage());
}
);
//发送消息
rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData);
//休眠,等待ack回执
Thread.sleep(2000);
}
}
<a name="vhXXT"></a>
# ReturnCallback
:::tips
ReturnCallback也是一个回调接口,消息未被交换机成功路由到队列时会触发这个回调,确认消息是否到达队列,即判断消息是否成功被交换机路由到队列
- 消息被交换机成功路由到队列,不返回内容
- 消息未被交换机成功路由到队列,返回错误信息,触发ReturnCallback回调接口
:::
<a name="WiJ6n"></a>
### 添加配置
:::tips
在消息生产者的配置文件中添加配置
:::
```yaml
spring:
rabbitmq:
#开启消息生产者回调
publisher-returns: true
template:
#设置消息路由到队列失败时是否返回回调
mandatory: true
编写代码
:::tips 创建一个配置类,在类上打上@Configuration注解,同时实现ApplicationContextAware接口,重写接口中的setApplicationContext方法,方法中的ApplicationContext就是Spring容器对象,然后从Spring容器对象中获取RabbitTemplate对象,调用RabbitTemplate对象的setReturnCallback方法,定义消息未被交换机成功路由到队列时触发的回调内容 :::
@Configuration
public class XxxConfig implements ApplicationContextAware{
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//获取RabbitTemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
//消息投递失败的日志
System.out.println("消息发送失败!应答码:{" + replyCode + "},原因{" + replyText + "},交换机{" + exchange + "},路由键{" + routingKey + "},消息{" + message.toString() + "}");
//如果有业务需要,可以重发消息
});
}
}