副本(Replica)是分布式系统中常见的概念之一,指的是分布式系统对数据和服务提供的一种冗余方式。在常见的分布式系统中,为了对外提供可用的服务,我们往往会对数据和服务进行副本处理。数据副本是指在不同的节点上持久化同一份数据,当某一个节点上存储的数据丢失时,可以从副本上读取该数据,这是解决分布式系统数据丢失问题最有效的手段。另一类副本是服务副本,指多个节点提供同样的服务,每个节点都有能力接收来自外部的请求并进行相应的处理。

多副本机制

Kafka 从 0.8 版本开始为分区引入了多副本机制,通过增加副本数量来提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是一主多从的关系,其中 leader 副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步。同时,Kafka 通过多副本机制实现故障自动转移,在 Kafka 集群中某个 broker 节点失效的情况下仍然保证服务可用。
image.png
当 leader 副本挂掉或者 leader 副本所在的 broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 leader 副本重启回来后,只能作为追随者副本加入到集群中。

ISR 集合

下图展示的是一个有 3 台 broker 的 Kafka 集群上的副本分布情况。从图中可以看到,主题 1 分区 0 的 3 个副本分散在 3 台 Broker 上,其他主题分区的副本也都散落在不同的 Broker 上,从而实现数据冗余。
image.png
分区中的所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成 ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。消息会先发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步,同步期间内 follower 副本相对于 leader 副本而言会有一定程度的滞后。与 leader 副本同步滞后过多的副本组成 OSR(Out-of-Sync Replicas),由此可见,AR = ISR + OSR。正常情况下,所有 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR = ISR,OSR 集合为空。

1. 失效副本

在 ISR 集合之外,也就是处于同步失效或功能失效(比如副本处于非存活状态)的副本统称为失效副本。leader 副本负责维护和跟踪 ISR 集合中所有 follower 副本的滞后状态,当 follower 副本落后太多时,leader 副本会把它从 ISR 集合中剔除。broker 端提供了 replica.lag.time.max.ms 参数,默认值为 30 秒,这个参数表示 follower 副本能够落后 leader 副本的最长时间间隔,如果 follower 副本异步拉取消息的速度持续的慢于 leader 副本的消息写入速度,那么在 30 秒后,此 follower 副本就被认为是与 leader 副本不同步的,此时 Kafka 会自动收缩 ISR 集合,将该副本踢出。

具体实现原理为:当 follower 副本将 leader 副本 LEO(LogEndOffset)之前的日志全部同步时,则认为该 follower 副本已经追赶上 leader 副本,此时更新该副本的 lastCaughtUpTimeMs 标识。Kafka 的副本管理器会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的 lastCaughtUpTimeMs 差值是否大于参数 replica.lag.time.max.ms 指定的值。千万不要错误地认为 follower 副本只要拉取 leader 副本的数据就会更新 lastCaughtUpTimeMs。试想一下,当 leader 副本中消息的流入速度大于 follower 副本中拉取的速度时,就算 follower 副本一直不断地拉取 leader 副本的消息也不能与leader 副本同步。如果此时还将该 follower 副本置于 ISR 集合中,那么当 leader 副本下线而选取此 follower 副本为新的 leader 副本时就会造成消息丢失。

Kafka 源码注释中说明了一般有两种情况会导致副本失效:

  • follower 副本进程卡住,在一段时间内根本没有向 leader 副本发起同步请求,比如频繁 Full GC。
  • follower 副本进程同步过慢,在一段时间内都无法追赶上 leader 副本,比如 I/O 开销过大。

这里再补充一点,如果通过工具增加了副本因子,那么新增加的副本在赶上 leader 副本之前也都是处于失效状态的。如果一个 follower 副本由于某些原因(比如异常宕机)而下线,之后又上线,在追赶上 leader 副本之前也处于失效状态。

值得注意的是,如果 OSR 集合中有 follower 副本追上了 leader 副本,那么 leader 副本会把它从 OSR 集合转移至 ISR 集合,这也表明了 ISR 是一个动态调整的集合。追赶上 leader 副本的判定准则是此副本的 LEO 是否不小于 leader 副本的 HW,注意这里并不是和 leader 副本的 LEO 相比。

