Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上是相互独立的。每个主题又可以分为一个或多个分区,每条消息在发送时会根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配一个唯一序号,即偏移量(offset)。

日志文件布局

如果分区规则设置得合理,那么所有的消息可以均匀地分布到不同的分区中,这样就可以实现水平扩展。不考虑多副本的话,一个分区对应一个日志(Log)文件。为了防止单个日志文件过大,Kafka 又将 Log 切分为多个日志分段(LogSegment),相当于一个巨型文件被平均分配为多个相对较小的文件,便于消息维护和清理。

事实上,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应磁盘上的一个日志文件和两个索引文件以及可能的其他文件,比如以 .txnindex 为后缀的事物索引文件。日志文件整体布局如下图所示:
image.png
举个例子,假设有一个名为 demo-topic 的主题,此主题中具有 6 个分区,那么在实际物理存储上表现为如下图所示的这几个文件夹:
image.png
向 Log 中追加消息时是顺序写入的,只有最后一个 LogSegment 才能执行写入操作,在此之前所有的 LogSegment 都不能写入数据。通常我们将最后一个 LogSegment 称为 activeSegment,即表示当前活跃的日志分段。随着消息的不断写入,当 activeSegment 满足一定条件时,就需要创建新的 activeSegment,之后追加的消息将写入新的 activeSegment。

为了便于消息的检索,每个 LogSegment 中的日志文件(以 .log 为文件后缀)都有对应的两个索引文件:

  • 偏移量索引文件(以 .index 为文件后缀)
  • 时间戳索引文件(以 .timeindex 为文件后缀)

每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment 中第一条消息的 offset。偏移量是一个 64 位的长整数型,日志文件和两个索引文件都是根据基准偏移量命名的,名称固定为 20 位数字,没有达到的位数则用 0 填充。比如第一个 LogSegment 的基准偏移量为 0,对应的日志文件布局如下:
image.png
注意每个 LogSegment 中不只包含 .log、.index、.timeindex 这三种文件,还可能包含 .delete.cleaned.swap 等临时文件,以及可能的 .snapshot.txnindexleader-epoch-checkpoint 等文件。其中,.snapshot 是 Kafka 为幂等或事务型 Producer 所做的快照文件。.deleted 是删除日志段操作创建的文件,删除日志时 Broker 会把日志段文件从 .log 后缀修改为 .deleted 后缀,然后在后台异步删除。.cleaned 和 .swap 都是 Compaction 操作的产物。

从更宏观的视角看,Kafka 中的文件不只上面提及的这些。每一个根目录都会包含最基本的 4 个检查点文件(xxx-checkpoint)和 meta.properties 文件。在某一时刻,Kafka 中的文件目录如下图所示:
image.png

日志索引

上面讲到每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。

Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes 指定,默认为 4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。

稀疏索引通过 MappedByteBuffer 将索引文件映射到内存中,以加快索引的查询速度。偏移量索引文件中的偏移量是单调递増的,査询指定偏移量时,使用二分査找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面的一个折中。

1. 索引文件切分

上面讲到日志分段文件达到一定条件时需要进行切分,那么其对应的索引文件也需要进行切分。日志分段文件切分包含以下几个条件,满足其一即可。

  • 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 的值,该参数默认为 1GB。
  • 当前日志分段中消息的最大时间戳与当前时间戳的差值大于 log.roll.hours 参数值,该参数默认为 7 天。
  • 偏移量索引或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes 值,该参数默认为 10MB。
  • 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量(offset - baseoffset > Integer.MAX_VALUE)。

对非活跃的日志分段而言,其对应的索引文件内容已经固定而不需要再写入索引项,所以会被设定为只读。而对当前活跃的日志分段而言,索引文件还会追加更多的索引项,所以被设定为可读写。在索引文件切分时,Kafka 会关闭当前正在写入的索引文件并设置为只读模式,同时以可读写的模式创建新的索引文件,索引文件的大小由 broker 端参数 log.index.size.max.bytes 配置。Kafka 在创建索引文件时会为其预分配该参数对应的空间大小,这一点与日志分段文件不同。只有当索引文件进行切分时,Kafka 才会把该索引文件裁剪到实际的数据大小。

2. 偏移量索引

