1. 定义
本质:Kafka本质是一个MQ(Message Queue,消息队列)
使用消息队列的好处?
- 解耦:允许独立扩展或修改队列两边的处理过程
- 可恢复性:一个处理消息的进程挂掉了,加入队列的消息仍然可以系统恢复后被处理
- 缓冲:解决生产消息和消费消息的处理速度不一致的情况
- 灵活性&削峰:不会因为突发的超负荷请求而崩溃,可以顶住突发的访问压力
- 异步通信:允许用户把消息放入队列但不立即处理
发布订阅模式:
一对多,生产者将消息发布到Topic中,有多个消费者订阅该主题,发布到Topic的消息会被所有订阅者消费,被消费的数据不会立即从Topic清除。
2. 架构
- Kafka存储的消息来源于Producer,一个Producer向一个Topic中生产数据;
- 每个Topic由多个分区构成,每个分区有多个副本;
- 数据存储在Topic的不同分区,存储利用了分片索引机制,在一个分区内,消息内容和索引连同时间戳存储在一起;
- 分区的副本分为leader和follower之分,leader副本负责读写数据,follower副本负责从leader同步数据,并在leader挂掉时成为新的leader;
- Consumer进程从分区中订阅消息。
组件 | 功能 |
---|---|
Producer | 消息生产者,向 Kafka Broker 发消息的客户端 |
Consumer | 消息消费者,从 Kafka Broker 取消息的客户端 |
Consumer Group | 消费者组(CG),消费者组内每个消费者负责消费不同分区的数据,提高消费能力。一个分区只能由组内一个消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。 |
Broker | 一台 Kafka 机器就是一个 Broker。一个集群由多个 Broker 组成。一个 Broker 可以容纳多个 Topic。 |
Topic | 可以理解为一个队列,Topic 将消息分类,生产者和消费者面向的是同一个 Topic。 |
Partition | 为了实现扩展性,提高并发能力,一个非常大的 Topic 可以以多个Partition的形式分布到多个 Broker (即服务器)上,每个 Partition 是一个 有序的队列。 |
Replication | 副本,为实现备份的功能,保证集群中的某个节点发生故障时,该节点上的 Partition 数据不丢失,且 Kafka 仍然能够继续工作,Kafka 提供了副本机制,一个 Topic 的每个分区都有若干个副本,每个副本位于不同的brocker上。其中一个副本为leader,其他的副本为follower,负责同步数据。 |
Leader | 每个分区多个副本的“主”副本,生产者发送数据的对象,以及消费者消费数据的对象,都是 Leader。 |
Follower | 每个分区多个副本的“从”副本,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 还会成为新的 Leader |
Offset | 消费者消费的位置信息,监控数据消费到什么位置,当消费者挂掉再重新恢复的时候,可以从消费位置继续消费。 |
Zookeeper | Kafka 集群能够正常工作,需要依赖于 Zookeeper,Zookeeper 帮助 Kafka 存储和管理集群信息。 |
3. Kafka生产者
- 创建一个Kafka生产者对象(KafkaProducer
)需要配置如下信息: - 指定连接的Kafka集群
- ack应答级别 acks
- 重试次数 retries
- 批大小 batch.size:数据积累到batch.size之后,sender会发送数据
- 等待时间 linger.ms:sender在上一次发送数据后等待linger.time,即使数据量未达到batch.size,也会发送数据
- 缓冲区大小 buffer.memory:共享变量区域RecordAccumulator中存放了很多个RecordBatch(批)。
- 指定key、value的序列化类
- 生产者将要发送到数据封装成一个ProducerRecord对象然后再发送
- producer.send(new ProducerRecord<>(…)).get();
- ProducerRecord类的有参构造有多种重载方法,用这些方法创建生产者对象
- 必须包含的参数是topic和value;
- 可选的参数是partition、key、timestamp、headers
public class MyProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.创建Kafka生产者的配置信息
Properties properties = new Properties();
//2.指定连接的Kafka集群,ProducerConfig里面有所有的键变量。
//properties.put("bootstrap.servers","hadoopstudy102:9092");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG ,"hadoopstudy102:9092");
//3.ack应答级别
properties.put("acks", "all");
//4.重试次数
properties.put("retries", 3);
//5.批次大小 16384=16k
properties.put("batch.size", 16384);
//6.等待时间
properties.put("linger.ms", 1);
//7.RecordAccumulator 缓冲区大小 33554432=32m
properties.put("buffer.memory", 33554432);
//8.指定key,value的序列化类
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//9.创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
//10.发送数据
for (int i = 0; i < 10; i++) {
//加一个.get()后变成同步
producer.send(new ProducerRecord<>("first","mhj","mhj--"+i)).get();
}
//11.关闭资源
producer.close();
}
}
3.1. 分区策略
- 分区原因:
- 方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个 topic又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
- 提高并发,因为可以以Partition为单位读写了。
- 分区的原则:
- ProducerRecord类的构造方法中指定了partition分区号,则直接将该值作为分区值;
- ProducerRecord类的构造方法中没有指定partition,但指定了key,则按照key的哈希值分区(hashcode%numOfPartition);
- 如果partition和key都么有指定,则轮询分区。
3.2 数据发送方式
同步发送:发送一个message,收到返回值后才发送下一个。安全但效率低。
producer.send(new ProducerRecord<>(“first”,”mhj”,”mhj—“+i)).get(); 加get()为同步发送
异步发送:发送一个message,无需受到来自Broker的返回值就发送下一个。数据易丢失但效率高。
producer.send(new ProducerRecord<>(“first”,”mhj”,”mhj—“+i));
4. Kafka broker
4.1. Controller 与 ZooKeeper
Kafka 集群中有一个broker会被选举为Controller,负责
- 管理集群broker的上下线;
- 所有topic的分区副本分配;
- Topic分区的leader选举等工作。
Kafka broker 会在ZooKeeper中注册临时节点 /brokers/ids
4.1.1. 选举过程
该选举过程产生于运行时有broker节点挂掉了。
- 所有的broker都向zk注册临时节点,Controller监听zk的/broker/ids目录;
- 假设当broker1节点挂掉以后,在该broker1内的TopicA的分区leader也相应的挂掉了;
- Kafka Controller从每个Topic的每个分区获取ISR,然后将broker1从ISR中剔除;
- 通过相应的算法从isr中选举出新的leader并更新leader。
4.2. 存储机制
5. Kafka 消费者
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式消息发送速率由broker决定,很难适应消费速度不同的消费者,容易产生拒绝服务和网络拥塞问题。
pull模式的不足:如果Kafka没有数据,消费者会陷入到循环中,一直返回空数据。针对这种不足,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可以消费,consumer会等待一段时间再拉取,这段时间即为timeout。
consumer group:一个consumer group中有多个 consumer,topic中的多个partition分配给消费者组中的多个consumer就涉及到了partition的分配问题。
5.1. 分区分配策略
同一消费者组的消费者不能消费相同的分区;
不同消费者组的消费者可以消费一个Topic相同的分区;
一个消费者组可以同时消费多个Topic;
5.1.1. RoundRobin 轮询
轮询:就是将partition依次分给消费者中的每个消费者。
这种方式会将多个topic的分区作为一个整体进行Hash排序,消费者组内分配分区个数的最大差别为1
优点:多个消费者消费数据均衡性好;
缺点:当消费者组内的消费者订阅不同主题的时候,会产生消费混乱问题;
如下图所示,Comsumer-0订阅了TopicA,Consumer-1和Consumer-2订阅了TopicB,但他们都会受到来自未订阅主题的消息。
5.1.2. Range(默认)
面向topic的默认分区分配策略;
优点:不会产生轮询方式的消费混乱问题
缺点:当多个consumer同时订阅了相同主题,会产生分配不均衡的问题。而且组内消费者订阅的主题越多,分区分配就可能越不均衡。
partitionNum = 10; consumerNum = 3; avgNum = 10 / 3 = 3; restNum = 10 % 3 = 1;
每个consumer分配到 avgNum个分区,前restNum个consumer 在分配到avgNum个分区的基础上多分一个。(先计算每个消费者分配的分区数,再给每个消费者一次性分配分区)
5.2. 消费者offset的维护
由于consumer消费过程中可能会出出现宕机等故障,consumer恢复后,需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
- 在Kafka 0.9 版本之前,consumer默认将offset保存在ZooKeeper中 /consumers/consumer_group_N/offsets/topic_N/partition_N
- 从Kafka 0.9 版本开始,consumer默认将offset保存在Kafka 内置的Topic __consumer_offsets 中
6. 数据可靠性
6.1. 数据丢失问题:ack应答机制,设置ack等级
ack=0 | 不等待任何返回值就继续发送 |
---|---|
ack=1 | leader接收到后给返回值 |
ack=all 或 ack=-1 | ISR中所有副本都接收到后才给返回值 |
注意:将ack设置为0的时候才使Kafka可以异步通信并具有高吞吐量,这样有可能产生数据丢失问题。所以Kafka的高吞吐量和数据安全性是呈反比的。
**
6.2. 数据重复问题:幂等性
幂等性指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。
幂等性的实现:每条消息附加了Sequence和PID,相同的PID和SequenceNumber发送给Broker,只会保存一条。
如图所示:
- 第一次发送message(C,D),并附加Sequence=1和PID=1000;
- Broker将该数据添加到Streaming Message中;
- Broker返回的ack没能成功的发送到Producer;
- Producer重发了message(C,D),并附加Sequence=1和PID=1000;
- Broker比较PID和Sequence后没有将该Message放入Streaming Message中;
- Broker返回ack给Producer。
6.3. 数据不丢失不重复:exactly Once = At Least Once + 幂等性
幂等性 ——> 数据不重复;
At Least Once ——> 数据不丢失(ack=-1);
6.4. 副本数据同步策略
方案 | 优点 | 缺点 |
---|---|---|
半数以上完成同步,就发送ack | 延迟低 | 选举新的leader时,如果要容忍n台节点的故障,就需要2n+1个副本 |
全部完成同步,才发送ack | 选举新的leader时,如果要容忍n台节点的故障,只需要n+1个副本 | 延迟高 |
Kafka选择第二种方案,原因:
- 第一种方案需要2n+1个副本,产生的冗余数据太多,故选择第二种;
- 虽然第二种方案的网络延迟比较高,但网络延迟对Kafka的影响较小。
6.5. ISR(in-sync replica set)
ISR即一个和leader保持同步的follower集合,每个Partition都会有一个ISR,由leader动态维护。
- 当ISR中的follower完成数据的同步之后,follower就会给leader发送ack。
- 根据副本数据同步策略,当ISR中的所有replic都向leader发送ack后,leader才会给Producer发送ack。
- 如果follower长时间未向leader同步数据,则该follower将被提出ISR。该时间由
replica.lag.time.max.ms
参数来设定。 - leader发生故障之后,就会从ISR中选举新的leader。
6.6. 副本间数据一致性 HW和LEO
LEO:指的是每个副本最大的offset;
HW:指的是消费者能见到的最大的offset,ISR中各个副本的LEO的最小值。
- follower故障
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,就可以重新加入ISR了。
- leader故障
leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会将各自的log文件高于HW的部分截掉,然后从新的leader中同步数据。
7. Kafka 的高效读写
- 顺序写磁盘
Kafka的producer生产数据要写入log文件中,写入的过程是一直追加到文件末端,为顺序写。
- 零复制技术
用于优化网络传输的性能,减少复制拷贝次数,减小上下文切换和缓存行污染。
参考文献
https://baijiahao.baidu.com/s?id=1660161962545490659&wfr=spider&for=pc&isFailFlag=1
幂等性:https://www.cnblogs.com/smartloli/p/11922639.html