MQ消费重试的常见手段

在使用MQ时,必不可少的需要结合当前场景,考虑消费失败时如何处理。对于消息不重要的场景,失败就失败了,继续往下消费就行。但对于消息重要的场景,就需要有一定的机制去保证消息最终处理成功。
机制的手段有很多,大体思路都是,先进行重试,重试一定次数之后就走兜底逻辑。

  • 对于重试,可进行固定时间间隔的重试或做间隔时间递增的重试等
  • 对于兜底逻辑,包括消息投递到死信队列、告警、人工补偿、定时对账补偿等

下面主要讲如何使用好kafka和pulsar自带的消费重试功能,对于兜底逻辑来说只做异常日志的记录,其它手段较灵活,不在本文讨论范围内。另外本文使用的spring-kafka版本为2.3.0,pulsar版本为2.7.0。

kafka重试

消费失败重试

这里使用的是spring-kafka客户端
一个spring-kakfa consumer的使用例子如下:

  1. @Component
  2. @Slf4j
  3. public class DemoListener {
  4. @KafkaListener(id = "groupId", topics = "demo-topic",
  5. containerFactory = "defaultKafkaListenerContainerFactory")
  6. public void handleMsg(ConsumerRecord<?, String> record, Acknowledgment ack) {
  7. try {
  8. doHandleMsg(record);
  9. //流程正常结束的手动ack。若中间有流程抛异常,会进入kafka的重试流程进行自动重试
  10. ack.acknowledge();
  11. } catch (Exception e) {
  12. log.error("[DemoListener.handleMsg] error, record:{}, exception:{}", record, Throwables.getStackTraceAsString(e));
  13. }
  14. }
  15. }

对于consumer的配置,定义在了bean defaultKafkaListenerContainerFactory

  1. @Bean("defaultKafkaListenerContainerFactory")
  2. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  3. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  4. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
  5. factory.setErrorHandler(defaultErrorHandler());
  6. //省略部分配置
  7. return factory;
  8. }
  9. private ErrorHandler defaultErrorHandler() {
  10. FixedBackOff fixedBackOff = new FixedBackOff(3 * 1000, 3);
  11. return new SeekToCurrentErrorHandler((record, exception) -> {
  12. log.error("consume fail after retry, record:{}, exception:{}", record, Throwables.getStackTraceAsString(exception));
  13. }, fixedBackOff);
  14. }

可以看到,先通过设置手动ACK,再配置了ErrorHandler,该ErrorHandler实际为SeekToCurrentErrorHandler,第一个参数设置了当达到重试次数后的处理策略,这里为打印了一条错误日志;第二个参数为重试频率,这里通过FixedBackOff设置了3秒重试一次,总共重试3次。

这是一种通过SeekToCurrentErrorHandler配置失败重试的方式,还有一种方式是设置RetryTemplate, RecoveryCallback,如下:

  1. @Bean("defaultKafkaListenerContainerFactory")
  2. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  3. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  4. factory.setRetryTemplate(kafkaRetry());
  5. factory.setRecoveryCallback(retryContext -> {
  6. ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
  7. log.info("Recovery is called for message {} ", consumerRecord.value());
  8. return Optional.empty();
  9. });
  10. //省略部分配置
  11. return factory;
  12. }
  13. public RetryTemplate kafkaRetry(){
  14. RetryTemplate retryTemplate = new RetryTemplate();
  15. FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
  16. fixedBackOffPolicy.setBackOffPeriod(3*1000l);
  17. retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
  18. SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
  19. retryPolicy.setMaxAttempts(3);
  20. retryTemplate.setRetryPolicy(retryPolicy);
  21. return retryTemplate;
  22. }

对于两者来说,都是先通过配置的重试策略,比如3s重试一次,超过3次之后就进入兜底的方法打一条日志。
那么这两种方式有什么区别的?
后者的重试方式是,直接在客户端内存中进行重试,这时不会调用Consumer.poll()去拉取新的数据,直到重试结束。我们直到,kafka对consumer做健康检查,是通过两次poll的时间间隔,与max.poll.interval.ms配置做比较,如果超过了这个时间,会认为消费者节点有问题,从而进行partition与consumer的rebalance,导致消费者消费不到消息。
对于前者SeekToCurrentErrorHandler来说,实现机制是这样的,当消费失败时,将调用Consumer#seek(partition, offset)方法,offset为当前消费失败消息的offset,这样下次去poll消息时还是可以拿到该条记录。这种方式就不会出现阻塞poll,从而导致rebalance的情况出现。

死信队列

死信队列的使用方式如下

  1. @Bean
  2. public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {
  3. DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
  4. BackOff backOff = new FixedBackOff(3 * 1000L, 3L);
  5. return new SeekToCurrentErrorHandler(recoverer, backOff);
  6. }

经过定义的重试间隔后,若还是没有消费成功,消息会发送到DeadLetterPublishingRecoverer中定义的死信队列,topic为原有topic加上.DLT后缀。因此如果使用死信队列的处理方式,意味着需要定义多一个topic。

Pulsar重试

消费失败重试

Pulsar消费失败的处理分两种:

  • 没有进行ack
  • 调用consumer.negativeAcknowledge(message) (即nack)来表示消费失败

对于没有进行ack的场景,消费者通过ConsumerBuilder#ackTimeout(ackTimeout, timeUnit)指定的时间进行重新消费,即在过指定时间后该条消息会重新消费,若不指定则不会进行重新投递。
对于nack的场景,则通过ConsumerBuilder#negativeAckRedeliveryDelay(redeliveryDelay, timeUnit)指定的时间进行重新消费,默认是1分钟。

死信队列

默认情况下消费重试会无限次进行,但一些场景下可能并不希望重试无限进行,这个场景下可以使用死信队列。如下为Pulsar死信队列配置类:

  1. public class DeadLetterPolicy {
  2. //最大的重新消费次数,超过此次数的消息会被放到deadLetterTopic中
  3. private int maxRedeliverCount;
  4. //死信队列名
  5. private String deadLetterTopic;
  6. //重试队列名,下文会讲
  7. private String retryLetterTopic;
  8. }

重试队列

Pulsar支持重试队列的概念,意思是当消费失败时,消息会投递在重试topic中。如果设置了enableRetry(true),该重试会在当前消费者上进行。

  1. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
  2. .topic(topic)
  3. .subscriptionName("my-subscription")
  4. .subscriptionType(SubscriptionType.Shared)
  5. .enableRetry(true)
  6. .receiverQueueSize(100)
  7. //配置重试队列
  8. .deadLetterPolicy(DeadLetterPolicy.builder()
  9. .maxRedeliverCount(maxRedeliveryCount)
  10. .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
  11. .build())
  12. .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
  13. .subscribe();

如果把重试队列设置成当前topic本身,即可实现在当前topic上进行重试,不需要再定义一个重试队列。

对比

相似点

Kafka与Pulsar都有配置重试间隔、重试次数、死信队列等能力

差异点

  • Kafka除了支持固定重试时间间隔外,通过ExponentialBackOff还可支持递增时间的重试间隔,而Pulsar目前看下来应该是只支持固定时间间隔的重试
  • Pulsar自带支持重试队列,可在额外的队列中进行重试,而kafka自带的功能是在当前topic上进行重试
  • kafka支持达到重试次数之后的兜底逻辑处理,而Pulsar看起来是不支持,即达到次数之后就什么也不做

参考链接