概述
只有 Controller 才会拥有副本状态机,管理当前集群所有副本的状态。比如当 Broker 上线时,将该 Broker 上的所有副本状态变更为在线状态,当 Broker 下线时,将该 Broker 上的所有副本状态变更为离线状态。更多状态下面会详细讨论。
组件
副本状态
Kafka 定义的状态如下表所示:
| 状态 | 说明 |
|---|---|
| NewReplica | 创建新 Topic 或进行副本重分配时,新创建的副本就处于这个状态。处于此状态的副本只能成功Follower副本 |
| OnlineReplica | 副本开始正常工作时处于此状态,处在这个状态的副本可以成为 Leader副本,也可以成为Follower副本 |
| OfflineRplica | 副本所在的Broker下线后,会转换为此状态 |
| ReplicaDeletionStarted | 刚开始删除副本时,会先将副本转换为此状态,然后开始删除操作。 标志着副本删除操作开始。 |
| ReplicaDeletionSuccessful | 副本被到成功删除扣,副本状态会处于此状态。 标志着副本删除操作成功完成。 |
| ReplicaDeletionIneligible | 副本删除操作失败,会将副本转换为此状态 |
| NonExistentReplica | 副本被成功删除后最终转换为此状态,副本已经不存在物理文件中 |

副本状态转换操作
NonExistentReplica => NewReplica
- Controller 向此副本所在的 Broker 发送
LeaderAndIsrRequest请求,该请求包含此副本所对应的分区详情。 将副本状态设置为
NewReplica。// #4-1 目标状态为「NewReplica」,它的前序状态是「NonExistentReplica」case NewReplica =>validReplicas.foreach { replica =>// 获取副本的分区对象val partition = replica.topicPartition// 从缓存中获取副本的当前状态val currentState = controllerContext.replicaState(replica)// 获取分区的「副本元数据」信息controllerContext.partitionLeadershipInfo(partition) match {// #4-2 缓存存在该分区的元数据信息,那么就需要进行更新缓存操作case Some(leaderIsrAndControllerEpoch) =>// 判断所要变更的副本是否为Leader副本if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {// 如果是Leader副本,记录错误日志,因为Leader副本是不能被设置成NewReplica状态val exception = new StateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader")logFailedStateChange(replica, currentState, OfflineReplica, exception)} else {// #4-3 构建「LeaderAndIsrRequest」请求,然后放入缓冲队列中等待网络I/O执行才会发送到Broker端controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId), // 副本IDreplica.topicPartition, // 分区详情leaderIsrAndControllerEpoch, // 分区对应的副本元数据信息controllerContext.partitionFullReplicaAssignment(replica.topicPartition), // 副本所对应主题的元数据(所有分区以及分区对应的所有副本详情)isNew = true) // 副本是新创建的if (traceEnabled)logSuccessfulTransition(stateLogger, replicaId, partition, currentState, NewReplica)// #4-4 更新本地缓存的副本状态,将副本状态设置为「NewReplica」controllerContext.putReplicaState(replica, NewReplica)}// #4-5 Controller缓存没有数据,那么将它的状态设置为「NewReplica」即可(因为其它Broker也没有数据,所以不需要通知)case None =>if (traceEnabled) {logSuccessfulTransition(stateLogger, replicaId, partition, currentState, NewReplica)}controllerContext.putReplicaState(replica, NewReplica)}}
NewReplica => OnlineReplica
将副本加入到 AR 集合中。
- 将此副本状态设置为
OnlineReplica。 向集群广播 UpdateMetadataRequest。 ```scala // #5-1 副本当前状态为「NewReplica」 case NewReplica => // 从本地缓存获取主题的所有元数据信息 val assignment = controllerContext.partitionFullReplicaAssignment(partition) // #5-2 判断缓存中是否包含当前副本ID if (!assignment.replicas.contains(replicaId)) { // #5-3 不包含,输出错误日志 error(s”Adding replica ($replicaId) that is not part of the assignment $assignment”)
// #5-3 再将该副本加入到副本列表中,并更新缓存 val newAssignment = assignment.copy(replicas = assignment.replicas :+ replicaId) controllerContext.updatePartitionFullReplicaAssignment(partition, newAssignment) }
// #5-7 将副本状态设置为「OnlineReplica」 controllerContext.putReplicaState(replica, OnlineReplica)
<a name="YpnLT"></a>## OnlineReplica、OfflineReplica、ReplicaDeletionIneligible=>OnlineReplica1. 向此副本所在的 Broker 发送 LEaderAndIsrRequest,并附带此副本对应的分区详情。1. 将此副本状态设置为 `OnlineReplica`。1. 向集群广播 UpdateMetadataRequest。```scala// #5-4 其它状态case _ =>// #5-5 从缓存中获取分区详情,详情包括Leader副本、ISR集合等controllerContext.partitionLeadershipInfo(partition) match {// #5-6 如果存在分区信息,向该副本对象所在的Broker发送请求,令其同步该分区数据case Some(leaderIsrAndControllerEpoch) =>// #5-6 如果缓存存在,构建「LeaderAndIsrRequest」请求,然后放入缓冲队列中等待网络I/O执行才会发送到Broker端controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId), // 副本IDreplica.topicPartition, // 分区详情leaderIsrAndControllerEpoch, // 分区对应的副本元数据信息controllerContext.partitionFullReplicaAssignment(partition), // 副本所对应主题的元数据(所有分区以及分区对应的所有副本详情)isNew = false) // 副本是新创建的case None =>}// #5-7 将副本状态设置为「OnlineReplica」controllerContext.putReplicaState(replica, OnlineReplica)
NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible →OfflineReplica
- 向副本所在的 Broker 发送
StopReplicaRequest请求。 - 对于存在 Leader 的分区,将此副本从 ISR 集合中移除。更新 Zookeeper
/brokers/topics/<topic name>/partitions/<replicaId>/state节点。 - 向该分区其它副本所在的 Broker 发送
LeaderAndIsrRequest请求,并附带此副本对应的分区详情。 - 向集群广播 UpdateMetadataRequest。
将副本状态更新为 OfflineReplica。
// #6 目标状态设置「OfflineReplica」,意味着副本下线case OfflineReplica =>validReplicas.foreach { replica =>// #6-1 向副本所在的Broker发送「StopReplicaRequest」请求:表示停止副本controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = false)}// #6-2 将副本对象集合划分成有Leader副本和无Leader副本val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition { replica =>controllerContext.partitionLeadershipInfo(replica.topicPartition).isDefined}// #6-3 如果分区有Leader副本,那么相应ISR集合也会存在。那首先就批量从各自ISR集合中移除replicaId// 更新ZK元数据和本地缓存val updatedLeaderIsrAndControllerEpochs =removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))// #6-4 遍历每个已完成更新的分区updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")// #6-5 分区对应主题不处于「删除中」状态if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {// #6-6 获取该分区除给定副本以外的其它副本所在的Brokerval recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)// #6-7 向这些Broker发送「LeaderAndIsrRequest」请求以更新该分区元数据controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients, // 待发送请求的broker id列表partition, // 分区leaderIsrAndControllerEpoch, // 此刻分区最新的副本元数据controllerContext.partitionFullReplicaAssignment(partition), // 副本所对应主题的元数据(所有分区以及分区对应的所有副本详情)isNew = false)}val replica = PartitionAndReplica(partition, replicaId)val currentState = controllerContext.replicaState(replica)if (traceEnabled)logSuccessfulTransition(stateLogger, replicaId, partition, currentState, OfflineReplica)// #6-8 变更副本的状态为「OfflineReplica」controllerContext.putReplicaState(replica, OfflineReplica)}// #6-9 遍历无Leader的所有副本对象replicasWithoutLeadershipInfo.foreach { replica =>val currentState = controllerContext.replicaState(replica)if (traceEnabled)logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, OfflineReplica)// #6-10 向集群所有Broker发送请求,更新对应分区的元数据controllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(replica.topicPartition))// #6-11 变更副本的状态为「OfflineReplica」controllerContext.putReplicaState(replica, OfflineReplica)}
OfflineReplica => ReplicaDeletionStarted
向副本所在的 Broker 发送 StopReplicaRequest。
case ReplicaDeletionStarted =>validReplicas.foreach { replica =>val currentState = controllerContext.replicaState(replica)if (traceEnabled)logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, ReplicaDeletionStarted)controllerContext.putReplicaState(replica, ReplicaDeletionStarted)controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = true)}
副本状态机初始化
/*** 初始化集群内所有副本的状态*/private def initializeReplicaState(): Unit = {// #1 遍历缓存中所有分区详情controllerContext.allPartitions.foreach { partition =>// #2 获取该副本列表val replicas = controllerContext.partitionReplicaAssignment(partition)replicas.foreach { replicaId =>// #3 构建副本状态䭴val partitionAndReplica = PartitionAndReplica(partition, replicaId)// #4 判断副本是否在线:1.副本所在的Broker在线 且 2.离线路径并没有包含该分区if (controllerContext.isReplicaOnline(replicaId, partition)) {// #4-1 将副本状态设置为OnlineReplicacontrollerContext.putReplicaState(partitionAndReplica, OnlineReplica)} else {// #4-1 如果Broker宕机,将状态设置为ReplicaDeletionIneligible。// 这是一个更安全的状态。如果Broker重新恢复,ReplicaDeletionIneligible还可以回到OnlineReplicacontrollerContext.putReplicaState(partitionAndReplica, ReplicaDeletionIneligible)}}}}
