MQ消费重试的常见手段
在使用MQ时,必不可少的需要结合当前场景,考虑消费失败时如何处理。对于消息不重要的场景,失败就失败了,继续往下消费就行。但对于消息重要的场景,就需要有一定的机制去保证消息最终处理成功。
机制的手段有很多,大体思路都是,先进行重试,重试一定次数之后就走兜底逻辑。
- 对于重试,可进行固定时间间隔的重试或做间隔时间递增的重试等
- 对于兜底逻辑,包括消息投递到死信队列、告警、人工补偿、定时对账补偿等
下面主要讲如何使用好kafka和pulsar自带的消费重试功能,对于兜底逻辑来说只做异常日志的记录,其它手段较灵活,不在本文讨论范围内。另外本文使用的spring-kafka版本为2.3.0,pulsar版本为2.7.0。
kafka重试
消费失败重试
这里使用的是spring-kafka客户端
一个spring-kakfa consumer的使用例子如下:
@Component@Slf4jpublic class DemoListener {@KafkaListener(id = "groupId", topics = "demo-topic",containerFactory = "defaultKafkaListenerContainerFactory")public void handleMsg(ConsumerRecord<?, String> record, Acknowledgment ack) {try {doHandleMsg(record);//流程正常结束的手动ack。若中间有流程抛异常,会进入kafka的重试流程进行自动重试ack.acknowledge();} catch (Exception e) {log.error("[DemoListener.handleMsg] error, record:{}, exception:{}", record, Throwables.getStackTraceAsString(e));}}}
对于consumer的配置,定义在了bean defaultKafkaListenerContainerFactory
@Bean("defaultKafkaListenerContainerFactory")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);factory.setErrorHandler(defaultErrorHandler());//省略部分配置return factory;}private ErrorHandler defaultErrorHandler() {FixedBackOff fixedBackOff = new FixedBackOff(3 * 1000, 3);return new SeekToCurrentErrorHandler((record, exception) -> {log.error("consume fail after retry, record:{}, exception:{}", record, Throwables.getStackTraceAsString(exception));}, fixedBackOff);}
可以看到,先通过设置手动ACK,再配置了ErrorHandler,该ErrorHandler实际为SeekToCurrentErrorHandler,第一个参数设置了当达到重试次数后的处理策略,这里为打印了一条错误日志;第二个参数为重试频率,这里通过FixedBackOff设置了3秒重试一次,总共重试3次。
这是一种通过SeekToCurrentErrorHandler配置失败重试的方式,还有一种方式是设置RetryTemplate, RecoveryCallback,如下:
@Bean("defaultKafkaListenerContainerFactory")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setRetryTemplate(kafkaRetry());factory.setRecoveryCallback(retryContext -> {ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");log.info("Recovery is called for message {} ", consumerRecord.value());return Optional.empty();});//省略部分配置return factory;}public RetryTemplate kafkaRetry(){RetryTemplate retryTemplate = new RetryTemplate();FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();fixedBackOffPolicy.setBackOffPeriod(3*1000l);retryTemplate.setBackOffPolicy(fixedBackOffPolicy);SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();retryPolicy.setMaxAttempts(3);retryTemplate.setRetryPolicy(retryPolicy);return retryTemplate;}
对于两者来说,都是先通过配置的重试策略,比如3s重试一次,超过3次之后就进入兜底的方法打一条日志。
那么这两种方式有什么区别的?
后者的重试方式是,直接在客户端内存中进行重试,这时不会调用Consumer.poll()去拉取新的数据,直到重试结束。我们直到,kafka对consumer做健康检查,是通过两次poll的时间间隔,与max.poll.interval.ms配置做比较,如果超过了这个时间,会认为消费者节点有问题,从而进行partition与consumer的rebalance,导致消费者消费不到消息。
对于前者SeekToCurrentErrorHandler来说,实现机制是这样的,当消费失败时,将调用Consumer#seek(partition, offset)方法,offset为当前消费失败消息的offset,这样下次去poll消息时还是可以拿到该条记录。这种方式就不会出现阻塞poll,从而导致rebalance的情况出现。
死信队列
死信队列的使用方式如下
@Beanpublic ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);BackOff backOff = new FixedBackOff(3 * 1000L, 3L);return new SeekToCurrentErrorHandler(recoverer, backOff);}
经过定义的重试间隔后,若还是没有消费成功,消息会发送到DeadLetterPublishingRecoverer中定义的死信队列,topic为原有topic加上.DLT后缀。因此如果使用死信队列的处理方式,意味着需要定义多一个topic。
Pulsar重试
消费失败重试
Pulsar消费失败的处理分两种:
- 没有进行ack
- 调用
consumer.negativeAcknowledge(message)(即nack)来表示消费失败
对于没有进行ack的场景,消费者通过ConsumerBuilder#ackTimeout(ackTimeout, timeUnit)指定的时间进行重新消费,即在过指定时间后该条消息会重新消费,若不指定则不会进行重新投递。
对于nack的场景,则通过ConsumerBuilder#negativeAckRedeliveryDelay(redeliveryDelay, timeUnit)指定的时间进行重新消费,默认是1分钟。
死信队列
默认情况下消费重试会无限次进行,但一些场景下可能并不希望重试无限进行,这个场景下可以使用死信队列。如下为Pulsar死信队列配置类:
public class DeadLetterPolicy {//最大的重新消费次数,超过此次数的消息会被放到deadLetterTopic中private int maxRedeliverCount;//死信队列名private String deadLetterTopic;//重试队列名,下文会讲private String retryLetterTopic;}
重试队列
Pulsar支持重试队列的概念,意思是当消费失败时,消息会投递在重试topic中。如果设置了enableRetry(true),该重试会在当前消费者上进行。
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).receiverQueueSize(100)//配置重试队列.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
如果把重试队列设置成当前topic本身,即可实现在当前topic上进行重试,不需要再定义一个重试队列。
对比
相似点
Kafka与Pulsar都有配置重试间隔、重试次数、死信队列等能力
差异点
- Kafka除了支持固定重试时间间隔外,通过
ExponentialBackOff还可支持递增时间的重试间隔,而Pulsar目前看下来应该是只支持固定时间间隔的重试 - Pulsar自带支持重试队列,可在额外的队列中进行重试,而kafka自带的功能是在当前topic上进行重试
- kafka支持达到重试次数之后的兜底逻辑处理,而Pulsar看起来是不支持,即达到次数之后就什么也不做
