1.项目地址

https://github.com/GuardFTC/Kafka-test.git

2.单数据源

创建Topic

通常来说,不建议自动创建主题,因为这样可能会出现很多命名不规范,且完全无意义的主题。因此,接下来的所有例子,我们需要手动创建Topic
如图,创建一个名为single-kafka-topic的Topic,该Topic包含2个Partition。
image.png

添加依赖

  1. <!--Kafka依赖-->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. </dependency>
  6. <!--web依赖-->
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-web</artifactId>
  10. </dependency>
  11. <!--lombok依赖-->
  12. <dependency>
  13. <groupId>org.projectlombok</groupId>
  14. <artifactId>lombok</artifactId>
  15. <optional>true</optional>
  16. </dependency>
  17. <!--Hutool依赖-->
  18. <dependency>
  19. <groupId>cn.hutool</groupId>
  20. <artifactId>hutool-all</artifactId>
  21. <version>5.8.6</version>
  22. </dependency>
  23. <!--单元测试依赖-->
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-test</artifactId>
  27. <scope>test</scope>
  28. </dependency>

Producer

yml配置

  1. #服务器配置
  2. server:
  3. port: 8081
  4. #spring配置
  5. spring:
  6. kafka:
  7. bootstrap-servers: 120.48.107.224:9092
  8. producer:
  9. acks: -1
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. retries: 3
  13. properties:
  14. enable.idempotence: true
  15. retry.backoff.ms: 100

添加发送结果处理类

  1. import cn.hutool.log.StaticLog;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.springframework.kafka.core.KafkaProducerException;
  4. import org.springframework.kafka.core.KafkaSendCallback;
  5. import org.springframework.kafka.support.SendResult;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * @author: 冯铁城 [17615007230@163.com]
  9. * @date: 2022-09-07 16:13:57
  10. * @describe: Kafka发布消息回调处理逻辑实现类
  11. */
  12. @Component
  13. public class KafkaSendCallBackForString implements KafkaSendCallback<String, String> {
  14. @Override
  15. public void onSuccess(SendResult<String, String> result) {
  16. //1.获取消息属性
  17. ProducerRecord<String, String> producerRecord = result.getProducerRecord();
  18. String topic = producerRecord.topic();
  19. Integer partition = producerRecord.partition();
  20. String key = producerRecord.key();
  21. String value = producerRecord.value();
  22. //2.打印日志
  23. StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);
  24. }
  25. @Override
  26. public void onFailure(KafkaProducerException e) {
  27. //1.获取消息属性
  28. ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();
  29. String topic = failedProducerRecord.topic();
  30. Integer partition = failedProducerRecord.partition();
  31. String key = failedProducerRecord.key();
  32. String value = failedProducerRecord.value();
  33. //2.打印日志
  34. StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);
  35. //3.异常堆栈信息输出
  36. e.printStackTrace();
  37. //4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作
  38. }
  39. }
  1. import lombok.AllArgsConstructor;
  2. import lombok.Getter;
  3. /**
  4. * @author: 冯铁城 [17615007230@163.com]
  5. * @date: 2022-09-07 16:24:20
  6. * @describe: 日志模板枚举
  7. */
  8. @Getter
  9. @AllArgsConstructor
  10. public enum LogTemplateEnum {
  11. /**
  12. * Kafka消息发送成功日志模板
  13. */
  14. KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),
  15. /**
  16. * Kafka消息发送失败日志模板
  17. */
  18. KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),
  19. ;
  20. private final String template;
  21. }

发布消息

