1.项目地址

https://github.com/GuardFTC/Kafka-test.git

2.手动提交偏移量

Springboot整合Kafka中,我们已经了解到了Kafka的基础使用,包括如何发送消息以及接受消息进行消费。
在具体的项目使用中,有时候对于消费者偏移量,需要我们在代码中手动提交,接下来开始讲解如何手动提交偏移量

创建topic

创建一个名为auto-commit-false-topic的Topic,分区数为1
image.png

创建项目

具体创建项目的流程这里不做过多累述,参考Springboot整合Kafka即可

编写配置

重点关注:enable-auto-commit: false

  1. #服务器配置
  2. server:
  3. port: 8083
  4. #spring配置
  5. spring:
  6. kafka:
  7. bootstrap-servers: 120.48.107.224:9092
  8. producer:
  9. acks: -1
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. retries: 3
  13. properties:
  14. enable.idempotence: true
  15. consumer:
  16. group-id: auto-commit-false-group
  17. enable-auto-commit: false
  18. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  19. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  20. auto-offset-reset: earliest

编写发送结果处理类

  1. import cn.hutool.log.StaticLog;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.springframework.kafka.core.KafkaProducerException;
  4. import org.springframework.kafka.core.KafkaSendCallback;
  5. import org.springframework.kafka.support.SendResult;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * @author: 冯铁城 [17615007230@163.com]
  9. * @date: 2022-09-07 16:13:57
  10. * @describe: Kafka发布消息回调处理逻辑实现类
  11. */
  12. @Component
  13. public class KafkaSendCallBackForString implements KafkaSendCallback<String, String> {
  14. @Override
  15. public void onSuccess(SendResult<String, String> result) {
  16. //1.获取消息属性
  17. ProducerRecord<String, String> producerRecord = result.getProducerRecord();
  18. String topic = producerRecord.topic();
  19. Integer partition = producerRecord.partition();
  20. String key = producerRecord.key();
  21. String value = producerRecord.value();
  22. //2.打印日志
  23. StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);
  24. }
  25. @Override
  26. public void onFailure(KafkaProducerException e) {
  27. //1.获取消息属性
  28. ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();
  29. String topic = failedProducerRecord.topic();
  30. Integer partition = failedProducerRecord.partition();
  31. String key = failedProducerRecord.key();
  32. String value = failedProducerRecord.value();
  33. //2.打印日志
  34. StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);
  35. //3.异常堆栈信息输出
  36. e.printStackTrace();
  37. //4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作
  38. }
  39. }
  1. import lombok.AllArgsConstructor;
  2. import lombok.Getter;
  3. /**
  4. * @author: 冯铁城 [17615007230@163.com]
  5. * @date: 2022-09-07 16:24:20
  6. * @describe: 日志模板枚举
  7. */
  8. @Getter
  9. @AllArgsConstructor
  10. public enum LogTemplateEnum {
  11. /**
  12. * Kafka消息发送成功日志模板
  13. */
  14. KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),
  15. /**
  16. * Kafka消息发送失败日志模板
  17. */
  18. KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),
  19. ;
  20. private final String template;
  21. }

Producer

Producer代码不做过多累述,发送一条消息到对应Topic即可

  1. import com.ftc.autocommitfalse.config.KafkaSendCallBackForString;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. @SpringBootTest
  7. class ProducerTests {
  8. @Autowired
  9. private KafkaTemplate<String, String> kafkaTemplate;
  10. @Autowired
  11. private KafkaSendCallBackForString kafkaSendCallBackForString;
  12. @Test
  13. void sendMessage() {
  14. //1.发送消息
  15. kafkaTemplate.send("auto-commit-false-topic", "auto-commit-false-message")
  16. .addCallback(kafkaSendCallBackForString);
  17. }
  18. }

Consumer

