1.项目地址
https://github.com/GuardFTC/Kafka-test.git
2.单数据源
创建Topic
通常来说,不建议自动创建主题,因为这样可能会出现很多命名不规范,且完全无意义的主题。因此,接下来的所有例子,我们需要手动创建Topic
如图,创建一个名为single-kafka-topic的Topic,该Topic包含2个Partition。
添加依赖
<!--Kafka依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!--web依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--lombok依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--Hutool依赖--><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.6</version></dependency><!--单元测试依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
Producer
yml配置
#服务器配置server:port: 8081#spring配置spring:kafka:bootstrap-servers: 120.48.107.224:9092producer:acks: -1key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3properties:enable.idempotence: trueretry.backoff.ms: 100
添加发送结果处理类
import cn.hutool.log.StaticLog;import org.apache.kafka.clients.producer.ProducerRecord;import org.springframework.kafka.core.KafkaProducerException;import org.springframework.kafka.core.KafkaSendCallback;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-07 16:13:57* @describe: Kafka发布消息回调处理逻辑实现类*/@Componentpublic class KafkaSendCallBackForString implements KafkaSendCallback<String, String> {@Overridepublic void onSuccess(SendResult<String, String> result) {//1.获取消息属性ProducerRecord<String, String> producerRecord = result.getProducerRecord();String topic = producerRecord.topic();Integer partition = producerRecord.partition();String key = producerRecord.key();String value = producerRecord.value();//2.打印日志StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);}@Overridepublic void onFailure(KafkaProducerException e) {//1.获取消息属性ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();String topic = failedProducerRecord.topic();Integer partition = failedProducerRecord.partition();String key = failedProducerRecord.key();String value = failedProducerRecord.value();//2.打印日志StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);//3.异常堆栈信息输出e.printStackTrace();//4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作}}
import lombok.AllArgsConstructor;import lombok.Getter;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-07 16:24:20* @describe: 日志模板枚举*/@Getter@AllArgsConstructorpublic enum LogTemplateEnum {/*** Kafka消息发送成功日志模板*/KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),/*** Kafka消息发送失败日志模板*/KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),;private final String template;}
发布消息
如下,可以看到,发送消息时,可以指定Topic,Partition,Key,Timestamp,或通过ProducerRecord形式发送消息
import com.ftc.single.config.KafkaSendCallBackForString;import org.apache.kafka.clients.producer.ProducerRecord;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.kafka.core.KafkaTemplate;@SpringBootTestclass ProducerTests {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Autowiredprivate KafkaSendCallBackForString kafkaSendCallBackForString;/*** 测试主题*/private static final String TEST_TOPIC = "single-kafka-topic";/*** 测试Key*/private static final String TEST_KEY = "single-kafka-key";@Testvoid sendMessage() {//1.发送消息,指定主题kafkaTemplate.send(TEST_TOPIC, "topic_message").addCallback(kafkaSendCallBackForString);//2.发送消息,指定主题、Key | 取模之后分区编号=0kafkaTemplate.send(TEST_TOPIC, TEST_KEY, "topic_key_message").addCallback(kafkaSendCallBackForString);//3.发送消息,指定主题、分区、KeykafkaTemplate.send(TEST_TOPIC, 0, TEST_KEY, "topic_partition_key_message").addCallback(kafkaSendCallBackForString);//4.发送消息,指定主题、分区、Key、时间戳kafkaTemplate.send(TEST_TOPIC, 0, System.currentTimeMillis(), TEST_KEY, "topic_partition_key_timestamp_message").addCallback(kafkaSendCallBackForString);//5.发送消息,通过ProducerRecord形式ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TEST_TOPIC, 0, TEST_KEY, "record_message");kafkaTemplate.send(producerRecord).addCallback(kafkaSendCallBackForString);}}
Consumer
yml配置
#服务器配置server:port: 8081#spring配置spring:kafka:bootstrap-servers: 120.48.107.224:9092consumer:group-id: single-kafka-groupenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestauto-commit-interval: 5s
监听消息
如下,在监听消息时,可以指定Topic,GroupId以及Partition
package com.ftc.single.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-13 10:01:26* @describe: 消费者demo*/@Componentpublic class Consumer {@KafkaListener(topics = {"single-kafka-topic"})public void listenerDemo(ConsumerRecord<String, String> record) {System.out.println("receive message:" + record.value());}@KafkaListener(topics = {"single-kafka-topic"}, groupId = "${spring.kafka.consumer.group-id}")public void listenerDemoV2(ConsumerRecord<String, String> record) {System.out.println("receive message:" + record.value());}@KafkaListener(topics = {"single-kafka-topic"}, groupId = "${spring.kafka.consumer.group-id}",topicPartitions = {@TopicPartition(topic = "single-kafka-topic", partitions = {"0"})})public void listenerDemoV3(ConsumerRecord<String, String> record) {System.out.println("receive message from partition 0:" + record.value());}}
验证
注意
验证时,需要将消费者代码修改成如下状态,不然会提示偏移量提交异常
package com.ftc.single.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-13 10:01:26* @describe: 消费者demo*/@Componentpublic class Consumer {@KafkaListener(topics = {"single-kafka-topic"})public void listenerDemo(ConsumerRecord<String, String> record) {System.out.println("receive message:" + record.value());}}
消息发送
消息接收
3.多数据源
创建Topic
如图,创建一个名为multi-kafka-topic的Topic,该Topic包含2个Partition。
添加依赖
<!--Kafka依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!--web依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--lombok依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--Hutool依赖--><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.6</version></dependency><!--单元测试依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
Producer
yml配置
#服务器配置server:port: 8082#spring配置kafka:#主数据源配置primary:producer:bootstrap-servers: 120.48.107.224:9092ack: "-1"key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3idempotence: truetransaction-id-prefix: primary-transaction-producer-#从数据源配置secondary:producer:bootstrap-servers: 120.48.107.224:9092ack: "-1"key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3idempotence: true
配置类
import lombok.Data;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-08 13:39:47* @describe: Producer配置属性*/@Datapublic class ProducerProperties {/*** 服务器集群地址*/private String bootstrapServers;/*** ACK参数*/private String ack;/*** Key序列化策略类*/private String keySerializer;/*** Value序列化策略类*/private String valueSerializer;/*** 重试次数*/private int retries;/*** 是否开启幂等*/private boolean idempotence;/*** 事务ID前缀,不为空代表开启Kafka事务*/private String transactionIdPrefix;}
import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-08 15:28:45* @describe: Producer配置*/@Configurationpublic class ProducerConfig {@Bean(name = "primaryProducerProperties")@ConfigurationProperties(prefix = "kafka.primary.producer")public ProducerProperties primaryProperties() {return new ProducerProperties();}@Primary@Bean(name = "secondaryProducerProperties")@ConfigurationProperties(prefix = "kafka.secondary.producer")public ProducerProperties secondaryProperties() {return new ProducerProperties();}}
import lombok.RequiredArgsConstructor;import org.apache.kafka.clients.producer.ProducerConfig;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap;import java.util.Map;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-08 13:42:00* @describe: Producer客户端配置*/@Configuration@RequiredArgsConstructorpublic class ProducerClientConfig {private final com.ftc.multi.config.producer.ProducerConfig producerConfig;@Bean("primaryProducerTemplate")public KafkaTemplate<String, String> primaryTemplate() {return new KafkaTemplate<>(primaryFactory());}@Bean("primaryTransactionManager")public KafkaTransactionManager<String, String> primaryTransactionManager() {return new KafkaTransactionManager<>(primaryFactory());}@Bean("secondaryProducerTemplate")public KafkaTemplate<String, String> secondaryTemplate() {return new KafkaTemplate<>(secondaryFactory());}@Bean("primaryFactory")public DefaultKafkaProducerFactory<String, String> primaryFactory() {//1.获取配置ProducerProperties primaryProperties = producerConfig.primaryProperties();//2.获取配置Map<String, Object> props = getProps(primaryProperties);//3.创建工厂DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);//4.开启事务producerFactory.setTransactionIdPrefix(primaryProperties.getTransactionIdPrefix());//5.返回return producerFactory;}@Bean("secondaryFactory")public DefaultKafkaProducerFactory<String, String> secondaryFactory() {//1.获取配置ProducerProperties secondaryProperties = producerConfig.secondaryProperties();//2.获取配置Map<String, Object> props = getProps(secondaryProperties);//3.创建工厂,返回return new DefaultKafkaProducerFactory<>(props);}/*** 封装不同的配置** @param producerProperties Producer属性* @return 封装后的配置*/private Map<String, Object> getProps(ProducerProperties producerProperties) {//1.生成配置Map<String, Object> props = new HashMap<>(10);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerProperties.getBootstrapServers());props.put(ProducerConfig.ACKS_CONFIG, producerProperties.getAck());props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producerProperties.getKeySerializer());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producerProperties.getValueSerializer());props.put(ProducerConfig.RETRIES_CONFIG, producerProperties.getRetries());props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, producerProperties.isIdempotence());//2.返回return props;}}
添加发送结果处理类
import cn.hutool.log.StaticLog;import org.apache.kafka.clients.producer.ProducerRecord;import org.springframework.kafka.core.KafkaProducerException;import org.springframework.kafka.core.KafkaSendCallback;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-07 16:13:57* @describe: Kafka发布消息回调处理逻辑实现类*/@Componentpublic class KafkaSendCallBackForString implements KafkaSendCallback<String, String> {@Overridepublic void onSuccess(SendResult<String, String> result) {//1.获取消息属性ProducerRecord<String, String> producerRecord = result.getProducerRecord();String topic = producerRecord.topic();Integer partition = producerRecord.partition();String key = producerRecord.key();String value = producerRecord.value();//2.打印日志StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);}@Overridepublic void onFailure(KafkaProducerException e) {//1.获取消息属性ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();String topic = failedProducerRecord.topic();Integer partition = failedProducerRecord.partition();String key = failedProducerRecord.key();String value = failedProducerRecord.value();//2.打印日志StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);//3.异常堆栈信息输出e.printStackTrace();//4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作}}
import lombok.AllArgsConstructor;import lombok.Getter;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-07 16:24:20* @describe: 日志模板枚举*/@Getter@AllArgsConstructorpublic enum LogTemplateEnum {/*** Kafka消息发送成功日志模板*/KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),/*** Kafka消息发送失败日志模板*/KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),;private final String template;}
发送消息demo
import com.ftc.multi.config.KafkaSendCallBackForString;import lombok.SneakyThrows;import org.junit.jupiter.api.AfterEach;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.kafka.core.KafkaTemplate;import javax.annotation.Resource;import java.util.concurrent.TimeUnit;@SpringBootTestclass ProducerTests {@Resource@Qualifier("primaryProducerTemplate")private KafkaTemplate<String, String> primaryTemplate;@Resource@Qualifier("secondaryProducerTemplate")private KafkaTemplate<String, String> secondaryTemplate;@Autowiredprivate KafkaSendCallBackForString kafkaSendCallBackForString;@AfterEach@SneakyThrows({InterruptedException.class})void stopSeconds() {TimeUnit.MILLISECONDS.sleep(500);}@Testvoid send() {//1.主数据源发送消息primaryTemplate.send("multi-kafka-topic",0,"primary","primary_message").addCallback(kafkaSendCallBackForString);//2.备数据源发送消息secondaryTemplate.send("multi-kafka-topic",1,"secondary","secondary_message").addCallback(kafkaSendCallBackForString);}}
Consumer
yml配置
#服务器配置server:port: 8082#Kafka配置kafka:#主数据源配置primary:consumer:bootstrap-servers: 120.48.107.224:9092group-id: multi-kafka-group-primaryenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestpoll-time: 3000#从数据源配置secondary:consumer:bootstrap-servers: 120.48.107.224:9092group-id: multi-kafka-group-secondaryenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestpoll-time: 3000
配置类
import lombok.Data;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-08 19:09:37* @describe: Consumer属性*/@Datapublic class ConsumerProperties {/*** 服务器集群地址*/private String bootstrapServers;/*** 消费者组ID*/private String groupId;/*** 是否自动提交*/private boolean enableAutoCommit;/*** Key反序列化策略类*/private String keyDeserializer;/*** Value反序列化策略类*/private String valueDeserializer;/*** 偏移量消费策略*/private String autoOffsetReset;/*** 拉取消息超时时间/ms*/private int pollTime;}
import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-08 19:14:05* @describe: Consumer配置*/@Configurationpublic class ConsumerConfig {@Bean("primaryConsumerProperties")@ConfigurationProperties("kafka.primary.consumer")public ConsumerProperties primaryProperties() {return new ConsumerProperties();}@Bean("secondaryConsumerProperties")@ConfigurationProperties("kafka.secondary.consumer")public ConsumerProperties secondaryProperties() {return new ConsumerProperties();}}
import lombok.RequiredArgsConstructor;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;import java.util.Map;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-08 16:00:24* @describe: Consumer客户端配置*/@EnableKafka@Configuration@RequiredArgsConstructorpublic class ConsumerClientConfig {private final com.ftc.multi.config.consumer.ConsumerConfig consumerConfig;@Bean("primaryConsumerFactory")KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> primaryConsumerFactory() {//1.获取属性ConsumerProperties consumerProperties = consumerConfig.primaryProperties();//2.获取配置Map<String, Object> props = getProps(consumerProperties);//3.创建工厂DefaultKafkaConsumerFactory<Object, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(props);//4.创建最终工厂ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setPollTimeout(consumerProperties.getPollTime());//5.返回return factory;}@Bean("secondaryConsumerFactory")KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> secondaryConsumerFactory() {//1.获取属性ConsumerProperties secondaryProperties = consumerConfig.secondaryProperties();//2.获取配置Map<String, Object> props = getProps(secondaryProperties);//3.创建工厂DefaultKafkaConsumerFactory<Object, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(props);//4.创建最终工厂ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setPollTimeout(secondaryProperties.getPollTime());//5.返回return factory;}/*** 封装不同的配置** @param consumerProperties Consumer属性* @return 封装后的配置*/private Map<String, Object> getProps(ConsumerProperties consumerProperties) {//.1生成配置Map<String, Object> props = new HashMap<>(10);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerProperties.getBootstrapServers());props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerProperties.getGroupId());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerProperties.isEnableAutoCommit());props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumerProperties.getKeyDeserializer());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumerProperties.getValueDeserializer());//2.返回return props;}}
监听消息demo
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;/*** @author: 冯铁城 [17615007230@163.com]* @date: 2022-09-08 16:06:48* @describe: 多数据源监听*/@Componentpublic class Consumer {@KafkaListener(topics = {"multi-kafka-topic"},containerFactory = "primaryConsumerFactory",topicPartitions = {@TopicPartition(topic = "multi-kafka-topic", partitions = {"0"})})public void primaryListener(ConsumerRecord<String, String> record) {System.out.println("主数据源_监听获取数据:" + record.value());}@KafkaListener(topics = {"multi-kafka-topic"},containerFactory = "secondaryConsumerFactory",topicPartitions = {@TopicPartition(topic = "multi-kafka-topic", partitions = {"1"})})public void secondaryListener(ConsumerRecord<String, String> record) {System.out.println("备数据源_监听获取数据:" + record.value());}}
验证
消息发送
消息接收
如图,消息接收成功