偏移量索引项的格式如下所示。每个索引项占用 8 个字节,共分为两个部分:

  • relativeOffset:相对偏移量,表示消息相对于 baseOffset 的偏移量,占 4 个字节,当前索引文件的文件名即为 baseOffset 的值。这也是为什么相对偏移量不能超过 Integer.MAX_VALUE 值的原因。

  • position:物理地址,也就是消息在日志分段文件中对应的物理位置,占 4 个字节。

消息的偏移量(offset)占用 8 个字节,也可以称为绝对偏移量。索引项中没有直接使用绝对偏移量而是改为只占用 4 个字节的相对偏移量(relativeOffset=offset-baseOffset),是因为这样可以减小索引文件占用的空间。比如一个日志分段的 baseOffset 为 32,那么其文件名就是 00000000000000000032.log,offset 为 35 的消息在索引文件中的 relativeOffset 的值就为 3,这样 relativeOffset 的值就相对会小,节省空间。

这些索引文件都是以 16 进制保存的,如果我们想观察文件内容可以通过 Kafka 提供的 kafka-dump-log.sh 脚本进行解析,如下示例所示:
image.png
我们举个例子来分析下索引文件是怎么工作的?如下图所示,如果我们要查找偏移量为 23 的消息,首先要通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即 [22,656] 这个索引项,然后从日志分段文件中的物理位置 656 开始顺序查找偏移量为 23 的消息。
image.png
如果有多个日志文件,如下图所示,我们如果想要查找偏移量为 268 的消息,首先肯定是定位到 baseOffset 为 251 的日志分段,然后计算相对偏移量 relativeOffset 为 17,之后再在对应的索引文件中找到不大于 17 的索引项,最后根据索引项中的 position 定位到具体的日志分段文件位置开始查找目标消息。
image.png
这里查找指定的日志分段并不是顺序查找,而是采用了跳跃表的结构。Kafka 的每个日志对象中使用了 ConcurrentSkipListMap 来保存各个日志分段,每个日志分段的 baseOffset 作为 key,这样可以根据指定偏移量来快速定位到消息所在的日志分段。

3. 时间戳索引

时间戳索引项的格式如下所示。每个索引项占用 12 个字节,共分为两个部分。

  • timestamp:当前日志分段最大的时间戳,占 8 个字节。
  • relativeOffset:时间戳所对应的消息的相对偏移量,占 4 个字节。

时间戳索引文件中包含若干时间戳索引项,每个追加的时间戳索引项中的 timestamp 必须大于之前追加的索引项的 timestamp,否则不予追加。如果 broker 端参数 log.message.timestamp.type 设置为 LogAppendTime,那么消息的时间戳必定能够保持单调递增;相反如果是 CreateTime 类型则无法保证,因为如果两个不同时钟的生产者同时往一个分区中插入消息,那么就会造成当前分区的时间戳乱序。

我们已经知道每当写入一定量的消息时,就会在偏移量索引文件和时间戳索引文件中分别增加一个偏移量索引项和时间戳索引项。两个文件增加索引项的操作是同时进行的,但这并不意味着偏移量索引中的 relativeOffset 和时间戳索引项中的 relativeOffset 是同一个值,因为时间戳索引项有可能不会被追加。

那如何根据时间戳索引文件查找指定时间之后的消息呢?具体如下图所示:
image.png
如果要查找指定时间戳 targetTimeStamp = 1526384718288 开始的消息,首先是找到不小于指定时间戳的日志分段。但这里就无法使用跳跃表来快速定位到相应的日志分段了,我们需要将 targetTimeStamp 和每个日志分段中的最大时间戳 largestTimeStamp 逐一对比,直到找到不小于 targetTimeStamp 的 largestTimeStamp 所对应的日志分段。日志分段中的 largestTimeStamp 的计算是先查询该日志分段所对应的时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于 0,则取其值,否则取该日志分段的最近修改时间。

找到相应的时间戳日志分段后,在时间戳索引文件中使用二分查找算法查找到不大于 targetTimeStamp 的最大索引项,即 [1526384718283,28],如此便找到了一个相对偏移量 28。接着,在偏移量索引文件中使用二分法查找到不大于 28 的最大索引项,即 [26,838]。最后,从上面计算得到的日志分段文件中的 838 的物理位置开始查找不小于 targetTimeStamp 的消息。

日志清理