Consumer代码声明了两个消费者,分别属于不同的消费者组,这样可以保证两个消费者同时消费同一条消息
对于手动提交,包括同步提交和异步提交两种偏移量提交方式

  1. 同步提交:需要指定最大阻塞时间,避免消费者进程长时间阻塞
  2. 异步提交,需要指定callback参数,在提交异常时,根据具体场景进行处理 ```java import cn.hutool.core.util.ObjectUtil; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;

import java.time.Duration; import java.time.temporal.ChronoUnit;

/**

  • @author: 冯铁城 [17615007230@163.com]
  • @date: 2022-09-13 10:01:26
  • @describe: 手动提交偏移量消费者 */ @Component public class Consumer {

    @KafkaListener(topics = {“auto-commit-false-topic”}, groupId = “auto-commit-false-topic-v1”) public void listenerBySyncCommit(ConsumerRecord record, org.apache.kafka.clients.consumer.Consumer consumer) {

    1. //1.处理消息
    2. System.out.println("sync receive message:" + record.value());
    3. //2.同步提交偏移量
    4. consumer.commitSync(Duration.of(2000, ChronoUnit.MILLIS));

    }

    @KafkaListener(topics = {“auto-commit-false-topic”}, groupId = “auto-commit-false-topic-v2”) public void listenerByAsyncCommit(ConsumerRecord record, org.apache.kafka.clients.consumer.Consumer consumer) {

    1. //1.处理消息
    2. System.out.println("async receive message:" + record.value());
    3. //2.异步提交偏移量
    4. consumer.commitAsync((offsets, exception) -> {
    5. if (ObjectUtil.isNotNull(exception)) {
    6. //3.打印异常信息
    7. exception.printStackTrace();
    8. //4.TODO 进行消息重发等机制
    9. }
    10. });

    } } ```

    验证

    Producer

    如图,消息发送成功
    image.png

    Consumer

    如图,消息消费成功image.png
    查看两个消费者的偏移量,偏移量也提交成功
    image.png
    image.png

    3.JSON序列化的消息的发送与接收

    创建Topic

    创建一个名为json-message-topic的Topic
    image.png

    创建项目

    具体创建项目的流程这里不做过多累述,参考Springboot整合Kafka

    编写JSON序列化器/JSON反序列化器

    JSON序列化器

    ```java import cn.hutool.json.JSONObject; import org.apache.kafka.common.serialization.Serializer;

import java.nio.charset.StandardCharsets;

/**

  • @author: 冯铁城 [17615007230@163.com]
  • @date: 2022-09-13 11:27:20
  • @describe: JSON序列化器 */ public class JsonSerialize implements Serializer {

    @Override public byte[] serialize(String topic, JSONObject data) {

    1. return data.toString().getBytes(StandardCharsets.UTF_8);

    } } ```

    JSON反序列化器

    ```java import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import org.apache.kafka.common.serialization.Deserializer;

import java.nio.charset.StandardCharsets;

/**

  • @author: 冯铁城 [17615007230@163.com]
  • @date: 2022-09-13 11:18:14
  • @describe: JSON反序列化器 */ public class JsonDeserialize implements Deserializer {

    @Override public JSONObject deserialize(String topic, byte[] data) {

    1. return JSONUtil.parseObj(StrUtil.str(data, StandardCharsets.UTF_8));

    } } ```

    编写配置

    重点关注:value-serializer ```yaml

    服务器配置

    server: port: 8084

spring配置

spring: kafka: bootstrap-servers: 120.48.107.224:9092 producer: acks: -1 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.ftc.jsonmessage.config.serialize.JsonSerialize retries: 3 properties: enable.idempotence: true consumer: group-id: json-message-group enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.ftc.jsonmessage.config.serialize.JsonDeserialize auto-offset-reset: earliest

  1. <a name="PRe4L"></a>
  2. ### 编写发送结果处理类
  3. ```java
  4. import cn.hutool.json.JSONObject;
  5. import cn.hutool.log.StaticLog;
  6. import org.apache.kafka.clients.producer.ProducerRecord;
  7. import org.springframework.kafka.core.KafkaProducerException;
  8. import org.springframework.kafka.core.KafkaSendCallback;
  9. import org.springframework.kafka.support.SendResult;
  10. import org.springframework.stereotype.Component;
  11. /**
  12. * @author: 冯铁城 [17615007230@163.com]
  13. * @date: 2022-09-07 16:13:57
  14. * @describe: Kafka发布消息回调处理逻辑实现类
  15. */
  16. @Component
  17. public class KafkaSendCallBackForJson implements KafkaSendCallback<String, JSONObject> {
  18. @Override
  19. public void onSuccess(SendResult<String, JSONObject> result) {
  20. //1.获取消息属性
  21. ProducerRecord<String, JSONObject> producerRecord = result.getProducerRecord();
  22. String topic = producerRecord.topic();
  23. Integer partition = producerRecord.partition();
  24. String key = producerRecord.key();
  25. JSONObject value = producerRecord.value();
  26. //2.打印日志
  27. StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value.toString());
  28. }
  29. @Override
  30. public void onFailure(KafkaProducerException e) {
  31. //1.获取消息属性
  32. ProducerRecord<String, JSONObject> failedProducerRecord = e.getFailedProducerRecord();
  33. String topic = failedProducerRecord.topic();
  34. Integer partition = failedProducerRecord.partition();
  35. String key = failedProducerRecord.key();
  36. JSONObject value = failedProducerRecord.value();
  37. //2.打印日志
  38. StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value.toString());
  39. //3.异常堆栈信息输出
  40. e.printStackTrace();
  41. //4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作
  42. }
  43. }
  1. import lombok.AllArgsConstructor;
  2. import lombok.Getter;
  3. /**
  4. * @author: 冯铁城 [17615007230@163.com]
  5. * @date: 2022-09-07 16:24:20
  6. * @describe: 日志模板枚举
  7. */
  8. @Getter
  9. @AllArgsConstructor
  10. public enum LogTemplateEnum {
  11. /**
  12. * Kafka消息发送成功日志模板
  13. */
  14. KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),
  15. /**
  16. * Kafka消息发送失败日志模板
  17. */
  18. KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),
  19. ;
  20. private final String template;
  21. }