2. Unclean 领导者选举

默认情况下,当 leader 副本发生故障时,只有在 ISR 集合中的副本才有资格被选举为新的 leader 副本,而在 OSR 集合中的副本则没有任何机会。但是 ISR 有可能出现为空的情况,比如 Leader 副本也挂掉了,Kafka 需要重新选举一个新的 Leader。可是当前 ISR 集合是空的,如何选举新 Leader 呢?

Kafka 把所有不在 ISR 中的存活副本都称为失效副本。通常失效副本落后 Leader 是很多的,因此如果选择这些副本作为新 Leader 的话就可能出现数据丢失。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。broker 端提供了 unclean.leader.election.enable 参数控制是否允许 Unclean 领导者选举,默认为 false。

开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。

我们可以根据实际业务场景决定是否开启 Unclean 领导者选举。不过建议你不要开启,毕竟我们还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善而牺牲了数据一致性,就非常不值了。

高水位机制

ISR 与 HW 和 LEO 也有紧密的关系。HW 是 High Watermark 的缩写,俗称高水位,它标识了一个特定的消息偏移量,消费者只能拉取到这个 offset 之前的消息。
image.png
假设上图是某个分区 Leader 副本的高水位图。在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息消费者只能消费已提交消息,即图中位移小于 8 的所有消息。这里我们不讨论 Kafka 事务,因为事务机制会影响消费者所能看到的消息的范围,它不只是简单依赖高水位来判断的。

日志末端位移(Log End Offset)的简写是 LEO,它表示副本写入下一条消息的位移值,LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加 1。分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费分区 HW 之前的消息。注意,数字 15 所在的方框是虚线,即下一条新消息的位移是 15。显然,介于高水位和 LEO 之间的消息就属于未提交消息。这也从侧面反映了:同一个副本对象,其高水位值不会大于 LEO 值。

高水位和 LEO 是副本对象的两个重要属性。Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位

1. HW 更新机制

对于副本而言,还有两个概念:本地副本(Local Replica)和远程副本(Remote Replica),本地副本是指对应的 Log 分配在当前的 broker 节点上,远程副本是指对应的 Log 分配在其他的 broker 节点上。在 Kafka 中,同一个分区的信息会存在多个 broker 节点上,并被其上的副本管理器所管理,这样在逻辑层面每个 broker 节点上的分区就有了多个副本,但是只有本地副本才有对应的日志。

在一个分区中,Leader 副本所在的节点会记录所有副本的 LEO,而 Follower 副本所在的节点只会记录自身的LEO,而不会记录其他副本的 LEO。对 HW 而言,各个副本所在的节点都只记录它自身的 HW。如下所示:
image.png
为什么要在 broker 0 上保存这些远程副本的 LEO 值呢?其实它们的主要作用是:帮助 Leader 副本确定其高水位值,也就是分区高水位。下图展示了这些值被更新的时机:
image.png
下面,我们分别从 Leader 副本和 Follower 副本两个维度,来总结一下高水位和 LEO 的更新机制。当 Leader 副本处理生产者请求时,在将消息写入本地磁盘后,会获取 Leader 副本所在 Broker 端保存的所有远程副本的 LEO 值(LEO1、LEO2 …… LEOn),然后与 Leader 副本当前的高水位值比较更新:

  1. currentHW = max{currentHW, min(LEO1LEO2 …… LEOn)}

当 Leader 副本处理 Follower 副本拉取消息的请求时,会使用 Follower 副本发送拉取请求中的 fetchOffset 值更新远程副本 LEO 值,更新分区高水位值的步骤与处理生产者请求的步骤相同。而 Follower 副本从 Leader 副本拉取到消息后,会更新自己的 LEO 值,并根据 Leader 副本返回的高水位值与更新后的 LEO 值比较更新:

  1. currentHW = min(currentHWcurrentLEO)

2. 副本同步机制