如下,可以看到,发送消息时,可以指定Topic,Partition,Key,Timestamp,或通过ProducerRecord形式发送消息

  1. import com.ftc.single.config.KafkaSendCallBackForString;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. @SpringBootTest
  8. class ProducerTests {
  9. @Autowired
  10. private KafkaTemplate<String, String> kafkaTemplate;
  11. @Autowired
  12. private KafkaSendCallBackForString kafkaSendCallBackForString;
  13. /**
  14. * 测试主题
  15. */
  16. private static final String TEST_TOPIC = "single-kafka-topic";
  17. /**
  18. * 测试Key
  19. */
  20. private static final String TEST_KEY = "single-kafka-key";
  21. @Test
  22. void sendMessage() {
  23. //1.发送消息,指定主题
  24. kafkaTemplate.send(TEST_TOPIC, "topic_message")
  25. .addCallback(kafkaSendCallBackForString);
  26. //2.发送消息,指定主题、Key | 取模之后分区编号=0
  27. kafkaTemplate.send(TEST_TOPIC, TEST_KEY, "topic_key_message")
  28. .addCallback(kafkaSendCallBackForString);
  29. //3.发送消息,指定主题、分区、Key
  30. kafkaTemplate.send(TEST_TOPIC, 0, TEST_KEY, "topic_partition_key_message")
  31. .addCallback(kafkaSendCallBackForString);
  32. //4.发送消息,指定主题、分区、Key、时间戳
  33. kafkaTemplate.send(TEST_TOPIC, 0, System.currentTimeMillis(), TEST_KEY, "topic_partition_key_timestamp_message")
  34. .addCallback(kafkaSendCallBackForString);
  35. //5.发送消息,通过ProducerRecord形式
  36. ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TEST_TOPIC, 0, TEST_KEY, "record_message");
  37. kafkaTemplate.send(producerRecord)
  38. .addCallback(kafkaSendCallBackForString);
  39. }
  40. }

Consumer

yml配置

  1. #服务器配置
  2. server:
  3. port: 8081
  4. #spring配置
  5. spring:
  6. kafka:
  7. bootstrap-servers: 120.48.107.224:9092
  8. consumer:
  9. group-id: single-kafka-group
  10. enable-auto-commit: true
  11. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  12. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  13. auto-offset-reset: earliest
  14. auto-commit-interval: 5s

监听消息

如下,在监听消息时,可以指定Topic,GroupId以及Partition

  1. package com.ftc.single.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.kafka.annotation.TopicPartition;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @author: 冯铁城 [17615007230@163.com]
  8. * @date: 2022-09-13 10:01:26
  9. * @describe: 消费者demo
  10. */
  11. @Component
  12. public class Consumer {
  13. @KafkaListener(topics = {"single-kafka-topic"})
  14. public void listenerDemo(ConsumerRecord<String, String> record) {
  15. System.out.println("receive message:" + record.value());
  16. }
  17. @KafkaListener(topics = {"single-kafka-topic"}, groupId = "${spring.kafka.consumer.group-id}")
  18. public void listenerDemoV2(ConsumerRecord<String, String> record) {
  19. System.out.println("receive message:" + record.value());
  20. }
  21. @KafkaListener(topics = {"single-kafka-topic"}, groupId = "${spring.kafka.consumer.group-id}",
  22. topicPartitions = {@TopicPartition(topic = "single-kafka-topic", partitions = {"0"})}
  23. )
  24. public void listenerDemoV3(ConsumerRecord<String, String> record) {
  25. System.out.println("receive message from partition 0:" + record.value());
  26. }
  27. }

验证

注意

验证时,需要将消费者代码修改成如下状态,不然会提示偏移量提交异常

  1. package com.ftc.single.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @author: 冯铁城 [17615007230@163.com]
  7. * @date: 2022-09-13 10:01:26
  8. * @describe: 消费者demo
  9. */
  10. @Component
  11. public class Consumer {
  12. @KafkaListener(topics = {"single-kafka-topic"})
  13. public void listenerDemo(ConsumerRecord<String, String> record) {
  14. System.out.println("receive message:" + record.value());
  15. }
  16. }

消息发送

如图,消息发送成功
image.png

消息接收

如图,消息接收成功
image.png

3.多数据源

创建Topic

如图,创建一个名为multi-kafka-topic的Topic,该Topic包含2个Partition。
image.png

