消息可靠性

  1. 消息发送时丢失:
  • 生产者发送的消息未送达exchange
  • 消息到达exchange后未到达queue
  1. MQ宕机:
  • 队列queue将消息丢失
  1. 消费者接收到消息未消费就宕机

解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

    1.1publisher confirm机制

    这种机制必须给每个消息指定一个唯一ID(用以区分不同消息,避免ack冲突)。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

      1.1.1修改配置

      修改publisher服务中的application.yml文件,添加下面的内容
  1. spring:
  2. rabbitmq:
  3. publisher-confirm-type: correlated
  4. publisher-returns: true
  5. template:
  6. mandatory: true
  7. - publish-confirm-type:开启publisher-confirm,这里支持两种类型:
  8. - simple:同步等待confirm结果,直到超时
  9. - correlated:异步回调,定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallback
  10. - publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  11. - template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallbackfalse:则直接丢弃消息

1.1.2定义return回调

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置: 修改publisher服务,添加一个:

package cn.itcast.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    // 为RabbitTemplate设置路由到队列失败时调用的方法
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnCallback((message, replyCode, replyTest, exchange, routingKey) -> {
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                    replyCode,replyTest,exchange,routingKey,message.toString());
        });
    }
}

1.1.3定义ConfirmCallback

ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。 在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中,定义一个单元测试方法.

@Test
public void testSendMessage2SimpleQueue1() throws InterruptedException {
    //1.设置发送的消息内容
    String message = "hello, spring amqp!";
    //2.设置回调函数策略
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    correlationData.getFuture().addCallback(
        result -> {
            //连接MQ正常
            // 在连接正常的情况下接收MQ队列返回的应答信息
            if (result.isAck()){
                // 正常ACK情况
                log.debug("消息发送成功ACK,ID:{}",correlationData.getId());
            }else {
                // NACK情况
                log.info("消息发送失败-NACK,ID:{}",correlationData.getId());
            }
        }, ex -> {
            //连接MQ异常
            log.error("消息发送失败-连接MQ异常,ID:{}",correlationData.getId());
        }
    );
    //3.发送消息
    rabbitTemplate.convertAndSend("amq.topic", "simple.test", message,correlationData);

    Thread.sleep(5000);
}

1.2消息持久化

  • 交换机持久化
    • RabbitMQ中交换机默认是非持久化的,mq重启后就丢失
    • 默认情况下,由SpringAMQP声明的交换机都是持久化的
//SpringAMQP中可以通过代码指定交换机是否持久化:
@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.direct", true, false);
}
  • 队列持久化 (和交换机一样)
@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}
  • 消息持久化 (SpringAMQP发出的任何消息都是持久化的,不用特意指定)
@Test
    public void testSendDurableMessage() throws InterruptedException {
        // 1.消息体
        Message message = MessageBuilder.
                withBody("hello, spring amqp!".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        // 2.发送消息
        rabbitTemplate.convertAndSend("simple.queue", message);

    }

1.3消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。 RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。 可能出现一下场景:

  1. RabbitMQ投递消息给消费者
  2. 消费者获取消息后返回ACK给RabbitMQ
  3. RabbitMQ删除消息
  4. 消费者宕机,消息未处理

到此消息就丢失了,返回ACK的时机需要把控. SpringAMQP允许配置三种确认模式

  • manual:手动ack,需要在业务代码结束后,调用API发送ack.
    • 自己根据业务情况,判断什么时候该ack
  • auto:自动ack,由spring监测listenr代码是否出现异常,没有异常返回ack,抛出异常则返回nack.
    • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
    • none模式下,消息投递是不可靠的,可能丢失

      1.4失败重试机制

      当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力.

      1.4.1本地重试

      利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。 修改consumer服务的application.yml文件,添加内容:
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初始的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试. 重试达到最大次数后,Spring会返回ack,消息会被丢弃.

1.4.2失败策略

用MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RepublishConfig {
    /**
     * 创建错误的交换机: 用于路由消费不了的消息
     * @return
     */
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    /**
     * 创建错误队列: 用于存放消费不了的消息
     * @return
     */
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }

    /**
     * 将错误队列绑定到错误交换机上,并设置路由规则
     * @param errorQueue
     * @param errorMessageExchange
     * @return
     */
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    /**
     * 当本地重试次数耗尽时,调用此对象
     * 此对象可以将处理不了的消息投递到错误的交换机上,并路由到错误队列中进行存储
     */
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

1.5总结

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

延迟队列