概述

本章节是对分区状态机的学习。分区状态机也是 Controller 特有的组件,负责管理 Kafka 分区状态的转换。和状态机设计理念高度相似,只不过分区状态数量只有 4 个,但是不代表分区状态机逻辑简单。

组件

【kafka】 分区状态机 PartitionStateMachine - 图1

副本状态

分区状态 说明
NewPartition 分区被创建后被设置成这个状态,意味着它是一个全新的分区。
处于这种状态的分区不能选举 Leader副本。
OnlinePartition 分区正式提供服务时所处的状态
OfflinePartition 分区下线后所处的状态
NonExistentPartition 分区被删除,并且从分区状态机移除后所处的状态

分区状态转换示意图.png

分区状态转换操作

NonExistentPartition => NewPartition

  1. 将分区状态变更为 NewPartition

    1. // #3 状态变更
    2. targetState match {
    3. // #3-1 目标状态为NewPartition
    4. case NewPartition =>
    5. validPartitions.foreach { partition =>
    6. stateChangeLog.info(s"Changed partition $partition state from ${partitionState(partition)} to $targetState with " +
    7. s"assigned replicas ${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
    8. controllerContext.putPartitionState(partition, NewPartition)
    9. }
    10. Map.empty

    NewPartition => OnlinePartition

  2. 确认该分区的 Leader 副本和 AR 以及 ISR 集合等元数据信息。并写入 Zookeeper 节点。存活的第一个副本就是 Leader 副本,ISR 集合为存活的副本列表。

  3. 将分区状态变更为 OnlinePartition
  4. 向所有可用的副本所在的 Broker 发送 LeaderAndIsrReqeust,指导这些 Borker 进行 Leader/Follower 的角色切换。
  5. 向集群广播 UpdateMetadataRequest 请求更新相关元数据。

    1. // #3-2 分区上线
    2. case OnlinePartition =>
    3. // ① 获取「NewPartition」状态下的所有分区
    4. val uninitializedPartitions = validPartitions.filter(partition => partitionState(partition) == NewPartition)
    5. // ② 获取可进行 Leader选举 的分区列表(当前状态为 OfflinePartition 或 OnlinePartition 才能进行分区Leader选举)
    6. val partitionsToElectLeader = validPartitions
    7. .filter(partition => partitionState(partition) == OfflinePartition || partitionState(partition) == OnlinePartition)
    8. // ③ 初始化「NewPartition」状态分区,在Zookeeper中写入Leader和ISR数据
    9. if (uninitializedPartitions.nonEmpty) {
    10. // 初始化Zookeeper节点/brokers/topics/<topic>/partitions/<partition>数据
    11. val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
    12. successfulInitializations.foreach { partition =>
    13. stateChangeLog.info(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
    14. s"${controllerContext.partitionLeadershipInfo(partition).get.leaderAndIsr}")
    15. // NewPartition->OnlinePartition
    16. controllerContext.putPartitionState(partition, OnlinePartition)
    17. }
    18. }
    19. // ...
    20. }

    OnlinePartitio/OfflinePartition => OnlinePartition

  6. 根据分区选举策略对分区进行 Leader 选举,确认 Leader 和 ISR 集合,并将结果写入 Zookeeper 结点。

  7. 将分区状态变更为 OnlinePartition
  8. 向所有可用的副本所在的 Broker 发送 LeaderAndIsrReqeust,指导这些 Borker 进行 Leader/Follower 的角色切换。
  9. 向集群广播 UpdateMetadataRequest 请求更新相关元数据。

    1. // ④ 使用Leader副本选举策略给具备Leader选举资格的分区选举Leader副本
    2. if (partitionsToElectLeader.nonEmpty) {
    3. // 不断尝试为多个分区选举Leader,直到所有分区都成功选出Leader
    4. val electionResults = electLeaderForPartitions(
    5. partitionsToElectLeader,
    6. partitionLeaderElectionStrategyOpt.getOrElse(
    7. throw new IllegalArgumentException("Election strategy is a required field when the target state is OnlinePartition")
    8. )
    9. )
    10. // 遍历选举结果
    11. electionResults.foreach {
    12. case (partition, Right(leaderAndIsr)) =>
    13. stateChangeLog.info(
    14. s"Changed partition $partition from ${partitionState(partition)} to $targetState with state $leaderAndIsr"
    15. )
    16. // ⑤ 将成功选举Leader后的分区设置成OnlinePartition状态
    17. controllerContext.putPartitionState(partition, OnlinePartition)
    18. case (_, Left(_)) => // Ignore; no need to update partition state on election error
    19. }
    20. // ⑥ 返回Leader选举结果
    21. electionResults
    22. } else {
    23. Map.empty
    24. }

    其实状态变更只涉及到缓存状态修改,这里就不将代码贴出来了。

    分区 Leader 选举

    分区状态机除了维护各分区状态外,还有一个功能就是分区 Leader 选举。不同场景执行不同 Leader 选举策略,选举策略一共有 4 种:

选举策略 说明
OfflinePartitionLeaderElectionStrategy Leader 副本离线引发分区Leader选举
ReassignPartitionLeaderElectionsStrategy 执行分区副本重分配操作而引发分区Leader选举
PreferredReplicaPartitionLeaderElectionStrategy 执行 Preferred 副本 Leader 选举
ControlledShutdownPartitionLeaderElectionStrategy 正常关闭Broker而引发分区Leader选举

对象 PartitionLeaderElectionAlgorithms 分别实现了这 4 种选举策略:

  1. /**
  2. * 针对4种触发的Leader选举的场景,这个对象分别定义了4个方法,负责为每种场景选举Leader副本
  3. */
  4. object PartitionLeaderElectionAlgorithms {
  5. /**
  6. * 场景一:由于Leader副本下线而引发分区Leader选举
  7. * 选举策略:AR集合第一个满足存在①副本在线②在ISR集合中,它就是Leader副本
  8. *
  9. * @param assignment 分区副本列表,即AR(Assigned Replicas,感觉也可以称为All Replicas)
  10. * 使用Seq表示AR是有序的,但顺序不一定和ISR相同,因为ISR可能会频繁进出
  11. * @param isr 同步副本集合。注意:Leader副本也在ISR集合中。
  12. * @param liveReplicas 该分区下所有存活的副本。
  13. * @param uncleanLeaderElectionEnabled 是否允许Unclean Leader副本参与Leader选举。这可能存在数据丢失的风险
  14. * @param controllerContext Controller上下文,里面保留集群所有元数据
  15. * @return
  16. */
  17. def offlinePartitionLeaderElection(assignment: Seq[Int],
  18. isr: Seq[Int],
  19. liveReplicas: Set[Int],
  20. uncleanLeaderElectionEnabled: Boolean,
  21. controllerContext: ControllerContext): Option[Int] = {
  22. // #1 按顺序搜索AR列表,如果同时满足①副本所在的Broker仍在运行;②副本在ISR列表中 这两个条件,则表明找到Leader副本。
  23. // 否则判断是否开启Unclean Leader选举,如果开启,则从Unclean Leader副本中选出一个作为Leader副本以保证分区可用性
  24. assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
  25. // #2 ISR集合没有可用的副本对象,查看是否允许Unclean Leader选举(即unclean.leader.election.enable=true)
  26. if (uncleanLeaderElectionEnabled) {
  27. // #3 选举当前副本列表中第一个存活副本作为Leader
  28. val leaderOpt = assignment.find(liveReplicas.contains)
  29. if (leaderOpt.isDefined)
  30. controllerContext.stats.uncleanLeaderElectionRate.mark()
  31. leaderOpt
  32. } else {
  33. // 如果不允许Unclean Leader选举,则返回None表示无法选举Leader
  34. None
  35. }
  36. }
  37. }
  38. /**
  39. * 场景二:由于执行分区重分配操作而引发的分区Leader选举
  40. *
  41. * @param reassignment
  42. * @param isr
  43. * @param liveReplicas
  44. * @return
  45. */
  46. def reassignPartitionLeaderElection(reassignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
  47. reassignment.find(id => liveReplicas.contains(id) && isr.contains(id))
  48. }
  49. /**
  50. * 场景三:由于执行Preferred副本Leader选举而引发的分区Leader选举
  51. * 出现这个选举策略是因为解决Leader分配不均的问题。
  52. * 当所有集群正常并且同时重启时,副本Leader会均匀分布,但随着时间推移,避免不了某些Broker宕机,
  53. * 这样在其它Broker的follower成为Leader,即便后续后续原有的Leader重启成功,
  54. * 即便奋起直追,加入ISR列表,它的身份也只是Follower。久而久之,就会导致整个集群的流量不均匀,加大其它Broker宕机风险。
  55. * Kakfa提供kakfa-preferred-replica-election.sh脚本去均衡分区的Leader副本,实现思路是:
  56. * 1.引入preferred-replica概念,它是指ISR列表中第一个replicaid就是preferred-replica。
  57. * 最初的Leader肯定是排在ISR列表的首位,但是Broker宕机后变成follower,但是ISR中的preferred-replica不会改变,
  58. * 执行kakfa-preferred-replica-election.sh脚本就是让preferred-replica重新成为分区Leader副本。
  59. * 这是手动触发的,也可以配置「auto.leader.rebalance.enable=true」让Kafka满足一定条件时自动触发(Kafka监控集群不均衡度达到某个阈值时自动触发preferred-replica选举操作)。
  60. *
  61. * @param assignment
  62. * @param isr
  63. * @param liveReplicas
  64. * @return
  65. */
  66. def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
  67. // 判断第一个副本是否存活
  68. assignment.headOption.filter(id => liveReplicas.contains(id) && isr.contains(id))
  69. }
  70. /**
  71. * 场景四:因为正常关闭Broker而引发分区Leader选举
  72. *
  73. * @param assignment
  74. * @param isr
  75. * @param liveReplicas
  76. * @param shuttingDownBrokers
  77. * @return
  78. */
  79. def controlledShutdownPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], shuttingDownBrokers: Set[Int]): Option[Int] = {
  80. assignment.find(id => liveReplicas.contains(id) && isr.contains(id) && !shuttingDownBrokers.contains(id))
  81. }
  82. }

