1.项目地址
https://github.com/GuardFTC/Kafka-test.git
2.手动提交偏移量
在Springboot整合Kafka中,我们已经了解到了Kafka的基础使用,包括如何发送消息以及接受消息进行消费。
在具体的项目使用中,有时候对于消费者偏移量,需要我们在代码中手动提交,接下来开始讲解如何手动提交偏移量
创建topic
创建一个名为auto-commit-false-topic的Topic,分区数为1
创建项目
具体创建项目的流程这里不做过多累述,参考Springboot整合Kafka即可
编写配置
重点关注:enable-auto-commit: false
#服务器配置
server:
port: 8083
#spring配置
spring:
kafka:
bootstrap-servers: 120.48.107.224:9092
producer:
acks: -1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3
properties:
enable.idempotence: true
consumer:
group-id: auto-commit-false-group
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
编写发送结果处理类
import cn.hutool.log.StaticLog;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-07 16:13:57
* @describe: Kafka发布消息回调处理逻辑实现类
*/
@Component
public class KafkaSendCallBackForString implements KafkaSendCallback<String, String> {
@Override
public void onSuccess(SendResult<String, String> result) {
//1.获取消息属性
ProducerRecord<String, String> producerRecord = result.getProducerRecord();
String topic = producerRecord.topic();
Integer partition = producerRecord.partition();
String key = producerRecord.key();
String value = producerRecord.value();
//2.打印日志
StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);
}
@Override
public void onFailure(KafkaProducerException e) {
//1.获取消息属性
ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();
String topic = failedProducerRecord.topic();
Integer partition = failedProducerRecord.partition();
String key = failedProducerRecord.key();
String value = failedProducerRecord.value();
//2.打印日志
StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);
//3.异常堆栈信息输出
e.printStackTrace();
//4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作
}
}
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-07 16:24:20
* @describe: 日志模板枚举
*/
@Getter
@AllArgsConstructor
public enum LogTemplateEnum {
/**
* Kafka消息发送成功日志模板
*/
KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),
/**
* Kafka消息发送失败日志模板
*/
KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),
;
private final String template;
}
Producer
Producer代码不做过多累述,发送一条消息到对应Topic即可
import com.ftc.autocommitfalse.config.KafkaSendCallBackForString;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootTest
class ProducerTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaSendCallBackForString kafkaSendCallBackForString;
@Test
void sendMessage() {
//1.发送消息
kafkaTemplate.send("auto-commit-false-topic", "auto-commit-false-message")
.addCallback(kafkaSendCallBackForString);
}
}
Consumer
Consumer代码声明了两个消费者,分别属于不同的消费者组,这样可以保证两个消费者同时消费同一条消息
对于手动提交,包括同步提交和异步提交两种偏移量提交方式
- 同步提交:需要指定最大阻塞时间,避免消费者进程长时间阻塞
- 异步提交,需要指定callback参数,在提交异常时,根据具体场景进行处理 ```java import cn.hutool.core.util.ObjectUtil; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;
import java.time.Duration; import java.time.temporal.ChronoUnit;
/**
- @author: 冯铁城 [17615007230@163.com]
- @date: 2022-09-13 10:01:26
@describe: 手动提交偏移量消费者 */ @Component public class Consumer {
@KafkaListener(topics = {“auto-commit-false-topic”}, groupId = “auto-commit-false-topic-v1”) public void listenerBySyncCommit(ConsumerRecord
record, org.apache.kafka.clients.consumer.Consumer consumer) { //1.处理消息
System.out.println("sync receive message:" + record.value());
//2.同步提交偏移量
consumer.commitSync(Duration.of(2000, ChronoUnit.MILLIS));
}
@KafkaListener(topics = {“auto-commit-false-topic”}, groupId = “auto-commit-false-topic-v2”) public void listenerByAsyncCommit(ConsumerRecord
record, org.apache.kafka.clients.consumer.Consumer consumer) { //1.处理消息
System.out.println("async receive message:" + record.value());
//2.异步提交偏移量
consumer.commitAsync((offsets, exception) -> {
if (ObjectUtil.isNotNull(exception)) {
//3.打印异常信息
exception.printStackTrace();
//4.TODO 进行消息重发等机制
}
});
验证
Producer
Consumer
3.JSON序列化的消息的发送与接收
创建Topic
创建一个名为json-message-topic的Topic
创建项目
具体创建项目的流程这里不做过多累述,参考Springboot整合Kafka
编写JSON序列化器/JSON反序列化器
JSON序列化器
```java import cn.hutool.json.JSONObject; import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.StandardCharsets;
/**
- @author: 冯铁城 [17615007230@163.com]
- @date: 2022-09-13 11:27:20
@describe: JSON序列化器 */ public class JsonSerialize implements Serializer
{ @Override public byte[] serialize(String topic, JSONObject data) {
return data.toString().getBytes(StandardCharsets.UTF_8);
JSON反序列化器
```java import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import org.apache.kafka.common.serialization.Deserializer;
import java.nio.charset.StandardCharsets;
/**
- @author: 冯铁城 [17615007230@163.com]
- @date: 2022-09-13 11:18:14
@describe: JSON反序列化器 */ public class JsonDeserialize implements Deserializer
{ @Override public JSONObject deserialize(String topic, byte[] data) {
return JSONUtil.parseObj(StrUtil.str(data, StandardCharsets.UTF_8));
编写配置
重点关注:value-serializer ```yaml
服务器配置
server: port: 8084
spring配置
spring: kafka: bootstrap-servers: 120.48.107.224:9092 producer: acks: -1 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.ftc.jsonmessage.config.serialize.JsonSerialize retries: 3 properties: enable.idempotence: true consumer: group-id: json-message-group enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.ftc.jsonmessage.config.serialize.JsonDeserialize auto-offset-reset: earliest
<a name="PRe4L"></a>
### 编写发送结果处理类
```java
import cn.hutool.json.JSONObject;
import cn.hutool.log.StaticLog;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-07 16:13:57
* @describe: Kafka发布消息回调处理逻辑实现类
*/
@Component
public class KafkaSendCallBackForJson implements KafkaSendCallback<String, JSONObject> {
@Override
public void onSuccess(SendResult<String, JSONObject> result) {
//1.获取消息属性
ProducerRecord<String, JSONObject> producerRecord = result.getProducerRecord();
String topic = producerRecord.topic();
Integer partition = producerRecord.partition();
String key = producerRecord.key();
JSONObject value = producerRecord.value();
//2.打印日志
StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value.toString());
}
@Override
public void onFailure(KafkaProducerException e) {
//1.获取消息属性
ProducerRecord<String, JSONObject> failedProducerRecord = e.getFailedProducerRecord();
String topic = failedProducerRecord.topic();
Integer partition = failedProducerRecord.partition();
String key = failedProducerRecord.key();
JSONObject value = failedProducerRecord.value();
//2.打印日志
StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value.toString());
//3.异常堆栈信息输出
e.printStackTrace();
//4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作
}
}
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-07 16:24:20
* @describe: 日志模板枚举
*/
@Getter
@AllArgsConstructor
public enum LogTemplateEnum {
/**
* Kafka消息发送成功日志模板
*/
KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),
/**
* Kafka消息发送失败日志模板
*/
KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),
;
private final String template;
}
Producer
import cn.hutool.json.JSONObject;
import com.ftc.jsonmessage.config.KafkaSendCallBackForJson;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.concurrent.TimeUnit;
@SpringBootTest
class ProducerTests {
@Autowired
private KafkaTemplate<String, JSONObject> kafkaTemplate;
@Autowired
private KafkaSendCallBackForJson kafkaSendCallBackForJson;
@AfterEach
@SneakyThrows({InterruptedException.class})
void stopSeconds() {
TimeUnit.MILLISECONDS.sleep(500);
}
@Test
void sendJson() {
//1.创建消息
JSONObject message = new JSONObject(true);
message.set("id", 1);
message.set("name", "ftc");
//2.发送消息
kafkaTemplate.send("json-message-topic", message)
.addCallback(kafkaSendCallBackForJson);
}
}
Consumer
import cn.hutool.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-08 16:06:48
* @describe: JSON数据源监听
*/
@Component
public class JsonConsumer {
@KafkaListener(topics = {"json-message-topic"}, groupId = "json-message-group")
public void listener(ConsumerRecord<String, JSONObject> record) {
System.out.println("JSON数据源_监听获取数据:" + record.value().toString());
}
}
验证
Producer
Consumer
4.Consumer规则验证
Producer代码
- 共发送5条消息,其中第1条消息未指定分区,其余4条消息的分区均为0分区 ```java import com.ftc.single_kafka_producer.config.KafkaSendCallBackForString; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate;
@SpringBootTest class SingleKafkaProducerApplicationTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaSendCallBackForString kafkaSendCallBackForString;
/**
* 测试主题
*/
private static final String TEST_TOPIC = "test_topic";
/**
* 测试Key
*/
private static final String TEST_KEY = "test_key";
@Test
void sendMessage() {
//1.发送消息,指定主题
kafkaTemplate.send(TEST_TOPIC, "topic_message")
.addCallback(kafkaSendCallBackForString);
//2.发送消息,指定主题、Key | 取模之后分区编号=0
kafkaTemplate.send(TEST_TOPIC, TEST_KEY, "topic_key_message")
.addCallback(kafkaSendCallBackForString);
//3.发送消息,指定主题、分区、Key
kafkaTemplate.send(TEST_TOPIC, 0, TEST_KEY, "topic_partition_key_message")
.addCallback(kafkaSendCallBackForString);
//4.发送消息,指定主题、分区、Key、时间戳
kafkaTemplate.send(TEST_TOPIC, 0, System.currentTimeMillis(), TEST_KEY, "topic_partition_key_timestamp_message")
.addCallback(kafkaSendCallBackForString);
//5.发送消息,通过ProducerRecord形式
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TEST_TOPIC, 0, TEST_KEY, "record_message");
kafkaTemplate.send(producerRecord).addCallback(kafkaSendCallBackForString);
}
}
<a name="pfll7"></a>
### Consumer代码
1. 共5个消费者
2. 第1,2个消费者,为同组消费者,同时指定消费同一分区
3. 第3个消费者,与1,2消费者同组,但消费另一分区
4. 第4,5个消费者,与1,2,3不同组
```java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-07 18:16:21
* @describe: 监听者规则测试demo
*/
@Component
public class ConsumerRuleDemo {
@KafkaListener(topics = {"test_topic"}, groupId = "${spring.kafka.consumer.group-id}",
topicPartitions = {@TopicPartition(topic = "test_topic", partitions = {"0"})}
)
public void listenerDemoV2(ConsumerRecord<String, String> record) {
System.out.println("receive message from group test_group partition 0 N1:" + record.value());
}
@KafkaListener(topics = {"test_topic"}, groupId = "${spring.kafka.consumer.group-id}",
topicPartitions = {@TopicPartition(topic = "test_topic", partitions = {"0"})}
)
public void listenerDemoV3(ConsumerRecord<String, String> record) {
System.out.println("receive message from group test_group partition 0 N2:" + record.value());
}
@KafkaListener(topics = {"test_topic"}, groupId = "${spring.kafka.consumer.group-id}",
topicPartitions = {@TopicPartition(topic = "test_topic", partitions = {"1"})}
)
public void listenerDemoV4(ConsumerRecord<String, String> record) {
System.out.println("receive message from group test_group partition 1:" + record.value());
}
@KafkaListener(topics = {"test_topic"}, groupId = "test_group_bak")
public void listenerDemoV5(ConsumerRecord<String, String> record) {
System.out.println("receive message from group test_group_bak N1:" + record.value());
}
@KafkaListener(topics = {"test_topic"}, groupId = "test_group_bak")
public void listenerDemoV6(ConsumerRecord<String, String> record) {
System.out.println("receive message from group test_group_bak N2:" + record.value());
}
}
运行结果总结
控制台打印结果如下图:
- 第3个消费者消费了第1条消息
- 第1,2个消费者,同时消费了第2,3,4,5条消息
- 第4个消费者,同时消费了第1条消息
- 第5个消费者,同时消费了第2,3,4,5条消息
由此可以分析出
- 同一个消费者组的消费者,在不指定分区时,不会同时消费同一条消息
- 同一个消费者组的消费者,在指定分区时,可以同时消费同一条消息
- 不同消费者组的消费者,无论是否指定分区,都可以同时消费同一条消息