SpringBoot与Kafka整合起来十分简单,使用也相当容易
Kafka启动
启动zk
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
启动kafka
bin\windows\kafka-server-start.bat config\server.properties
SpringBoot项目搭建
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
生产者使用
- application配置文件 ```xml spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
2. 代码
```xml
@RestController
public class Producer {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("producer")
public String testProducer(String msg){
ListenableFuture<SendResult<String, String>> test = kafkaTemplate.send("test", msg);
return "ok";
}
}
消费者
- application配置文件 ```xml spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.group-id=cg1
2. 代码
```xml
@Component
public class Consumer {
@KafkaListener(topics = "test")
public void testConsumer(String msg){
System.err.println(msg);
}
}