Leader副本离线

当副本离线时,根据 AR 的顺序在 ISR 中第一个出现的就作为 Leader 副本。如果ISR集合为空,则会判断是否支持 un-clean Leader 选举,如果支持的话,就选举当前AR中第一个存活的副本作为 Leader 副本。这是集群为了高可用的一种妥协,因为这会导致数据丢失。
Leader离线触发Leader选举.png

分区重分配

我们可以通过 kafka-reassign-partitions.sh 脚本对某些主题进行分区重分配,这也会导致分区进行内部的分区 Leader 选举。
和上面一样,根据 AR 的顺序在 ISR 中第一个出现的就作为 Leader 副本。但不支持 un-clean 作为 Leader 副本。

优先副本

出现这个选举策略是为了解决 Leader 分配不均的问题。当所有同时启动,副本 Leader 会均匀分布集群的 Broker 上。但随着时间推移,避免不了某些 Broker 出现意外而宕机,这样位于其它 Broker 的 follower 成为 Leader,即便后续旧 Leader 重启成功,哪怕奋起直追,加入 ISR,它的身份也只是 Follower。久而久之,就会导致整个集群的流量集中于某几台 Broker 节点,流量分配不均匀,而且还会加大其它 Broker 宕机风险。Kakfa 提供kakfa-preferred-replica-election.sh 脚本去均衡分区 Leader。
实现思路是:引入 preferred-replica 概念,它是指 AR 列表中第一个 replicaid 就是 preferred-replica。最初的 Leader 肯定是排在 AR 列表的首位,但是 Broker 宕机后变成 follower,虽然 ISR 顺序会变,数据也会变,但是 AR 集合顺序不会变,数量也不会变。执行 kakfa-preferred-replica-election.sh 脚本就是让 preferred-replica 重新成为分区Leader副本。前提条件是 preferred-replica 也要存在 ISR 集合中。

Controller关闭

Controller 关闭也会导致分区 Leader 选举,和上面一样,根据 AR 的顺序在 ISR 中第一个出现的就作为 Leader 副本。但不支持 un-clean 作为 Leader 副本。并且该 Broker 不存在正在关闭的 Broker 列表中。

总结

分区状态机只有 Controller 才会拥有,用来管理整个集群的分区状态。其中还包括非常重要的分区 Leader 选举功能。