1.项目地址
https://github.com/GuardFTC/Kafka-test.git
2.手动提交偏移量
在Springboot整合Kafka中,我们已经了解到了Kafka的基础使用,包括如何发送消息以及接受消息进行消费。
在具体的项目使用中,有时候对于消费者偏移量,需要我们在代码中手动提交,接下来开始讲解如何手动提交偏移量
创建topic
创建一个名为auto-commit-false-topic的Topic,分区数为1
创建项目
具体创建项目的流程这里不做过多累述,参考Springboot整合Kafka即可
编写配置
重点关注:enable-auto-commit: false
#服务器配置server:port: 8083#spring配置spring:kafka:bootstrap-servers: 120.48.107.224:9092producer:acks: -1key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3properties:enable.idempotence: trueconsumer:group-id: auto-commit-false-groupenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliest
编写发送结果处理类
import cn.hutool.log.StaticLog;import org.apache.kafka.clients.producer.ProducerRecord;import org.springframework.kafka.core.KafkaProducerException;import org.springframework.kafka.core.KafkaSendCallback;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-07 16:13:57* @describe: Kafka发布消息回调处理逻辑实现类*/@Componentpublic class KafkaSendCallBackForString implements KafkaSendCallback<String, String> {@Overridepublic void onSuccess(SendResult<String, String> result) {//1.获取消息属性ProducerRecord<String, String> producerRecord = result.getProducerRecord();String topic = producerRecord.topic();Integer partition = producerRecord.partition();String key = producerRecord.key();String value = producerRecord.value();//2.打印日志StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);}@Overridepublic void onFailure(KafkaProducerException e) {//1.获取消息属性ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();String topic = failedProducerRecord.topic();Integer partition = failedProducerRecord.partition();String key = failedProducerRecord.key();String value = failedProducerRecord.value();//2.打印日志StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);//3.异常堆栈信息输出e.printStackTrace();//4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作}}
import lombok.AllArgsConstructor;import lombok.Getter;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-07 16:24:20* @describe: 日志模板枚举*/@Getter@AllArgsConstructorpublic enum LogTemplateEnum {/*** Kafka消息发送成功日志模板*/KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),/*** Kafka消息发送失败日志模板*/KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),;private final String template;}
Producer
Producer代码不做过多累述,发送一条消息到对应Topic即可
import com.ftc.autocommitfalse.config.KafkaSendCallBackForString;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.kafka.core.KafkaTemplate;@SpringBootTestclass ProducerTests {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Autowiredprivate KafkaSendCallBackForString kafkaSendCallBackForString;@Testvoid sendMessage() {//1.发送消息kafkaTemplate.send("auto-commit-false-topic", "auto-commit-false-message").addCallback(kafkaSendCallBackForString);}}
Consumer
Consumer代码声明了两个消费者,分别属于不同的消费者组,这样可以保证两个消费者同时消费同一条消息
对于手动提交,包括同步提交和异步提交两种偏移量提交方式
- 同步提交:需要指定最大阻塞时间,避免消费者进程长时间阻塞
- 异步提交,需要指定callback参数,在提交异常时,根据具体场景进行处理 ```java import cn.hutool.core.util.ObjectUtil; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;
import java.time.Duration; import java.time.temporal.ChronoUnit;
/**
- @author: 冯铁城 [17615007230@163.com]
- @date: 2022-09-13 10:01:26
@describe: 手动提交偏移量消费者 */ @Component public class Consumer {
@KafkaListener(topics = {“auto-commit-false-topic”}, groupId = “auto-commit-false-topic-v1”) public void listenerBySyncCommit(ConsumerRecord
record, org.apache.kafka.clients.consumer.Consumer consumer) { //1.处理消息System.out.println("sync receive message:" + record.value());//2.同步提交偏移量consumer.commitSync(Duration.of(2000, ChronoUnit.MILLIS));
}
@KafkaListener(topics = {“auto-commit-false-topic”}, groupId = “auto-commit-false-topic-v2”) public void listenerByAsyncCommit(ConsumerRecord
record, org.apache.kafka.clients.consumer.Consumer consumer) { //1.处理消息System.out.println("async receive message:" + record.value());//2.异步提交偏移量consumer.commitAsync((offsets, exception) -> {if (ObjectUtil.isNotNull(exception)) {//3.打印异常信息exception.printStackTrace();//4.TODO 进行消息重发等机制}});
验证
Producer
Consumer
3.JSON序列化的消息的发送与接收
创建Topic
创建一个名为json-message-topic的Topic
创建项目
具体创建项目的流程这里不做过多累述,参考Springboot整合Kafka
编写JSON序列化器/JSON反序列化器
JSON序列化器
```java import cn.hutool.json.JSONObject; import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.StandardCharsets;
/**
- @author: 冯铁城 [17615007230@163.com]
- @date: 2022-09-13 11:27:20
@describe: JSON序列化器 */ public class JsonSerialize implements Serializer
{ @Override public byte[] serialize(String topic, JSONObject data) {
return data.toString().getBytes(StandardCharsets.UTF_8);
JSON反序列化器
```java import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import org.apache.kafka.common.serialization.Deserializer;
import java.nio.charset.StandardCharsets;
/**
- @author: 冯铁城 [17615007230@163.com]
- @date: 2022-09-13 11:18:14
@describe: JSON反序列化器 */ public class JsonDeserialize implements Deserializer
{ @Override public JSONObject deserialize(String topic, byte[] data) {
return JSONUtil.parseObj(StrUtil.str(data, StandardCharsets.UTF_8));
编写配置
重点关注:value-serializer ```yaml
服务器配置
server: port: 8084
spring配置
spring: kafka: bootstrap-servers: 120.48.107.224:9092 producer: acks: -1 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.ftc.jsonmessage.config.serialize.JsonSerialize retries: 3 properties: enable.idempotence: true consumer: group-id: json-message-group enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.ftc.jsonmessage.config.serialize.JsonDeserialize auto-offset-reset: earliest
<a name="PRe4L"></a>### 编写发送结果处理类```javaimport cn.hutool.json.JSONObject;import cn.hutool.log.StaticLog;import org.apache.kafka.clients.producer.ProducerRecord;import org.springframework.kafka.core.KafkaProducerException;import org.springframework.kafka.core.KafkaSendCallback;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-07 16:13:57* @describe: Kafka发布消息回调处理逻辑实现类*/@Componentpublic class KafkaSendCallBackForJson implements KafkaSendCallback<String, JSONObject> {@Overridepublic void onSuccess(SendResult<String, JSONObject> result) {//1.获取消息属性ProducerRecord<String, JSONObject> producerRecord = result.getProducerRecord();String topic = producerRecord.topic();Integer partition = producerRecord.partition();String key = producerRecord.key();JSONObject value = producerRecord.value();//2.打印日志StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value.toString());}@Overridepublic void onFailure(KafkaProducerException e) {//1.获取消息属性ProducerRecord<String, JSONObject> failedProducerRecord = e.getFailedProducerRecord();String topic = failedProducerRecord.topic();Integer partition = failedProducerRecord.partition();String key = failedProducerRecord.key();JSONObject value = failedProducerRecord.value();//2.打印日志StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value.toString());//3.异常堆栈信息输出e.printStackTrace();//4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作}}
import lombok.AllArgsConstructor;import lombok.Getter;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-07 16:24:20* @describe: 日志模板枚举*/@Getter@AllArgsConstructorpublic enum LogTemplateEnum {/*** Kafka消息发送成功日志模板*/KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),/*** Kafka消息发送失败日志模板*/KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),;private final String template;}
Producer
import cn.hutool.json.JSONObject;import com.ftc.jsonmessage.config.KafkaSendCallBackForJson;import lombok.SneakyThrows;import org.junit.jupiter.api.AfterEach;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.kafka.core.KafkaTemplate;import java.util.concurrent.TimeUnit;@SpringBootTestclass ProducerTests {@Autowiredprivate KafkaTemplate<String, JSONObject> kafkaTemplate;@Autowiredprivate KafkaSendCallBackForJson kafkaSendCallBackForJson;@AfterEach@SneakyThrows({InterruptedException.class})void stopSeconds() {TimeUnit.MILLISECONDS.sleep(500);}@Testvoid sendJson() {//1.创建消息JSONObject message = new JSONObject(true);message.set("id", 1);message.set("name", "ftc");//2.发送消息kafkaTemplate.send("json-message-topic", message).addCallback(kafkaSendCallBackForJson);}}
Consumer
import cn.hutool.json.JSONObject;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-08 16:06:48* @describe: JSON数据源监听*/@Componentpublic class JsonConsumer {@KafkaListener(topics = {"json-message-topic"}, groupId = "json-message-group")public void listener(ConsumerRecord<String, JSONObject> record) {System.out.println("JSON数据源_监听获取数据:" + record.value().toString());}}
验证
Producer
Consumer
4.Consumer规则验证
Producer代码
- 共发送5条消息,其中第1条消息未指定分区,其余4条消息的分区均为0分区 ```java import com.ftc.single_kafka_producer.config.KafkaSendCallBackForString; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate;
@SpringBootTest class SingleKafkaProducerApplicationTests {
@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Autowiredprivate KafkaSendCallBackForString kafkaSendCallBackForString;/*** 测试主题*/private static final String TEST_TOPIC = "test_topic";/*** 测试Key*/private static final String TEST_KEY = "test_key";@Testvoid sendMessage() {//1.发送消息,指定主题kafkaTemplate.send(TEST_TOPIC, "topic_message").addCallback(kafkaSendCallBackForString);//2.发送消息,指定主题、Key | 取模之后分区编号=0kafkaTemplate.send(TEST_TOPIC, TEST_KEY, "topic_key_message").addCallback(kafkaSendCallBackForString);//3.发送消息,指定主题、分区、KeykafkaTemplate.send(TEST_TOPIC, 0, TEST_KEY, "topic_partition_key_message").addCallback(kafkaSendCallBackForString);//4.发送消息,指定主题、分区、Key、时间戳kafkaTemplate.send(TEST_TOPIC, 0, System.currentTimeMillis(), TEST_KEY, "topic_partition_key_timestamp_message").addCallback(kafkaSendCallBackForString);//5.发送消息,通过ProducerRecord形式ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TEST_TOPIC, 0, TEST_KEY, "record_message");kafkaTemplate.send(producerRecord).addCallback(kafkaSendCallBackForString);}
}
<a name="pfll7"></a>### Consumer代码1. 共5个消费者2. 第1,2个消费者,为同组消费者,同时指定消费同一分区3. 第3个消费者,与1,2消费者同组,但消费另一分区4. 第4,5个消费者,与1,2,3不同组```javaimport org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-07 18:16:21* @describe: 监听者规则测试demo*/@Componentpublic class ConsumerRuleDemo {@KafkaListener(topics = {"test_topic"}, groupId = "${spring.kafka.consumer.group-id}",topicPartitions = {@TopicPartition(topic = "test_topic", partitions = {"0"})})public void listenerDemoV2(ConsumerRecord<String, String> record) {System.out.println("receive message from group test_group partition 0 N1:" + record.value());}@KafkaListener(topics = {"test_topic"}, groupId = "${spring.kafka.consumer.group-id}",topicPartitions = {@TopicPartition(topic = "test_topic", partitions = {"0"})})public void listenerDemoV3(ConsumerRecord<String, String> record) {System.out.println("receive message from group test_group partition 0 N2:" + record.value());}@KafkaListener(topics = {"test_topic"}, groupId = "${spring.kafka.consumer.group-id}",topicPartitions = {@TopicPartition(topic = "test_topic", partitions = {"1"})})public void listenerDemoV4(ConsumerRecord<String, String> record) {System.out.println("receive message from group test_group partition 1:" + record.value());}@KafkaListener(topics = {"test_topic"}, groupId = "test_group_bak")public void listenerDemoV5(ConsumerRecord<String, String> record) {System.out.println("receive message from group test_group_bak N1:" + record.value());}@KafkaListener(topics = {"test_topic"}, groupId = "test_group_bak")public void listenerDemoV6(ConsumerRecord<String, String> record) {System.out.println("receive message from group test_group_bak N2:" + record.value());}}
运行结果总结
控制台打印结果如下图:
- 第3个消费者消费了第1条消息
- 第1,2个消费者,同时消费了第2,3,4,5条消息
- 第4个消费者,同时消费了第1条消息
- 第5个消费者,同时消费了第2,3,4,5条消息

由此可以分析出
- 同一个消费者组的消费者,在不指定分区时,不会同时消费同一条消息
- 同一个消费者组的消费者,在指定分区时,可以同时消费同一条消息
- 不同消费者组的消费者,无论是否指定分区,都可以同时消费同一条消息


