1.项目地址

https://github.com/GuardFTC/kafka-test.git

2.前言

上一篇文章,我们了解了如何实现Kafka的消息不丢失,但是随之而来的一个问题,就是Kafka消息的重复消费。针对这个问题,我们在本文中进行一个详细的讲解

3.创建Topic

如图,创建一个名为not-repeat-message-topic的Topic
image.png

4.模拟消息重复情况的发生

配置

如下,主要变更为:Consumer调整心跳,session过期,以及最大拉取消息时间间隔
image.png
其中session过期时间要设置在上图的时间范围内

  1. #服务器配置
  2. server:
  3. port: 8088
  4. #spring配置
  5. spring:
  6. kafka:
  7. bootstrap-servers: 120.48.107.224:9092
  8. producer:
  9. acks: -1
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. properties:
  13. enable.idempotence: true
  14. consumer:
  15. group-id: not-repeat-message-group
  16. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  17. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  18. auto-offset-reset: earliest
  19. enable-auto-commit: false
  20. properties:
  21. heartbeat.interval.ms: 1000
  22. session.timeout.ms: 7000
  23. max.poll.interval.ms: 10000

Producer

producer无特殊配置,照常发送消息即可,并且设置异常重试机制

  1. import cn.hutool.core.util.StrUtil;
  2. import cn.hutool.json.JSONUtil;
  3. import cn.hutool.log.StaticLog;
  4. import lombok.RequiredArgsConstructor;
  5. import org.apache.kafka.clients.producer.ProducerRecord;
  6. import org.springframework.kafka.core.KafkaProducerException;
  7. import org.springframework.kafka.core.KafkaSendCallback;
  8. import org.springframework.kafka.core.KafkaTemplate;
  9. import org.springframework.kafka.support.SendResult;
  10. import org.springframework.stereotype.Component;
  11. /**
  12. * @author: 冯铁城 [17615007230@163.com]
  13. * @date: 2022-09-07 16:13:57
  14. * @describe: Kafka发布消息回调处理逻辑实现类
  15. */
  16. @Component
  17. @RequiredArgsConstructor
  18. public class KafkaSendCallBackForString implements KafkaSendCallback<String, String> {
  19. private final KafkaTemplate<String, String> kafkaTemplate;
  20. @Override
  21. public void onSuccess(SendResult<String, String> result) {
  22. //1.获取消息属性
  23. ProducerRecord<String, String> producerRecord = result.getProducerRecord();
  24. String topic = producerRecord.topic();
  25. Integer partition = producerRecord.partition();
  26. String key = producerRecord.key();
  27. String value = producerRecord.value();
  28. //2.打印日志
  29. StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);
  30. }
  31. @Override
  32. public void onFailure(KafkaProducerException e) {
  33. //1.获取消息属性
  34. ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();
  35. String topic = failedProducerRecord.topic();
  36. Integer partition = failedProducerRecord.partition();
  37. String key = failedProducerRecord.key();
  38. String value = failedProducerRecord.value();
  39. //2.打印日志
  40. StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);
  41. //3.异常堆栈信息输出
  42. e.printStackTrace();
  43. //4.重发消息
  44. resendFailMessage(failedProducerRecord);
  45. }
  46. /**
  47. * 消息重发
  48. *
  49. * @param producerRecord 消息
  50. */
  51. private void resendFailMessage(ProducerRecord<String, String> producerRecord) {
  52. //1.获取重试次数 TODO 后续可优化为从缓存中获取
  53. int retryTime = 0;
  54. //2.获取最大重试次数 TODO 后续可优化为写入配置文件中
  55. int maxRetryTime = 10;
  56. //3.重试次数++
  57. retryTime++;
  58. //4.判定是否超过限制
  59. if (retryTime > maxRetryTime) {
  60. String errorMessage = StrUtil.format(LogTemplateEnum.KAFKA_SEND_OVER_TIME_LIMIT.getTemplate(), JSONUtil.toJsonStr(producerRecord));
  61. throw new RuntimeException(errorMessage);
  62. }
  63. //5.未超过限制重发消息
  64. kafkaTemplate.send(producerRecord).addCallback(this);
  65. }
  66. }
  1. import com.ftc.notrepeat.config.KafkaSendCallBackForString;
  2. import lombok.SneakyThrows;
  3. import org.junit.jupiter.api.AfterEach;
  4. import org.junit.jupiter.api.Test;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. import org.springframework.kafka.core.KafkaTemplate;
  8. import java.util.concurrent.TimeUnit;
  9. @SpringBootTest
  10. class ProducerTests {
  11. @Autowired
  12. private KafkaTemplate<String, String> kafkaTemplate;
  13. @Autowired
  14. private KafkaSendCallBackForString kafkaSendCallBackForString;
  15. @AfterEach
  16. @SneakyThrows(InterruptedException.class)
  17. void sleep() {
  18. TimeUnit.SECONDS.sleep(2);
  19. }
  20. @Test
  21. void sendMessage() {
  22. //1.发送消息
  23. kafkaTemplate.send("not-repeat-message-topic", "not-repeat-message")
  24. .addCallback(kafkaSendCallBackForString);
  25. }
  26. }

