应用场景

前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略:

  • 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的成功率。
  • 自定义错误处理逻辑:如果业务上,消息处理失败之后有明确的降级逻辑可以弥补的,可以采用这种方式,但是2.0.x版本有Bug,2.1.x版本修复。

那么如果代码本身存在逻辑错误,无论重试多少次都不可能成功,也没有具体的降级业务逻辑,之前在深入思考中讨论过,可以通过日志,或者降级逻辑记录的方式把错误消息保存下来,然后事后分析、修复Bug再重新处理。但是很显然,这样做非常原始,并且太过笨拙,处理复杂度过高。所以,本文将介绍利用中间件特性来便捷地处理该问题的方案:使用RabbitMQ的DLQ(死信)队列。

动手试试

准备一个会消费失败的例子,可以直接沿用前文的工程。也可以新建一个,然后创建如下代码的逻辑:

  1. @SpringBootApplication
  2. @EnableBinding(DLQApplication.TestTopic.class)
  3. public class DLQApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(DLQApplication.class, args);
  6. }
  7. @RestController
  8. static class TestController {
  9. @Autowired
  10. private TestTopic testTopic;
  11. @GetMapping("/sendMessage")
  12. public String messageWithMQ(@RequestParam String message) {
  13. testTopic.output().send(MessageBuilder.withPayload(message).build());
  14. return "ok";
  15. }
  16. }
  17. @Slf4j
  18. @Component
  19. static class TestListener {
  20. @StreamListener(TestTopic.INPUT)
  21. public void receive(String payload) throws InterruptedException {
  22. log.info("Received: " + payload);
  23. Thread.sleep(5000);
  24. throw new RuntimeException("Message consumer failed !");
  25. }
  26. }
  27. interface TestTopic {
  28. String OUTPUT = "example-topic-output";
  29. String INPUT = "example-topic-input";
  30. @Output(OUTPUT)
  31. MessageChannel output();
  32. @Input(INPUT)
  33. SubscribableChannel input();
  34. }
  35. }

内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。
在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名)、并设置一下分组,比如:

  1. spring.cloud.stream.bindings.example-topic-output.destination=dlq-topic
  2. spring.cloud.stream.bindings.example-topic-input.destination=dlq-topic
  3. # 最大失败重试次数
  4. spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
  5. spring.cloud.stream.bindings.example-topic-input.group=stream-dlq-handler
  6. # 开启死信队列
  7. spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true
  8. # 将失败消息推送死信队列
  9. spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.republish-to-dlq=true
  10. # 设置死信队列消息存活时间ms
  11. spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.dlq-ttl=20000

这里加入了一个重要配置spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true,用来开启DLQ(死信队列)。
完成了上面配置之后,启动应用并访问localhost:8080/send/errot/message?message=hello接口来发送一个消息到MQ中了,此时可以看到消费失败后抛出了异常,消息消费失败,记录了日志。此时,可以查看RabbitMQ的控制台如下:image.png
其中,dlq-topic.stream-dlq-handler.dlq队列就是dlq-topic.stream-dlq-handler的dlq(死信)队列,当dlq-topic.stream-dlq-handler队列中的消息消费时候之后,就会将这条消息原封不动的转存到dlq队列中。这样这些没有得到妥善处理的消息就通过简单的配置实现了存储,之后,我们还可以通过简单的操作对这些消息进行重新消费。我们只需要在控制台中点击dlq-topic.stream-dlq-handler.dlq队列的名字进入到详情页面之后,使用Move messages功能,直接将这些消息移动回dlq-topic.stream-dlq-handler队列,这样这些消息就能重新被消费一次。

更换配置时容易出现异常 2021-07-09 16:48:23.209 ERROR 40352 —- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg ‘x-message-ttl’ for queue ‘dlq-topic.stream-dlq-handler.dlq’ in vhost ‘/‘: received ‘20000’ but current is ‘10000’, class-id=50, method-id=10) 是因为两者队列不一致造成的,可以更换topic或者删除旧的队列

  1. 消费逻辑里面加了5秒延迟,可以看到消息先到 dlq-topic.stream-dlq-handler 队列

image.png

  1. 消费失败后转到了 dlq-topic.stream-dlq-handler.dlq死信队列

image.png

  1. 移动message,选择 dlq-topic.stream-dlq-handler 队列

image.png

  1. 消息被重新消费

image.png
如果Move messages功能中是如下内容:

To move messages, the shovel plugin must be enabled, try:

$ rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management

那是由于没有安装对应的插件,只需要根据提示的命令安装就能使用该命令了。

深入思考

先来总结一下在引入了RabbitMQ的DLQ之后,对于消息异常处理更为完整一些的基本思路:

  1. 瞬时的环境抖动引起的异常,利用重试功能提高处理成功率
  2. 如果重试依然失败的,日志报错,并进入DLQ队列
  3. 日志告警通知相关开发人员,分析问题原因
  4. 解决问题(修复程序Bug、扩容等措施)之后,DLQ队列中的消息移回重新处理

在这样的整体思路中,可能还涉及一些微调,这里举几个常见例子,帮助读者进一步了解一些特殊的场景和配置使用!
场景一:有些消息在业务上存在时效性,进入死信队列之后,过一段时间再处理已经没有意义,这个时候如何过滤这些消息呢?
只需要配置一个参数即可:

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.dlq-ttl=10000

该参数可以控制DLQ队列中消息的存活时间,当超过配置时间之后,该消息会自动的从DLQ队列中移除。

场景二:可能进入DLQ队列的消息存在各种不同的原因(不同异常造成的),此时如果在做补救措施的时候,还希望根据这些异常做不同的处理时候,我们如何区分这些消息进入DLQ的原因呢?
再来看看这个参数:

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.republish-to-dlq=true

该参数默认是false,如果设置了死信队列的时候,会将消息原封不动的发送到死信队列(也就是上面例子中的实现),此时大家可以在RabbitMQ控制台中通过Get message(s)功能来看看队列中的消息。
这样,不论我们是通过移回原通道处理还是新增订阅处理这些消息的时候就可以以此作为依据进行分类型处理了。