第一部分 Kafka架构与实战

Kafka介绍

  1. Kafka在一个或多个可以跨越多个数据中心的服务器上作为集群运行。
    2. Kafka集群中按照主题分类管理,一个主题可以有多个分区,一个分区可以有多个副本分区。
    3. 每个记录由一个键,一个值和一个时间戳组成

Kafka优势

  • 高吞吐量:单机每秒处理几十上百万的消息量。即使存储了许多TB的消息,它也保持稳定的性能
  • 高性能:单节点支持上千个客户端,并保证零停机和零数据丢失
  • 持久化数据存储:将消息持久化到磁盘。通过将数据持久化到硬盘以及replication防止数据丢失。
  • 分布式系统,易于向外扩展。所有的Producer、Broker和Consumer都会有多个,均为分布式的。无需停机即可扩展机器。多个Producer、Consumer可能是不同的应用
  • 可靠性 - Kafka是分布式,分区,复制和容错的
  • 客户端状态维护:消息被处理的状态是在Consumer端维护,而不是由server端维护。当失败时能自动平衡
  • 支持online和offline的场景
  • 支持多种客户端语言。Kafka支持Java、.NET、PHP、Python等多种语言

基本架构

这部分内容较多,详见讲义部分。
image.png

核心概念【面试】

1)Producer

生产者创建消息。
该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的 segment 文件中。

一般情况下,一个消息会被发布到一个特定的主题上。
1. 默认情况下通过轮询把消息均衡地分布到主题的所有分区上。
2. 在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。
3. 生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。

2)Consumer

消费者读取消息。
1. 消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。
2. 消费者通过检查消息的偏移量来区分已经读取过的消息。偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时,Kafka 会把它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在Zookeeper 或Kafka上,如果消费者关闭或重启,它的读取状态不会丢失。
3.消费者是消费组的一部分。群组保证每个分区只能被一个消费者使用。(如果多个消费者,则无法确定每个分区的偏移量,因为在给定的分区里,每个消息的偏移量都是唯一的)
4. 如果一个消费者失效,消费组里的其他消费者可以接管失效消费者的工作,再平衡,分区重新分配。
image.png

3)Broker

一个独立的Kafka 服务器被称为broker。
broker 是集群的组成部分。每个集群都有一个broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。
分区首领:拥有某个leader分区的broker,比如broker1就是分区0的分区首领,因为分区0的leader在broker1上。

image.png

4)Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
物理上不同Topic的消息分开存储。
主题就好比数据库的表,尤其是分库分表之后的逻辑表。

5)Partition

  1. 主题可以被分为若干个分区,一个分区就是一个提交日志。
    2. 消息以追加的方式写入分区,然后以先入先出的顺序读取
    3. 无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。
    4. Kafka 通过分区来实现数据冗余和伸缩性
    5. 在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。(分区内部有序)image.png

6)Replicas

Kafka 使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。那些副本被保存在broker 上,每个broker 可以保存成百上千个属于不同主题和分区的副本。

  • 首领副本
    • 每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本。
  • 跟随者副本
    • 首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一的任务就是从首领那里复制消息,保持与首领一致的状态。如果首领发生崩溃,其中的一个跟随者会被提升为新首领。

7)Offset

生产者Offset

消息写入的时候,每一个分区都有一个offset,这个offset就是生产者的offset,同时也是这个分区的最新最大的offset。
image.png

消费者Offset

这是某一个分区的offset情况,生产者写入的offset是最新最大的值是12,
而当Consumer A进行消费时,从0开始消费,一直消费到了9,消费者的offset就记录在9,
而Consumer B就纪录在了11。(A和B 不是一个消费组??)
等下一次他们再来消费时,他们可以选择接着上一次的位置消费,当然也可以选择从头消费,或者跳到最近的记录并从“现在”开始消费。
image.png

8)副本

Kafka通过副本保证高可用。副本分为首领副本(Leader)和跟随者副本(Follower)。
跟随者副本包括同步副本和不同步副本,在发生首领副本切换的时候,只有同步副本可以切换为首领副本。

AR

分区中的所有副本统称为AR(Assigned Repllicas)。
AR=ISR(“同步”)+OSR(不同步)

ISR

所有与leader副本保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas),ISR集合是AR集合中的一个子集。
消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。前面所说的“一定程度”是指可以忍受的滞后范围,这个范围可以通过参数进行配置。