Consumer

  1. Consumer在消费完消息后,线程Sleep30s,Sleep时长一定要大于max.poll.interval.ms和session.timeout.ms
  2. 创建2个消费者,共同属于1个消费者组
  3. 消费完成后手动提交消费者偏移量
  4. 消费者的任务为输出消息 ```java import cn.hutool.core.util.ObjectUtil; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;

import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.concurrent.TimeUnit;

/**

  • @author: 冯铁城 [17615007230@163.com]
  • @date: 2022-09-13 10:01:26
  • @describe: 消费者demo */ @Component public class ConsumerDemo {

    @KafkaListener(topics = {“not-repeat-message-topic”}, groupId = “${spring.kafka.consumer.group-id}”) public void consumer1(ConsumerRecord record, Consumer consumer) throws InterruptedException {

    1. //1.消费消息
    2. System.out.println("receive message:" + record.value());
    3. //2.线程睡30s
    4. TimeUnit.SECONDS.sleep(30);
    5. //2.手动异步提交偏移量
    6. consumer.commitAsync((offsets, exception) -> {
    7. //3.异常不为空,代表提交失败,改为同步提交偏移量,最大阻塞时间2000ms
    8. if (ObjectUtil.isNotNull(exception)) {
    9. consumer.commitSync(Duration.of(100, ChronoUnit.MILLIS));
    10. }
    11. });

    }

    @KafkaListener(topics = {“not-repeat-message-topic”}, groupId = “${spring.kafka.consumer.group-id}”) public void consumer2(ConsumerRecord record, Consumer consumer) throws InterruptedException {

    1. //1.消费消息
    2. System.out.println("receive message:" + record.value());
    3. //2.线程睡30s
    4. TimeUnit.SECONDS.sleep(30);
    5. //2.手动异步提交偏移量
    6. consumer.commitAsync((offsets, exception) -> {
    7. //3.异常不为空,代表提交失败,改为同步提交偏移量,最大阻塞时间2000ms
    8. if (ObjectUtil.isNotNull(exception)) {
    9. consumer.commitSync(Duration.of(100, ChronoUnit.MILLIS));
    10. }
    11. });

    } } ```

    验证

    Producer

    如图,消息发送成功
    image.png
    image.png

    Consumer

    如下图,因为session过期导致触发rebalance机制,致使偏移量提交失败,消息被重复消费
    image.png
    image.png
    通过可视化工具可以看到,消费者偏移量一直提交失败
    image.png

    5.解决方案

    Producer

    Producer对于Kafka消息重复发送的问题,可以通过开启幂等-发送成功数据冗余查询来保证不会重复发送两条消息。上述的验证案例中我们已经开启了Producer幂等,至于数据冗余知道这个概念就好了,本次示例就不写了
    但是我们还是需要对Producer做一些改造,以方便接下来的测试

  1. 发送两批消息,第一批为数字1~4,第二批为数字5
  2. 发送间隔10s,防止这5条消息被Consumer同一批拉取,最终同一批提交偏移量,不便于测试

    代码

    ```java import com.ftc.notrepeat.config.KafkaSendCallBackForString; import lombok.SneakyThrows; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate;

import java.util.concurrent.TimeUnit;

@SpringBootTest class ProducerTests {

  1. @Autowired
  2. private KafkaTemplate<String, String> kafkaTemplate;
  3. @Autowired
  4. private KafkaSendCallBackForString kafkaSendCallBackForString;
  5. @AfterEach
  6. @SneakyThrows(InterruptedException.class)
  7. void sleep() {
  8. TimeUnit.SECONDS.sleep(2);
  9. }
  10. @Test
  11. void sendMessage() throws InterruptedException {
  12. //1.发送消息
  13. for (int i = 1; i <= 4; i++) {
  14. kafkaTemplate.send("not-repeat-message-topic", i + "")
  15. .addCallback(kafkaSendCallBackForString);
  16. }
  17. TimeUnit.SECONDS.sleep(10);
  18. kafkaTemplate.send("not-repeat-message-topic", "5")
  19. .addCallback(kafkaSendCallBackForString);
  20. }

}

  1. <a name="dBE0A"></a>
  2. ### Consumer
  3. <a name="X6W4u"></a>
  4. #### 配置
  5. 对于Consumer的配置,进行如下变更
  6. 1. 适当调长session.timeout.ms时间:7000ms->10000ms
  7. 2. 适当调长max.poll.interval.ms时间:10000ms->15000ms
  8. ```yaml
  9. #spring配置
  10. spring:
  11. kafka:
  12. bootstrap-servers: 120.48.107.224:9092
  13. consumer:
  14. group-id: not-repeat-message-group
  15. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  16. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  17. auto-offset-reset: earliest
  18. enable-auto-commit: false
  19. properties:
  20. heartbeat.interval.ms: 1000
  21. session.timeout.ms: 10000
  22. max.poll.interval.ms: 15000

