Log End Offset:日志末端位移,记录该副本对象底层日志的下一条位移值。Leader副本会维护关于其它副本的 Log End Offset 值,用于消息同步。
High Watermark:HW 值也是指向下一条消息。小于HW或在HW以下的消息被认为是”已备份的”。
这些值更新时机:
Log End Offset
Follower:使用 ReplicaFetcherThread 线程不断向 Leader 副本发送 FETCH 请求并拉取消息,Follower 得到响应并解析,获得日志数据,再写入本地日志文件后更新本地缓存的 LEO 值。
Leader副本:Leader 副本第一时间接收来自 Producer 的请求,解析请求体,再写入本地日志文件后更新本地缓存的 LEO 值。同时,Leader副本会维护其它 Follower 的 LEO 值,主要目的是帮助 Leader 副本确定整个分区的 HW 值。Follower 的 LEO 值是 Leader 副本处理 FETCH 请求时被更新的。因为 Follower 发送的 FETCH 请求中包含从哪个起始位移值拉取消息,即当前 Follower 的 LEO 值,Leader 会将该值设置为这个 Follower 副本的 LEO 值。
High Watermarker
Follower 自身也会维护自己的 HW 值,这个值是需要和 Leader 副本比较的。Leader 在 FETCH 响应中会写入集群的 HW 值,记作 HW(L),Follower 收到 FETCH 响应,解析数据并写入日志文件后就会尝试更新自己缓存中的 HW 值,通过公式 Math.min(Log End Offset, HW(L)) 计算得到。
Leader 维护的 HW 值表示分区的 HW,对消费者来说,这是它可见的最大消息位移值。以下 4 种情况 Leader 会尝试更新 HW :
- Follower 晋升 Leader 副本成功。
- Broker 离线导致在该 Broker 上的副本从 ISR 集群中被剔除,Kafka 会执行 ISR 缩减操作,需要检查分区 HW 是否需要更新。
- 生产者向 Leader 发送 Producer Request 请求,Leader 写入消息后会更新 LEO,然后检查 HW 是否需要修改。
- Leader 处理 FETCH 请求,更新完其它 Follower 的 LEO 后,再检查 HW 是否需要更新。
Leader 更新 HW 的算法:
- 获取该分区所有符合一定条件的集合副本(包含 Leader 副本)的 LEO 值
- 取它们的最小值作为分区的 HW 值。
注意,这里一定条件是指
- 该副本在 ISR 集合中
- 该 Follower LEO 落后于 Leader LEO 的时间 <=
replica.lag.time.max.ms
条件 2 是防止:Follower 已经追赶上 Leader 的进度,但却不在 ISR 中。
每个 Broker 都会有 ReplicaManager 副本管理器,其实也可以称为分区管理器,用来管理存在该 Broker 的所有分区。这个副本管理器做以下事情:
- 接收并处理来自 Controller 的
LeaderAndIsr请求,根据该请求的元数据对副本进行晋升或降级处理,此外还包括 FETCH 线程的创建与关闭、相关元数据的创建与清理。 - 管理 Leader 的主题的 ISR,如果 ISR 中的副本的
当前时间-上次FETCH时间>replicaLagTimeMaxMs(默认值:30S),那么该副本就会被剔除 ISR 集合。对于 Kafka 2.7,是由 Broker 组装好 AlterIsr 请求并发往 Controller,由 Controller 更新 Zookeeper 数据和向其它 Broker 广播相关元数据。而低于 Kafka 2.7 版本是直接由 Broker 完成这两项工作。 - 副本管理器还会维护分区的状态,如果 Leader 的日志目录出现问题,那么该 Leader 所表示的分区就会被置为
Offline状态。 - 副本管理器接收来自 Controller、Producer、Consumer的请求并响应。如果遇到集群元数据的错误,会先告知 Controller,由 Controller 进行相关逻辑处理。
// 每15秒就会对ISR集合进行检查scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
/*** 判断副本是否脱离的ISR集合* 副本的LEO不等于Leader的LEO 且** @param replicaId 副本ID* @param leaderEndOffset Leader的LEO值* @param currentTimeMs 当前时间戳* @param maxLagMs 最大滞后时间间隔,默认值:30S* @return*/private def isFollowerOutOfSync(replicaId: Int,leaderEndOffset: Long,currentTimeMs: Long,maxLagMs: Long): Boolean = {val followerReplica = getReplicaOrException(replicaId)followerReplica.logEndOffset != leaderEndOffset &&(currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs}