添加依赖

  1. <!--Kafka依赖-->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. </dependency>
  6. <!--web依赖-->
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-web</artifactId>
  10. </dependency>
  11. <!--lombok依赖-->
  12. <dependency>
  13. <groupId>org.projectlombok</groupId>
  14. <artifactId>lombok</artifactId>
  15. <optional>true</optional>
  16. </dependency>
  17. <!--Hutool依赖-->
  18. <dependency>
  19. <groupId>cn.hutool</groupId>
  20. <artifactId>hutool-all</artifactId>
  21. <version>5.8.6</version>
  22. </dependency>
  23. <!--单元测试依赖-->
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-test</artifactId>
  27. <scope>test</scope>
  28. </dependency>

Producer

yml配置

  1. #服务器配置
  2. server:
  3. port: 8082
  4. #spring配置
  5. kafka:
  6. #主数据源配置
  7. primary:
  8. producer:
  9. bootstrap-servers: 120.48.107.224:9092
  10. ack: "-1"
  11. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  13. retries: 3
  14. idempotence: true
  15. transaction-id-prefix: primary-transaction-producer-
  16. #从数据源配置
  17. secondary:
  18. producer:
  19. bootstrap-servers: 120.48.107.224:9092
  20. ack: "-1"
  21. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  22. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  23. retries: 3
  24. idempotence: true

配置类

  1. import lombok.Data;
  2. /**
  3. * @author: 冯铁城 [17615007230@163.com]
  4. * @date: 2022-09-08 13:39:47
  5. * @describe: Producer配置属性
  6. */
  7. @Data
  8. public class ProducerProperties {
  9. /**
  10. * 服务器集群地址
  11. */
  12. private String bootstrapServers;
  13. /**
  14. * ACK参数
  15. */
  16. private String ack;
  17. /**
  18. * Key序列化策略类
  19. */
  20. private String keySerializer;
  21. /**
  22. * Value序列化策略类
  23. */
  24. private String valueSerializer;
  25. /**
  26. * 重试次数
  27. */
  28. private int retries;
  29. /**
  30. * 是否开启幂等
  31. */
  32. private boolean idempotence;
  33. /**
  34. * 事务ID前缀,不为空代表开启Kafka事务
  35. */
  36. private String transactionIdPrefix;
  37. }
  1. import org.springframework.boot.context.properties.ConfigurationProperties;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.context.annotation.Primary;
  5. /**
  6. * @author: 冯铁城 [17615007230@163.com]
  7. * @date: 2022-09-08 15:28:45
  8. * @describe: Producer配置
  9. */
  10. @Configuration
  11. public class ProducerConfig {
  12. @Bean(name = "primaryProducerProperties")
  13. @ConfigurationProperties(prefix = "kafka.primary.producer")
  14. public ProducerProperties primaryProperties() {
  15. return new ProducerProperties();
  16. }
  17. @Primary
  18. @Bean(name = "secondaryProducerProperties")
  19. @ConfigurationProperties(prefix = "kafka.secondary.producer")
  20. public ProducerProperties secondaryProperties() {
  21. return new ProducerProperties();
  22. }
  23. }
  1. import lombok.RequiredArgsConstructor;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.kafka.transaction.KafkaTransactionManager;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. /**
  11. * @author: 冯铁城 [17615007230@163.com]
  12. * @date: 2022-09-08 13:42:00
  13. * @describe: Producer客户端配置
  14. */
  15. @Configuration
  16. @RequiredArgsConstructor
  17. public class ProducerClientConfig {
  18. private final com.ftc.multi.config.producer.ProducerConfig producerConfig;
  19. @Bean("primaryProducerTemplate")
  20. public KafkaTemplate<String, String> primaryTemplate() {
  21. return new KafkaTemplate<>(primaryFactory());
  22. }
  23. @Bean("primaryTransactionManager")
  24. public KafkaTransactionManager<String, String> primaryTransactionManager() {
  25. return new KafkaTransactionManager<>(primaryFactory());
  26. }
  27. @Bean("secondaryProducerTemplate")
  28. public KafkaTemplate<String, String> secondaryTemplate() {
  29. return new KafkaTemplate<>(secondaryFactory());
  30. }
  31. @Bean("primaryFactory")
  32. public DefaultKafkaProducerFactory<String, String> primaryFactory() {
  33. //1.获取配置
  34. ProducerProperties primaryProperties = producerConfig.primaryProperties();
  35. //2.获取配置
  36. Map<String, Object> props = getProps(primaryProperties);
  37. //3.创建工厂
  38. DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
  39. //4.开启事务
  40. producerFactory.setTransactionIdPrefix(primaryProperties.getTransactionIdPrefix());
  41. //5.返回
  42. return producerFactory;
  43. }
  44. @Bean("secondaryFactory")
  45. public DefaultKafkaProducerFactory<String, String> secondaryFactory() {
  46. //1.获取配置
  47. ProducerProperties secondaryProperties = producerConfig.secondaryProperties();
  48. //2.获取配置
  49. Map<String, Object> props = getProps(secondaryProperties);
  50. //3.创建工厂,返回
  51. return new DefaultKafkaProducerFactory<>(props);
  52. }
  53. /**
  54. * 封装不同的配置
  55. *
  56. * @param producerProperties Producer属性
  57. * @return 封装后的配置
  58. */
  59. private Map<String, Object> getProps(ProducerProperties producerProperties) {
  60. //1.生成配置
  61. Map<String, Object> props = new HashMap<>(10);
  62. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerProperties.getBootstrapServers());
  63. props.put(ProducerConfig.ACKS_CONFIG, producerProperties.getAck());
  64. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producerProperties.getKeySerializer());
  65. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producerProperties.getValueSerializer());
  66. props.put(ProducerConfig.RETRIES_CONFIG, producerProperties.getRetries());
  67. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, producerProperties.isIdempotence());
  68. //2.返回
  69. return props;
  70. }
  71. }