Producer

  1. import cn.hutool.json.JSONObject;
  2. import com.ftc.jsonmessage.config.KafkaSendCallBackForJson;
  3. import lombok.SneakyThrows;
  4. import org.junit.jupiter.api.AfterEach;
  5. import org.junit.jupiter.api.Test;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import org.springframework.kafka.core.KafkaTemplate;
  9. import java.util.concurrent.TimeUnit;
  10. @SpringBootTest
  11. class ProducerTests {
  12. @Autowired
  13. private KafkaTemplate<String, JSONObject> kafkaTemplate;
  14. @Autowired
  15. private KafkaSendCallBackForJson kafkaSendCallBackForJson;
  16. @AfterEach
  17. @SneakyThrows({InterruptedException.class})
  18. void stopSeconds() {
  19. TimeUnit.MILLISECONDS.sleep(500);
  20. }
  21. @Test
  22. void sendJson() {
  23. //1.创建消息
  24. JSONObject message = new JSONObject(true);
  25. message.set("id", 1);
  26. message.set("name", "ftc");
  27. //2.发送消息
  28. kafkaTemplate.send("json-message-topic", message)
  29. .addCallback(kafkaSendCallBackForJson);
  30. }
  31. }

Consumer

  1. import cn.hutool.json.JSONObject;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @author: 冯铁城 [17615007230@163.com]
  7. * @date: 2022-09-08 16:06:48
  8. * @describe: JSON数据源监听
  9. */
  10. @Component
  11. public class JsonConsumer {
  12. @KafkaListener(topics = {"json-message-topic"}, groupId = "json-message-group")
  13. public void listener(ConsumerRecord<String, JSONObject> record) {
  14. System.out.println("JSON数据源_监听获取数据:" + record.value().toString());
  15. }
  16. }

验证

Producer

消息发送成功
image.png

Consumer

如下,JSON形式消息接收成功
image.png

4.Consumer规则验证

