在SpringBoot整合Redis之前,先来简单认识下Kafka。

1. Kafka 简介

KafKa是一款由Apache软件基金会开源,由Scala编写的一个分布式发布订阅消息系统。

  • Kafka 是一个分布式的流媒体平台。
  • 应用:消息系统、日志收集、用户行为追踪、流式处理。

    2. Kafka 特点

  • 高吞吐量

    • 可以满足每秒百万级别消息的生产和消费。
  • 持久性
    • 有一套完善的消息存储机制,确保数据的高效安全的持久化。
  • 分布式

    • 基于分布式的扩展和容错机制;
    • Kafka 的数据会复制到多台服务器上,当某一台发生故障失效后,生产者和消费者转而使用其他的服务器。

      3. 基本架构

      image.png

      4. Kafka 术语

  • Broker:对于 KafKa 集群来说,每一个 KafKa 实例都被称为一个broker。

  • Topic(主题):在 KafKa 中每一条消息都所属一个 Topic 下,Topic之间是完全物理隔离的。
  • Partition(分区):一个 Topic 下面可以拥有一个到多个 Partitine,Partition 也是物理层面的隔离。
  • Producer(生产者):消息的生产者,向 kafka 的 Topic 发布消息
  • Consumer(消费者):消息的消费者,向 Topic 注册,并且拉取(接收)发布到这些 Topic 的消息。
  • Consumer Group:消费者组,消费者组是一组中存在多个消费者,消费者消费 Broker 中当前 Topic 的不同分区中的消息,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够一个消费者组中的一个消费者组所消费。
  • Replica:副本Replication,为保证集群中某个节点发生故障,节点上的 Partition(分区)的数据不丢失,Kafka 可以正常的工作。Kafka 提供了副本机制,一个 Topic 的每个分区有若干个副本,一个 Leader 和多个 Follower。
  • Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费消息的对象都是 Leader。
  • Follower:每个分区多个副本的从角色,实时的从 Leader 中同步数据,保持和 Leader 数据的同步,Leader 发生故障的时候,某个 Follower 就会变成新的 Leader。

    5. 使用 Kafka 的好处

  • 削峰填谷:缓冲上下游瞬时突发流量,保护 “脆弱” 的下游系统不被压垮,避免引发全链路服务 “雪崩”。

  • 系统解耦:发送方和接收方的松耦合,一定程度简化了开发成本,减少了系统间不必要的直接依赖。
  • 异步通信:消息队列允许用户把消息放入队列但不立即处理它。
  • 可恢复性:即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

    6. 使用 SpringBoot 整合 Kafka

    6.1 导入依赖:在 pom.xml 文件中导入对应的依赖

    1. <!-- 集成kafka -->
    2. <dependency>
    3. <groupId>org.springframework.kafka</groupId>
    4. <artifactId>spring-kafka</artifactId>
    5. <version>2.8.3</version>
    6. </dependency>

    6.2 配置 Kafka

  • 在 application.properties 或 application.yml 中配置:

    1. # KafkaProperties
    2. # Kafka所在服务器的ip地址+端口号
    3. spring.kafka.bootstrap-servers=59.xx.xx.xx:9092
    4. # 消费者分组的id
    5. spring.kafka.consumer.group-id=test-consumer-group
    6. # 是否自动提交
    7. spring.kafka.consumer.enable-auto-commit=true
    8. # 自动提交的频率(单位:毫秒)
    9. spring.kafka.consumer.auto-commit-interval=3000
    10. # 消费者密钥的反序列化器类
    11. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    12. # 消费者值的反序列化器类
    13. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    14. # 生产者密钥的反序列化器类
    15. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    16. # 生产者值的反序列化器类
    17. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  • 对应的配置类 org.springframework.boot.autoconfigure.kafka.KafkaProperties,来初始化kafka相关的bean 实例对象,并注册到 spring 容器中。

    6.3 访问 Kafka

  • 生产者:

Spring Boot 作为一款支持快速开发的集成性框架,同样提供了一批以 -Template 命名的模板工具类用于实现消息通信。对于 Kafka 而言,这个工具类就是 KafkaTemplate。

  1. @Component
  2. public class KafkaProducer {
  3. @Autowired
  4. private KafkaTemplate kafkaTemplate;
  5. public void sendMessage(String topic, String content) {
  6. // KafkaTemplate 提供了一系列 send 方法用来发送消息
  7. kafkaTemplate.send(topic, content);
  8. }
  9. }
  • 消费者:

在 Kafka 中消息通过服务器推送给各个消费者,而 Kafka 的消费者在消费消息时,需要提供一个监听器(Listener)对某个 Topic 实现监听,从而获取消息,这也是 Kafka 消费消息的唯一方式。

  1. @Component
  2. public class KafkaConsumer {
  3. // 对 test 主题进行监听
  4. @KafkaListener(topics = {"test"})
  5. public void handleMessage(ConsumerRecord record) {
  6. // 在读取消息的时候,会把消息封装成Record,用于接收消息
  7. System.out.println(record.value());
  8. }
  9. }
  • 测试:

    1. @RunWith(SpringRunner.class)
    2. @SpringBootTest
    3. @ContextConfiguration(classes = CommunityApplication.class)
    4. public class KafkaTests {
    5. @Autowired
    6. private KafkaProducer kafkaProducer;
    7. @Test
    8. public void testKafka() {
    9. kafkaProducer.sendMessage("test","Hello");
    10. kafkaProducer.sendMessage("test","How are you?");
    11. try {
    12. Thread.sleep(1000*10);
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. }
    16. }
    17. }