1.项目地址
https://github.com/GuardFTC/kafka-test.git
2.前言
上一篇文章,我们了解了如何实现Kafka的消息不丢失,但是随之而来的一个问题,就是Kafka消息的重复消费。针对这个问题,我们在本文中进行一个详细的讲解
3.创建Topic
如图,创建一个名为not-repeat-message-topic的Topic
4.模拟消息重复情况的发生
配置
如下,主要变更为:Consumer调整心跳,session过期,以及最大拉取消息时间间隔
其中session过期时间要设置在上图的时间范围内
#服务器配置
server:
port: 8088
#spring配置
spring:
kafka:
bootstrap-servers: 120.48.107.224:9092
producer:
acks: -1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
enable.idempotence: true
consumer:
group-id: not-repeat-message-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
properties:
heartbeat.interval.ms: 1000
session.timeout.ms: 7000
max.poll.interval.ms: 10000
Producer
producer无特殊配置,照常发送消息即可,并且设置异常重试机制
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import cn.hutool.log.StaticLog;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-07 16:13:57
* @describe: Kafka发布消息回调处理逻辑实现类
*/
@Component
@RequiredArgsConstructor
public class KafkaSendCallBackForString implements KafkaSendCallback<String, String> {
private final KafkaTemplate<String, String> kafkaTemplate;
@Override
public void onSuccess(SendResult<String, String> result) {
//1.获取消息属性
ProducerRecord<String, String> producerRecord = result.getProducerRecord();
String topic = producerRecord.topic();
Integer partition = producerRecord.partition();
String key = producerRecord.key();
String value = producerRecord.value();
//2.打印日志
StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);
}
@Override
public void onFailure(KafkaProducerException e) {
//1.获取消息属性
ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();
String topic = failedProducerRecord.topic();
Integer partition = failedProducerRecord.partition();
String key = failedProducerRecord.key();
String value = failedProducerRecord.value();
//2.打印日志
StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);
//3.异常堆栈信息输出
e.printStackTrace();
//4.重发消息
resendFailMessage(failedProducerRecord);
}
/**
* 消息重发
*
* @param producerRecord 消息
*/
private void resendFailMessage(ProducerRecord<String, String> producerRecord) {
//1.获取重试次数 TODO 后续可优化为从缓存中获取
int retryTime = 0;
//2.获取最大重试次数 TODO 后续可优化为写入配置文件中
int maxRetryTime = 10;
//3.重试次数++
retryTime++;
//4.判定是否超过限制
if (retryTime > maxRetryTime) {
String errorMessage = StrUtil.format(LogTemplateEnum.KAFKA_SEND_OVER_TIME_LIMIT.getTemplate(), JSONUtil.toJsonStr(producerRecord));
throw new RuntimeException(errorMessage);
}
//5.未超过限制重发消息
kafkaTemplate.send(producerRecord).addCallback(this);
}
}
import com.ftc.notrepeat.config.KafkaSendCallBackForString;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.concurrent.TimeUnit;
@SpringBootTest
class ProducerTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaSendCallBackForString kafkaSendCallBackForString;
@AfterEach
@SneakyThrows(InterruptedException.class)
void sleep() {
TimeUnit.SECONDS.sleep(2);
}
@Test
void sendMessage() {
//1.发送消息
kafkaTemplate.send("not-repeat-message-topic", "not-repeat-message")
.addCallback(kafkaSendCallBackForString);
}
}
Consumer
- Consumer在消费完消息后,线程Sleep30s,Sleep时长一定要大于max.poll.interval.ms和session.timeout.ms
- 创建2个消费者,共同属于1个消费者组
- 消费完成后手动提交消费者偏移量
- 消费者的任务为输出消息 ```java import cn.hutool.core.util.ObjectUtil; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;
import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.concurrent.TimeUnit;
/**
- @author: 冯铁城 [17615007230@163.com]
- @date: 2022-09-13 10:01:26
@describe: 消费者demo */ @Component public class ConsumerDemo {
@KafkaListener(topics = {“not-repeat-message-topic”}, groupId = “${spring.kafka.consumer.group-id}”) public void consumer1(ConsumerRecord
record, Consumer consumer) throws InterruptedException { //1.消费消息
System.out.println("receive message:" + record.value());
//2.线程睡30s
TimeUnit.SECONDS.sleep(30);
//2.手动异步提交偏移量
consumer.commitAsync((offsets, exception) -> {
//3.异常不为空,代表提交失败,改为同步提交偏移量,最大阻塞时间2000ms
if (ObjectUtil.isNotNull(exception)) {
consumer.commitSync(Duration.of(100, ChronoUnit.MILLIS));
}
});
}
@KafkaListener(topics = {“not-repeat-message-topic”}, groupId = “${spring.kafka.consumer.group-id}”) public void consumer2(ConsumerRecord
record, Consumer consumer) throws InterruptedException { //1.消费消息
System.out.println("receive message:" + record.value());
//2.线程睡30s
TimeUnit.SECONDS.sleep(30);
//2.手动异步提交偏移量
consumer.commitAsync((offsets, exception) -> {
//3.异常不为空,代表提交失败,改为同步提交偏移量,最大阻塞时间2000ms
if (ObjectUtil.isNotNull(exception)) {
consumer.commitSync(Duration.of(100, ChronoUnit.MILLIS));
}
});
验证
Producer
Consumer
如下图,因为session过期导致触发rebalance机制,致使偏移量提交失败,消息被重复消费
通过可视化工具可以看到,消费者偏移量一直提交失败
5.解决方案
Producer
Producer对于Kafka消息重复发送的问题,可以通过开启幂等-发送成功数据冗余查询来保证不会重复发送两条消息。上述的验证案例中我们已经开启了Producer幂等,至于数据冗余知道这个概念就好了,本次示例就不写了
但是我们还是需要对Producer做一些改造,以方便接下来的测试
- 发送两批消息,第一批为数字1~4,第二批为数字5
- 发送间隔10s,防止这5条消息被Consumer同一批拉取,最终同一批提交偏移量,不便于测试
代码
```java import com.ftc.notrepeat.config.KafkaSendCallBackForString; import lombok.SneakyThrows; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate;
import java.util.concurrent.TimeUnit;
@SpringBootTest class ProducerTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaSendCallBackForString kafkaSendCallBackForString;
@AfterEach
@SneakyThrows(InterruptedException.class)
void sleep() {
TimeUnit.SECONDS.sleep(2);
}
@Test
void sendMessage() throws InterruptedException {
//1.发送消息
for (int i = 1; i <= 4; i++) {
kafkaTemplate.send("not-repeat-message-topic", i + "")
.addCallback(kafkaSendCallBackForString);
}
TimeUnit.SECONDS.sleep(10);
kafkaTemplate.send("not-repeat-message-topic", "5")
.addCallback(kafkaSendCallBackForString);
}
}
<a name="dBE0A"></a>
### Consumer
<a name="X6W4u"></a>
#### 配置
对于Consumer的配置,进行如下变更
1. 适当调长session.timeout.ms时间:7000ms->10000ms
2. 适当调长max.poll.interval.ms时间:10000ms->15000ms
```yaml
#spring配置
spring:
kafka:
bootstrap-servers: 120.48.107.224:9092
consumer:
group-id: not-repeat-message-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
properties:
heartbeat.interval.ms: 1000
session.timeout.ms: 10000
max.poll.interval.ms: 15000
代码
对于Consumer的代码,进行如下变更
- 对于第一批消息,缩短线程Sleep时间,模拟现实场景中对于业务逻辑代码的执行效率优化
- 低于第二批消息,依旧保持线程Sleep时间>session.timeout.ms和max.poll.interval.ms
- 通过isConsole来进行控制台输出控制,在输出后,变更isConsole=false,以此来模拟现实场景中在代码层面和数据层面做的幂等处理 ```java import cn.hutool.core.util.ObjectUtil; import lombok.SneakyThrows; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;
import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.concurrent.TimeUnit;
/**
- @author: 冯铁城 [17615007230@163.com]
- @date: 2022-09-13 10:01:26
@describe: 消费者demo */ @Component public class ConsumerDemo {
@KafkaListener(topics = {“not-repeat-message-topic”}, groupId = “${spring.kafka.consumer.group-id}”) public void consumer1(ConsumerRecord
record, Consumer consumer) { doMessage(record, consumer);
}
@KafkaListener(topics = {“not-repeat-message-topic”}, groupId = “${spring.kafka.consumer.group-id}”) public void consumer2(ConsumerRecord
record, Consumer consumer) { doMessage(record, consumer);
}
/**
* 是否进行控制台打印
*/
boolean isConsole = true;
/**
* 消费消息
*
* @param record 消息
* @param consumer 消费者,用于手动提交偏移量
*/
@SneakyThrows(InterruptedException.class)
private void doMessage(ConsumerRecord<String, String> record, Consumer<String, String> consumer) {
//1.消费消息
if (isConsole) {
System.out.println("receive message:" + record.value());
} else {
System.out.println("message is repeat:" + record.value());
}
//2.第一批消息睡100ms,第二批消息睡30s
Integer longTimeMessage = 5;
if (!longTimeMessage.equals(Integer.parseInt(record.value()))) {
TimeUnit.MILLISECONDS.sleep(100);
} else {
isConsole = false;
TimeUnit.SECONDS.sleep(30);
}
//3.手动异步提交偏移量
consumer.commitAsync((offsets, exception) -> {
//4.异常不为空,代表提交失败,改为同步提交偏移量,最大阻塞时间2000ms
if (ObjectUtil.isNotNull(exception)) {
consumer.commitSync(Duration.of(2000, ChronoUnit.MILLIS));
}
});
}
验证
Producer
Consumer
如下图,第一批消息正常消费,控制台正常输出
第二批消息,第一次消费正常输出,后续因为线程阻塞,session过期,触发Rebalance机制,导致偏移量提交失败,因此会重复消费消息,但是因为我们代码中进行了幂等处理,所以不会进行常规打印,而是重复消息提示打印
由此可以看到,在消费者端进行了相应的配置和处理后,尽最大可能的避免了重复消费的现象,即使当重复消费的现象出现,也可以避免异常数据到达数据层