添加发送结果处理类

  1. import cn.hutool.log.StaticLog;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.springframework.kafka.core.KafkaProducerException;
  4. import org.springframework.kafka.core.KafkaSendCallback;
  5. import org.springframework.kafka.support.SendResult;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * @author: 冯铁城 [17615007230@163.com]
  9. * @date: 2022-09-07 16:13:57
  10. * @describe: Kafka发布消息回调处理逻辑实现类
  11. */
  12. @Component
  13. public class KafkaSendCallBackForString implements KafkaSendCallback<String, String> {
  14. @Override
  15. public void onSuccess(SendResult<String, String> result) {
  16. //1.获取消息属性
  17. ProducerRecord<String, String> producerRecord = result.getProducerRecord();
  18. String topic = producerRecord.topic();
  19. Integer partition = producerRecord.partition();
  20. String key = producerRecord.key();
  21. String value = producerRecord.value();
  22. //2.打印日志
  23. StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);
  24. }
  25. @Override
  26. public void onFailure(KafkaProducerException e) {
  27. //1.获取消息属性
  28. ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();
  29. String topic = failedProducerRecord.topic();
  30. Integer partition = failedProducerRecord.partition();
  31. String key = failedProducerRecord.key();
  32. String value = failedProducerRecord.value();
  33. //2.打印日志
  34. StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);
  35. //3.异常堆栈信息输出
  36. e.printStackTrace();
  37. //4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作
  38. }
  39. }
  1. import lombok.AllArgsConstructor;
  2. import lombok.Getter;
  3. /**
  4. * @author: 冯铁城 [17615007230@163.com]
  5. * @date: 2022-09-07 16:24:20
  6. * @describe: 日志模板枚举
  7. */
  8. @Getter
  9. @AllArgsConstructor
  10. public enum LogTemplateEnum {
  11. /**
  12. * Kafka消息发送成功日志模板
  13. */
  14. KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),
  15. /**
  16. * Kafka消息发送失败日志模板
  17. */
  18. KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),
  19. ;
  20. private final String template;
  21. }

