1.概述

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),
常见可以用于web/nginx日志、访问日志,消息服务

2.消息系统介绍

一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传
递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消
息系统选用发布-订阅模式。Kafka就是一种发布-订阅模式

Kafka主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展

image.png

3.名词解释

image.png

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

4.kafka入门

image.png

(1)创建kafka-demo项目,导入依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. </dependency>

(2)生产者发送消息

  1. package com.heima.kafka.sample;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.util.Properties;
  6. /**
  7. * 生产者
  8. */
  9. public class ProducerQuickStart {
  10. public static void main(String[] args) {
  11. //1.kafka的配置信息
  12. Properties properties = new Properties();
  13. //kafka的连接地址
  14. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
  15. //发送失败,失败的重试次数
  16. properties.put(ProducerConfig.RETRIES_CONFIG,5);
  17. //消息key的序列化器
  18. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  19. //消息value的序列化器
  20. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  21. //2.生产者对象
  22. KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
  23. //封装发送的消息
  24. ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");
  25. //3.发送消息
  26. producer.send(record);
  27. //4.关闭消息通道,必须关闭,否则消息发送不成功
  28. producer.close();
  29. }
  30. }

(3)消费者接收消息

  1. package com.heima.kafka.sample;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import java.time.Duration;
  7. import java.util.Collections;
  8. import java.util.Properties;
  9. /**
  10. * 消费者
  11. */
  12. public class ConsumerQuickStart {
  13. public static void main(String[] args) {
  14. //1.添加kafka的配置信息
  15. Properties properties = new Properties();
  16. //kafka的连接地址
  17. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
  18. //消费者组
  19. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
  20. //消息的反序列化器
  21. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  22. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  23. //2.消费者对象
  24. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
  25. //3.订阅主题
  26. consumer.subscribe(Collections.singletonList("itheima-topic"));
  27. //当前线程一直处于监听状态
  28. while (true) {
  29. //4.获取消息
  30. ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
  31. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  32. System.out.println(consumerRecord.key());
  33. System.out.println(consumerRecord.value());
  34. }
  35. }
  36. }
  37. }

总结

  • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)
  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)

5.SpringBoot集成kafka

1.导入spring-kafka依赖信息

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <!-- kafkfa -->
  7. <dependency>
  8. <groupId>org.springframework.kafka</groupId>
  9. <artifactId>spring-kafka</artifactId>
  10. <exclusions>
  11. <exclusion>
  12. <groupId>org.apache.kafka</groupId>
  13. <artifactId>kafka-clients</artifactId>
  14. </exclusion>
  15. </exclusions>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.kafka</groupId>
  19. <artifactId>kafka-clients</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>com.alibaba</groupId>
  23. <artifactId>fastjson</artifactId>
  24. </dependency>
  25. </dependencies>

2.在resources下创建文件application.yml

  1. server:
  2. port: 9991
  3. spring:
  4. application:
  5. name: kafka-demo
  6. kafka:
  7. bootstrap-servers: 192.168.200.130:9092
  8. producer:
  9. retries: 10
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. consumer:
  13. group-id: ${spring.application.name}-test
  14. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  15. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.消息生产者

  1. package com.heima.kafka.controller;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. @RestController
  7. public class HelloController {
  8. @Autowired
  9. private KafkaTemplate<String,String> kafkaTemplate;
  10. @GetMapping("/hello")
  11. public String hello(){
  12. kafkaTemplate.send("itcast-topic","黑马程序员");
  13. return "ok";
  14. }
  15. }

4.消息消费者

  1. package com.heima.kafka.listener;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Component;
  4. import org.springframework.util.StringUtils;
  5. @Component
  6. public class HelloListener {
  7. @KafkaListener(topics = "itcast-topic")
  8. public void onMessage(String message){
  9. if(!StringUtils.isEmpty(message)){
  10. System.out.println(message);
  11. }
  12. }
  13. }

传递消息为对象

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式
方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍
方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

发送消息

  1. @GetMapping("/hello")
  2. public String hello(){
  3. User user = new User();
  4. user.setUsername("xiaowang");
  5. user.setAge(18);
  6. kafkaTemplate.send("user-topic", JSON.toJSONString(user));
  7. return "ok";
  8. }

接收消息

  1. package com.heima.kafka.listener;
  2. import com.alibaba.fastjson.JSON;
  3. import com.heima.kafka.pojo.User;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.stereotype.Component;
  6. import org.springframework.util.StringUtils;
  7. @Component
  8. public class HelloListener {
  9. @KafkaListener(topics = "user-topic")
  10. public void onMessage(String message){
  11. if(!StringUtils.isEmpty(message)){
  12. User user = JSON.parseObject(message, User.class);
  13. System.out.println(user);
  14. }
  15. }
  16. }