OSR

与leader副本同步滞后过多的副本(不包括leader)副本,组成OSR(Out-Sync Relipcas)。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR=ISR,OSR集合为空。

HW

HW是High Watermak的缩写, 俗称高水位,它表示了一个特定消息的偏移量(offset),消费之只能拉取到这个offset之前的消息。
【重点】HW和LEO的更新?

LEO

LEO是Log End Offset的缩写,它表示了当前日志文件中下一条待写入消息的offset(也就是未来的消息的偏移量)。

image.png

第二部分 Kafka高级特性

生产者

1、kafka中acks有几种选项?

  1. acks=0,不等待确认
    1. 生产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。 该情形不能保证broker是否真的收到了消息,retries配置也不会生效。发送的消息的返回的消息偏移量永远是-1。
  2. acks=1,等待leader分区确认
    1. 表示消息只需要写到主分区(leader分区)即可,然后就响应客户端,而不等待副本分区的确认。 在该情形下,如果主分区收到消息确认之后就宕机了,而副本分区还没来得及同步该消息,则该消息丢失。
  3. acks=all,等待所有ISR分区确认
    1. 首领分区会等待所有的ISR副本分区确认记录。 该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。 这是Kafka最强的可靠性保证,等效于 acks=-1
  4. acks=-1

    2、描述一下Kafka数据生成流程(如图)

  • Producer创建时,会创建一个sender线程并者只为守护线程
  • 生产消息时,内部其实是异步流程;生产的消息会先经过拦截器、序列化器、分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)
  • 批次发送的条件为:缓冲区大小达到batch.size或者linger.ms上限,哪个先达到就算哪个
  • 批次发送后,发往指定分区,然后落盘到broker;(如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端内部会对消息重试。)
  • 落盘到broker成功,返回生产元数据给生产者
  • 元数据返回有两种方式:一同是通过阻塞直接返回;另外一种是通过回调返回。

image.png

3、Kafka生产者序列化器的作用以及如何自定义序列化器?

a. 由于kafka中的数据都是字节数组,在将消息发送到Kafak之前需要先将数据序列化为字节数组。序列化器的作用就是用于序列化要发送的消息。(持久传输?)
b. 自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer接口,并实现其中serialize方法。

4、Kafka的生产者分区器的作用以及如何自定义分区器?

  1. 分区计算?
    1. 使用record提供的分区号
    2. 使用key的序列化后的值hash值对分区数量取模
    3. 使用轮询的方式分配区号

消费者

消费消息的偏移量保存在Kafka的名字是__consumer_offsets的主题中。
消费组均衡地给消费者分配分区,每个分区只由消费组中一个消费者消费。(避免重复消费,分区中会保存消费着 消费消息 的offsets?)

1、心跳机制?

  1. Kafka 的心跳是 Kafka Consumer 和 Broker 之间的健康检查,只有当 Broker Coordinator(协调器) 正常时,Consumer 才会发送心跳。

2、主题和分区,消费组?

  1. 主题Topic
    1. Kafka用于分类管理消息的逻辑单元,类似与MySQL的数据库。
  2. 分区Partition
    1. 是Kafka下数据存储的基本单元,这个是物理上的概念。
    2. 同一个topic的数据,会被分散的存储到多个partition中,这些partition可以在同一台机器上,也可以是在多台机器上。
    3. 优势在于:有利于水平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余性,提高容灾能力。为了做到均匀分布,通常partition的数量通常是Broker Server数量的整数倍。
  3. 消费组Consumer Group
    1. 同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。
    2. 保证一个消费组获取到特定主题的全部的消息。
    3. 在消费组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。

3、消费者位移提交?

  1. consumer需要向Kafka记录自己的位移数据,这个汇报过程称为位移提交
  2. consumer需要为而分配给它的每个分区提交各自的位移数据(向分区提交自己的偏移量)
  3. 位移提交的由consumer端负责,Kafka只负责保管
  4. 位移提交分为自动提交手动提交
  5. 位移提交又分为同步提交异步提交

4、消费者位移管理

  • Kafka中,消费者根据消息的位移顺序消费消息。
  • 消费者的位移由消费者管理,可以存储于zookeeper中,也可以存储于Kafka主题 __consumer_offsets中

