添加依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.kafka</groupId>
  8. <artifactId>spring-kafka-test</artifactId>
  9. <scope>test</scope>
  10. </dependency>
  11. </dependencies>

编写配置文件

  1. spring:
  2. kafka:
  3. bootstrap-servers: 47.96.188.26:9092
  4. producer:
  5. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  6. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  7. consumer:
  8. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  9. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  10. group-id: myGroupId

SpringBoot 生产者 & 消费者

  1. @RestController
  2. public class TestController {
  3. @Autowired
  4. KafkaTemplate<String, String> kafkaTemplate;
  5. // 生产者接口
  6. @GetMapping("/msg")
  7. public String send(String message) {
  8. String topic = "myFirstTopic";
  9. kafkaTemplate.send(topic, message);
  10. return "ok";
  11. }
  12. // 消费者程序
  13. // 指定要监听的 topic
  14. @KafkaListener(topics = "myFirstTopic")
  15. public void consumeTopic(String value) { // 参数: 收到的 value(自动赋值)
  16. System.out.println("拉取消息" + value);
  17. }
  18. }

在浏览器中给 /msg 接口发送数据
http://localhost:8080/msg?message=hello,world!

image.png