发送消息demo

  1. import com.ftc.multi.config.KafkaSendCallBackForString;
  2. import lombok.SneakyThrows;
  3. import org.junit.jupiter.api.AfterEach;
  4. import org.junit.jupiter.api.Test;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.beans.factory.annotation.Qualifier;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import org.springframework.kafka.core.KafkaTemplate;
  9. import javax.annotation.Resource;
  10. import java.util.concurrent.TimeUnit;
  11. @SpringBootTest
  12. class ProducerTests {
  13. @Resource
  14. @Qualifier("primaryProducerTemplate")
  15. private KafkaTemplate<String, String> primaryTemplate;
  16. @Resource
  17. @Qualifier("secondaryProducerTemplate")
  18. private KafkaTemplate<String, String> secondaryTemplate;
  19. @Autowired
  20. private KafkaSendCallBackForString kafkaSendCallBackForString;
  21. @AfterEach
  22. @SneakyThrows({InterruptedException.class})
  23. void stopSeconds() {
  24. TimeUnit.MILLISECONDS.sleep(500);
  25. }
  26. @Test
  27. void send() {
  28. //1.主数据源发送消息
  29. primaryTemplate.send("multi-kafka-topic",
  30. 0,
  31. "primary",
  32. "primary_message"
  33. ).addCallback(kafkaSendCallBackForString);
  34. //2.备数据源发送消息
  35. secondaryTemplate.send(
  36. "multi-kafka-topic",
  37. 1,
  38. "secondary",
  39. "secondary_message"
  40. ).addCallback(kafkaSendCallBackForString);
  41. }
  42. }

Consumer

yml配置

  1. #服务器配置
  2. server:
  3. port: 8082
  4. #Kafka配置
  5. kafka:
  6. #主数据源配置
  7. primary:
  8. consumer:
  9. bootstrap-servers: 120.48.107.224:9092
  10. group-id: multi-kafka-group-primary
  11. enable-auto-commit: true
  12. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  13. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  14. auto-offset-reset: earliest
  15. poll-time: 3000
  16. #从数据源配置
  17. secondary:
  18. consumer:
  19. bootstrap-servers: 120.48.107.224:9092
  20. group-id: multi-kafka-group-secondary
  21. enable-auto-commit: true
  22. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  23. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  24. auto-offset-reset: earliest
  25. poll-time: 3000