代码

对于Consumer的代码,进行如下变更

  1. 对于第一批消息,缩短线程Sleep时间,模拟现实场景中对于业务逻辑代码的执行效率优化
  2. 低于第二批消息,依旧保持线程Sleep时间>session.timeout.ms和max.poll.interval.ms
  3. 通过isConsole来进行控制台输出控制,在输出后,变更isConsole=false,以此来模拟现实场景中在代码层面和数据层面做的幂等处理 ```java import cn.hutool.core.util.ObjectUtil; import lombok.SneakyThrows; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;

import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.concurrent.TimeUnit;

/**

  • @author: 冯铁城 [17615007230@163.com]
  • @date: 2022-09-13 10:01:26
  • @describe: 消费者demo */ @Component public class ConsumerDemo {

    @KafkaListener(topics = {“not-repeat-message-topic”}, groupId = “${spring.kafka.consumer.group-id}”) public void consumer1(ConsumerRecord record, Consumer consumer) {

    1. doMessage(record, consumer);

    }

    @KafkaListener(topics = {“not-repeat-message-topic”}, groupId = “${spring.kafka.consumer.group-id}”) public void consumer2(ConsumerRecord record, Consumer consumer) {

    1. doMessage(record, consumer);

    }

  1. /**
  2. * 是否进行控制台打印
  3. */
  4. boolean isConsole = true;
  5. /**
  6. * 消费消息
  7. *
  8. * @param record 消息
  9. * @param consumer 消费者,用于手动提交偏移量
  10. */
  11. @SneakyThrows(InterruptedException.class)
  12. private void doMessage(ConsumerRecord<String, String> record, Consumer<String, String> consumer) {
  13. //1.消费消息
  14. if (isConsole) {
  15. System.out.println("receive message:" + record.value());
  16. } else {
  17. System.out.println("message is repeat:" + record.value());
  18. }
  19. //2.第一批消息睡100ms,第二批消息睡30s
  20. Integer longTimeMessage = 5;
  21. if (!longTimeMessage.equals(Integer.parseInt(record.value()))) {
  22. TimeUnit.MILLISECONDS.sleep(100);
  23. } else {
  24. isConsole = false;
  25. TimeUnit.SECONDS.sleep(30);
  26. }
  27. //3.手动异步提交偏移量
  28. consumer.commitAsync((offsets, exception) -> {
  29. //4.异常不为空,代表提交失败,改为同步提交偏移量,最大阻塞时间2000ms
  30. if (ObjectUtil.isNotNull(exception)) {
  31. consumer.commitSync(Duration.of(2000, ChronoUnit.MILLIS));
  32. }
  33. });
  34. }

} ```

验证

Producer

如下图,2批消息全部发送成功
image.png
image.png

Consumer

如下图,第一批消息正常消费,控制台正常输出
image.png
第二批消息,第一次消费正常输出,后续因为线程阻塞,session过期,触发Rebalance机制,导致偏移量提交失败,因此会重复消费消息,但是因为我们代码中进行了幂等处理,所以不会进行常规打印,而是重复消息提示打印
image.png
image.png
由此可以看到,在消费者端进行了相应的配置和处理后,尽最大可能的避免了重复消费的现象,即使当重复消费的现象出现,也可以避免异常数据到达数据层

6.总结

Producer

  1. 开启Producer幂等处理
  2. 可以进行发送数据冗余备份

    Consumer

    规避重复消费

  3. 合理调整heartbeat.interval.ms

  4. 合理调整session.timeout.ms
  5. 合理调整max.poll.interval.ms
  6. 尽可能优化业务逻辑代码执行效率

    当重复消息出现时

  7. 数据层面:通过唯一属性来从数据层面避免重复数据的存在

  8. 代码层面:通过预查询和锁机制,在代码层面确保数据安全性,尽可能保证异常数据不到达数据层