• .消息可靠性
    • mq的发送消息的时候可能会在发送给exchange交换机的时候丢失 可能在交换机发送给队列的时候丢失 也可能在监听消息的时候丢失
  • 解决方案:
    • 生产则确认机制:
    • mq持久化
    • 消费者确认机制
    • 失败重试机制
  • 生产者确认消息
    • 发送到交换机的阶段:发送成功返回ack 不成功返回nack
    • 交换机发送到队列的阶段: 发送者回执,消息发送到交换机了 如果没有发送到队列,就会返回失败的原因和ack
    • 发送消息的时候,必须个每个消息设置一个唯一的id,以区分不同的消息
    • 实现步骤:
      • 需要在rabbit配置 publisher-confirm-type:correlated //simple 代表同步 correlated代表异步
        publisher-returns:true 开启回执
        template:
        mandatory:true ```xml spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
  1. - 在消息的生产者定义一个配置类实现ApplicationContextAware接口 重写setApplicationcontext方法 通过application对象获取rabbittenplate对象
  2. - rabbtitemplate.setConfirm()定义实现确认逻辑 .setRentruncallback定义回调逻辑
  3. ```xml
  4. package cn.itcast.mq.config;
  5. @Slf4j
  6. @Configuration
  7. public class CommonConfig implements ApplicationContextAware {
  8. @Override
  9. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  10. // 获取RabbitTemplate
  11. RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
  12. // 设置ReturnCallback
  13. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  14. // 投递失败,记录日志
  15. log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
  16. replyCode, replyText, exchange, routingKey, message.toString());
  17. // 如果有业务需要,可以重发消息
  18. });
  19. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  20. /**
  21. * @param correlationData 自定义的数据
  22. * @param ack 是否确认
  23. * @param cause 原因
  24. */
  25. @Override
  26. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  27. if(ack){
  28. // 3.1.ack,消息成功
  29. log.debug("消息发送成功, ID:{}", correlationData.getId());
  30. }else{
  31. // 3.2.nack,消息失败
  32. log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), cause);
  33. }
  34. }
  35. });
  36. }
  37. @Bean
  38. public DirectExchange simpleExchange(){
  39. // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
  40. return new DirectExchange("simple.direct", false, false);
  41. }
  42. @Bean
  43. public Queue simpleQueue(){
  44. return new Queue("simple.queue",false);
  45. }
  46. @Bean
  47. public Binding binding(){
  48. return BindingBuilder.bind(simpleQueue()).to(simpleExchange()).with("simple");
  49. }
  50. }
  • 消息没有发送到交换机:就会触发confirm方法 返回ack为false 发送成功则为turn 消息发送到交换机 但是没有路由到队列 就会触发returncallback方法 返回失败的原因 以及状态,路由等消息
  • 消息持久化 :

    • 交换机持久化:mq中交换机默认数不持久化的如果宕机重启就会丢失消息 所以需要持久化: (事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。)

      @Bean
      public DirectExchange simpleExchange(){
      // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
      return new DirectExchange("simple.direct", true, false);
      }
      
    • 队列持久化:在rabbitmq中队列默认是不持久化的,需要在声明的时候持久化

      @Bean
      public Queue simpleQueue(){
         return new Queue("simple.queue",true);
      }
      //事实上,默认情况下,由SpringAMQP声明的队列都是持久化的
      
    • 消息持久化

