1.前言

有的时候,可能在部分环境下,没有条件去搭建一个Kafka。或者说,时间不允许去现搭建一个Kafka,我们需要在最短的时间内测试出Kafka的相关配置对功能的影响。
Spring官方提供了一款内嵌式的Kafka,无需搭建外部环境,只需要引入相应依赖即可
不过需要注意的是,内嵌式Kafka仅仅适用于单元测试环境。也就是说,我们可以基于内嵌式Kafka快速的进行Kafka相关的单元测试
接下来开始学习如何使用内嵌式Kafka

2.引入依赖

  1. <!--Kafka测试依赖-->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka-test</artifactId>
  5. <scope>test</scope>
  6. </dependency>

3.编写Config配置

注意以下几点

  1. 该Config类,只能存在于test文件夹下。EmbeddedKafkaBroker只能出现在单元测试环境中
  2. 可以通过producerProps变更Producer配置
  3. 可以通过consumerProps变更Consumer配置 ```java import lombok.RequiredArgsConstructor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.utils.KafkaTestUtils;

import java.util.Map;

/**

  • @author: 冯铁城 [17615007230@163.com]
  • @date: 2022-09-19 09:54:31
  • @describe: 嵌入式Kafka配置 */ @Configuration @RequiredArgsConstructor public class EmbeddedKafkaConfig {

    private final EmbeddedKafkaBroker broker;

    @Bean(“embeddedProducer”) public KafkaTemplate embeddedProducer() {

    1. //1.获取Producer配置
    2. Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
    3. //2.返回Producer实例
    4. return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));

    }

    @Bean(“embeddedConsumerFactory”) KafkaListenerContainerFactory> embeddedConsumerFactory() {

    1. //1.定义基础属性
    2. String group = "embeddedGroup";
    3. String autoCommit = "true";
    4. //2.获取Consumer配置
    5. Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(group, autoCommit, broker);
    6. //3.创建工厂
    7. DefaultKafkaConsumerFactory<Object, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
    8. //4.创建Consumer工厂
    9. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    10. factory.setConsumerFactory(consumerFactory);
    11. //5.返回
    12. return factory;

    } } ```

    4.编写单元测试

    注意以下几点

  1. @EmbeddedKafka用来声明当前使用嵌入式Kafka
  2. 可以通过@EmbeddedKafka来变更Broker的配置
  3. Producer与Consumer的代码与常规使用无任何差异
  4. 使用@AfterEach,确保Consumer线程执行完成后,主线程再结束 ```java import cn.hutool.log.StaticLog; import lombok.SneakyThrows; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource; import java.util.concurrent.TimeUnit;

@SpringBootTest @EmbeddedKafka( topics = {“embedded-topic”}, brokerProperties = { “log.dirs=D:\Code\Projects\Java\kafka-test\spring-kafka-test\logs”, “log.flush.interval.messages=1000”, “log.flush.interval.ms=1000”, } ) class ProducerAndConsumerTests {

  1. @Resource
  2. @Qualifier("embeddedProducer")
  3. KafkaTemplate<String, String> embeddedProducer;
  4. @AfterEach
  5. @SneakyThrows(InterruptedException.class)
  6. void sleep() {
  7. TimeUnit.SECONDS.sleep(5);
  8. }
  9. @Test
  10. void kafka() {
  11. //1.发送消息
  12. embeddedProducer
  13. .send("embedded-topic", "test-embedded-message")
  14. .addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
  15. @Override
  16. public void onSuccess(SendResult<String, String> result) {
  17. String message = result.getProducerRecord().value();
  18. StaticLog.info("send kafka success message:" + message);
  19. }
  20. @Override
  21. public void onFailure(Throwable e) {
  22. e.printStackTrace();
  23. }
  24. });
  25. }
  26. @KafkaListener(topics = "embedded-topic", containerFactory = "embeddedConsumerFactory")
  27. public void listen(ConsumerRecord<String, String> record) {
  28. StaticLog.info("receive kafka message:" + record.value());
  29. }

} ```

5.验证

运行单元测试,如下图,消息的发送与接收都成功了
image.png
由此可以分析出,在不方便搭建Kafka的时候,可以通过SpringKafkaTest快速在单元测试环境,进行Kafka相关功能的模拟