介绍

:::tips RabbitMQ提供了生产者消息确认机制来避免消息发送到MQ过程中丢失,消息被投递出去以后,会返回一个结果给发送者,表示消息是否处理成功

返回结果有两种方式:

  • publish-confirm,生产者确认
    • 消息被生产者成功投递到交换机,返回ack
    • 消息未被生产者投递到交换机,返回nack
  • publish-return,生产者回调
    • 消息未被交换机路由到队列,返回失败原因

image.png

image.png :::

ConfirmCallback

:::tips ConfirmCallback是一个回调接口,消息被生产者投递出去后就会触发这个回调,确认消息是否到达RabbitMQ服务器,即判断消息是否成功被生产者投递到达交换机

  • 消息成功投递到交换机,返回ack,触发ConfirmCallback回调接口
  • 消息未成功投递到交换机,返回nack,触发ConfirmCallback回调接口
  • 投递过程中出现异常,触发ConfirmCallback回调接口 :::

    添加配置

    :::tips 在消息生产者的配置文件中添加配置 :::

    1. spring:
    2. rabbitmq:
    3. #设置消息生产者确认机制的类型:simple同步等待、correlated异步回调
    4. publisher-confirm-type: correlated

    编写代码

    :::tips 在发送消息时指定回调内容,因为每个业务的逻辑处理不一定相同 ::: ```java @SpringBootTest @Slf4j public class MyTest{

    public void test() throws InterruptedException {

    1. //指定交换机
    2. String exchangeName = "task.direct";
    3. //指定路由key
    4. String routingKey = "task";
    5. //构建消息
    6. String message = "这是一条测试消息";
    7. //构建CorrelationData对象,并为消息设置全局唯一的消息ID
    8. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    9. //定义消息被生产者投递出去后触发的回调代码
    10. correlationData.getFuture().addCallback(
    11. result -> {
    12. if(result.isAck()){
    13. //ack:消息投递到交换机成功
    14. log.debug("消息发送成功,ID:{}", correlationData.getId());
    15. }else{
    16. //nack:消息投递到交换机失败
    17. log.error("消息发送失败,ID:{},原因{}", correlationData.getId(), result.getReason());
    18. }
    19. },
    20. ex -> {
    21. log.error("消息发送异常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage());
    22. }
    23. );
    24. //发送消息
    25. rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData);
    26. //休眠,等待ack回执
    27. Thread.sleep(2000);

    }

}

  1. <a name="vhXXT"></a>
  2. # ReturnCallback
  3. :::tips
  4. ReturnCallback也是一个回调接口,消息未被交换机成功路由到队列时会触发这个回调,确认消息是否到达队列,即判断消息是否成功被交换机路由到队列
  5. - 消息被交换机成功路由到队列,不返回内容
  6. - 消息未被交换机成功路由到队列,返回错误信息,触发ReturnCallback回调接口
  7. :::
  8. <a name="WiJ6n"></a>
  9. ### 添加配置
  10. :::tips
  11. 在消息生产者的配置文件中添加配置
  12. :::
  13. ```yaml
  14. spring:
  15. rabbitmq:
  16. #开启消息生产者回调
  17. publisher-returns: true
  18. template:
  19. #设置消息路由到队列失败时是否返回回调
  20. mandatory: true

编写代码

:::tips 创建一个配置类,在类上打上@Configuration注解,同时实现ApplicationContextAware接口,重写接口中的setApplicationContext方法,方法中的ApplicationContext就是Spring容器对象,然后从Spring容器对象中获取RabbitTemplate对象,调用RabbitTemplate对象的setReturnCallback方法,定义消息未被交换机成功路由到队列时触发的回调内容 :::

  1. @Configuration
  2. public class XxxConfig implements ApplicationContextAware{
  3. @Override
  4. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  5. //获取RabbitTemplate对象
  6. RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
  7. //设置ReturnCallback
  8. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  9. //消息投递失败的日志
  10. System.out.println("消息发送失败!应答码:{" + replyCode + "},原因{" + replyText + "},交换机{" + exchange + "},路由键{" + routingKey + "},消息{" + message.toString() + "}");
  11. //如果有业务需要,可以重发消息
  12. });
  13. }
  14. }