下面举例说明一下 Kafka 副本同步的全流程,该例子使用一个单分区且有两个副本的主题。当生产者发送一条消息时,Leader 和 Follower 副本对应的高水位是怎么被更新的呢?

首先是初始状态,下图中的 remote LEO 就是远程副本的 LEO 值。在初始状态时所有值都是 0。
image.png
当生产者给主题分区发送一条消息后,状态变更为:
image.png
此时 Leader 副本成功将消息写入了本地磁盘,故 LEO 值被更新为 1。Follower 副本再次尝试从 Leader 副本拉取消息。和之前不同的是,这次有消息可以拉取了,Follower 副本在拉取的请求中会带有自身的 LEO 信息,即 fetchOffset,然后 Leader 副本返回给 Follower 副本相应的消息,并携带自身 HW 信息:
image.png
此时 Follower 副本拉取到了消息,并成功更新自己的 LEO 为 1。与此同时,Follower 副本还会更新自己的 HW 值,更新 HW 的算法是比较当前 LEO 和 Leader 副本中传送过来的 HW 值,取较小值作为自己的 HW 值。当前 Follower 副本的 HW 等于0,因为 min(0,1)=0。

此时,Leader 和 Follower 副本的 LEO 都是 1,但各自的高水位依然是 0,还没有被更新。它们需要在下一轮的拉取中被更新,接下来 Follower 副本再次请求拉取 Leader 副本中的消息,如下图所示:
image.png
由于位移值是 0 的消息已经拉取成功,因此 Follower 副本这次请求拉取的是位移值等于 1 的消息。Leader 副本收到来自 Follower 副本的拉取请求,其中带有 LEO 的相关信息,Leader 副本会选取多个副本的拉取请求中最小的 fetchOffset 值作为新的 HW 值,然后更新对应远程副本的 Remote LEO 值为 1。最后连同消息和已更新过的 HW 值一起返回给 Follower 副本。Follower 副本接收到以后,比较当前 LEO 与接收到的 HW 值后也将自己的高水位值更新成 1。至此,一次完整的消息同步周期就结束了。

3. LEO 与 HW 的维护

在 Kafka 的根目录下有 cleaner-offset-checkpoint、log-start-offset-checkpoint、recovery-point-offset-checkpoint 和 replication-offset-checkpoint 四个检查点文件,其中 cleaner-offset-checkpoint 用于开启日志压缩时记录每个主题的每个分区中已清理的偏移量。
image.png
而 recovery-point-offset-checkpoint 和 replication-offset-checkpoint 这两个文件则分别保存了分区的 LEO 和 HW。Kafka 中会有一个定时任务负责将所有分区的 LEO 刷写到恢复点文件 recovery-point­-offset-checkpoint 中,定时周期由 broker 端参数 log.flush.offset.checkpoint.interval.ms 配置,默认 1 分钟。还有一个定时任务负责将所有分区的 HW 刷写到复制点文件 replication-offset-checkpoint 中,定时周期由 broker 端参数 replica.high.watermark.checkpoint.interval.ms 来配置,默认 5 秒。

log-start-offset-checkpoint 文件则用来保存分区的 logStartOffset 值,它用来标识日志的起始偏移量。各个副本在变动 LEO 和 HW 的过程中,logStartOffset 也有可能随之而动。Kafka 也有一个定时任务来负责将所有分区的 logStartOffset 刷写到起始点文件 log-start-offset-checkpoint 中,定时周期由 broker 端参数 log.flush.start.offset.checkpoint.interval.ms 来配置,默认 1 分钟。

Leader Epoch

依托于高水位机制,Kafka 既界定了消息的对外可见性,又实现了异步的副本同步机制。不过,我们还是要思考一下这里面存在的问题。从刚才的分析中可以看到,Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。因此 Leader 副本高水位值的更新和 Follower 副本高水位值的更新在时间上是错开的。而这种错配正是很多数据丢失或数据不一致问题的根源。

1. HW 机制缺陷

