下面几天就集中来详细聊聊,当消息消费失败之后该如何处理的几种方式。不过不论哪种方式,都需要与具体业务结合,解决不同业务场景可能出现的问题。
今天第一节,介绍一下Spring Cloud Stream中默认就已经配置了的一个异常解决方案:重试!

应用场景

依然要明确一点,任何解决方案都要结合具体的业务实现来确定,不要有了锤子看什么问题都是钉子。那么重试可以解决什么问题呢?由于重试的基础逻辑并不会改变,所以通常重试只能解决因环境不稳定等外在因素导致的失败情况,比如:当我们接收到某个消息之后,需要调用一个外部的Web Service做一些事情,这个时候如果与外部系统的网络出现了抖动,导致调用失败而抛出异常。这个时候,通过重试消息消费的具体逻辑,可能在下一次调用的时候,就能完成整合业务动作,从而解决刚才所述的问题。

动手试试

先通过一个小例子来看看Spring Cloud Stream默认的重试机制是如何运作的。之前在如何消费自己生产的消息一文中的例子,我们可以继续沿用,或者也可以精简一些,都写到一个主类中,比如下面这样:

  1. @SpringBootApplication
  2. @EnableBinding(MqAttemptApplication.TestTopic.class)
  3. public class MqAttemptApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(MqAttemptApplication.class, args);
  6. }
  7. @RestController
  8. static class TestController {
  9. @Autowired
  10. private TestTopic testTopic;
  11. @GetMapping("/sendMessage")
  12. public String messageWithMQ(@RequestParam String message) {
  13. testTopic.output().send(MessageBuilder.withPayload(message).build());
  14. return "ok";
  15. }
  16. }
  17. @Slf4j
  18. @Component
  19. static class TestListener {
  20. @StreamListener(TestTopic.INPUT)
  21. public void receive(String payload) {
  22. log.info("Received: " + payload);
  23. throw new RuntimeException("Message consumer failed!");
  24. }
  25. }
  26. interface TestTopic {
  27. String OUTPUT = "example-topic-output";
  28. String INPUT = "example-topic-input";
  29. @Output(OUTPUT)
  30. MessageChannel output();
  31. @Input(INPUT)
  32. SubscribableChannel input();
  33. }
  34. }

内容很简单,既包含了消息的生产,也包含了消息消费。与之前例子不同的就是在消息消费逻辑中,主动的抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

  1. spring.cloud.stream.bindings.example-topic-input.destination=test-topic
  2. spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

  1. 2021-07-09 13:18:31.050 INFO 31288 --- [SOCeVNfyVnFWg-1] c.s.MqAttemptApplication$TestListener : Received: hello
  2. 2021-07-09 13:18:32.064 INFO 31288 --- [SOCeVNfyVnFWg-1] c.s.MqAttemptApplication$TestListener : Received: hello
  3. 2021-07-09 13:18:34.079 INFO 31288 --- [SOCeVNfyVnFWg-1] c.s.MqAttemptApplication$TestListener : Received: hello
  4. 2021-07-09 13:18:34.081 ERROR 31288 --- [SOCeVNfyVnFWg-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking com.snow.MqAttemptApplication$TestListener#receive[1 args]; nested exception is java.lang.RuntimeException: Message consumer failed!, failedMessage=GenericMessage [payload=byte[5], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=test-topic, amqp_receivedExchange=test-topic, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=test-topic.anonymous.tSTuDxIbRSOCeVNfyVnFWg, amqp_redelivered=false, id=d53c1920-182f-fb02-e0f9-9b5b93f52cf8, amqp_consumerTag=amq.ctag-9DCR9EY8oC-JnH4BFoKlIQ, contentType=application/json, timestamp=1625807911046}]
  5. at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)
  6. at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
  7. at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
  8. at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
  9. at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
  10. at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
  11. at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
  12. at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
  13. at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
  14. at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
  15. at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
  16. at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
  17. at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
  18. at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
  19. at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:60)
  20. at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:214)
  21. at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
  22. at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
  23. at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:211)
  24. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
  25. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
  26. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
  27. at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
  28. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
  29. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
  30. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
  31. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
  32. at java.lang.Thread.run(Thread.java:748)
  33. Caused by: java.lang.RuntimeException: Message consumer failed!
  34. at com.snow.MqAttemptApplication$TestListener.receive(MqAttemptApplication.java:47)
  35. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  36. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  37. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  38. at java.lang.reflect.Method.invoke(Method.java:498)
  39. at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
  40. at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
  41. at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
  42. ... 27 more

从日志中可以看到,一共输出了三次Received: hello,也就是说消息消费逻辑执行了3次,然后抛出了最终执行失败的异常(三次)。

设置重复次数

默认情况下Spring Cloud Stream会重试3次,我们也可以通过配置的方式修改这个默认配置,比如下面的配置可以将重试次数调整为1次:

spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1

对于一些纯内部计算逻辑,不需要依赖外部环境,如果出错通常是代码逻辑错误的情况下,不论我们如何重试都会继续错误的业务逻辑可以将该参数设置为0,避免不必要的重试影响消息处理的速度。

深入思考

完成了上面的基础尝试之后,再思考下面两个问题:
问题一:如果在重试过程中消息处理成功了,还会有异常信息吗?
答案是不会。因为重试过程是消息处理的一个整体,如果某一次重试成功了,会任务对所收到消息的消费成功了。
这个问题可以在上述例子中做一些小改动来验证,比如:

@Slf4j
@Component
static class TestListener {

    int count = 1;
    @StreamListener(TestTopic.INPUT)
    public void receive(String payload) {
        log.info("Received: " + payload + " : count" + count);
        if (count == 3) {
            count = 1;
            return;
        } else {
            throw new RuntimeException("Message consumer failed !" + count++);
        }
    }
}

通过加入一个计数器,当重试是第3次的时候,不抛出异常来模拟消费逻辑处理成功了。此时重新运行程序,并调用接口localhost:8080/sendMessage?message=hello,可以获得如下日志结果,并没有异常打印出来。

2021-07-09 14:26:40.510  INFO 57284 --- [gSQCpUsxoP2Ow-1] c.s.MqAttemptApplication$TestListener    : Received: hello : count1
2021-07-09 14:26:41.516  INFO 57284 --- [gSQCpUsxoP2Ow-1] c.s.MqAttemptApplication$TestListener    : Received: hello : count2
2021-07-09 14:26:43.521  INFO 57284 --- [gSQCpUsxoP2Ow-1] c.s.MqAttemptApplication$TestListener    : Received: hello : count3

也就是,虽然前两次消费抛出了异常,但是并不影响最终的结果,也不会打印中间过程的异常,避免了对日志告警产生误报等问题。

问题二:如果重试都失败之后应该怎么办呢?
如果消息在重试了还是失败之后,目前的配置唯一能做的就是将异常信息记录下来,进行告警。由于日志中有消息的消息信息描述,所以应用维护者可以根据这些信息来做一些补救措施。
当然,这样的做法显然不是最好的,因为太过麻烦。那么怎么做才好呢?且听下回分解!