初识Kafka

分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。

三大角色:消息系统、存储系统、流式处理平台。

基本概念

体系架构

一个典型的Kafka体系架构包括若干 Producer、若干 Broker、若干Consumer,以及一个ZooKeeper集群。

1.png

主题和分区

生产者将消息写入主题,消费者从主题中消费消息。一个主题可以有多个分区,每个分区包含的消息是不同的。同一主题的不同分区可以分布在不同的broker上。分区在存储层可以看成一个可顺序追加的日志文件。

2.png

分区多副本机制

一个分区包含多个副本,提升容灾能力。Leader副本负责读写,Follower副本只负责从Leader副本同步消息。当Leader副本出现故障时,Follower副本中的一个会被选举成Leader副本。

3.png

消费者位移

消费者会记录分区的消费位置,重启后可以从之前记录的消费位置开始消费。

ISR集合

分区中的所有副本统称为AR(Assigned Replicas)。所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas)。“一定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。

HW和LEO

HW是High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。

LEO是Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offset。分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW。

4.png

重要的服务端参数

参数 说明
zookeeper.connect broker要连接的ZooKeeper集群的服务地址(包含端口号)。
listeners broker监听客户端连接的地址列表。
broker.id Kafka集群中broker的唯一标识,默认值为-1。
log.dir和log.dirs Kafka 把所有的消息都保存在磁盘上,而这两个参数用来配置 Kafka 日志文件存放的根目录。
message.max.bytes 该参数用来指定broker所能接收消息的最大值,默认值为1000012(B),约等于976.6KB。

生产者

客户端开发

发送消息的三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)。发后即忘不能处理发送失败的情况,可靠性最差。

发送消息时可以通过分区器控制消息发到哪个分区。

原理分析

整体架构

5.png

RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个String 类型,表示节点的 id 编号)。

元数据的更新

通过InFlightRequests可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。

Kafka集群的元数据记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。

Kafka客户端需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。

重要的生产者参数

参数 说明
acks 指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。
max.request.size 限制生产者客户端能发送的消息的最大值,默认值为 1048576B,即1MB。
retries 配置生产者重试的次数,默认值为0。
retry.backoff.ms 设定两次重试之间的时间间隔,默认值为100。
compression.type 指定消息的压缩方式,默认值为“none”。
linger.ms 指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入ProducerBatch 的时间,默认值为 0。

消费者

消费者与消费组

一个消费组可以包含多个消费者。Kafka以消费组为单位订阅主题,主题下的分区会分配给消费组下的消费者,一个分区只能被一个消费者消费。

6.png

一个消费组中的消费者增加或减少,会触发消费者再均衡,所有分区会重新分配给消费者。

消费位移提交

消费位移存储在Kafka内部的主题__consumer_offsets中。

消费位移可以同步提交,也可以异步提交。可以自动提交,也可以手动提交。

消息异步消费,位移提交时机,都可能会造成重复消费和消息丢失的现象。

兼顾性能和可靠性的位移提交方式:

如果消费者正常退出或发生再均衡的情况,那么可以在此之前使用同步提交的方式做最后的把关。

7.png

主题和分区

主题的管理

主题的管理可以通过kafka-topics.sh 脚本、KafkaAdminClient API、直接操纵日志文件和ZooKeeper节点来实现。

Kafka支持增加分区,但是增加分区会影响消息的写入分区,从而影响消息的消费顺序,最好在一开始就设置好分区数量。Kafka不支持减少分区,如果需要减少,需要新建一个有更少分区的主题,把数据复制过去。

分区的管理

优先副本的选举

在创建主题的时候,该主题的分区及副本会尽可能均匀地分布到 Kafka 集群的各个broker节点上,对应的leader副本的分配也比较均匀。

随着时间的更替,Kafka 集群的broker 节点不可避免地会遇到宕机或崩溃的问题,当分区的leader节点发生故障时,其中一个follower节点就会成为新的leader节点,这样就会导致集群的负载不均衡,从而影响整体的健壮性和稳定性。

为了能够有效地治理负载失衡的情况,Kafka引入了优先副本(preferredreplica)的概念。所谓的优先副本是指在 AR 集合列表中的第一个副本。Kafka要确保所有主题的优先副本在Kafka集群中均匀分布,这样就保证了所有分区的leader均衡分布。

所谓的优先副本的选举是指通过一定的方式促使优先副本选举为leader副本。

在 Kafka 中可以提供分区自动平衡的功能,但是在生产环境中不建议开启,为这可能引起负面的性能问题,也有可能引起客户端一定时间的阻塞。优先副本的选举时间需要人工控制。

分区重分配

当集群中新增broker节点时,只有新创建的主题分区才有可能被分配到这个节点上,而之前的主题分区并不会自动分配到新加入的节点中,因为在它们被创建时还没有这个新节点,这样新节点的负载和原先节点的负载之间严重不均衡。

