1. 定义

本质:Kafka本质是一个MQ(Message Queue,消息队列)

使用消息队列的好处?

  1. 解耦:允许独立扩展或修改队列两边的处理过程
  2. 可恢复性:一个处理消息的进程挂掉了,加入队列的消息仍然可以系统恢复后被处理
  3. 缓冲:解决生产消息和消费消息的处理速度不一致的情况
  4. 灵活性&削峰:不会因为突发的超负荷请求而崩溃,可以顶住突发的访问压力
  5. 异步通信:允许用户把消息放入队列但不立即处理

发布订阅模式:

一对多,生产者将消息发布到Topic中,有多个消费者订阅该主题,发布到Topic的消息会被所有订阅者消费,被消费的数据不会立即从Topic清除。


2. 架构

  1. Kafka存储的消息来源于Producer,一个Producer向一个Topic中生产数据;
  2. 每个Topic由多个分区构成,每个分区有多个副本;
  3. 数据存储在Topic的不同分区,存储利用了分片索引机制,在一个分区内,消息内容和索引连同时间戳存储在一起;
  4. 分区的副本分为leader和follower之分,leader副本负责读写数据,follower副本负责从leader同步数据,并在leader挂掉时成为新的leader;
  5. Consumer进程从分区中订阅消息。
组件 功能
Producer 消息生产者,向 Kafka Broker 发消息的客户端
Consumer 消息消费者,从 Kafka Broker 取消息的客户端
Consumer Group 消费者组(CG),消费者组内每个消费者负责消费不同分区的数据,提高消费能力。一个分区只能由组内一个消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
Broker 一台 Kafka 机器就是一个 Broker。一个集群由多个 Broker 组成。一个 Broker 可以容纳多个 Topic。
Topic 可以理解为一个队列,Topic 将消息分类,生产者和消费者面向的是同一个 Topic。
Partition 为了实现扩展性,提高并发能力,一个非常大的 Topic 可以以多个Partition的形式分布到多个 Broker (即服务器)上,每个 Partition 是一个 有序的队列。
Replication 副本,为实现备份的功能,保证集群中的某个节点发生故障时,该节点上的 Partition 数据不丢失,且 Kafka 仍然能够继续工作,Kafka 提供了副本机制,一个 Topic 的每个分区都有若干个副本,每个副本位于不同的brocker上。其中一个副本为leader,其他的副本为follower,负责同步数据。
Leader 每个分区多个副本的“主”副本,生产者发送数据的对象,以及消费者消费数据的对象,都是 Leader。
Follower 每个分区多个副本的“从”副本,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 还会成为新的 Leader
Offset 消费者消费的位置信息,监控数据消费到什么位置,当消费者挂掉再重新恢复的时候,可以从消费位置继续消费。
Zookeeper Kafka 集群能够正常工作,需要依赖于 Zookeeper,Zookeeper 帮助 Kafka 存储和管理集群信息。



3. Kafka生产者

  1. 创建一个Kafka生产者对象(KafkaProducer)需要配置如下信息:
    1. 指定连接的Kafka集群
    2. ack应答级别 acks
    3. 重试次数 retries
    4. 批大小 batch.size:数据积累到batch.size之后,sender会发送数据
    5. 等待时间 linger.ms:sender在上一次发送数据后等待linger.time,即使数据量未达到batch.size,也会发送数据
    6. 缓冲区大小 buffer.memory:共享变量区域RecordAccumulator中存放了很多个RecordBatch(批)。
    7. 指定key、value的序列化类
  2. 生产者将要发送到数据封装成一个ProducerRecord对象然后再发送
    1. producer.send(new ProducerRecord<>(…)).get();
  3. ProducerRecord类的有参构造有多种重载方法,用这些方法创建生产者对象
    1. 必须包含的参数是topicvalue
    2. 可选的参数是partitionkeytimestampheaders

