SpringBoot与Kafka整合起来十分简单,使用也相当容易

Kafka启动

  1. 启动zk

    1. bin\windows\zookeeper-server-start.bat config\zookeeper.properties
  2. 启动kafka

    1. bin\windows\kafka-server-start.bat config\server.properties

    SpringBoot项目搭建

  3. 引入依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-starter-web</artifactId>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.springframework.kafka</groupId>
    8. <artifactId>spring-kafka</artifactId>
    9. </dependency>
    10. <dependency>
    11. <groupId>org.projectlombok</groupId>
    12. <artifactId>lombok</artifactId>
    13. <optional>true</optional>
    14. </dependency>
    15. </dependencies>

生产者使用

  1. application配置文件 ```xml spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  1. 2. 代码
  2. ```xml
  3. @RestController
  4. public class Producer {
  5. @Autowired
  6. private KafkaTemplate<String,String> kafkaTemplate;
  7. @GetMapping("producer")
  8. public String testProducer(String msg){
  9. ListenableFuture<SendResult<String, String>> test = kafkaTemplate.send("test", msg);
  10. return "ok";
  11. }
  12. }

消费者

  1. application配置文件 ```xml spring.kafka.bootstrap-servers=localhost:9092

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.group-id=cg1

  1. 2. 代码
  2. ```xml
  3. @Component
  4. public class Consumer {
  5. @KafkaListener(topics = "test")
  6. public void testConsumer(String msg){
  7. System.err.println(msg);
  8. }
  9. }