问题
:::tips 当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致RabbitMQ的压力飙升
可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到队列 :::
开启本地重试
:::tips 开启本地重试后,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试,重试次数达到设定的次数后,Spring会返回ack,消息就会被丢弃 :::
添加配置
:::tips 在消息消费者的配置文件中添加配置 :::
spring:
rabbitmq:
listener:
simple:
retry:
#开启消费者失败重试
enabled: true
#配置第一次失败重试的等待时长,单位:毫秒
initial-interval: 1000
#失败后等待时长倍数,下次等待时长=multiplier*last-interval
multiplier: 1
#配置最大重试次数
max-attempts: 3
#配置是否是无状态:true无状态,false有状态。如果业务中包含事务,这里改为false
stateless: true
配置失败策略
:::tips 开启本地重试后,消息处理过程中出现异常,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的,这样显然是不行的,我们需要定义MessageRecovery接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
一般采用的处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的专门存放异常消息的队列,后续由人工集中处理 :::
编写代码
:::tips 在消息消费者服务中的配置类中声明处理失败消息的交换机和队列,同时声明绑定交换机和队列
将自定义的MessageRecovery接口对象注册到Spring容器中,并关联声明的交换机和路由key,消息消费失败后就会被路由到声明的交换机中,交换机会通过路由key将消息路由给所有绑定了路由key的消息队列中 :::
@Configuration
public class XxxConfig{
//声明处理失败消息的交换机
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
//声明存放失败消息的队列
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
//声明绑定存放失败消息的交换机和队列
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
//将自定义的MessageRecovery接口对象注册到Spring容器中,并关联声明的交换机和路由key,消息消费失败后就会被路由到声明的交换机中,交换机会通过路由key将消息路由给所有绑定了路由key的消息队列中
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}