添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
编写配置文件
spring:
kafka:
bootstrap-servers: 47.96.188.26:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: myGroupId
SpringBoot 生产者 & 消费者
@RestController
public class TestController {
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
// 生产者接口
@GetMapping("/msg")
public String send(String message) {
String topic = "myFirstTopic";
kafkaTemplate.send(topic, message);
return "ok";
}
// 消费者程序
// 指定要监听的 topic
@KafkaListener(topics = "myFirstTopic")
public void consumeTopic(String value) { // 参数: 收到的 value(自动赋值)
System.out.println("拉取消息" + value);
}
}
在浏览器中给 /msg 接口发送数据
http://localhost:8080/msg?message=hello,world!