1.项目地址
https://github.com/GuardFTC/Kafka-test.git
2.单数据源
创建Topic
通常来说,不建议自动创建主题,因为这样可能会出现很多命名不规范,且完全无意义的主题。因此,接下来的所有例子,我们需要手动创建Topic
如图,创建一个名为single-kafka-topic的Topic,该Topic包含2个Partition。
添加依赖
<!--Kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--web依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--Hutool依赖-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.6</version>
</dependency>
<!--单元测试依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
Producer
yml配置
#服务器配置
server:
port: 8081
#spring配置
spring:
kafka:
bootstrap-servers: 120.48.107.224:9092
producer:
acks: -1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3
properties:
enable.idempotence: true
retry.backoff.ms: 100
添加发送结果处理类
import cn.hutool.log.StaticLog;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-07 16:13:57
* @describe: Kafka发布消息回调处理逻辑实现类
*/
@Component
public class KafkaSendCallBackForString implements KafkaSendCallback<String, String> {
@Override
public void onSuccess(SendResult<String, String> result) {
//1.获取消息属性
ProducerRecord<String, String> producerRecord = result.getProducerRecord();
String topic = producerRecord.topic();
Integer partition = producerRecord.partition();
String key = producerRecord.key();
String value = producerRecord.value();
//2.打印日志
StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);
}
@Override
public void onFailure(KafkaProducerException e) {
//1.获取消息属性
ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();
String topic = failedProducerRecord.topic();
Integer partition = failedProducerRecord.partition();
String key = failedProducerRecord.key();
String value = failedProducerRecord.value();
//2.打印日志
StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);
//3.异常堆栈信息输出
e.printStackTrace();
//4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作
}
}
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-07 16:24:20
* @describe: 日志模板枚举
*/
@Getter
@AllArgsConstructor
public enum LogTemplateEnum {
/**
* Kafka消息发送成功日志模板
*/
KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),
/**
* Kafka消息发送失败日志模板
*/
KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),
;
private final String template;
}
发布消息
如下,可以看到,发送消息时,可以指定Topic,Partition,Key,Timestamp,或通过ProducerRecord形式发送消息
import com.ftc.single.config.KafkaSendCallBackForString;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootTest
class ProducerTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaSendCallBackForString kafkaSendCallBackForString;
/**
* 测试主题
*/
private static final String TEST_TOPIC = "single-kafka-topic";
/**
* 测试Key
*/
private static final String TEST_KEY = "single-kafka-key";
@Test
void sendMessage() {
//1.发送消息,指定主题
kafkaTemplate.send(TEST_TOPIC, "topic_message")
.addCallback(kafkaSendCallBackForString);
//2.发送消息,指定主题、Key | 取模之后分区编号=0
kafkaTemplate.send(TEST_TOPIC, TEST_KEY, "topic_key_message")
.addCallback(kafkaSendCallBackForString);
//3.发送消息,指定主题、分区、Key
kafkaTemplate.send(TEST_TOPIC, 0, TEST_KEY, "topic_partition_key_message")
.addCallback(kafkaSendCallBackForString);
//4.发送消息,指定主题、分区、Key、时间戳
kafkaTemplate.send(TEST_TOPIC, 0, System.currentTimeMillis(), TEST_KEY, "topic_partition_key_timestamp_message")
.addCallback(kafkaSendCallBackForString);
//5.发送消息,通过ProducerRecord形式
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TEST_TOPIC, 0, TEST_KEY, "record_message");
kafkaTemplate.send(producerRecord)
.addCallback(kafkaSendCallBackForString);
}
}
Consumer
yml配置
#服务器配置
server:
port: 8081
#spring配置
spring:
kafka:
bootstrap-servers: 120.48.107.224:9092
consumer:
group-id: single-kafka-group
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
auto-commit-interval: 5s
监听消息
如下,在监听消息时,可以指定Topic,GroupId以及Partition
package com.ftc.single.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-13 10:01:26
* @describe: 消费者demo
*/
@Component
public class Consumer {
@KafkaListener(topics = {"single-kafka-topic"})
public void listenerDemo(ConsumerRecord<String, String> record) {
System.out.println("receive message:" + record.value());
}
@KafkaListener(topics = {"single-kafka-topic"}, groupId = "${spring.kafka.consumer.group-id}")
public void listenerDemoV2(ConsumerRecord<String, String> record) {
System.out.println("receive message:" + record.value());
}
@KafkaListener(topics = {"single-kafka-topic"}, groupId = "${spring.kafka.consumer.group-id}",
topicPartitions = {@TopicPartition(topic = "single-kafka-topic", partitions = {"0"})}
)
public void listenerDemoV3(ConsumerRecord<String, String> record) {
System.out.println("receive message from partition 0:" + record.value());
}
}
验证
注意
验证时,需要将消费者代码修改成如下状态,不然会提示偏移量提交异常
package com.ftc.single.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-13 10:01:26
* @describe: 消费者demo
*/
@Component
public class Consumer {
@KafkaListener(topics = {"single-kafka-topic"})
public void listenerDemo(ConsumerRecord<String, String> record) {
System.out.println("receive message:" + record.value());
}
}
消息发送
消息接收
3.多数据源
创建Topic
如图,创建一个名为multi-kafka-topic的Topic,该Topic包含2个Partition。
添加依赖
<!--Kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--web依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--Hutool依赖-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.6</version>
</dependency>
<!--单元测试依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
Producer
yml配置
#服务器配置
server:
port: 8082
#spring配置
kafka:
#主数据源配置
primary:
producer:
bootstrap-servers: 120.48.107.224:9092
ack: "-1"
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3
idempotence: true
transaction-id-prefix: primary-transaction-producer-
#从数据源配置
secondary:
producer:
bootstrap-servers: 120.48.107.224:9092
ack: "-1"
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3
idempotence: true
配置类
import lombok.Data;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-08 13:39:47
* @describe: Producer配置属性
*/
@Data
public class ProducerProperties {
/**
* 服务器集群地址
*/
private String bootstrapServers;
/**
* ACK参数
*/
private String ack;
/**
* Key序列化策略类
*/
private String keySerializer;
/**
* Value序列化策略类
*/
private String valueSerializer;
/**
* 重试次数
*/
private int retries;
/**
* 是否开启幂等
*/
private boolean idempotence;
/**
* 事务ID前缀,不为空代表开启Kafka事务
*/
private String transactionIdPrefix;
}
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-08 15:28:45
* @describe: Producer配置
*/
@Configuration
public class ProducerConfig {
@Bean(name = "primaryProducerProperties")
@ConfigurationProperties(prefix = "kafka.primary.producer")
public ProducerProperties primaryProperties() {
return new ProducerProperties();
}
@Primary
@Bean(name = "secondaryProducerProperties")
@ConfigurationProperties(prefix = "kafka.secondary.producer")
public ProducerProperties secondaryProperties() {
return new ProducerProperties();
}
}
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import java.util.HashMap;
import java.util.Map;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-08 13:42:00
* @describe: Producer客户端配置
*/
@Configuration
@RequiredArgsConstructor
public class ProducerClientConfig {
private final com.ftc.multi.config.producer.ProducerConfig producerConfig;
@Bean("primaryProducerTemplate")
public KafkaTemplate<String, String> primaryTemplate() {
return new KafkaTemplate<>(primaryFactory());
}
@Bean("primaryTransactionManager")
public KafkaTransactionManager<String, String> primaryTransactionManager() {
return new KafkaTransactionManager<>(primaryFactory());
}
@Bean("secondaryProducerTemplate")
public KafkaTemplate<String, String> secondaryTemplate() {
return new KafkaTemplate<>(secondaryFactory());
}
@Bean("primaryFactory")
public DefaultKafkaProducerFactory<String, String> primaryFactory() {
//1.获取配置
ProducerProperties primaryProperties = producerConfig.primaryProperties();
//2.获取配置
Map<String, Object> props = getProps(primaryProperties);
//3.创建工厂
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
//4.开启事务
producerFactory.setTransactionIdPrefix(primaryProperties.getTransactionIdPrefix());
//5.返回
return producerFactory;
}
@Bean("secondaryFactory")
public DefaultKafkaProducerFactory<String, String> secondaryFactory() {
//1.获取配置
ProducerProperties secondaryProperties = producerConfig.secondaryProperties();
//2.获取配置
Map<String, Object> props = getProps(secondaryProperties);
//3.创建工厂,返回
return new DefaultKafkaProducerFactory<>(props);
}
/**
* 封装不同的配置
*
* @param producerProperties Producer属性
* @return 封装后的配置
*/
private Map<String, Object> getProps(ProducerProperties producerProperties) {
//1.生成配置
Map<String, Object> props = new HashMap<>(10);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerProperties.getBootstrapServers());
props.put(ProducerConfig.ACKS_CONFIG, producerProperties.getAck());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producerProperties.getKeySerializer());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producerProperties.getValueSerializer());
props.put(ProducerConfig.RETRIES_CONFIG, producerProperties.getRetries());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, producerProperties.isIdempotence());
//2.返回
return props;
}
}
添加发送结果处理类
import cn.hutool.log.StaticLog;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-07 16:13:57
* @describe: Kafka发布消息回调处理逻辑实现类
*/
@Component
public class KafkaSendCallBackForString implements KafkaSendCallback<String, String> {
@Override
public void onSuccess(SendResult<String, String> result) {
//1.获取消息属性
ProducerRecord<String, String> producerRecord = result.getProducerRecord();
String topic = producerRecord.topic();
Integer partition = producerRecord.partition();
String key = producerRecord.key();
String value = producerRecord.value();
//2.打印日志
StaticLog.info(LogTemplateEnum.KAFKA_SEND_SUCCESS_LOG.getTemplate(), topic, partition, key, value);
}
@Override
public void onFailure(KafkaProducerException e) {
//1.获取消息属性
ProducerRecord<String, String> failedProducerRecord = e.getFailedProducerRecord();
String topic = failedProducerRecord.topic();
Integer partition = failedProducerRecord.partition();
String key = failedProducerRecord.key();
String value = failedProducerRecord.value();
//2.打印日志
StaticLog.error(LogTemplateEnum.KAFKA_SEND_ERROR_LOG.getTemplate(), topic, partition, key, value);
//3.异常堆栈信息输出
e.printStackTrace();
//4.TODO 可进行自定义的异常逻辑,比如重新发送消息等操作
}
}
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-07 16:24:20
* @describe: 日志模板枚举
*/
@Getter
@AllArgsConstructor
public enum LogTemplateEnum {
/**
* Kafka消息发送成功日志模板
*/
KAFKA_SEND_SUCCESS_LOG("Kafka producer send success! topic:{} partition:{} key:{} value:{}"),
/**
* Kafka消息发送失败日志模板
*/
KAFKA_SEND_ERROR_LOG("Kafka producer send error! topic:{} partition:{} key:{} value:{}"),
;
private final String template;
}
发送消息demo
import com.ftc.multi.config.KafkaSendCallBackForString;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@SpringBootTest
class ProducerTests {
@Resource
@Qualifier("primaryProducerTemplate")
private KafkaTemplate<String, String> primaryTemplate;
@Resource
@Qualifier("secondaryProducerTemplate")
private KafkaTemplate<String, String> secondaryTemplate;
@Autowired
private KafkaSendCallBackForString kafkaSendCallBackForString;
@AfterEach
@SneakyThrows({InterruptedException.class})
void stopSeconds() {
TimeUnit.MILLISECONDS.sleep(500);
}
@Test
void send() {
//1.主数据源发送消息
primaryTemplate.send("multi-kafka-topic",
0,
"primary",
"primary_message"
).addCallback(kafkaSendCallBackForString);
//2.备数据源发送消息
secondaryTemplate.send(
"multi-kafka-topic",
1,
"secondary",
"secondary_message"
).addCallback(kafkaSendCallBackForString);
}
}
Consumer
yml配置
#服务器配置
server:
port: 8082
#Kafka配置
kafka:
#主数据源配置
primary:
consumer:
bootstrap-servers: 120.48.107.224:9092
group-id: multi-kafka-group-primary
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
poll-time: 3000
#从数据源配置
secondary:
consumer:
bootstrap-servers: 120.48.107.224:9092
group-id: multi-kafka-group-secondary
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
poll-time: 3000
配置类
import lombok.Data;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-08 19:09:37
* @describe: Consumer属性
*/
@Data
public class ConsumerProperties {
/**
* 服务器集群地址
*/
private String bootstrapServers;
/**
* 消费者组ID
*/
private String groupId;
/**
* 是否自动提交
*/
private boolean enableAutoCommit;
/**
* Key反序列化策略类
*/
private String keyDeserializer;
/**
* Value反序列化策略类
*/
private String valueDeserializer;
/**
* 偏移量消费策略
*/
private String autoOffsetReset;
/**
* 拉取消息超时时间/ms
*/
private int pollTime;
}
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-08 19:14:05
* @describe: Consumer配置
*/
@Configuration
public class ConsumerConfig {
@Bean("primaryConsumerProperties")
@ConfigurationProperties("kafka.primary.consumer")
public ConsumerProperties primaryProperties() {
return new ConsumerProperties();
}
@Bean("secondaryConsumerProperties")
@ConfigurationProperties("kafka.secondary.consumer")
public ConsumerProperties secondaryProperties() {
return new ConsumerProperties();
}
}
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-08 16:00:24
* @describe: Consumer客户端配置
*/
@EnableKafka
@Configuration
@RequiredArgsConstructor
public class ConsumerClientConfig {
private final com.ftc.multi.config.consumer.ConsumerConfig consumerConfig;
@Bean("primaryConsumerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> primaryConsumerFactory() {
//1.获取属性
ConsumerProperties consumerProperties = consumerConfig.primaryProperties();
//2.获取配置
Map<String, Object> props = getProps(consumerProperties);
//3.创建工厂
DefaultKafkaConsumerFactory<Object, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
//4.创建最终工厂
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setPollTimeout(consumerProperties.getPollTime());
//5.返回
return factory;
}
@Bean("secondaryConsumerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> secondaryConsumerFactory() {
//1.获取属性
ConsumerProperties secondaryProperties = consumerConfig.secondaryProperties();
//2.获取配置
Map<String, Object> props = getProps(secondaryProperties);
//3.创建工厂
DefaultKafkaConsumerFactory<Object, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
//4.创建最终工厂
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setPollTimeout(secondaryProperties.getPollTime());
//5.返回
return factory;
}
/**
* 封装不同的配置
*
* @param consumerProperties Consumer属性
* @return 封装后的配置
*/
private Map<String, Object> getProps(ConsumerProperties consumerProperties) {
//.1生成配置
Map<String, Object> props = new HashMap<>(10);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerProperties.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerProperties.getGroupId());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerProperties.isEnableAutoCommit());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumerProperties.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumerProperties.getValueDeserializer());
//2.返回
return props;
}
}
监听消息demo
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
/**
* @author: 冯铁城 [17615007230@163.com]
* @date: 2022-09-08 16:06:48
* @describe: 多数据源监听
*/
@Component
public class Consumer {
@KafkaListener(
topics = {"multi-kafka-topic"},
containerFactory = "primaryConsumerFactory",
topicPartitions = {@TopicPartition(topic = "multi-kafka-topic", partitions = {"0"})}
)
public void primaryListener(ConsumerRecord<String, String> record) {
System.out.println("主数据源_监听获取数据:" + record.value());
}
@KafkaListener(
topics = {"multi-kafka-topic"},
containerFactory = "secondaryConsumerFactory",
topicPartitions = {@TopicPartition(topic = "multi-kafka-topic", partitions = {"1"})}
)
public void secondaryListener(ConsumerRecord<String, String> record) {
System.out.println("备数据源_监听获取数据:" + record.value());
}
}
验证
消息发送
消息接收
如图,消息接收成功