场景

假设在MQ使用过程当中,并未出现消息丢失和消息重复的问题,但是因为消费者系统(优惠券系统)出现了数据库宕机的情况,此时,就会导致优惠券系统从MQ中获取的消息没办法进行业务处理写入优惠券系统数据库。

当数据库发生宕机时,监听器回调函数返回的消息状态并不是 CONSUME_SUCCESS ,而应该是在捕获异常后的 RECOMSUME_LATER 状态。

  1. comsumer.registerMessageListener(
  2. new MessageListenerConcurrently() {
  3. @Override
  4. public ConsumeConcurrentlyStatus consumeMessage(
  5. List<MessageExt> msgs,
  6. ConsumeConcurrentlyContext context) {
  7. try {
  8. // 在这里对获取到的msgs订单进行处理
  9. // 比如增加积分、发送优惠券、通知发货等等
  10. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  11. } catch(Exception e) {
  12. // 如果因为数据库宕机等问题,对消息处理失败了
  13. // 此时返回一个稍后重试消费的状态
  14. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  15. }
  16. }
  17. )
  18. }
  19. )

RocketMQ是如何让你进行消费重试的

当消费者系统返回 RECONSUME_LATER 状态,MQ会将这批消息放到消费组的重试队列中去。
比如优惠券系统的消费组 VoucherConsumerGroup ,会有一个 %RETRY%VoucherConsumerGroup 名称的重试队列。消息就会存入到这个重试队列当中。然后过一段时间之后,重试队列的消息会再次被消费者系统获取到,然后继续进行处理,如果再次失败,就又会返回 RECONSUME_LATER ,然后再过一段时间再让消费者系统重试处理。默认最多重试16次,每次的重试之间的间隔是不一样的,间隔时间可以按照如下配置:

  1. messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

如果连续重试了16次还是无法处理消息,这时候就会将这批消息存入 死信队列(顾名思义:就是死掉的消息放入这个队列)—— 如果重试了16次还一直没有处理成功,就不要继续重试这批消息了,就认为这批消息死掉了,然后这批消息就会自动进入死信队列(类似名称 %DLQ%VoucherConsumerGroup )。

针对死信队列的消息的处理,可以根据使用场景,比如专门开一个后台线程,然后订阅死信队列,对死信队列的信息进行不断的重试。

消息处理失败场景的方案总结

消费者底层的一些依赖在发生故障的时候,比如数据库宕机,缓存宕机之类的,此时就无法完成对MQ消息的处理了,那么就可以通过一些返回状态去让消息进入RocketMQ自带的重试队列 %RETRY%
同时,如果反复重试还是不行,可以让消息进入RocketMQ自带的死信队列 %DLQ% ,后续再针对死信队列中的消息进行单独的处理就可以了。