Kafka 将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka 中每一个分区副本都对应一个 Log,而 Log 又可以分为多个日志分段,这样也便于日志的清理操作。Kafka 提供了两种日志清理策略。

  • LogRetention:按照一定的保留策略直接删除不符合条件的日志分段。
  • LogCompaction:对消息 key 进行整合,合并有相同 key 的不同 value 值,只保留最后一个版本。

我们可以通过 broker 端参数 log.cleanup.policy 来设置日志清理策略,默认值为 delete,即采用日志删除的清理策略。如果要采用日志压缩的淸理策略,需要将 log.cleanup.policy 设置为 compact,并且 log.cleaner.enable 参数也要设为 true(默认为 true)。通过将 log.cleanup.policy 参数设置为 “delete,compact”,还可以同时支持日志删除和日志压缩两种策略。日志清理的粒度可以控制到主题级别,比如与 log.cleanup.policy 对应的主题级别的参数为 cleanup.policy。

1. 日志删除

在 Kafka 的日志管理器中有专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过 broker 端参数 log.retention.check.interval.ms 来配置,默认值为 5 分钟。当前日志分段的保留策略有 3 种:基于时间的保留策略、基于日志大小的保留策略和基于日志起始偏移量的保留策略。

1.1 基于时间的保留策略

日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retentionMs)来寻找可删除的日志分段文件集合(deletableSegments),如下图所示。
image.png
其中 retentionMs 可以通过 broker 端参数 log.retention.{hours|minutes|ms} 来配置,这三个选项的优先级从低到高。默认情况下只配置了 log.retention.hours 参数,其值为 168,故默认情况下日志分段文件的保留时间为 7 天。

查找过期的日志分段文件,并不是简单地根据日志分段的 lastModifiedTime 来计算的,而是根据日志分段中最大的 largestTimeStamp 来计算的。因为日志分段的 lastModifiedTime 可以被有意或无意地修改,比如分区副本进行了重新分配,因此 lastModifiedTime 并不能真实地反映出日志分段在磁盘的保留时间。要获取日志分段中最大的 largestTimeStamp 的值,首先要查询该日志分段所对应的时间戳索引文件中的最后一条索引项,若最后一条索引项的时间戳字段值大于 0 则取其值,否则才取 lastModifiedTime。

删除日志分段时,首先会从 Log 对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。然后将日志分段所对应的所有文件(包括对应的索引文件)添加上 .deleted 后缀。最后交由延迟任务来删除这些以 .deleted 为后缀的文件,这个任务的延迟执行时间可通过 file.delete.delay.ms 参数来配置,默认为 1 分钟。

1.2 基于日志大小的保留策略

日志删除任务会检查当前日志文件的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments),如下图所示。
image.png
其中的 retentionSize 可以通过 broker 端参数 log.retention.bytes 来配置,默认值为 -1,表示无穷大。注意这个参数配置的是 Log 中所有日志文件的总大小,而不是单个日志分段的大小。单个日志分段的大小由 broker 端参数 log.segment.bytes 来限制,默认值为 1GB。

基于日志大小的保留策略与基于时间的保留策略类似,首先计算日志文件的总大小和 retentionSize 的差值,即计算需要删除的日志总大小,然后从日志文件中的第一个日志分段开始查找可删除的日志分段文件集合。查出后就执行删除操作,这个删除操作和基于时间的保留策略的删除操作相同。

2. 日志压缩

Kafka 中的 Log Compaction 是指在默认的日志删除(LogRetention)规则之外提供的一种清理过时数据的方式。Log Compaction 对于有相同 key 的不同 value 值,只保留最后一个版本。如果应用只关心 key 对应的最新 vaue 值,则可以开启 Kafka 的日志清理功能,Kafka 会定期将相同 key 的消息进行合并,只保留最新的 value 值。具体如下图所示:
image.png

很多资料会把 Log Compaction 翻译为日志压缩,但这种叫法容易让人和消息压缩产生关联,其实这是两个不同的概念,英文 Compaction 有压紧、压实的意思,注意两者的区分。

Log Compaction 执行前后,日志分段中的每条消息的偏移量和写入时的偏移量保持一致。Log Compaction 会生成新的日志分段文件,日志分段中每条消息的物理位置会重新按照新文件来组织。Log Compaction 执行过后的偏移量不再是连续的,不过这并不影响日志的查询。

