MQ消费重试的常见手段
在使用MQ时,必不可少的需要结合当前场景,考虑消费失败时如何处理。对于消息不重要的场景,失败就失败了,继续往下消费就行。但对于消息重要的场景,就需要有一定的机制去保证消息最终处理成功。
机制的手段有很多,大体思路都是,先进行重试,重试一定次数之后就走兜底逻辑。
- 对于重试,可进行固定时间间隔的重试或做间隔时间递增的重试等
- 对于兜底逻辑,包括消息投递到死信队列、告警、人工补偿、定时对账补偿等
下面主要讲如何使用好kafka和pulsar自带的消费重试功能,对于兜底逻辑来说只做异常日志的记录,其它手段较灵活,不在本文讨论范围内。另外本文使用的spring-kafka版本为2.3.0,pulsar版本为2.7.0。
kafka重试
消费失败重试
这里使用的是spring-kafka客户端
一个spring-kakfa consumer的使用例子如下:
@Component
@Slf4j
public class DemoListener {
@KafkaListener(id = "groupId", topics = "demo-topic",
containerFactory = "defaultKafkaListenerContainerFactory")
public void handleMsg(ConsumerRecord<?, String> record, Acknowledgment ack) {
try {
doHandleMsg(record);
//流程正常结束的手动ack。若中间有流程抛异常,会进入kafka的重试流程进行自动重试
ack.acknowledge();
} catch (Exception e) {
log.error("[DemoListener.handleMsg] error, record:{}, exception:{}", record, Throwables.getStackTraceAsString(e));
}
}
}
对于consumer的配置,定义在了bean defaultKafkaListenerContainerFactory
@Bean("defaultKafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setErrorHandler(defaultErrorHandler());
//省略部分配置
return factory;
}
private ErrorHandler defaultErrorHandler() {
FixedBackOff fixedBackOff = new FixedBackOff(3 * 1000, 3);
return new SeekToCurrentErrorHandler((record, exception) -> {
log.error("consume fail after retry, record:{}, exception:{}", record, Throwables.getStackTraceAsString(exception));
}, fixedBackOff);
}
可以看到,先通过设置手动ACK,再配置了ErrorHandler
,该ErrorHandler
实际为SeekToCurrentErrorHandler
,第一个参数设置了当达到重试次数后的处理策略,这里为打印了一条错误日志;第二个参数为重试频率,这里通过FixedBackOff设置了3秒重试一次,总共重试3次。
这是一种通过SeekToCurrentErrorHandler
配置失败重试的方式,还有一种方式是设置RetryTemplate
, RecoveryCallback
,如下:
@Bean("defaultKafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setRetryTemplate(kafkaRetry());
factory.setRecoveryCallback(retryContext -> {
ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
log.info("Recovery is called for message {} ", consumerRecord.value());
return Optional.empty();
});
//省略部分配置
return factory;
}
public RetryTemplate kafkaRetry(){
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(3*1000l);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
对于两者来说,都是先通过配置的重试策略,比如3s重试一次,超过3次之后就进入兜底的方法打一条日志。
那么这两种方式有什么区别的?
后者的重试方式是,直接在客户端内存中进行重试,这时不会调用Consumer.poll()
去拉取新的数据,直到重试结束。我们直到,kafka对consumer做健康检查,是通过两次poll的时间间隔,与max.poll.interval.ms
配置做比较,如果超过了这个时间,会认为消费者节点有问题,从而进行partition与consumer的rebalance,导致消费者消费不到消息。
对于前者SeekToCurrentErrorHandler
来说,实现机制是这样的,当消费失败时,将调用Consumer#seek(partition, offset)
方法,offset为当前消费失败消息的offset,这样下次去poll消息时还是可以拿到该条记录。这种方式就不会出现阻塞poll,从而导致rebalance的情况出现。
死信队列
死信队列的使用方式如下
@Bean
public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
BackOff backOff = new FixedBackOff(3 * 1000L, 3L);
return new SeekToCurrentErrorHandler(recoverer, backOff);
}
经过定义的重试间隔后,若还是没有消费成功,消息会发送到DeadLetterPublishingRecoverer
中定义的死信队列,topic为原有topic加上.DLT
后缀。因此如果使用死信队列的处理方式,意味着需要定义多一个topic。
Pulsar重试
消费失败重试
Pulsar消费失败的处理分两种:
- 没有进行ack
- 调用
consumer.negativeAcknowledge(message)
(即nack)来表示消费失败
对于没有进行ack的场景,消费者通过ConsumerBuilder#ackTimeout(ackTimeout, timeUnit)
指定的时间进行重新消费,即在过指定时间后该条消息会重新消费,若不指定则不会进行重新投递。
对于nack的场景,则通过ConsumerBuilder#negativeAckRedeliveryDelay(redeliveryDelay, timeUnit)
指定的时间进行重新消费,默认是1分钟。
死信队列
默认情况下消费重试会无限次进行,但一些场景下可能并不希望重试无限进行,这个场景下可以使用死信队列
。如下为Pulsar死信队列配置类:
public class DeadLetterPolicy {
//最大的重新消费次数,超过此次数的消息会被放到deadLetterTopic中
private int maxRedeliverCount;
//死信队列名
private String deadLetterTopic;
//重试队列名,下文会讲
private String retryLetterTopic;
}
重试队列
Pulsar支持重试队列的概念,意思是当消费失败时,消息会投递在重试topic中。如果设置了enableRetry(true)
,该重试会在当前消费者上进行。
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.receiverQueueSize(100)
//配置重试队列
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
如果把重试队列设置成当前topic本身,即可实现在当前topic上进行重试,不需要再定义一个重试队列。
对比
相似点
Kafka与Pulsar都有配置重试间隔、重试次数、死信队列等能力
差异点
- Kafka除了支持固定重试时间间隔外,通过
ExponentialBackOff
还可支持递增时间的重试间隔,而Pulsar目前看下来应该是只支持固定时间间隔的重试 - Pulsar自带支持重试队列,可在额外的队列中进行重试,而kafka自带的功能是在当前topic上进行重试
- kafka支持达到重试次数之后的兜底逻辑处理,而Pulsar看起来是不支持,即达到次数之后就什么也不做