学习创建一个 SpringBoot 应用,该应用能够连接给定的 Apache Kafka Borker 实例。 另外,从 Kafka 主题中学习产生和消费消息。
我们将遵循的步骤:
- 创建具有 Kafka 依赖项的 Spring Boot 应用
- 在
application.yaml
中配置 kafka Borker 实例 - 使用
KafkaTemplate
将消息发送到主题 - 使用
@KafkaListener
实时收听发送到主题的消息
1. 先决条件
2. Spring Boot 应用
打开 spring 初始化器并创建具有以下依赖项的 spring boot 应用:
- Spring Apache Kafka
- Spring Web
创建 Spring boot kafka 应用
生成的项目在pom.xml
中具有以下依赖项。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
在首选的 IDE 中导入项目。
3. 配置 Kafka Borker
在application.yaml
文件中,添加 kafka Borker 地址以及与消费者和生产者相关的配置。
server:
port: 9000
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group-id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
4. KafKaProducerService
和KafKaConsumerService
KafKaProducerService
类使用自动连接的KafkaTemplate
将消息发送到已配置的主题名称。 同样,KafKaConsumerService
类使用@KafkaListener
来接收来自已配置主题名称的消息。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import com.howtodoinjava.kafka.demo.common.AppConstants;
@Service
public class KafKaProducerService
{
private static final Logger logger =
LoggerFactory.getLogger(KafKaProducerService.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message)
{
logger.info(String.format("Message sent -> %s", message));
this.kafkaTemplate.send(AppConstants.TOPIC_NAME, message);
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.howtodoinjava.kafka.demo.common.AppConstants;
@Service
public class KafKaConsumerService
{
private final Logger logger =
LoggerFactory.getLogger(KafKaConsumerService.class);
@KafkaListener(topics = AppConstants.TOPIC_NAME,
groupId = AppConstants.GROUP_ID)
public void consume(String message)
{
logger.info(String.format("Message recieved -> %s", message));
}
}
public class AppConstants
{
public static final String TOPIC_NAME = "test";
public static final String GROUP_ID = "group_id";
}
5. 控制器
控制器负责使用 REST API 从用户获取消息,并将消息移交给生产者服务以将其发布到 kafka 主题。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.howtodoinjava.kafka.demo.service.KafKaProducerService;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaProducerController
{
private final KafKaProducerService producerService;
@Autowired
public KafkaProducerController(KafKaProducerService producerService)
{
this.producerService = producerService;
}
@PostMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestParam("message") String message)
{
this.producerService.sendMessage(message);
}
}
6. 测试
使用任何 REST API 测试器并将少量消息发布到查询参数"message"
中的 API http://localhost:9000/kafka/publish
。
留言栏:http://localhost:9000/kafka/publish?message=Alphabet
观察控制台日志:
2020-05-24 23:36:47.132 INFO 2092 --- [nio-9000-exec-4]
c.h.k.demo.service.KafKaProducerService : Message sent -> Alphabet
2020-05-24 23:36:47.138 INFO 2092 --- [ntainer#0-0-C-1]
c.h.k.demo.service.KafKaConsumerService : Message recieved -> Alphabet
如果您已经在命令提示符下打开了 Kafka 控制台用户,那么您也会看到该消息。
Kafka 控制台消费者
7. 总结
在 spring boot kafka 教程中,我们学习了如何创建 spring boot 应用和配置 Kafka 服务器。 另外,我们通过使用KafkaTemplate
发布一些消息,然后使用@KafkaListener
使用这些消息来验证该应用。
学习愉快!