Kafka 中的 Log Compaction 可以类比于 Redis 中的 RDB 的持久化模式。系统异常崩溃恢复时通过读取 Kafka 中的消息来恢复其应有的状态,那么此系统关心的是它原本的最新状态而不是历史时刻中的每一个状态。如果 Kafka 的日志保存策略是 Log Deletion,那么系统势必要一股脑地读取 Kafka 中的所有数据来进行恢复,如果日志保存策略是 Log Compaction,那么可以减少数据的加载量进而加快系统的恢复速度。

2.1 清理检查点文件

每个分区日志目录下都有一个 cleaner-offset-checkpoint 文件,这个文件就是清理检查点文件,用来记录每个主题的每个分区中已清理的偏移量。通过清理检查点文件可将 Log 分为两部分,如下图所示。通过检查点 cleaner checkpoint 来划分出一个已经清理过的 clean 部分和一个还未清理过的 dirty 部分。在日志清理的同时,客户端也可以读取日志中的消息。diry 部分的消息偏移量是逐一递增的,而 clean 部分的消息偏移量是断续的。
image.png
注意 Log Compaction 是针对 key 的,所以在使用时应注意每个消息的 key 值不为 null。每个 broker 会启动log.cleaner.thread(默认值为 1)个日志清理线程负责执行清理任务,这些线程会选择 “污浊率” 最高的日志文件进行清理。用 cleanBytes 表示 clean 部分的日志占用大小,dirtyBytes 表示 dirty 部分的日志占用大小,那么这个日志的污浊率(dirtyRatio)为:

  1. dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes)

为了防止日志不必要的频繁清理操作,Kafka 还使用了参数 log.cleaner.min.cleanable.ratio(默认值为 0.5)来限定可进行清理操作的最小污浊率。Kafka 中用于保存消费者消费位移的主题 __consumer_offsets 使用的就是 Log Compaction 策略。

2.2 消息 key 的筛选

根据分区日志文件的污浊率选择了合适的待压缩日志文件后,我们还需要对该日志文件中消息的 key 进行筛选操作。这一过程 Kafka 会用一个名为 SkimpyoffsetMap 的对象来构建起 key 与 offset 映射关系的哈希表。
image.png
在日志清理过程中需要遍历两次日志文件,第一次遍历会把每个 key 的哈希值和最后出现的 offset 都保存在 SkimpyOffsetMap 中,映射模型如上图所示。第二次遍历会将日志文件中的每个消息的偏移量与 SkimpyOffsetMap 中保留的最大的偏移量进行比较,看是否符合保留条件,如果不符合保留条件则会被清理。假设一条消息的 offset 为 O1,这条消息的 key 在 SkimpyoffsetMap 中对应的 offset 为 O2,如果 O1 大于等于 O2 即满足保留条件。

2.3 日志分段分组

Log Compaction 执行过后的日志分段的大小会比原先的日志分段的要小,为了防止出现太多的小文件,Kafka 在实际清理过程中并不对单个的日志分段进行单独清理,而是将日志文件中 offset 从 0 至 firstUncleanableOffset 的所有日志分段进行分组,每个日志分段只属于一组。同一个组的多个日志分段清理过后,只会生成一个新的日志分段。

如下图所示,假设所有的参数配置都为默认值,在 Log Compaction 之前 checkpoint 的初始值为 0。执行第一次 Log Compaction 之后,每个非活跃的日志分段的大小都有所缩减,checkpoint 的值也有所变化。执行第二次 Log Compaction 时会组队成 [0.4GB,0.4GB]、[0.3GB,0.7GB]、[0.3GB]、[1GB] 这 4 个分组,并且从第二次 Log Compaction 开始还会涉及墓碑消息的清除。第三次 Log Compaction 同理。
image.png
Log Compaction 过程中会将每个日志分组中需要保留的消息复制到一个以 .clean 为后缀的临时文件中,此临时文件以当前日志分组中第一个日志分段的文件名命名,例如 00000000000000000000.log.clean。经过 Log Compaction 过后将 .clean 的文件修改为 .swap 后缀的文件,例如 00000000000000000000.log.swap。然后删除原本的日志文件,最后才把文件的 .swap 后缀去掉。整个过程中的索引文件的变换也是如此,至此一个完整 Log Compaction 操作才算完成。