场景
假设在MQ使用过程当中,并未出现消息丢失和消息重复的问题,但是因为消费者系统(优惠券系统)出现了数据库宕机的情况,此时,就会导致优惠券系统从MQ中获取的消息没办法进行业务处理写入优惠券系统数据库。
当数据库发生宕机时,监听器回调函数返回的消息状态并不是 CONSUME_SUCCESS
,而应该是在捕获异常后的 RECOMSUME_LATER
状态。
comsumer.registerMessageListener(
new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
// 在这里对获取到的msgs订单进行处理
// 比如增加积分、发送优惠券、通知发货等等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch(Exception e) {
// 如果因为数据库宕机等问题,对消息处理失败了
// 此时返回一个稍后重试消费的状态
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
)
}
)
RocketMQ是如何让你进行消费重试的
当消费者系统返回 RECONSUME_LATER
状态,MQ会将这批消息放到消费组的重试队列中去。
比如优惠券系统的消费组 VoucherConsumerGroup
,会有一个 %RETRY%VoucherConsumerGroup
名称的重试队列。消息就会存入到这个重试队列当中。然后过一段时间之后,重试队列的消息会再次被消费者系统获取到,然后继续进行处理,如果再次失败,就又会返回 RECONSUME_LATER
,然后再过一段时间再让消费者系统重试处理。默认最多重试16次,每次的重试之间的间隔是不一样的,间隔时间可以按照如下配置:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
如果连续重试了16次还是无法处理消息,这时候就会将这批消息存入 死信队列(顾名思义:就是死掉的消息放入这个队列)—— 如果重试了16次还一直没有处理成功,就不要继续重试这批消息了,就认为这批消息死掉了,然后这批消息就会自动进入死信队列(类似名称 %DLQ%VoucherConsumerGroup
)。
针对死信队列的消息的处理,可以根据使用场景,比如专门开一个后台线程,然后订阅死信队列,对死信队列的信息进行不断的重试。
消息处理失败场景的方案总结
消费者底层的一些依赖在发生故障的时候,比如数据库宕机,缓存宕机之类的,此时就无法完成对MQ消息的处理了,那么就可以通过一些返回状态去让消息进入RocketMQ自带的重试队列 %RETRY%
。
同时,如果反复重试还是不行,可以让消息进入RocketMQ自带的死信队列 %DLQ%
,后续再针对死信队列中的消息进行单独的处理就可以了。