1.项目地址

https://github.com/GuardFTC/Kafka-test.git

2.前言

对于消息队列的使用,不可避免的就是如何避免消息的丢失。Kafka中,我们同样需要从Producer,Consumer,Broker三个角度来进行消息丢失问题的处理
以下实现都是基于Kafka一些常见问题中的理论实现

3.创建Topic

从Broker角度考虑,Topic的副本数应该>=3,但是因为我部署的是kafka单机,没办法指定多副本,所以知道这个概念就好了,见谅
如图,创建一个名为not-loss-message-topic的Topic,指定分区数=3,副本数=3
image.png

4.模拟消息丢失的情况发生

配置

  1. Producer:开启自动重试。重试次数=3,且重试频率为100ms一次
  2. Consumer:开启自动提交,提交频率为1ms ```yaml

    服务器配置

    server: port: 8087

spring配置

spring: kafka: bootstrap-servers: 120.48.107.224:9092 producer: acks: -1 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 properties: enable.idempotence: true retry.backoff.ms: 100 consumer: group-id: not-loss-message-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 1ms

  1. <a name="cq1eJ"></a>
  2. ### Producer
  3. Produer发布一条消息到not-loss-message-topic
  4. ```java
  5. import com.ftc.notlossmessage.config.KafkaSendCallBackForString;
  6. import org.junit.jupiter.api.Test;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.boot.test.context.SpringBootTest;
  9. import org.springframework.kafka.core.KafkaTemplate;
  10. @SpringBootTest
  11. class ProducerTests {
  12. @Autowired
  13. private KafkaTemplate<String, String> kafkaTemplate;
  14. @Autowired
  15. private KafkaSendCallBackForString kafkaSendCallBackForString;
  16. @Test
  17. void sendMessage() {
  18. //1.发送消息
  19. kafkaTemplate.send("not-loss-message-topic", "not-loss-message")
  20. .addCallback(kafkaSendCallBackForString);
  21. }
  22. }

