出自图灵学院 我整理了一下,把老师说的话,敲了一下记录下来发了个博客

概述

首先对于广播模式的消息, 是不存在消息重试的机制的,即消息消费失败后,不会再重新进行发送,而只是继续消费新的消息。

而对于普通的消息,只要消费者消费消息失败后,就会自动进行重试.

如何让消息进行重试

集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。可以有三种配置方式:

  • 返回Action.ReconsumeLater-推荐
  • 返回null
  • 抛出异常
  1. public class MessageListenerImpl implements MessageListener {
  2. @Override
  3. public Action consume(Message message, ConsumeContext context) {
  4. //处理消息
  5. doConsumeMessage(message);
  6. //方式1:返回 Action.ReconsumeLater,消息将重试
  7. return Action.ReconsumeLater;
  8. //方式2:返回 null,消息将重试
  9. return null;
  10. //方式3:直接抛出异常, 消息将重试
  11. throw new RuntimeException("Consumer Message exceotion");
  12. }
  13. }

如何让消息失败不进行重试

如果希望消费失败后不重试,可以直接返回Action.CommitMessage。

  1. public class MessageListenerImpl implements MessageListener {
  2. @Override
  3. public Action consume(Message message, ConsumeContext context) {
  4. try {
  5. doConsumeMessage(message);
  6. } catch (Throwable e) {
  7. //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
  8. return Action.CommitMessage;
  9. }
  10. //消息处理正常,直接返回 Action.CommitMessage;
  11. return Action.CommitMessage;
  12. }
  13. }

重试消息如何处理

如果你消息消费失败,就会自动进入重试队列里面. 重试队列是RocketMQ自动生成的,它会给你每个消费者组创建一个重试队列, 你消息一旦消费不成功,就会进入到这个队列里面进行不断的重试.

重试的消息会进入一个 “%RETRY%”+ConsumeGroup 的队列中。

RocketMQ消息重试机制使用以及注意事项原理 - 图1

消息重试次数以及间隔时间

然后RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:

重试次数 与上次重试的间隔时间 重试次数 与上次重试的间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

这个重试时间跟延迟消息的延迟级别是对应的。不过取的是延迟级别的后16级别。

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

这个重试时间可以将源码中的org.apache.rocketmq.example.quickstart.Consumer里的消息监听器返回状态改为RECONSUME_LATER测试一下。

自定义重试间隔时间

关于这个重试次数,RocketMQ可以进行定制。例如通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2小时。

需要注意配置覆盖问题:

消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效。并且最后启动的Consumer会覆盖之前启动的Consumer的配置。

消息重试超过上限怎么办?

如果重试次数设置的是16,当消息重试16次后仍然失败,消息肯定是不能丢掉的,消息将不再投递。转为进入死信队列。

RocketMQ的死信队列要比RabbitMQ的死信队列简单的很多.不需要那么繁琐的配置.当你消息要放入死信队列的时候,RocketMQ默认就会给你创建死信队列.

死信队列的创建规则和重试队列的创建规则是差不多的, 死信队列topic创建规则是是把消费者组的前缀加了个 %DLQ%了.

RocketMQ消息重试机制使用以及注意事项原理 - 图2

关于MessageId的问题需要注意

在老版本的RocketMQ中,一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。

但是在4.7.1版本中,每次重试MessageId都会重建。