image.png

  • 消费者确认:mq在收到消费则确认会删除消息 但是如果消费者发送ack回执给mq后 出现异常导致没有处理消息,消息就会丢失了 索引需要设置消费者的回复时机

    • springAMQP可以有3种确认模式

      • none :关闭ack, mq会默认假定消费者在收到消息会马上处理,消息会被删除掉
      • manual: 手动ack, 代码逻辑需要我们自己完成’
      • auto:自动ack, 由spring检测listener代码的异常,没有异常则返回ack 否则nack(推荐使用)
        配置:spring:
        rabbitmq:
        listener:
        simple:
        acknowledge-mode: none # 关闭ack auto 自动 manual手动ack
        默认我们使用auto
        
    • 失败重试机制:(消费者消费失败的机制 :会会发起重试机制 就是隔一段时间在次处理消息)

      spring:
      rabbitmq:
      listener:
       simple:
         retry:
           enabled: true # 开启消费者失败重试
           initial-interval: 1000ms # 初识的失败等待时长为1秒
           multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
           max-attempts: 3 # 最大重试次数
           stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
      
    • 重试失败策略:在重试失败后,有3种处理消息的策略

      • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
      • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
      • RepublishMessageRecoverer:重试耗尽之后,发送到指定的交换机:需要定义一个republishemassagerecover 来绑定回收的交换机和队列
        @Configuration
        public class ErrorMessageConfig {
        @Bean
        public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
        }
        @Bean
        public Queue errorQueue(){
        return new Queue("error.queue", true);
        }
        @Bean
        public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
        }
        //2)定义一个RepublishMessageRecoverer,关联队列和交换机
        @Bean
        public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
        }
        }
        

        .总结

        如何确保RabbitMQ消息的可靠性?
  • 开启生产者确认机制,确保生产者的消息能到达队列

  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
  • 死信交换机

    • 死信:
      • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false(消息会拒绝,且重新入列为false)
      • 消息是一个过期消息,超时无人消费(消息超时了)
      • 要投递的队列消息满了,无法投递(队列满了)
    • 死信交换机:在定义交换机的时候,
      @Bean
      public Queue simpleQueue(){
         return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
                 .deadLetterExchange("dl.direct") // 指定死信交换机
                 .build();
      }
      // 声明死信交换机 dl.direct
      @Bean
      public DirectExchange dlExchange(){
         return new DirectExchange("dl.direct", true, false);
      }
      // 声明存储死信的队列 dl.queue
      @Bean
      public Queue dlQueue(){
         return new Queue("dl.queue", true);
      }
      // 将死信队列 与 死信交换机绑定
      @Bean
      public Binding dlBinding(){
         return BindingBuilder.bind(simpleQueue()).to(dlExchange()).with("simple");
      }
      
  • TTL:消息超时分2种情况:一是消息设置超时,而是队列设置了超时时间,如果都设置 了则按时间最短的为标准

  • 利用死信交换机来实现延迟消息的发送:原理,声明一个死信队列来保存死信(给队列或者消息设置超时时间)并声明绑定死信交换机(消息过期会发送到这个交换机上),在声明一个交换机(死信交换机是direct),再声明一个队列来和死信交换机绑定就可以接收超时了的死信, 消费者监听这个队列就行 当消息到达时间 就可以收到消息,了; ```java @Bean public Queue ttlQueue(){ return QueueBuilder.durable(“ttl.queue”) // 指定队列名称,并持久化
      .ttl(10000) // 设置队列的超时时间,10秒
      .deadLetterExchange("dl.ttl.direct") // 指定死信交换机
      .build();
    
    }

//消息也可以指定超时时间 @Test public void testTTLMsg() { // 创建消息 Message message = MessageBuilder .withBody(“hello, ttl message”.getBytes(StandardCharsets.UTF_8)) .setExpiration(“5000”) .build(); // 消息ID,需要封装到CorrelationData中 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 发送消息 rabbitTemplate.convertAndSend(“ttl.direct”, “ttl”, message, correlationData); log.debug(“发送消息成功”); }


- **延迟队列(延迟交换机):**
   - 延迟队列可以更好的发送延迟短信,
   - 应用场景:延迟发短信,处理超时没有支付的订单,预约工作会议等..
   - 延迟队列需要安装一个插件delay queue(DelayExchange)
   - 原理:
      - 需要声明一个交换机 属性为delayed类型
      - 发送消息的时候 使用messagebuilder 设置消息的消息头setHeader("x-delay",超时时间) 必须设置消息头携带:""x-delay" 延迟时间
      - 延迟交换机 接收到消息头的时候会去检查是否携带x-delay 如果携带就会持久化到硬盘,读取x-delay的属性值作为延迟时间
      - 返回routing not found结果给消息发送者
      - x-delay时间到期后,重新投递消息到指定队列
      - 代码实现如下
```java
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct",delayed = "true"),
        key = "delay"
))


或者
 @Bean
    public DirectExchange exchange(){
          //  return new DirectExchange("dead.direct",true,false);
            return ExchangeBuilder.directExchange("dead.direct").delayed().build();
    }
  • 惰性队列:

    • 消息堆积问题:当发送消息的速度大于消费者处理消息的速度,就会造成消息堆积,直到到达队列的上限,之后在发的消息就会变成死信,可能造成消息的丢失
    • 解决消息堆积方案:
      • 1增加消费者的数量,提高消费速度,也就是work queue模式,
      • 2或扩大队列的容积,提高堆积的上限,
      • 3或者使用惰性队列
    • 惰性队列:
      • 消息不存储在内存,而是存储在硬盘;
      • 消费者需要消息的时候才会读取到内存
      • 支持海量消息的存储
      • 优点:
        • 基于磁盘存储,储存量高,
        • 没有间歇性的page_out操作 性能比较稳定(普通队列持久化的时候需要先到内存,然后page_out操作到磁盘)
      • 缺点:
        • 性能依赖磁盘的io能力,
        • 基于磁盘存储,时效性会降低
    • 基于命名声明惰性队列:略…
    • 基于注解声明惰性队列:

      @bean
      public Queue queue(){
      return  QueueBuilde.durable("lazy.queue")//持久化并声明队列名字
         .lazy()//声明为惰性队列
         .build();
      }
      
    • 基于@RabbitListener注解声明惰性队列

      @RabbitListener(queuesToDeclare = @Queue(
             name = "lazy.queue",durable = "true",
             arguments = @Argument(name = "x-queue-mode",value = "lazy")
      ))
      
  • MQ集群:RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式 集群的rabbit需要有相同的erlang cookie 才可以实现集群。RabbitMQ的集群有两种模式:

  • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
  • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。
  • 普通集群 也叫标准集群
    • 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
    • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
    • 队列所在节点宕机,队列中的消息就会丢失(实际上队列只保存在创建队列的mq节点)
  • 镜像集群 :本质上就是 主从模式
    • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
    • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点
    • 一个队列的主节点可能是另一个队列的镜像节点
    • 所有操作都是主节点完成,然后同步给镜像节点
    • 主宕机后,镜像节点会替代成新的主节点(如果在主从同步完成前,主就已经宕机,可能出现数据丢失)
    • 会有数据丢失的风险
  • 仲裁队列:
    • 仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:
      • 与镜像队列一样,都是主从模式,支持主从数据同步
      • 使用非常简单,没有复杂的配置
      • 主从同步基于Raft协议,强一致