为了解决上述问题,需要让分区副本再次进行合理的分配,也就是所谓的分区重分配。Kafka提供了 kafka-reassign-partitions.sh 脚本来执行分区重分配的工作,它可以在集群扩容、broker节点失效的场景下对分区进行迁移。

修改副本因子

修改副本因子的功能也是通过重分配所使用的 kafka-reassign-partition.sh 脚本实现的。重分配的方案可以通过scale代码生成。

如何选择合适的分区数

性能测试工具

通过生产者性能测试脚本 kafka-producer-perf-test.sh,和消费者性能测试脚本kafka-consumer-perf-test.sh,测试相同硬件不同分区数量的实际性能表现。

分区数越多吞吐量就越高吗

测试环境为一个由3台普通云主机组成的3节点的Kafka集群,每台云主机的内存大小为8GB、磁盘大小为40GB、4核CPU的主频为2600MHz。JVM版本为1.8.0_112,Linux系统版本为2.6.32-504.23.4.el6.x86_64。

创建分区数为1、20、50、100、200、500、1000的主题,对应的主题名称分别为topic-1、topic-20、topic-50、topic-100、topic-200、topic-500、topic-1000,所有主题的副本因子都设置为1。

使用kafka-producer-perf-test.sh脚本分别向这些主题中发送100万条消息体大小为1KB的消息。测试结果如下:

8.png

分区数的上限

增加一个分区,对应会增加一个文件描述符,而操作系统的文件描述符是有上限的。

考量因素

从吞吐量方面考虑,增加合适的分区数可以在一定程度上提升整体吞吐量,但超过对应的阈值之后吞吐量不升反降。

分区数的多少会影响系统的可用性。如果分区数很多,集群中的某个broker节点宕机,那么就会有大量的分区需要同时进行leader角色切换,这个切换的过程会耗费一笔可观的时间,并且在这个时间窗口内这些分区也会变得不可用。

分区数越多也会让Kafka的正常启动和关闭的耗时变得越长,与此同时,主题的分区数越多不仅会增加日志清理的耗时,而且在被删除时也会耗费更多的时间。

如果一定要给一个准则,则建议将分区数设定为集群中broker的倍数,即假定集群中有3个broker节点,可以设定分区数为3、6、9等,至于倍数的选定可以参考预估的吞吐量。不过,如果集群中的broker 节点数有很多,比如大几十或上百、上千,那么这种准则也不太适用。

日志存储

文件目录布局

不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。

9.png

为了便于消息的检索,每个LogSegment中的日志文件(以“.log”为文件后缀)都有对应的两个索引文件:偏移量索引文件(以“.index”为文件后缀)和时间戳索引文件(以“.timeindex”为文件后缀)。每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment中第一条消息的offset。偏移量是一个64位的长整型数,日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为20位数字,没有达到的位数则用0填充。

10.png

日志索引

每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。

Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes指定,默认值为4096,即4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。

日志分段文件达到一定的条件时需要进行切分,那么其对应的索引文件也需要进行切分。

日志清理

Kafka提供了两种日志清理策略:

(1)日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。保留策略有三种,基于时间的保留策略、基于日志大小的保留策略和基于日志起始偏移量的保留策略。

(2)日志压缩(Log Compaction):针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。

磁盘存储

Kafka 依赖于文件系统(更底层地来说就是磁盘)来存储和缓存消息。

有关测试结果表明,一个由6块7200r/min的RAID-5阵列组成的磁盘簇的线性(顺序)写入速度可以达到600MB/s,而随机写入速度只有100KB/s,两者性能相差6000倍。操作系统可以针对线性读写做深层次的优化,比如预读(read-ahead,提前将一个比较大的磁盘块读入内存)和后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术。顺序写盘的速度不仅比随机写盘的速度快,而且也比随机写内存的速度快。

11.png

Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作,所以就算 Kafka使用磁盘作为存储介质,它所能承载的吞吐量也不容小觑。

Kafka在性能上具备足够竞争力的其他因素:页缓存、磁盘I/O流程、零拷贝。

深入服务端

时间轮

Kafka中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。Kafka基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer),将插入和删除操作的时间复杂度都降为O(1)。

Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

12.png

当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。这就是Kafka层级时间轮的概念。

13.png

在Kafka中怎么推进时间呢?Kafka中的定时器借了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList都加入DelayQueue,是TimerTaskList,不是TimerTaskEntry,通过ExpiredOperationReaper线程获取DelayQueue 中到期的任务列表。

以上,Kafka 中的 TimingWheel 专门用来执行插入和删除TimerTaskEntry的操作,而 DelayQueue 专门负责时间推进的任务。