Consumer

  1. 让Consumer线程Sleep5ms,保证消费者偏移量提交成功(消费线程和偏移量提交线程,不是一个线程)
  2. Stop主线程,模拟应用程序突然宕机的情况 ```java import cn.hutool.core.util.ObjectUtil; import lombok.SneakyThrows; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**

  • @author: 冯铁城 [17615007230@163.com]
  • @date: 2022-09-13 10:01:26
  • @describe: 消费者demo */ @Component public class Consumer {

    @SneakyThrows(Exception.class) @KafkaListener(topics = {“not-loss-message-topic”}, groupId = “${spring.kafka.consumer.group-id}”) public void failListener(ConsumerRecord record) {

    1. //1.线程5ms,等偏移量提交完成
    2. TimeUnit.MILLISECONDS.sleep(5);
    3. //2.模拟应用程序异常中断
    4. if (ObjectUtil.isNotNull(record)) {
    5. Thread.currentThread().stop();
    6. }
    7. //3.消费消息
    8. System.out.println("receive message:" + record.value());

    } } ```

    验证

    Producer

    如图,Producer消息发送成功
    image.png

    Consumer

    如图,Consumer进程中断,消息消费失败。但是偏移量,却提交成功。消息丢失了!!!!!!!
    image.png
    image.png

    5.解决方案

    Producer

    从Producer角度来看,实现消息不丢失,需要进行如下处理

  1. 开启Producer幂等(开启幂等时,AKCS参数必须设置为-1(ALL))
  2. 关闭自动重试
  3. 手动进行失败重试处理

    配置

    结合上述方案,Producer配置如下
    1. #spring配置
    2. spring:
    3. kafka:
    4. bootstrap-servers: 120.48.107.224:9092
    5. producer:
    6. acks: -1
    7. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    8. value-serializer: org.apache.kafka.common.serialization.StringSerializer
    9. properties:
    10. enable.idempotence: true

    代码

    通过addCallback中的onFailure方法实现消息发送失败时的重试机制,并且设置重试次数,从而避免无限重试导致导致资源长时间占用的情况 ```java import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import cn.hutool.log.StaticLog; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaProducerException; import org.springframework.kafka.core.KafkaSendCallback; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component;

/**

  • @author: 冯铁城 [17615007230@163.com]
  • @date: 2022-09-07 16:13:57
  • @describe: Kafka发布消息回调处理逻辑实现类 */ @Component @RequiredArgsConstructor public class KafkaSendCallBackForString implements KafkaSendCallback {

    private final KafkaTemplate kafkaTemplate;

    @Override public void onSuccess(SendResult result) {

    1. //1.获取消息属性
    2. ProducerRecord<String, String> producerRecord = result.getProducerRecord();
    3. String topic = producerRecord.topic();
    4. Integer partition = producerRecord.partition();
    5. String key = producerRecord.key();
    6. String value = producerRecord.value();
    7. //2.打印日志
    8. StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);

    }

    @Override public void onFailure(KafkaProducerException e) {

    1. //1.获取消息属性
    2. ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();
    3. String topic = failedProducerRecord.topic();
    4. Integer partition = failedProducerRecord.partition();
    5. String key = failedProducerRecord.key();
    6. String value = failedProducerRecord.value();
    7. //2.打印日志
    8. StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);
    9. //3.异常堆栈信息输出
    10. e.printStackTrace();
    11. //4.重发消息
    12. resendFailMessage(failedProducerRecord);

    }

    /**

    • 消息重发 *
    • @param producerRecord 消息 */ private void resendFailMessage(ProducerRecord producerRecord) {

      //1.获取重试次数 TODO 后续可优化为从缓存中获取 int retryTime = 0;

      //2.获取最大重试次数 TODO 后续可优化为写入配置文件中 int maxRetryTime = 10;

      //3.重试次数++ retryTime++;

      //4.判定是否超过限制 if (retryTime > maxRetryTime) {

      1. String errorMessage = StrUtil.format(LogTemplateEnum.KAFKA_SEND_OVER_TIME_LIMIT.getTemplate(), JSONUtil.toJsonStr(producerRecord));
      2. throw new RuntimeException(errorMessage);

      }

      //5.未超过限制重发消息 kafkaTemplate.send(producerRecord).addCallback(this); } } java import com.ftc.notlossmessage.config.KafkaSendCallBackForString; import lombok.SneakyThrows; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate;

import java.util.concurrent.TimeUnit;

@SpringBootTest class ProducerTests {

  1. @Autowired
  2. private KafkaTemplate<String, String> kafkaTemplate;
  3. @Autowired
  4. private KafkaSendCallBackForString kafkaSendCallBackForString;
  5. @AfterEach
  6. @SneakyThrows(InterruptedException.class)
  7. void sleep() {
  8. TimeUnit.SECONDS.sleep(30);
  9. }
  10. @Test
  11. void sendMessage() {
  12. //1.发送消息
  13. kafkaTemplate.send("not-loss-message-topic", "not-loss-message")
  14. .addCallback(kafkaSendCallBackForString);
  15. }

}

  1. <a name="AQC7K"></a>
  2. ### Broker
  3. Broker的角度,确保消息不丢失
  4. 1. 创建Topic时,指定副本数>=3
  5. 2. Producer设置Acks参数=-1(ALL)
  6. 3. Broker合理配置:写入成功应答最低副本数(min.insync.replicas)
  7. 4. Broker合理配置:消息刷盘数量阈值(log.flush.interval.messages)
  8. 5. Broker合理配置:消息刷盘时间阈值(log.flush.interval.ms)
  9. <a name="eSomy"></a>
  10. #### Broker配置变更
  11. 如图,已变更相关配置(变更配置后记得重启,不然是不生效的)<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/28218714/1663310636229-bb3972fd-01ee-45e5-8128-69bed47cdc3f.png#clientId=uaab80ed7-22c4-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=68&id=u26c70ea9&margin=%5Bobject%20Object%5D&name=image.png&originHeight=102&originWidth=1515&originalType=binary&ratio=1&rotation=0&showTitle=false&size=10816&status=done&style=none&taskId=u0d870845-fc19-49f0-ac48-bb4b90829ad&title=&width=1010)
  12. <a name="yKr0X"></a>
  13. #### 副本相关配置
  14. 因为我在服务器部署的是单机版本,就不改了,但是一定要有这个概念
  15. <a name="vTAr8"></a>
  16. #### Producer相关配置
  17. 请看上一个单元格,Producer配置中acks参数已设置为-1(ALL)
  18. <a name="sjhYy"></a>
  19. ### Consumer
  20. Consumer角度,确保消息不丢失
  21. 1. 关闭自动提交
  22. 2. 在业务逻辑代码执行完成后,手动提交偏移量
  23. <a name="ry9K4"></a>
  24. #### 配置
  25. 综上所述,Consumer配置如下
  26. ```yaml
  27. spring:
  28. kafka:
  29. bootstrap-servers: 120.48.107.224:9092
  30. consumer:
  31. group-id: not-loss-message-group
  32. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  33. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  34. auto-offset-reset: earliest
  35. enable-auto-commit: false