配置类

  1. import lombok.Data;
  2. /**
  3. * @author: 冯铁城 [17615007230@163.com]
  4. * @date: 2022-09-08 19:09:37
  5. * @describe: Consumer属性
  6. */
  7. @Data
  8. public class ConsumerProperties {
  9. /**
  10. * 服务器集群地址
  11. */
  12. private String bootstrapServers;
  13. /**
  14. * 消费者组ID
  15. */
  16. private String groupId;
  17. /**
  18. * 是否自动提交
  19. */
  20. private boolean enableAutoCommit;
  21. /**
  22. * Key反序列化策略类
  23. */
  24. private String keyDeserializer;
  25. /**
  26. * Value反序列化策略类
  27. */
  28. private String valueDeserializer;
  29. /**
  30. * 偏移量消费策略
  31. */
  32. private String autoOffsetReset;
  33. /**
  34. * 拉取消息超时时间/ms
  35. */
  36. private int pollTime;
  37. }
  1. import org.springframework.boot.context.properties.ConfigurationProperties;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. /**
  5. * @author: 冯铁城 [17615007230@163.com]
  6. * @date: 2022-09-08 19:14:05
  7. * @describe: Consumer配置
  8. */
  9. @Configuration
  10. public class ConsumerConfig {
  11. @Bean("primaryConsumerProperties")
  12. @ConfigurationProperties("kafka.primary.consumer")
  13. public ConsumerProperties primaryProperties() {
  14. return new ConsumerProperties();
  15. }
  16. @Bean("secondaryConsumerProperties")
  17. @ConfigurationProperties("kafka.secondary.consumer")
  18. public ConsumerProperties secondaryProperties() {
  19. return new ConsumerProperties();
  20. }
  21. }
  1. import lombok.RequiredArgsConstructor;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.kafka.annotation.EnableKafka;
  6. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  7. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  8. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  9. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. /**
  13. * @author: 冯铁城 [17615007230@163.com]
  14. * @date: 2022-09-08 16:00:24
  15. * @describe: Consumer客户端配置
  16. */
  17. @EnableKafka
  18. @Configuration
  19. @RequiredArgsConstructor
  20. public class ConsumerClientConfig {
  21. private final com.ftc.multi.config.consumer.ConsumerConfig consumerConfig;
  22. @Bean("primaryConsumerFactory")
  23. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> primaryConsumerFactory() {
  24. //1.获取属性
  25. ConsumerProperties consumerProperties = consumerConfig.primaryProperties();
  26. //2.获取配置
  27. Map<String, Object> props = getProps(consumerProperties);
  28. //3.创建工厂
  29. DefaultKafkaConsumerFactory<Object, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
  30. //4.创建最终工厂
  31. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  32. factory.setConsumerFactory(consumerFactory);
  33. factory.getContainerProperties().setPollTimeout(consumerProperties.getPollTime());
  34. //5.返回
  35. return factory;
  36. }
  37. @Bean("secondaryConsumerFactory")
  38. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> secondaryConsumerFactory() {
  39. //1.获取属性
  40. ConsumerProperties secondaryProperties = consumerConfig.secondaryProperties();
  41. //2.获取配置
  42. Map<String, Object> props = getProps(secondaryProperties);
  43. //3.创建工厂
  44. DefaultKafkaConsumerFactory<Object, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
  45. //4.创建最终工厂
  46. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  47. factory.setConsumerFactory(consumerFactory);
  48. factory.getContainerProperties().setPollTimeout(secondaryProperties.getPollTime());
  49. //5.返回
  50. return factory;
  51. }
  52. /**
  53. * 封装不同的配置
  54. *
  55. * @param consumerProperties Consumer属性
  56. * @return 封装后的配置
  57. */
  58. private Map<String, Object> getProps(ConsumerProperties consumerProperties) {
  59. //.1生成配置
  60. Map<String, Object> props = new HashMap<>(10);
  61. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerProperties.getBootstrapServers());
  62. props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerProperties.getGroupId());
  63. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerProperties.isEnableAutoCommit());
  64. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumerProperties.getKeyDeserializer());
  65. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumerProperties.getValueDeserializer());
  66. //2.返回
  67. return props;
  68. }
  69. }

监听消息demo

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.kafka.annotation.TopicPartition;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @author: 冯铁城 [17615007230@163.com]
  7. * @date: 2022-09-08 16:06:48
  8. * @describe: 多数据源监听
  9. */
  10. @Component
  11. public class Consumer {
  12. @KafkaListener(
  13. topics = {"multi-kafka-topic"},
  14. containerFactory = "primaryConsumerFactory",
  15. topicPartitions = {@TopicPartition(topic = "multi-kafka-topic", partitions = {"0"})}
  16. )
  17. public void primaryListener(ConsumerRecord<String, String> record) {
  18. System.out.println("主数据源_监听获取数据:" + record.value());
  19. }
  20. @KafkaListener(
  21. topics = {"multi-kafka-topic"},
  22. containerFactory = "secondaryConsumerFactory",
  23. topicPartitions = {@TopicPartition(topic = "multi-kafka-topic", partitions = {"1"})}
  24. )
  25. public void secondaryListener(ConsumerRecord<String, String> record) {
  26. System.out.println("备数据源_监听获取数据:" + record.value());
  27. }
  28. }

验证

消息发送

如图,消息发送成功
image.png

消息接收

如图,消息接收成功
image.png