下图展示了单纯依靠高水位机制是怎么产生数据丢失的:
image.png
开始时,Leader 副本 A 和 Follower 副本 B 都处于正常状态。某个使用了默认 acks 设置的生产者程序向 A 发送了两条消息,A 全部写入成功,此时 Kafka 会通知生产者说两条消息全部发送成功。

现在我们假设 Leader 和 Follower 都写入了这两条消息,而且 Leader 副本的高水位也已经更新了,但 Follower 副本高水位还未更新——这是可能出现的,因为 Follower 端高水位的更新与 Leader 端有时间错配。若此时副本 B 所在的 Broker 宕机,当它重启回来后,副本 B 会执行日志截断操作,将 LEO 值调整为之前的高水位值,也就是 1。即位移值为 1 的那条消息就被副本 B 从磁盘中删除了,此时副本 B 的底层磁盘文件中只保存有 1 条消息,即位移值为 0 的那条消息。

当执行完截断操作后,副本 B 开始正常从 A 拉取消息。如果此时副本 A 所在的 Broker 也宕机了,那么 Kafka 就只能让副本 B 成为新的 Leader,当 A 重启回来后就成为了 Follower,由于 Follower 副本的 HW 不能比 Leader 副本的 HW 高,所以 A 也需要执行相同的日志截断操作,即将高水位调整为与 B 相同的值,也就是 1。这样操作之后,位移值为 1 的那条消息就从这两个副本中被永远地抹掉了。

严格来说,这个场景发生的前提是 Broker 端参数 min.insync.replicas 设置为 1。此时,一旦消息被写入到 Leader 副本的磁盘,就会被认为是已提交状态。而 Follower 端的高水位更新是有滞后的。如果在这个短暂的滞后时间窗口内,接连发生 Broker 宕机,那么这类数据的丢失就是不可避免的。

2. Leader Epoch 机制

为此,社区在 0.11 版本正式引入 Leader Epoch 的概念,来规避因高水位更新错配而导致数据不一致问题。在需要截断数据时使用 Leader Epoch 作为参考依据而不是原本的 HW 值。通常 Leader Epoch 由两部分组成:

  • Epoch:一个单调增加的版本号。每当Leader 副本发生变更时都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。

  • 起始位移(Start Offset):Leader 副本在该 Epoch 值上写入的首条消息的位移。

举个例子,假设现在有两个 Leader Epoch:<0,0> 和 <1,120>。第一个 Leader Epoch 的版本号是 0,这个版本的 Leader 从位移 0 开始保存消息,一共保存了 120 条消息后 Leader 发生了变更,版本号增加到 1,新版本的起始位移是 120。

Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到分区目录下的 leader-epoch-checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新缓存的 Leader Epoch 数据。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。这样每当有 Leader 变更时,新的 Leader 副本会先查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。
image.png
下面我们来看下如何利用 Leader Epoch 机制来规避这种数据丢失:
image.png
场景和之前大致类似,只不过引用 Leader Epoch 机制后,Follower 副本 B 重启回来后,需要向 A 发送一个特殊的请求去获取 Leader 的 LEO 值。A 作为目前的 Leader 在收到请求之后会返回当前的 LEO。在这个例子中,该值为 2。

当获知到 Leader LEO 值为 2 后,B 发现该 LEO 值不比它自己的 LEO 值小,而且缓存中也没有保存任何起始位移值 > 2 的 Epoch 条目,因此 B 无需执行任何日志截断操作。这是对高水位机制的一个明显改进,即副本是否执行日志截断不再依赖于高水位进行判断。

之后副本 A 宕机,B 成为 Leader。同样地,当 A 重启回来后,执行与 B 相同的逻辑判断,发现也不用执行日志截断,至此位移值为 1 的那条消息在两个副本中均得到保留。

之后,当生产者程序向 B 写入新消息时,Leader 副本 B 首次写入消息,会在 Broker 缓存中生成新的 Leader Epoch 条目:[Epoch=1, Offset=2]。之后,副本 B 会使用这个条目帮助判断后续是否执行日志截断操作。这样通过 Leader Epoch 机制,Kafka 完美地规避了这种数据丢失场景。