问题

:::tips 当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致RabbitMQ的压力飙升

可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到队列 :::

开启本地重试

:::tips 开启本地重试后,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试,重试次数达到设定的次数后,Spring会返回ack,消息就会被丢弃 :::

添加配置

:::tips 在消息消费者的配置文件中添加配置 :::

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. retry:
  6. #开启消费者失败重试
  7. enabled: true
  8. #配置第一次失败重试的等待时长,单位:毫秒
  9. initial-interval: 1000
  10. #失败后等待时长倍数,下次等待时长=multiplier*last-interval
  11. multiplier: 1
  12. #配置最大重试次数
  13. max-attempts: 3
  14. #配置是否是无状态:true无状态,false有状态。如果业务中包含事务,这里改为false
  15. stateless: true

配置失败策略

:::tips 开启本地重试后,消息处理过程中出现异常,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的,这样显然是不行的,我们需要定义MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

一般采用的处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的专门存放异常消息的队列,后续由人工集中处理 :::

编写代码

:::tips 在消息消费者服务中的配置类中声明处理失败消息的交换机和队列,同时声明绑定交换机和队列

将自定义的MessageRecovery接口对象注册到Spring容器中,并关联声明的交换机和路由key,消息消费失败后就会被路由到声明的交换机中,交换机会通过路由key将消息路由给所有绑定了路由key的消息队列中 :::

  1. @Configuration
  2. public class XxxConfig{
  3. //声明处理失败消息的交换机
  4. @Bean
  5. public DirectExchange errorMessageExchange(){
  6. return new DirectExchange("error.direct");
  7. }
  8. //声明存放失败消息的队列
  9. @Bean
  10. public Queue errorQueue(){
  11. return new Queue("error.queue", true);
  12. }
  13. //声明绑定存放失败消息的交换机和队列
  14. @Bean
  15. public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
  16. return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
  17. }
  18. //将自定义的MessageRecovery接口对象注册到Spring容器中,并关联声明的交换机和路由key,消息消费失败后就会被路由到声明的交换机中,交换机会通过路由key将消息路由给所有绑定了路由key的消息队列中
  19. @Bean
  20. public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
  21. return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
  22. }
  23. }