1.前言
有的时候,可能在部分环境下,没有条件去搭建一个Kafka。或者说,时间不允许去现搭建一个Kafka,我们需要在最短的时间内测试出Kafka的相关配置对功能的影响。
Spring官方提供了一款内嵌式的Kafka,无需搭建外部环境,只需要引入相应依赖即可
不过需要注意的是,内嵌式Kafka仅仅适用于单元测试环境。也就是说,我们可以基于内嵌式Kafka快速的进行Kafka相关的单元测试
接下来开始学习如何使用内嵌式Kafka
2.引入依赖
<!--Kafka测试依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
3.编写Config配置
注意以下几点
- 该Config类,只能存在于test文件夹下。EmbeddedKafkaBroker只能出现在单元测试环境中
- 可以通过producerProps变更Producer配置
- 可以通过consumerProps变更Consumer配置 ```java import lombok.RequiredArgsConstructor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.utils.KafkaTestUtils;
import java.util.Map;
/**
- @author: 冯铁城 [17615007230@163.com]
- @date: 2022-09-19 09:54:31
@describe: 嵌入式Kafka配置 */ @Configuration @RequiredArgsConstructor public class EmbeddedKafkaConfig {
private final EmbeddedKafkaBroker broker;
@Bean(“embeddedProducer”) public KafkaTemplate
embeddedProducer() { //1.获取Producer配置
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
//2.返回Producer实例
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
}
@Bean(“embeddedConsumerFactory”) KafkaListenerContainerFactory
> embeddedConsumerFactory() { //1.定义基础属性
String group = "embeddedGroup";
String autoCommit = "true";
//2.获取Consumer配置
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(group, autoCommit, broker);
//3.创建工厂
DefaultKafkaConsumerFactory<Object, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
//4.创建Consumer工厂
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
//5.返回
return factory;
4.编写单元测试
注意以下几点
- @EmbeddedKafka用来声明当前使用嵌入式Kafka
- 可以通过@EmbeddedKafka来变更Broker的配置
- Producer与Consumer的代码与常规使用无任何差异
- 使用@AfterEach,确保Consumer线程执行完成后,主线程再结束 ```java import cn.hutool.log.StaticLog; import lombok.SneakyThrows; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource; import java.util.concurrent.TimeUnit;
@SpringBootTest @EmbeddedKafka( topics = {“embedded-topic”}, brokerProperties = { “log.dirs=D:\Code\Projects\Java\kafka-test\spring-kafka-test\logs”, “log.flush.interval.messages=1000”, “log.flush.interval.ms=1000”, } ) class ProducerAndConsumerTests {
@Resource
@Qualifier("embeddedProducer")
KafkaTemplate<String, String> embeddedProducer;
@AfterEach
@SneakyThrows(InterruptedException.class)
void sleep() {
TimeUnit.SECONDS.sleep(5);
}
@Test
void kafka() {
//1.发送消息
embeddedProducer
.send("embedded-topic", "test-embedded-message")
.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
String message = result.getProducerRecord().value();
StaticLog.info("send kafka success message:" + message);
}
@Override
public void onFailure(Throwable e) {
e.printStackTrace();
}
});
}
@KafkaListener(topics = "embedded-topic", containerFactory = "embeddedConsumerFactory")
public void listen(ConsumerRecord<String, String> record) {
StaticLog.info("receive kafka message:" + record.value());
}
5.验证
运行单元测试,如下图,消息的发送与接收都成功了
由此可以分析出,在不方便搭建Kafka的时候,可以通过SpringKafkaTest快速在单元测试环境,进行Kafka相关功能的模拟