Log End Offset:日志末端位移,记录该副本对象底层日志的下一条位移值。Leader副本会维护关于其它副本的 Log End Offset 值,用于消息同步。
    High Watermark:HW 值也是指向下一条消息。小于HW或在HW以下的消息被认为是”已备份的”。

    这些值更新时机:
    Log End Offset
    Follower:使用 ReplicaFetcherThread 线程不断向 Leader 副本发送 FETCH 请求并拉取消息,Follower 得到响应并解析,获得日志数据,再写入本地日志文件后更新本地缓存的 LEO 值。
    Leader副本:Leader 副本第一时间接收来自 Producer 的请求,解析请求体,再写入本地日志文件后更新本地缓存的 LEO 值。同时,Leader副本会维护其它 Follower 的 LEO 值,主要目的是帮助 Leader 副本确定整个分区的 HW 值。Follower 的 LEO 值是 Leader 副本处理 FETCH 请求时被更新的。因为 Follower 发送的 FETCH 请求中包含从哪个起始位移值拉取消息,即当前 Follower 的 LEO 值,Leader 会将该值设置为这个 Follower 副本的 LEO 值。

    High Watermarker
    Follower 自身也会维护自己的 HW 值,这个值是需要和 Leader 副本比较的。Leader 在 FETCH 响应中会写入集群的 HW 值,记作 HW(L),Follower 收到 FETCH 响应,解析数据并写入日志文件后就会尝试更新自己缓存中的 HW 值,通过公式 Math.min(Log End Offset, HW(L)) 计算得到。

    Leader 维护的 HW 值表示分区的 HW,对消费者来说,这是它可见的最大消息位移值。以下 4 种情况 Leader 会尝试更新 HW :

    1. Follower 晋升 Leader 副本成功。
    2. Broker 离线导致在该 Broker 上的副本从 ISR 集群中被剔除,Kafka 会执行 ISR 缩减操作,需要检查分区 HW 是否需要更新。
    3. 生产者向 Leader 发送 Producer Request 请求,Leader 写入消息后会更新 LEO,然后检查 HW 是否需要修改。
    4. Leader 处理 FETCH 请求,更新完其它 Follower 的 LEO 后,再检查 HW 是否需要更新。

    Leader 更新 HW 的算法:

    1. 获取该分区所有符合一定条件的集合副本(包含 Leader 副本)的 LEO 值
    2. 取它们的最小值作为分区的 HW 值。

      注意,这里一定条件是指

      1. 该副本在 ISR 集合中
      2. 该 Follower LEO 落后于 Leader LEO 的时间 <= replica.lag.time.max.ms

      条件 2 是防止:Follower 已经追赶上 Leader 的进度,但却不在 ISR 中。

    每个 Broker 都会有 ReplicaManager 副本管理器,其实也可以称为分区管理器,用来管理存在该 Broker 的所有分区。这个副本管理器做以下事情:

    1. 接收并处理来自 Controller 的 LeaderAndIsr 请求,根据该请求的元数据对副本进行晋升或降级处理,此外还包括 FETCH 线程的创建与关闭、相关元数据的创建与清理。
    2. 管理 Leader 的主题的 ISR,如果 ISR 中的副本的 当前时间-上次FETCH时间 > replicaLagTimeMaxMs(默认值:30S) ,那么该副本就会被剔除 ISR 集合。对于 Kafka 2.7,是由 Broker 组装好 AlterIsr 请求并发往 Controller,由 Controller 更新 Zookeeper 数据和向其它 Broker 广播相关元数据。而低于 Kafka 2.7 版本是直接由 Broker 完成这两项工作。
    3. 副本管理器还会维护分区的状态,如果 Leader 的日志目录出现问题,那么该 Leader 所表示的分区就会被置为 Offline 状态。
    4. 副本管理器接收来自 Controller、Producer、Consumer的请求并响应。如果遇到集群元数据的错误,会先告知 Controller,由 Controller 进行相关逻辑处理。
    1. // 每15秒就会对ISR集合进行检查
    2. scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
    1. /**
    2. * 判断副本是否脱离的ISR集合
    3. * 副本的LEO不等于Leader的LEO 且
    4. *
    5. * @param replicaId 副本ID
    6. * @param leaderEndOffset Leader的LEO值
    7. * @param currentTimeMs 当前时间戳
    8. * @param maxLagMs 最大滞后时间间隔,默认值:30S
    9. * @return
    10. */
    11. private def isFollowerOutOfSync(replicaId: Int,
    12. leaderEndOffset: Long,
    13. currentTimeMs: Long,
    14. maxLagMs: Long): Boolean = {
    15. val followerReplica = getReplicaOrException(replicaId)
    16. followerReplica.logEndOffset != leaderEndOffset &&
    17. (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs
    18. }