代码

模拟Consumer宕机,还没有消费消息,Consumer就宕机了
手动提交先做异步提交,异步提交失败后,再进行手动提交,并设置最大阻塞时间

  1. import cn.hutool.core.util.ObjectUtil;
  2. import org.apache.kafka.clients.consumer.Consumer;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.stereotype.Component;
  6. import java.time.Duration;
  7. import java.time.temporal.ChronoUnit;
  8. /**
  9. * @author: 冯铁城 [17615007230@163.com]
  10. * @date: 2022-09-13 10:01:26
  11. * @describe: 消费者demo
  12. */
  13. @Component
  14. public class ConsumerDemo {
  15. @KafkaListener(topics = {"not-loss-message-topic"}, groupId = "${spring.kafka.consumer.group-id}")
  16. public void consumer(ConsumerRecord<String, String> record, Consumer<String, String> consumer) {
  17. //1.模拟应用程序异常中断
  18. if (ObjectUtil.isNotNull(record)) {
  19. Thread.currentThread().stop();
  20. }
  21. //2.消费消息
  22. System.out.println("receive message:" + record.value());
  23. //3.手动异步提交偏移量
  24. consumer.commitAsync((offsets, exception) -> {
  25. //4.异常不为空,代表提交失败,改为同步提交偏移量,最大阻塞时间2000ms
  26. if (ObjectUtil.isNotNull(exception)) {
  27. consumer.commitSync(Duration.of(100, ChronoUnit.MILLIS));
  28. }
  29. });
  30. }
  31. }

验证

发布消息

如图,因为Consumer宕机,因此消息没有被消费。但是通过Kafka可视化工具看到,偏移量并没有提交成功
image.png
image.png

去掉stop代码,模拟服务器修复完毕

  1. import cn.hutool.core.util.ObjectUtil;
  2. import org.apache.kafka.clients.consumer.Consumer;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.stereotype.Component;
  6. import java.time.Duration;
  7. import java.time.temporal.ChronoUnit;
  8. /**
  9. * @author: 冯铁城 [17615007230@163.com]
  10. * @date: 2022-09-13 10:01:26
  11. * @describe: 消费者demo
  12. */
  13. @Component
  14. public class ConsumerDemo {
  15. @KafkaListener(topics = {"not-loss-message-topic"}, groupId = "${spring.kafka.consumer.group-id}")
  16. public void consumer(ConsumerRecord<String, String> record, Consumer<String, String> consumer) {
  17. //1.消费消息
  18. System.out.println("receive message:" + record.value());
  19. //2.手动异步提交偏移量
  20. consumer.commitAsync((offsets, exception) -> {
  21. //3.异常不为空,代表提交失败,改为同步提交偏移量,最大阻塞时间2000ms
  22. if (ObjectUtil.isNotNull(exception)) {
  23. consumer.commitSync(Duration.of(100, ChronoUnit.MILLIS));
  24. }
  25. });
  26. }
  27. }

重启Consumer

如下图,重新消费了上一条消息,并且偏移量本次提交成功。
可以看到,即使消费者宕机,在重启后依然可以消费这条消息,没有出现消息丢失的情况
image.png
image.png

6.总结

Producer

  1. 开启幂等,从底层机制确保消息的有序和不丢失
  2. 关闭自动重试(其实不关也可以)
  3. 通过callback的onFailure方法手动重试,并且设置一个合理的最大重试次数,防止一直投递消息造成的线程占用

    Broker

  4. 创建Topic时指定副本数>=3

  5. Pruducer设置acks配置=-1(ALL)
  6. 合理配置写入成功应答最低副本数(min.insync.replicas),通常来说建议设置为3
  7. 合理配置消息刷盘数量阈值(log.flush.interval.messages)
  8. 合理配置消息刷盘时间阈值(log.flush.interval.ms)

    Consumer

  9. 关闭自动提交偏移量

  10. 代码中,在业务逻辑代码全部执行完毕后,手动异步/同步提交偏移量