概述

只有 Controller 才会拥有副本状态机,管理当前集群所有副本的状态。比如当 Broker 上线时,将该 Broker 上的所有副本状态变更为在线状态,当 Broker 下线时,将该 Broker 上的所有副本状态变更为离线状态。更多状态下面会详细讨论。

组件

【Kafka】副本状态机 ReplicaStateMachine - 图1

副本状态

Kafka 定义的状态如下表所示:

状态 说明
NewReplica 创建新 Topic 或进行副本重分配时,新创建的副本就处于这个状态。处于此状态的副本只能成功Follower副本
OnlineReplica 副本开始正常工作时处于此状态,处在这个状态的副本可以成为 Leader副本,也可以成为Follower副本
OfflineRplica 副本所在的Broker下线后,会转换为此状态
ReplicaDeletionStarted 刚开始删除副本时,会先将副本转换为此状态,然后开始删除操作。
标志着副本删除操作开始。
ReplicaDeletionSuccessful 副本被到成功删除扣,副本状态会处于此状态。
标志着副本删除操作成功完成。
ReplicaDeletionIneligible 副本删除操作失败,会将副本转换为此状态
NonExistentReplica 副本被成功删除后最终转换为此状态,副本已经不存在物理文件中

副本状态转换示意图.png

副本状态转换操作

NonExistentReplica => NewReplica

  1. Controller 向此副本所在的 Broker 发送 LeaderAndIsrRequest 请求,该请求包含此副本所对应的分区详情。
  2. 将副本状态设置为 NewReplica

    1. // #4-1 目标状态为「NewReplica」,它的前序状态是「NonExistentReplica」
    2. case NewReplica =>
    3. validReplicas.foreach { replica =>
    4. // 获取副本的分区对象
    5. val partition = replica.topicPartition
    6. // 从缓存中获取副本的当前状态
    7. val currentState = controllerContext.replicaState(replica)
    8. // 获取分区的「副本元数据」信息
    9. controllerContext.partitionLeadershipInfo(partition) match {
    10. // #4-2 缓存存在该分区的元数据信息,那么就需要进行更新缓存操作
    11. case Some(leaderIsrAndControllerEpoch) =>
    12. // 判断所要变更的副本是否为Leader副本
    13. if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {
    14. // 如果是Leader副本,记录错误日志,因为Leader副本是不能被设置成NewReplica状态
    15. val exception = new StateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader")
    16. logFailedStateChange(replica, currentState, OfflineReplica, exception)
    17. } else {
    18. // #4-3 构建「LeaderAndIsrRequest」请求,然后放入缓冲队列中等待网络I/O执行才会发送到Broker端
    19. controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(
    20. Seq(replicaId), // 副本ID
    21. replica.topicPartition, // 分区详情
    22. leaderIsrAndControllerEpoch, // 分区对应的副本元数据信息
    23. controllerContext.partitionFullReplicaAssignment(replica.topicPartition), // 副本所对应主题的元数据(所有分区以及分区对应的所有副本详情)
    24. isNew = true) // 副本是新创建的
    25. if (traceEnabled)
    26. logSuccessfulTransition(stateLogger, replicaId, partition, currentState, NewReplica)
    27. // #4-4 更新本地缓存的副本状态,将副本状态设置为「NewReplica」
    28. controllerContext.putReplicaState(replica, NewReplica)
    29. }
    30. // #4-5 Controller缓存没有数据,那么将它的状态设置为「NewReplica」即可(因为其它Broker也没有数据,所以不需要通知)
    31. case None =>
    32. if (traceEnabled) {
    33. logSuccessfulTransition(stateLogger, replicaId, partition, currentState, NewReplica)
    34. }
    35. controllerContext.putReplicaState(replica, NewReplica)
    36. }
    37. }

    NewReplica => OnlineReplica

  3. 将副本加入到 AR 集合中。

  4. 将此副本状态设置为 OnlineReplica
  5. 向集群广播 UpdateMetadataRequest。 ```scala // #5-1 副本当前状态为「NewReplica」 case NewReplica => // 从本地缓存获取主题的所有元数据信息 val assignment = controllerContext.partitionFullReplicaAssignment(partition) // #5-2 判断缓存中是否包含当前副本ID if (!assignment.replicas.contains(replicaId)) { // #5-3 不包含,输出错误日志 error(s”Adding replica ($replicaId) that is not part of the assignment $assignment”)

    // #5-3 再将该副本加入到副本列表中,并更新缓存 val newAssignment = assignment.copy(replicas = assignment.replicas :+ replicaId) controllerContext.updatePartitionFullReplicaAssignment(partition, newAssignment) }

// #5-7 将副本状态设置为「OnlineReplica」 controllerContext.putReplicaState(replica, OnlineReplica)

  1. <a name="YpnLT"></a>
  2. ## OnlineReplica、OfflineReplica、ReplicaDeletionIneligible=>OnlineReplica
  3. 1. 向此副本所在的 Broker 发送 LEaderAndIsrRequest,并附带此副本对应的分区详情。
  4. 1. 将此副本状态设置为 `OnlineReplica`。
  5. 1. 向集群广播 UpdateMetadataRequest。
  6. ```scala
  7. // #5-4 其它状态
  8. case _ =>
  9. // #5-5 从缓存中获取分区详情,详情包括Leader副本、ISR集合等
  10. controllerContext.partitionLeadershipInfo(partition) match {
  11. // #5-6 如果存在分区信息,向该副本对象所在的Broker发送请求,令其同步该分区数据
  12. case Some(leaderIsrAndControllerEpoch) =>
  13. // #5-6 如果缓存存在,构建「LeaderAndIsrRequest」请求,然后放入缓冲队列中等待网络I/O执行才会发送到Broker端
  14. controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(
  15. Seq(replicaId), // 副本ID
  16. replica.topicPartition, // 分区详情
  17. leaderIsrAndControllerEpoch, // 分区对应的副本元数据信息
  18. controllerContext.partitionFullReplicaAssignment(partition), // 副本所对应主题的元数据(所有分区以及分区对应的所有副本详情)
  19. isNew = false) // 副本是新创建的
  20. case None =>
  21. }
  22. // #5-7 将副本状态设置为「OnlineReplica」
  23. controllerContext.putReplicaState(replica, OnlineReplica)

NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible →OfflineReplica

  1. 向副本所在的 Broker 发送 StopReplicaRequest 请求。
  2. 对于存在 Leader 的分区,将此副本从 ISR 集合中移除。更新 Zookeeper /brokers/topics/<topic name>/partitions/<replicaId>/state 节点。
  3. 向该分区其它副本所在的 Broker 发送 LeaderAndIsrRequest 请求,并附带此副本对应的分区详情。
  4. 向集群广播 UpdateMetadataRequest。
  5. 将副本状态更新为 OfflineReplica。

    1. // #6 目标状态设置「OfflineReplica」,意味着副本下线
    2. case OfflineReplica =>
    3. validReplicas.foreach { replica =>
    4. // #6-1 向副本所在的Broker发送「StopReplicaRequest」请求:表示停止副本
    5. controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = false)
    6. }
    7. // #6-2 将副本对象集合划分成有Leader副本和无Leader副本
    8. val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition { replica =>
    9. controllerContext.partitionLeadershipInfo(replica.topicPartition).isDefined
    10. }
    11. // #6-3 如果分区有Leader副本,那么相应ISR集合也会存在。那首先就批量从各自ISR集合中移除replicaId
    12. // 更新ZK元数据和本地缓存
    13. val updatedLeaderIsrAndControllerEpochs =
    14. removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
    15. // #6-4 遍历每个已完成更新的分区
    16. updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>
    17. stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
    18. // #6-5 分区对应主题不处于「删除中」状态
    19. if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
    20. // #6-6 获取该分区除给定副本以外的其它副本所在的Broker
    21. val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)
    22. // #6-7 向这些Broker发送「LeaderAndIsrRequest」请求以更新该分区元数据
    23. controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(
    24. recipients, // 待发送请求的broker id列表
    25. partition, // 分区
    26. leaderIsrAndControllerEpoch, // 此刻分区最新的副本元数据
    27. controllerContext.partitionFullReplicaAssignment(partition), // 副本所对应主题的元数据(所有分区以及分区对应的所有副本详情)
    28. isNew = false)
    29. }
    30. val replica = PartitionAndReplica(partition, replicaId)
    31. val currentState = controllerContext.replicaState(replica)
    32. if (traceEnabled)
    33. logSuccessfulTransition(stateLogger, replicaId, partition, currentState, OfflineReplica)
    34. // #6-8 变更副本的状态为「OfflineReplica」
    35. controllerContext.putReplicaState(replica, OfflineReplica)
    36. }
    37. // #6-9 遍历无Leader的所有副本对象
    38. replicasWithoutLeadershipInfo.foreach { replica =>
    39. val currentState = controllerContext.replicaState(replica)
    40. if (traceEnabled)
    41. logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, OfflineReplica)
    42. // #6-10 向集群所有Broker发送请求,更新对应分区的元数据
    43. controllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(replica.topicPartition))
    44. // #6-11 变更副本的状态为「OfflineReplica」
    45. controllerContext.putReplicaState(replica, OfflineReplica)
    46. }

    OfflineReplica => ReplicaDeletionStarted

  6. 向副本所在的 Broker 发送 StopReplicaRequest。

    1. case ReplicaDeletionStarted =>
    2. validReplicas.foreach { replica =>
    3. val currentState = controllerContext.replicaState(replica)
    4. if (traceEnabled)
    5. logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, ReplicaDeletionStarted)
    6. controllerContext.putReplicaState(replica, ReplicaDeletionStarted)
    7. controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = true)
    8. }

    其实状态变更只涉及缓存状态修改,因此不做过多叙述。

    副本状态机初始化

    1. /**
    2. * 初始化集群内所有副本的状态
    3. */
    4. private def initializeReplicaState(): Unit = {
    5. // #1 遍历缓存中所有分区详情
    6. controllerContext.allPartitions.foreach { partition =>
    7. // #2 获取该副本列表
    8. val replicas = controllerContext.partitionReplicaAssignment(partition)
    9. replicas.foreach { replicaId =>
    10. // #3 构建副本状态䭴
    11. val partitionAndReplica = PartitionAndReplica(partition, replicaId)
    12. // #4 判断副本是否在线:1.副本所在的Broker在线 且 2.离线路径并没有包含该分区
    13. if (controllerContext.isReplicaOnline(replicaId, partition)) {
    14. // #4-1 将副本状态设置为OnlineReplica
    15. controllerContext.putReplicaState(partitionAndReplica, OnlineReplica)
    16. } else {
    17. // #4-1 如果Broker宕机,将状态设置为ReplicaDeletionIneligible。
    18. // 这是一个更安全的状态。如果Broker重新恢复,ReplicaDeletionIneligible还可以回到OnlineReplica
    19. controllerContext.putReplicaState(partitionAndReplica, ReplicaDeletionIneligible)
    20. }
    21. }
    22. }
    23. }