在SpringBoot整合Redis之前,先来简单认识下Kafka。
1. Kafka 简介
KafKa是一款由Apache软件基金会开源,由Scala编写的一个分布式发布订阅消息系统。
- Kafka 是一个分布式的流媒体平台。
-
2. Kafka 特点
高吞吐量
- 可以满足每秒百万级别消息的生产和消费。
- 持久性
- 有一套完善的消息存储机制,确保数据的高效安全的持久化。
分布式
Broker:对于 KafKa 集群来说,每一个 KafKa 实例都被称为一个broker。
- Topic(主题):在 KafKa 中每一条消息都所属一个 Topic 下,Topic之间是完全物理隔离的。
- Partition(分区):一个 Topic 下面可以拥有一个到多个 Partitine,Partition 也是物理层面的隔离。
- Producer(生产者):消息的生产者,向 kafka 的 Topic 发布消息
- Consumer(消费者):消息的消费者,向 Topic 注册,并且拉取(接收)发布到这些 Topic 的消息。
- Consumer Group:消费者组,消费者组是一组中存在多个消费者,消费者消费 Broker 中当前 Topic 的不同分区中的消息,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够一个消费者组中的一个消费者组所消费。
- Replica:副本Replication,为保证集群中某个节点发生故障,节点上的 Partition(分区)的数据不丢失,Kafka 可以正常的工作。Kafka 提供了副本机制,一个 Topic 的每个分区有若干个副本,一个 Leader 和多个 Follower。
- Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费消息的对象都是 Leader。
Follower:每个分区多个副本的从角色,实时的从 Leader 中同步数据,保持和 Leader 数据的同步,Leader 发生故障的时候,某个 Follower 就会变成新的 Leader。
5. 使用 Kafka 的好处
削峰填谷:缓冲上下游瞬时突发流量,保护 “脆弱” 的下游系统不被压垮,避免引发全链路服务 “雪崩”。
- 系统解耦:发送方和接收方的松耦合,一定程度简化了开发成本,减少了系统间不必要的直接依赖。
- 异步通信:消息队列允许用户把消息放入队列但不立即处理它。
可恢复性:即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
6. 使用 SpringBoot 整合 Kafka
6.1 导入依赖:在 pom.xml 文件中导入对应的依赖
<!-- 集成kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.3</version>
</dependency>
6.2 配置 Kafka
在 application.properties 或 application.yml 中配置:
# KafkaProperties
# Kafka所在服务器的ip地址+端口号
spring.kafka.bootstrap-servers=59.xx.xx.xx:9092
# 消费者分组的id
spring.kafka.consumer.group-id=test-consumer-group
# 是否自动提交
spring.kafka.consumer.enable-auto-commit=true
# 自动提交的频率(单位:毫秒)
spring.kafka.consumer.auto-commit-interval=3000
# 消费者密钥的反序列化器类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费者值的反序列化器类
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 生产者密钥的反序列化器类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者值的反序列化器类
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
对应的配置类 org.springframework.boot.autoconfigure.kafka.KafkaProperties,来初始化kafka相关的bean 实例对象,并注册到 spring 容器中。
6.3 访问 Kafka
生产者:
Spring Boot 作为一款支持快速开发的集成性框架,同样提供了一批以 -Template 命名的模板工具类用于实现消息通信。对于 Kafka 而言,这个工具类就是 KafkaTemplate。
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String content) {
// KafkaTemplate 提供了一系列 send 方法用来发送消息
kafkaTemplate.send(topic, content);
}
}
- 消费者:
在 Kafka 中消息通过服务器推送给各个消费者,而 Kafka 的消费者在消费消息时,需要提供一个监听器(Listener)对某个 Topic 实现监听,从而获取消息,这也是 Kafka 消费消息的唯一方式。
@Component
public class KafkaConsumer {
// 对 test 主题进行监听
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
// 在读取消息的时候,会把消息封装成Record,用于接收消息
System.out.println(record.value());
}
}
测试:
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka() {
kafkaProducer.sendMessage("test","Hello");
kafkaProducer.sendMessage("test","How are you?");
try {
Thread.sleep(1000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}