原文: https://howtodoinjava.com/kafka/spring-boot-with-kafka/

学习创建一个 SpringBoot 应用,该应用能够连接给定的 Apache Kafka Borker 实例。 另外,从 Kafka 主题中学习产生和消费消息。

我们将遵循的步骤:

  • 创建具有 Kafka 依赖项的 Spring Boot 应用
  • application.yaml中配置 kafka Borker 实例
  • 使用KafkaTemplate将消息发送到主题
  • 使用@KafkaListener实时收听发送到主题的消息

1. 先决条件

  • 请按照本指南在您的机器上设置 Kafka
  • 我们正在创建一个基于 Maven 的 Spring 引导应用,因此您的计算机应至少安装 Java8Maven

2. Spring Boot 应用

打开 spring 初始化器并创建具有以下依赖项的 spring boot 应用:

  • Spring Apache Kafka
  • Spring Web

Kafka 的 Spring Boot – HelloWorld 示例 - 图1

创建 Spring boot kafka 应用

生成的项目在pom.xml中具有以下依赖项。

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.kafka</groupId>
  8. <artifactId>spring-kafka</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-test</artifactId>
  13. <scope>test</scope>
  14. <exclusions>
  15. <exclusion>
  16. <groupId>org.junit.vintage</groupId>
  17. <artifactId>junit-vintage-engine</artifactId>
  18. </exclusion>
  19. </exclusions>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.kafka</groupId>
  23. <artifactId>spring-kafka-test</artifactId>
  24. <scope>test</scope>
  25. </dependency>
  26. </dependencies>

在首选的 IDE 中导入项目。

3. 配置 Kafka Borker

application.yaml文件中,添加 kafka Borker 地址以及与消费者和生产者相关的配置。

  1. server:
  2. port: 9000
  3. spring:
  4. kafka:
  5. consumer:
  6. bootstrap-servers: localhost:9092
  7. group-id: group-id
  8. auto-offset-reset: earliest
  9. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  10. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  11. producer:
  12. bootstrap-servers: localhost:9092
  13. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  14. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

4. KafKaProducerServiceKafKaConsumerService

KafKaProducerService类使用自动连接的KafkaTemplate将消息发送到已配置的主题名称。 同样,KafKaConsumerService类使用@KafkaListener来接收来自已配置主题名称的消息。

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.kafka.core.KafkaTemplate;
  5. import org.springframework.stereotype.Service;
  6. import com.howtodoinjava.kafka.demo.common.AppConstants;
  7. @Service
  8. public class KafKaProducerService
  9. {
  10. private static final Logger logger =
  11. LoggerFactory.getLogger(KafKaProducerService.class);
  12. @Autowired
  13. private KafkaTemplate<String, String> kafkaTemplate;
  14. public void sendMessage(String message)
  15. {
  16. logger.info(String.format("Message sent -> %s", message));
  17. this.kafkaTemplate.send(AppConstants.TOPIC_NAME, message);
  18. }
  19. }
  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Service;
  5. import com.howtodoinjava.kafka.demo.common.AppConstants;
  6. @Service
  7. public class KafKaConsumerService
  8. {
  9. private final Logger logger =
  10. LoggerFactory.getLogger(KafKaConsumerService.class);
  11. @KafkaListener(topics = AppConstants.TOPIC_NAME,
  12. groupId = AppConstants.GROUP_ID)
  13. public void consume(String message)
  14. {
  15. logger.info(String.format("Message recieved -> %s", message));
  16. }
  17. }
  1. public class AppConstants
  2. {
  3. public static final String TOPIC_NAME = "test";
  4. public static final String GROUP_ID = "group_id";
  5. }

5. 控制器

控制器负责使用 REST API 从用户获取消息,并将消息移交给生产者服务以将其发布到 kafka 主题。

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.web.bind.annotation.PostMapping;
  3. import org.springframework.web.bind.annotation.RequestMapping;
  4. import org.springframework.web.bind.annotation.RequestParam;
  5. import org.springframework.web.bind.annotation.RestController;
  6. import com.howtodoinjava.kafka.demo.service.KafKaProducerService;
  7. @RestController
  8. @RequestMapping(value = "/kafka")
  9. public class KafkaProducerController
  10. {
  11. private final KafKaProducerService producerService;
  12. @Autowired
  13. public KafkaProducerController(KafKaProducerService producerService)
  14. {
  15. this.producerService = producerService;
  16. }
  17. @PostMapping(value = "/publish")
  18. public void sendMessageToKafkaTopic(@RequestParam("message") String message)
  19. {
  20. this.producerService.sendMessage(message);
  21. }
  22. }

6. 测试

使用任何 REST API 测试器并将少量消息发布到查询参数"message"中的 API http://localhost:9000/kafka/publish

留言栏:http://localhost:9000/kafka/publish?message=Alphabet

观察控制台日志:

  1. 2020-05-24 23:36:47.132 INFO 2092 --- [nio-9000-exec-4]
  2. c.h.k.demo.service.KafKaProducerService : Message sent -> Alphabet
  3. 2020-05-24 23:36:47.138 INFO 2092 --- [ntainer#0-0-C-1]
  4. c.h.k.demo.service.KafKaConsumerService : Message recieved -> Alphabet

如果您已经在命令提示符下打开了 Kafka 控制台用户,那么您也会看到该消息。

Kafka 的 Spring Boot – HelloWorld 示例 - 图2

Kafka 控制台消费者

7. 总结

spring boot kafka 教程中,我们学习了如何创建 spring boot 应用和配置 Kafka 服务器。 另外,我们通过使用KafkaTemplate发布一些消息,然后使用@KafkaListener使用这些消息来验证该应用。

学习愉快!

源码下载