1.项目地址
https://github.com/GuardFTC/Kafka-test.git
2.前言
对于消息队列的使用,不可避免的就是如何避免消息的丢失。Kafka中,我们同样需要从Producer,Consumer,Broker三个角度来进行消息丢失问题的处理
以下实现都是基于Kafka一些常见问题中的理论实现
3.创建Topic
从Broker角度考虑,Topic的副本数应该>=3,但是因为我部署的是kafka单机,没办法指定多副本,所以知道这个概念就好了,见谅
如图,创建一个名为not-loss-message-topic的Topic,指定分区数=3,副本数=3
4.模拟消息丢失的情况发生
配置
spring配置
spring: kafka: bootstrap-servers: 120.48.107.224:9092 producer: acks: -1 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 properties: enable.idempotence: true retry.backoff.ms: 100 consumer: group-id: not-loss-message-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 1ms
<a name="cq1eJ"></a>
### Producer
Produer发布一条消息到not-loss-message-topic
```java
import com.ftc.notlossmessage.config.KafkaSendCallBackForString;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootTest
class ProducerTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaSendCallBackForString kafkaSendCallBackForString;
@Test
void sendMessage() {
//1.发送消息
kafkaTemplate.send("not-loss-message-topic", "not-loss-message")
.addCallback(kafkaSendCallBackForString);
}
}
Consumer
- 让Consumer线程Sleep5ms,保证消费者偏移量提交成功(消费线程和偏移量提交线程,不是一个线程)
- Stop主线程,模拟应用程序突然宕机的情况 ```java import cn.hutool.core.util.ObjectUtil; import lombok.SneakyThrows; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
- @author: 冯铁城 [17615007230@163.com]
- @date: 2022-09-13 10:01:26
@describe: 消费者demo */ @Component public class Consumer {
@SneakyThrows(Exception.class) @KafkaListener(topics = {“not-loss-message-topic”}, groupId = “${spring.kafka.consumer.group-id}”) public void failListener(ConsumerRecord
record) { //1.线程5ms,等偏移量提交完成
TimeUnit.MILLISECONDS.sleep(5);
//2.模拟应用程序异常中断
if (ObjectUtil.isNotNull(record)) {
Thread.currentThread().stop();
}
//3.消费消息
System.out.println("receive message:" + record.value());
验证
Producer
Consumer
如图,Consumer进程中断,消息消费失败。但是偏移量,却提交成功。消息丢失了!!!!!!!
5.解决方案
Producer
从Producer角度来看,实现消息不丢失,需要进行如下处理
- 开启Producer幂等(开启幂等时,AKCS参数必须设置为-1(ALL))
- 关闭自动重试
- 手动进行失败重试处理
配置
结合上述方案,Producer配置如下#spring配置
spring:
kafka:
bootstrap-servers: 120.48.107.224:9092
producer:
acks: -1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
enable.idempotence: true
代码
通过addCallback中的onFailure方法实现消息发送失败时的重试机制,并且设置重试次数,从而避免无限重试导致导致资源长时间占用的情况 ```java import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import cn.hutool.log.StaticLog; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaProducerException; import org.springframework.kafka.core.KafkaSendCallback; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component;
/**
- @author: 冯铁城 [17615007230@163.com]
- @date: 2022-09-07 16:13:57
@describe: Kafka发布消息回调处理逻辑实现类 */ @Component @RequiredArgsConstructor public class KafkaSendCallBackForString implements KafkaSendCallback
{ private final KafkaTemplate
kafkaTemplate; @Override public void onSuccess(SendResult
result) { //1.获取消息属性
ProducerRecord<String, String> producerRecord = result.getProducerRecord();
String topic = producerRecord.topic();
Integer partition = producerRecord.partition();
String key = producerRecord.key();
String value = producerRecord.value();
//2.打印日志
StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);
}
@Override public void onFailure(KafkaProducerException e) {
//1.获取消息属性
ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();
String topic = failedProducerRecord.topic();
Integer partition = failedProducerRecord.partition();
String key = failedProducerRecord.key();
String value = failedProducerRecord.value();
//2.打印日志
StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);
//3.异常堆栈信息输出
e.printStackTrace();
//4.重发消息
resendFailMessage(failedProducerRecord);
}
/**
- 消息重发 *
@param producerRecord 消息 */ private void resendFailMessage(ProducerRecord
producerRecord) { //1.获取重试次数 TODO 后续可优化为从缓存中获取 int retryTime = 0;
//2.获取最大重试次数 TODO 后续可优化为写入配置文件中 int maxRetryTime = 10;
//3.重试次数++ retryTime++;
//4.判定是否超过限制 if (retryTime > maxRetryTime) {
String errorMessage = StrUtil.format(LogTemplateEnum.KAFKA_SEND_OVER_TIME_LIMIT.getTemplate(), JSONUtil.toJsonStr(producerRecord));
throw new RuntimeException(errorMessage);
}
//5.未超过限制重发消息 kafkaTemplate.send(producerRecord).addCallback(this); } }
import java.util.concurrent.TimeUnit;
@SpringBootTest class ProducerTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaSendCallBackForString kafkaSendCallBackForString;
@AfterEach
@SneakyThrows(InterruptedException.class)
void sleep() {
TimeUnit.SECONDS.sleep(30);
}
@Test
void sendMessage() {
//1.发送消息
kafkaTemplate.send("not-loss-message-topic", "not-loss-message")
.addCallback(kafkaSendCallBackForString);
}
}
<a name="AQC7K"></a>
### Broker
Broker的角度,确保消息不丢失
1. 创建Topic时,指定副本数>=3
2. Producer设置Acks参数=-1(ALL)
3. Broker合理配置:写入成功应答最低副本数(min.insync.replicas)
4. Broker合理配置:消息刷盘数量阈值(log.flush.interval.messages)
5. Broker合理配置:消息刷盘时间阈值(log.flush.interval.ms)
<a name="eSomy"></a>
#### Broker配置变更
如图,已变更相关配置(变更配置后记得重启,不然是不生效的)<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/28218714/1663310636229-bb3972fd-01ee-45e5-8128-69bed47cdc3f.png#clientId=uaab80ed7-22c4-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=68&id=u26c70ea9&margin=%5Bobject%20Object%5D&name=image.png&originHeight=102&originWidth=1515&originalType=binary&ratio=1&rotation=0&showTitle=false&size=10816&status=done&style=none&taskId=u0d870845-fc19-49f0-ac48-bb4b90829ad&title=&width=1010)
<a name="yKr0X"></a>
#### 副本相关配置
因为我在服务器部署的是单机版本,就不改了,但是一定要有这个概念
<a name="vTAr8"></a>
#### Producer相关配置
请看上一个单元格,Producer配置中acks参数已设置为-1(ALL)
<a name="sjhYy"></a>
### Consumer
Consumer角度,确保消息不丢失
1. 关闭自动提交
2. 在业务逻辑代码执行完成后,手动提交偏移量
<a name="ry9K4"></a>
#### 配置
综上所述,Consumer配置如下
```yaml
spring:
kafka:
bootstrap-servers: 120.48.107.224:9092
consumer:
group-id: not-loss-message-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
代码
模拟Consumer宕机,还没有消费消息,Consumer就宕机了
手动提交先做异步提交,异步提交失败后,再进行手动提交,并设置最大阻塞时间
import cn.hutool.core.util.ObjectUtil;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-13 10:01:26
* @describe: 消费者demo
*/
@Component
public class ConsumerDemo {
@KafkaListener(topics = {"not-loss-message-topic"}, groupId = "${spring.kafka.consumer.group-id}")
public void consumer(ConsumerRecord<String, String> record, Consumer<String, String> consumer) {
//1.模拟应用程序异常中断
if (ObjectUtil.isNotNull(record)) {
Thread.currentThread().stop();
}
//2.消费消息
System.out.println("receive message:" + record.value());
//3.手动异步提交偏移量
consumer.commitAsync((offsets, exception) -> {
//4.异常不为空,代表提交失败,改为同步提交偏移量,最大阻塞时间2000ms
if (ObjectUtil.isNotNull(exception)) {
consumer.commitSync(Duration.of(100, ChronoUnit.MILLIS));
}
});
}
}
验证
发布消息
如图,因为Consumer宕机,因此消息没有被消费。但是通过Kafka可视化工具看到,偏移量并没有提交成功
去掉stop代码,模拟服务器修复完毕
import cn.hutool.core.util.ObjectUtil;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-13 10:01:26
* @describe: 消费者demo
*/
@Component
public class ConsumerDemo {
@KafkaListener(topics = {"not-loss-message-topic"}, groupId = "${spring.kafka.consumer.group-id}")
public void consumer(ConsumerRecord<String, String> record, Consumer<String, String> consumer) {
//1.消费消息
System.out.println("receive message:" + record.value());
//2.手动异步提交偏移量
consumer.commitAsync((offsets, exception) -> {
//3.异常不为空,代表提交失败,改为同步提交偏移量,最大阻塞时间2000ms
if (ObjectUtil.isNotNull(exception)) {
consumer.commitSync(Duration.of(100, ChronoUnit.MILLIS));
}
});
}
}
重启Consumer
如下图,重新消费了上一条消息,并且偏移量本次提交成功。
可以看到,即使消费者宕机,在重启后依然可以消费这条消息,没有出现消息丢失的情况
6.总结
Producer
- 开启幂等,从底层机制确保消息的有序和不丢失
- 关闭自动重试(其实不关也可以)
通过callback的onFailure方法手动重试,并且设置一个合理的最大重试次数,防止一直投递消息造成的线程占用
Broker
创建Topic时指定副本数>=3
- Pruducer设置acks配置=-1(ALL)
- 合理配置写入成功应答最低副本数(min.insync.replicas),通常来说建议设置为3
- 合理配置消息刷盘数量阈值(log.flush.interval.messages)
合理配置消息刷盘时间阈值(log.flush.interval.ms)
Consumer
关闭自动提交偏移量
- 代码中,在业务逻辑代码全部执行完毕后,手动异步/同步提交偏移量