延时操作

使用生产者客户端发送消息的时候将 acks 参数设置为-1,需要等待ISR集合中的所有副本都确认收到消息之后才能正确地收到响应的结果,或者捕获超时异常。返回响应结果给客户端的动作是由谁来执行的呢?在将消息写入 leader 副本的本地日志文件之后,Kafka会创建一个延时的生产操作(DelayedProduce),用来处理消息正常写入所有副本或超时的情况,以返回相应的响应结果给客户端。

延时操作创建之后会被加入延时操作管理器(DelayedOperationPurgatory)来做专门的处理。延时操作有可能会超时,每个延时操作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。延时操作还需要支持外部事件的触发。

深入客户端

分区分配策略

Kafka设置消费者与订阅主题之间的分区分配策略有三种:RangeAssignor、RoundRobinAssignor、StickyAssignor。

StickyAssignor主要有两个目的:(1)分区的分配要尽可能均匀。(2)分区的分配尽可能与上次分配的保持相同。

消费者协调器和组协调器

如果多个消费者设置了不同的分区分配策略,应该以哪个为准?这就需要消费者协调器(ConsumerCoordinator)和组协调器(GroupCoordinator)。

消费者再均衡原理

  • 第一阶段(FIND_COORDINATOR)
    消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker,并创建与该broker相互通信的网络连接。通过groupId确定所属__consumer_offsets的分区,确定该分区leader副本所在的broker,这个broker就是GroupCoordinator所在的broker。
  • 第二阶段(JOIN_GROUP)
    在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。
    GroupCoordinator会为消费组内的消费者选举出一个消费组的leader,消费者会投票选出分区分配策略,leader消费者负责实施具体的分区分配。
  • 第三阶段(SYNC_GROUP)
    leader 消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者,此时leader消费者并不是直接和其余的普通消费者同步分配方案,而是通过 GroupCoordinator 这个“中间人”来负责转发同步分配方案的。在第三阶段,也就是同步阶段,各个消费者会向GroupCoordinator发送SyncGroupRequest请求来同步分配方案。
  • 第四阶段(HEARTBEAT)
    进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。在正式消费之前,消费者还需要确定拉取消息的起始位置。

事务

幂等:Kafka的幂等只能保证单个生产者会话(session)中单分区的幂等。生产者投递一个消息,只会被写入一次。但是生产者投递两次内容相同的消息,会被写入两次。Kafka 并不会保证消息内容的幂等。

事务:Kafka的事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能。

可靠性探究

本章从副本的角度切入来深挖Kafka中的数据一致性、数据可靠性等问题,主要包括副本剖析、日志同步机制和可靠性分析等内容。

副本剖析

失效副本

正常情况下,分区的所有副本都处于ISR集合中,但是难免会有异常情况发生,从而某些副本被剥离出ISR集合中。在ISR集合之外,也就是处于同步失效或功能失效(比如副本处于非存活状态)的副本统称为失效副本,失效副本对应的分区也就称为同步失效分区,即under-replicated分区。

怎么判定一个分区是否有副本处于同步失效的状态呢?Kafka从0.9.x版本开始就通过唯一的broker端参数replica.lag.time.max.ms来抉择,当ISR集合中的一个follower副本滞后leader副本的时间超过此参数指定的值时则判定为同步失败,需要将此follower副本剔除出ISR集合。replica.lag.time.max.ms参数的默认值为10000。

Kafka又是何时扩充ISR的呢?随着follower副本不断与leader副本进行消息同步,follower副本的LEO也会逐渐后移,并最终追赶上leader副本,此时该follower副本就有资格进入ISR集合。追赶上leader副本的判定准则是此副本的LEO是否不小于leader副本的HW。

LEO与HW

生产者一直在往leader副本(带阴影的方框)中写入消息。某一时刻,leader副本的LEO增加至5,并且所有副本的HW还都为0。

14.png

之后follower副本(不带阴影的方框)向leader副本拉取消息,在拉取的请求中会带有自身的LEO信息,这个LEO信息对应的是FetchRequest请求中的fetch_offset。leader副本返回给follower副本相应的消息,并且还带有自身的HW信息,这个HW信息对应的是FetchResponse中的high_watermark。

15.png

此时两个follower副本各自拉取到了消息,并更新各自的LEO为3和4。与此同时,follower副本还会更新自己的HW,更新HW的算法是比较当前LEO和leader副本中传送过来的HW的值,取较小值作为自己的HW值。当前两个follower副本的HW都等于0(min(0,0)=0)。接下来follower副本再次请求拉取leader副本中的消息。

16.png

此时leader副本收到来自follower副本的FetchRequest请求,其中带有LEO的相关信息,选取其中的最小值作为新的HW,即min(15,3,4)=3。然后连同消息和HW一起返回FetchResponse给follower副本。