5、再均衡

触发条件:

  • 消费者组内成员发生变更(消费者的增加或减少)
  • 主题的分区数发生变更(只支持增加分区)
  • 订阅的主题发生变化

6、心跳机制image.png

7、消费组管理(Consumer Group)

组协调器 Group Coordinator

特性

  1. 消费组有一个或多个消费者,消费者可以是一个进程,也可以是一个线程
  2. group.id是一个字符串,唯一标识一个消费组
  3. 消费组订阅的主题每个分区只能分配给消费组一个消费者。

Group Coordinator来执行对于消费组的管理。
Group Coordinator——每个消费组分配一个消费组协调器用于组管理和位移管理。当消费组的第一个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费者和该组协调器协调通信。

主题

分区

1、副本机制

当集群中的一个broker宕机后系统可以自动故障转移到其他可用的副本上,不会造成数据丢失。
—replication-factor 3 = 1leader+2follower
所有读取和写入都由Leader副本负责。
通常,分区比broker(主机数,3台)多,并且Leader分区在broker之间平均分配。
image.png

2、Leadder选举

Kafka中Leader分区选举,通过维护一个动态变化的ISR集合来实现,一旦Leader分区丢掉,则从ISR中随机挑选一个副本做新的Leader分区。
如果ISR中的副本都丢失了,则:
1. 可以等待ISR中的副本任何一个恢复,接着对外提供服务,需要时间等待。
2. 从OSR中选出一个副本做Leader副本,此时会造成数据丢失。

3、 分区分配策略

1.RangeAssignor-均分

Kafka默认采用RangeAssignor的分配算法
对分区按照分区ID进行数值排序,然后订阅这个Topic的消费组的消费者再进行字典排序,
image.png

存在问题:
字典序靠前的消费组中的消费者比较“贪婪”。随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重。
image.png

2.RoundRobinAssignor-轮询

image.png
如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的。
如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些Topic的分配。
image.png

3.StickyAssignor

分区的分配尽量的均衡
每一次重分配的结果尽量与上一次分配结果保持一致

磁盘存储(速度快)

1、零拷贝

传统的IO,需要多次copy数据
1、第一次:将磁盘文件,读取到操作系统内核缓冲区;
2、第二次:将内核缓冲区的数据,copy到application应用程序的buffer;
3、第三步:将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于操作系统
内核的缓冲区);
4、第四次:将socket buffer的数据,copy到网络协议栈,由网卡进行网络传输。
image.png

kafka的两个过程:
1、网络数据持久化到磁盘 (Producer 到 Broker)
2、磁盘文件通过网络发送(Broker 到 Consumer)
数据落盘通常都是非实时的,Kafka的数据并不是实时的写入硬盘,它充分利用了现代操作系统分页存储来利用内存提高I/O效率。

2、页缓存

概念:就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。

Memory Mapped Files(mmap):将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件。

  • 不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。
  • 如果Kafka写入到mmap之后就立即flush然后再返回Producer叫同步(sync);
  • 写入mmap之后立即返回Producer不调用flush叫异步(async)。