Producer代码

  1. 共发送5条消息,其中第1条消息未指定分区,其余4条消息的分区均为0分区 ```java import com.ftc.single_kafka_producer.config.KafkaSendCallBackForString; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate;

@SpringBootTest class SingleKafkaProducerApplicationTests {

  1. @Autowired
  2. private KafkaTemplate<String, String> kafkaTemplate;
  3. @Autowired
  4. private KafkaSendCallBackForString kafkaSendCallBackForString;
  5. /**
  6. * 测试主题
  7. */
  8. private static final String TEST_TOPIC = "test_topic";
  9. /**
  10. * 测试Key
  11. */
  12. private static final String TEST_KEY = "test_key";
  13. @Test
  14. void sendMessage() {
  15. //1.发送消息,指定主题
  16. kafkaTemplate.send(TEST_TOPIC, "topic_message")
  17. .addCallback(kafkaSendCallBackForString);
  18. //2.发送消息,指定主题、Key | 取模之后分区编号=0
  19. kafkaTemplate.send(TEST_TOPIC, TEST_KEY, "topic_key_message")
  20. .addCallback(kafkaSendCallBackForString);
  21. //3.发送消息,指定主题、分区、Key
  22. kafkaTemplate.send(TEST_TOPIC, 0, TEST_KEY, "topic_partition_key_message")
  23. .addCallback(kafkaSendCallBackForString);
  24. //4.发送消息,指定主题、分区、Key、时间戳
  25. kafkaTemplate.send(TEST_TOPIC, 0, System.currentTimeMillis(), TEST_KEY, "topic_partition_key_timestamp_message")
  26. .addCallback(kafkaSendCallBackForString);
  27. //5.发送消息,通过ProducerRecord形式
  28. ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TEST_TOPIC, 0, TEST_KEY, "record_message");
  29. kafkaTemplate.send(producerRecord).addCallback(kafkaSendCallBackForString);
  30. }

}

  1. <a name="pfll7"></a>
  2. ### Consumer代码
  3. 1. 共5个消费者
  4. 2. 第1,2个消费者,为同组消费者,同时指定消费同一分区
  5. 3. 第3个消费者,与1,2消费者同组,但消费另一分区
  6. 4. 第4,5个消费者,与1,2,3不同组
  7. ```java
  8. import org.apache.kafka.clients.consumer.ConsumerRecord;
  9. import org.springframework.kafka.annotation.KafkaListener;
  10. import org.springframework.kafka.annotation.TopicPartition;
  11. import org.springframework.stereotype.Component;
  12. /**
  13. * @author: 冯铁城 [17615007230@163.com]
  14. * @date: 2022-09-07 18:16:21
  15. * @describe: 监听者规则测试demo
  16. */
  17. @Component
  18. public class ConsumerRuleDemo {
  19. @KafkaListener(topics = {"test_topic"}, groupId = "${spring.kafka.consumer.group-id}",
  20. topicPartitions = {@TopicPartition(topic = "test_topic", partitions = {"0"})}
  21. )
  22. public void listenerDemoV2(ConsumerRecord<String, String> record) {
  23. System.out.println("receive message from group test_group partition 0 N1:" + record.value());
  24. }
  25. @KafkaListener(topics = {"test_topic"}, groupId = "${spring.kafka.consumer.group-id}",
  26. topicPartitions = {@TopicPartition(topic = "test_topic", partitions = {"0"})}
  27. )
  28. public void listenerDemoV3(ConsumerRecord<String, String> record) {
  29. System.out.println("receive message from group test_group partition 0 N2:" + record.value());
  30. }
  31. @KafkaListener(topics = {"test_topic"}, groupId = "${spring.kafka.consumer.group-id}",
  32. topicPartitions = {@TopicPartition(topic = "test_topic", partitions = {"1"})}
  33. )
  34. public void listenerDemoV4(ConsumerRecord<String, String> record) {
  35. System.out.println("receive message from group test_group partition 1:" + record.value());
  36. }
  37. @KafkaListener(topics = {"test_topic"}, groupId = "test_group_bak")
  38. public void listenerDemoV5(ConsumerRecord<String, String> record) {
  39. System.out.println("receive message from group test_group_bak N1:" + record.value());
  40. }
  41. @KafkaListener(topics = {"test_topic"}, groupId = "test_group_bak")
  42. public void listenerDemoV6(ConsumerRecord<String, String> record) {
  43. System.out.println("receive message from group test_group_bak N2:" + record.value());
  44. }
  45. }

运行结果总结

控制台打印结果如下图:

  1. 第3个消费者消费了第1条消息
  2. 第1,2个消费者,同时消费了第2,3,4,5条消息
  3. 第4个消费者,同时消费了第1条消息
  4. 第5个消费者,同时消费了第2,3,4,5条消息

image.png
由此可以分析出

  1. 同一个消费者组的消费者,在不指定分区时,不会同时消费同一条消息
  2. 同一个消费者组的消费者,在指定分区时,可以同时消费同一条消息
  3. 不同消费者组的消费者,无论是否指定分区,都可以同时消费同一条消息