17.png

Leader Epoch

如果leader副本发生切换,Leader Epoch可以保证数据不丢失和数据一致性。

数据丢失的场景:

18.png

leader副本B将HW同步给副本A之前,副本A和副本B同时宕机,副本A先恢复,成为leader副本,根据HW进行日志截断,会丢掉消息m2。

数据不一致的场景:

19.png

副本A和副本B同时宕机,副本B先恢复,成为leader副本。副本A根据HW进行日志截断,保留消息m2,此时副本A和副本B出现了消息不一致。

为了解决上述两种问题,Kafka从0.11.0.0开始引入了leader epoch的概念,在需要截断数据的时候使用leader epoch作为参考依据而不是原本的HW。leaderepoch代表leader的纪元信息(epoch),初始值为0。每当leader变更一次,leader epoch的值就会加1,相当于为leader增设了一个版本号。与此同时,每个副本中还会增设一个矢量<LeaderEpoch=>StartOffset>,其中StartOffset表示当前LeaderEpoch下写入的第一条消息的偏移量。

日志同步机制

在分布式系统中,日志同步机制既要保证数据的一致性,也要保证数据的顺序性。最简单高效的方式是从集群中选出一个leader来负责处理数据写入的顺序性,follower按照leader中的写入顺序来进行同步即可。

日志同步机制的一个基本原则就是:如果告知客户端已经成功提交了某条消息,那么即使 leader宕机,也要保证新选举出来的leader中能够包含这条消息。这里就有一个需要权衡(tradeoff)的地方,如果leader在消息被提交前需要等待更多的follower确认,那么在它宕机之后就可以有更多的follower替代它,不过这也会造成性能的下降。

对于这种tradeoff,一种常见的做法是“少数服从多数”,它可以用来负责提交决策和选举决策。在这种方式下,如果我们有2f+1个副本,那么在提交之前必须保证有f+1个副本同步完消息。它的劣势是,为了保证leader选举的正常进行,它所能容忍的失败follower数比较少。

在Kafka中动态维护着一个ISR集合,处于ISR集合内的节点保持与leader相同的高水位(HW),只有位列其中的副本(unclean.leader.election.enable配置为false)才有资格被选为新的 leader。写入消息时只有等到所有 ISR 集合中的副本都确认收到之后才能被认为已经提交。

在采用ISR模型和(2f+1)个副本数的配置下,一个Kafka分区能够容忍最大2f个节点失败,而“少数服从多数”的方式只能容忍f个节点失败。

可靠性

可靠性一般靠几个9来衡量。影响可靠性的因素包括:

  1. 副本数。
  2. 生产者客户端参数acks。
  3. 消息发送的模式,即发后即忘、同步和异步。
  4. 重试参数retries。
  5. ISR集合最小数量min.insync.replicas。
  6. 同步刷盘的策略参数log.flush.interval.messages和log.flush.interval.ms。
  7. 消费者提交位移的方式。
  8. 死信队列。
  9. 回溯消费的支持。

Kafka监控

消费滞后

消息堆积是消费滞后(Lag)的一种表现形式,消息中间件中留存的消息与消费的消息之间的差值即为消息堆积量,也称为消费滞后(Lag)量。对Kafka的使用者而言,消费Lag是他们非常关心的一个指标。

对每一个分区而言,它的 Lag 等于 HW-ConsumerOffset 的值,其中ConsumerOffset表示当前的消费位移。

要计算 Lag,首先得获取 ConsumerOffset 和 HW 的值,ConsumerOffset 保存在内部主题__consumer_offsets 中,HW 又时刻在变化,那么这两个变量该如何获取呢?可以参考kafka-consumer-groups.sh脚本的用法,这个脚本可以让我们很方便地查看消费组内每个分区所对应的Lag。

同步失效分区

消费Lag是Kafka的普通使用者特别关心的一项指标,而同步失效分区(under-replicated)的多少是Kafka运维人员非常关心的一项指标。通常情况下,在一个运行状况良好的Kafka集群中,失效分区的个数应该为0。

如果集群中有多个broker的UnderReplicatedPartitions保持一个大于0的稳定值,则一般暗示集群中有broker已经处于下线状态。

如果集群中存在broker的UnderReplicatedPartitions频繁变动,或者处于一个稳定的大于0的值(这里特指没有broker下线的情况)时,一般暗示集群出现了性能问题,通常这类问题很难诊断,不过我们可以一步将问题的范围缩小,比如先尝试确定这个性能问题是否只存在于集群的某个broker中,还是整个集群之上。

集群层面的问题一般也就是两个方面:资源瓶颈和负载不均衡。

监控模块

整个监控系统的模型架构:

20.png

Kafka应用生态:

21.png