当一个进程准备读取磁盘上的文件内容时:
1. 操作系统会先查看待读取的数据所在的页 (page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘的 I/O 操作;
2. 如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。

如果一个进程需要将数据写入磁盘:
1. 操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。
2. 被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘(flush),以保持数据的一致性。

Kafka中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。
消息先被写入页缓存,由操作系统负责刷盘任务。

3、顺序写入

Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消 息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作,所以就算 Kafka 使用磁盘作为存储介质,也能承载非常大的吞吐量。

mmap和sendfile:
1. Linux内核提供、实现零拷贝的API;
2. sendfile 是将读到内核空间的数据,转到socket buffer,进行网络发送;
3. mmap将磁盘文件映射到内存,支持读和写,对内存的操作会反映在磁盘文件上。
4. RocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile。

Kafka速度快是因为:
1. partition顺序读写,充分利用磁盘特性,这是基础;
2. Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入;
3. Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。

稳定性(事务)

幂等性

保证在消息重发的时候,消费者不会重复处理。即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的一致性。
比如,银行转账,如果失败,需要重试。不管重试多少次,都要保证最终结果一定是一致的。

幂等性实现:
在底层设计架构中引入了ProducerID和SequenceNumber。

  • ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
  • SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。

异常发送情况:
image.png
ack失败时,对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。

事务操作

常见的三种情况:

  1. 只有Producer生产消息,这种场景需要事务的介入;
  2. 消费消息和生产消息并存,比如Consumer&Producer模式,这种场景是一般Kafka项目中比较常见的模式,需要事务介入;
  3. 只有Consumer消费消息,这种操作在实际项目中意义不大,和手动Commit Offsets的结果一样,而且这种场景不是事务的引入目的。

控制器

  • 控制器就是一个broker。
  • 控制器除了一般broker的功能,还负责Leader分区的选举。

image.png
结论:
Kafka通过Zookeeper的分布式锁特性选举集群控制器
1.Kafka 使用 Zookeeper 的分布式锁选举控制器,并在节点加入集群或退出集群时通知控制器。
2. 控制器负责在节点加入或离开集群时进行分区Leader选举。
3. 控制器使用epoch 来避免“脑裂”。“脑裂”是指两个节点同时认为自己是当前的控制器。

zk实现分布式锁(kafka选举broker)

  1. 锁就是zk指定目录下序号最小的临时序列节点,多个系统的多个线程都要在此目录下创建临时的顺序节点,因为Zk会为我们保证节点的顺序性,所以可以利用节点的顺序进行锁的判断。
  2. 每个线程都是先创建临时顺序节点,然后获取当前目录下最小的节点(序号),判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。
  3. 获取锁失败的线程获取当前节点上一个临时顺序节点,并对对此节点进行监听,当该节点删除的时候(上一个线程执行结束删除或者是掉线zk删除临时节点)这个线程会获取到通知,代表获取到了锁。

可靠性

为了保证可靠性,可以设置 acks=all 。Follower收到消息后,会像Leader发送ACK。一旦Leader收到了ISR中所有Replica的ACK(leader+follower),Leader就commit,那么Leader就向Producer发送ACK。

一致性保证

1、Follower副本何时更新LEO

  1. Follower副本的本地LEO何时更新? Follower副本的LEO值就是日志的LEO值,每当新写入一条消息,LEO值就会被更新。当Follower发送FETCH请求后,Leader将数据返回给Follower,此时Follower开始Log写数据,从而自动更新LEO值。
    2. Leader端Follower的LEO何时更新? Leader端的Follower的LEO更新发生在Leader在处理Follower FETCH请求时。一旦Leader接收到Follower发送的FETCH请求,它先从Log中读取相应的数据,给Follower返回数据前,先更新Follower的LEO

2、Follower副本何时更新HW

Follower更新HW发生在其更新LEO之后,一旦Follower向Log写完数据,尝试更新自己的HW值。 比较当前LEO值与FETCH响应中Leader的HW值,取两者的小者作为新的HW值。

3、Leader副本何时更新LEO

和Follower更新LEO相同,Leader写Log时自动更新自己的LEO值。

4、Leader副本何时更新HW值

Leader的HW值就是分区HW值,直接影响分区数据对消费者的可见性 。

Leader如何更新自己的HW值?Leader broker上保存了一套Follower副本的LEO以及自己的LEO。当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(包括Leader的LEO),并选择最小的LEO值作为HW值

HW和LEO正常更新案例(详见讲义-暂不要求)

消息重复及解决

生产者发送重复解决方案

生产发送的消息没有收到正确的broke响应,导致生产者重试。

  1. 要启动kafka的幂等性,设置: enable.idempotence=true ,以及 ack=all 以及 retries > 1 。
  2. ack=0,不重试,可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集。

生产者和broke阶段消息丢失解决方案

  1. 禁用unclean选举,ack=all
  2. 配置:min.insync.replicas > 1
  3. 失败的offset单独记录


消费者数据重复场景及决方案

数据消费完没有及时提交offset到broker。

  1. 取消自动提交
  2. 下游做幂等


延时队列

场景:
两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给follower副本,不过在leader副本一直没有新消息写入的情况下,follower副本会一直发送拉取请求,并且总收到空的拉取结果消耗资源

解决:
Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes,由参数fetch.min.bytes配置,默认值为1)的消息,那么就会创建一个延时拉取操作(DelayedFetch)以等待拉取到足够数量的消息。当延时拉取操作执行时,会再读取一次日志文件,然后将拉取结果返回给follower副本。