image.png

  1. public class MyProducer {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. //1.创建Kafka生产者的配置信息
  4. Properties properties = new Properties();
  5. //2.指定连接的Kafka集群,ProducerConfig里面有所有的键变量。
  6. //properties.put("bootstrap.servers","hadoopstudy102:9092");
  7. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG ,"hadoopstudy102:9092");
  8. //3.ack应答级别
  9. properties.put("acks", "all");
  10. //4.重试次数
  11. properties.put("retries", 3);
  12. //5.批次大小 16384=16k
  13. properties.put("batch.size", 16384);
  14. //6.等待时间
  15. properties.put("linger.ms", 1);
  16. //7.RecordAccumulator 缓冲区大小 33554432=32m
  17. properties.put("buffer.memory", 33554432);
  18. //8.指定key,value的序列化类
  19. properties.put("key.serializer",
  20. "org.apache.kafka.common.serialization.StringSerializer");
  21. properties.put("value.serializer",
  22. "org.apache.kafka.common.serialization.StringSerializer");
  23. //9.创建生产者对象
  24. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  25. //10.发送数据
  26. for (int i = 0; i < 10; i++) {
  27. //加一个.get()后变成同步
  28. producer.send(new ProducerRecord<>("first","mhj","mhj--"+i)).get();
  29. }
  30. //11.关闭资源
  31. producer.close();
  32. }
  33. }

3.1. 分区策略

  1. 分区原因
    1. 方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个 topic又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
    2. 提高并发,因为可以以Partition为单位读写了。
  2. 分区的原则
    1. ProducerRecord类的构造方法中指定了partition分区号,则直接将该值作为分区值;
    2. ProducerRecord类的构造方法中没有指定partition,但指定了key,则按照key的哈希值分区(hashcode%numOfPartition);
    3. 如果partition和key都么有指定,则轮询分区。

3.2 数据发送方式

同步发送:发送一个message,收到返回值后才发送下一个。安全但效率低。

producer.send(new ProducerRecord<>(“first”,”mhj”,”mhj—“+i)).get(); 加get()为同步发送

异步发送:发送一个message,无需受到来自Broker的返回值就发送下一个。数据易丢失但效率高。

producer.send(new ProducerRecord<>(“first”,”mhj”,”mhj—“+i));


4. Kafka broker

4.1. Controller 与 ZooKeeper

Kafka 集群中有一个broker会被选举为Controller,负责

  1. 管理集群broker的上下线;
  2. 所有topic的分区副本分配;
  3. Topic分区的leader选举等工作。

Kafka broker 会在ZooKeeper中注册临时节点 /brokers/ids

4.1.1. 选举过程

该选举过程产生于运行时有broker节点挂掉了。

  1. 所有的broker都向zk注册临时节点,Controller监听zk的/broker/ids目录;
  2. 假设当broker1节点挂掉以后,在该broker1内的TopicA的分区leader也相应的挂掉了;
  3. Kafka Controller从每个Topic的每个分区获取ISR,然后将broker1从ISR中剔除;
  4. 通过相应的算法从isr中选举出新的leader并更新leader。

4.2. 存储机制


5. Kafka 消费者

consumer采用pull(拉)模式从broker中读取数据。
push(推)模式消息发送速率由broker决定,很难适应消费速度不同的消费者,容易产生拒绝服务和网络拥塞问题。

pull模式的不足:如果Kafka没有数据,消费者会陷入到循环中,一直返回空数据。针对这种不足,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可以消费,consumer会等待一段时间再拉取,这段时间即为timeout。

consumer group:一个consumer group中有多个 consumer,topic中的多个partition分配给消费者组中的多个consumer就涉及到了partition的分配问题。

5.1. 分区分配策略

同一消费者组的消费者不能消费相同的分区;
不同消费者组的消费者可以消费一个Topic相同的分区;
一个消费者组可以同时消费多个Topic;

5.1.1. RoundRobin 轮询

轮询:就是将partition依次分给消费者中的每个消费者。
这种方式会将多个topic的分区作为一个整体进行Hash排序,消费者组内分配分区个数的最大差别为1

优点:多个消费者消费数据均衡性好;
缺点:当消费者组内的消费者订阅不同主题的时候,会产生消费混乱问题;
如下图所示,Comsumer-0订阅了TopicA,Consumer-1和Consumer-2订阅了TopicB,但他们都会受到来自未订阅主题的消息。

5.1.2. Range(默认)

面向topic的默认分区分配策略;

优点:不会产生轮询方式的消费混乱问题
缺点:当多个consumer同时订阅了相同主题,会产生分配不均衡的问题。而且组内消费者订阅的主题越多,分区分配就可能越不均衡。

partitionNum = 10; consumerNum = 3; avgNum = 10 / 3 = 3; restNum = 10 % 3 = 1;

每个consumer分配到 avgNum个分区,前restNum个consumer 在分配到avgNum个分区的基础上多分一个。(先计算每个消费者分配的分区数,再给每个消费者一次性分配分区)

5.2. 消费者offset的维护

由于consumer消费过程中可能会出出现宕机等故障,consumer恢复后,需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

  1. 在Kafka 0.9 版本之前,consumer默认将offset保存在ZooKeeper中 /consumers/consumer_group_N/offsets/topic_N/partition_N
  2. 从Kafka 0.9 版本开始,consumer默认将offset保存在Kafka 内置的Topic __consumer_offsets

6. 数据可靠性

6.1. 数据丢失问题:ack应答机制,设置ack等级

ack=0 不等待任何返回值就继续发送
ack=1 leader接收到后给返回值
ack=all 或 ack=-1 ISR中所有副本都接收到后才给返回值

注意:将ack设置为0的时候才使Kafka可以异步通信并具有高吞吐量,这样有可能产生数据丢失问题。所以Kafka的高吞吐量和数据安全性是呈反比的。
**

6.2. 数据重复问题:幂等性

幂等性指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。
幂等性的实现:每条消息附加了Sequence和PID,相同的PID和SequenceNumber发送给Broker,只会保存一条。
如图所示:

  • 第一次发送message(C,D),并附加Sequence=1和PID=1000;
  • Broker将该数据添加到Streaming Message中;
  • Broker返回的ack没能成功的发送到Producer;
  • Producer重发了message(C,D),并附加Sequence=1和PID=1000;
  • Broker比较PID和Sequence后没有将该Message放入Streaming Message中;
  • Broker返回ack给Producer。

6.3. 数据不丢失不重复:exactly Once = At Least Once + 幂等性

幂等性 ——> 数据不重复;
At Least Once ——> 数据不丢失(ack=-1);

6.4. 副本数据同步策略

方案 优点 缺点
半数以上完成同步,就发送ack 延迟低 选举新的leader时,如果要容忍n台节点的故障,就需要2n+1个副本
全部完成同步,才发送ack 选举新的leader时,如果要容忍n台节点的故障,只需要n+1个副本 延迟高

Kafka选择第二种方案,原因:

  1. 第一种方案需要2n+1个副本,产生的冗余数据太多,故选择第二种;
  2. 虽然第二种方案的网络延迟比较高,但网络延迟对Kafka的影响较小。

6.5. ISR(in-sync replica set)

ISR即一个和leader保持同步的follower集合,每个Partition都会有一个ISR,由leader动态维护。

  • 当ISR中的follower完成数据的同步之后,follower就会给leader发送ack。
  • 根据副本数据同步策略,当ISR中的所有replic都向leader发送ack后,leader才会给Producer发送ack。
  • 如果follower长时间未向leader同步数据,则该follower将被提出ISR。该时间由replica.lag.time.max.ms参数来设定。
  • leader发生故障之后,就会从ISR中选举新的leader。

6.6. 副本间数据一致性 HW和LEO

LEO:指的是每个副本最大的offset;
HW:指的是消费者能见到的最大的offset,ISR中各个副本的LEO的最小值。

  1. follower故障

follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,就可以重新加入ISR了。

  1. leader故障

leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会将各自的log文件高于HW的部分截掉,然后从新的leader中同步数据。


7. Kafka 的高效读写

  1. 顺序写磁盘

Kafka的producer生产数据要写入log文件中,写入的过程是一直追加到文件末端,为顺序写。

  1. 零复制技术

用于优化网络传输的性能,减少复制拷贝次数,减小上下文切换和缓存行污染。


参考文献

https://baijiahao.baidu.com/s?id=1660161962545490659&wfr=spider&for=pc&isFailFlag=1
幂等性:https://www.cnblogs.com/smartloli/p/11922639.html