1、查看kafka-reassign-partitions.sh脚本

cd kafka_home/bin
cat kafka-reassign-partitions.sh

  1. #!/bin/bash
  2. # Licensed to the Apache Software Foundation (ASF) under one or more
  3. # contributor license agreements. See the NOTICE file distributed with
  4. # this work for additional information regarding copyright ownership.
  5. # The ASF licenses this file to You under the Apache License, Version 2.0
  6. # (the "License"); you may not use this file except in compliance with
  7. # the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand "$@"

由上可得:kafka-reassign-partitions.sh脚本实际执行kafka-run-class.sh脚本,调用kafka.admin.ReassignPartitionsCommand类

2、ReassignPartitionsCommand类详情

源码分析Kafka分区重分配/迁移(kafka-reassign-partitions.sh) - 图1

  1. def executeAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
  2. //读取迁移计划json格式
  3. val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
  4. //将迁移计划json格式转string
  5. val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
  6. //如果有限额的参数,则将限额的参数进行读取,传入executeAssignment中去
  7. val throttle = if (opts.options.has(opts.throttleOpt)) opts.options.valueOf(opts.throttleOpt) else -1
  8. executeAssignment(zkUtils, reassignmentJsonString, throttle)
  9. }
  10. def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Long = -1) {
  11. val partitionsToBeReassigned = parseAndValidate(zkUtils, reassignmentJsonString)
  12. val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
  13. // If there is an existing rebalance running, attempt to change its throttle
  14. if (zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)) {
  15. println("There is an existing assignment running.")
  16. reassignPartitionsCommand.maybeLimit(throttle)
  17. }
  18. else {
  19. printCurrentAssignment(zkUtils, partitionsToBeReassigned)
  20. if (throttle >= 0)
  21. println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))
  22. if (reassignPartitionsCommand.reassignPartitions(throttle)) {
  23. println("Successfully started reassignment of partitions.")
  24. } else
  25. println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
  26. }
  27. }
  28. def reassignPartitions(throttle: Long = -1): Boolean = {
  29. maybeThrottle(throttle)
  30. try {
  31. val validPartitions = proposedAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }
  32. if (validPartitions.isEmpty) false
  33. else {
  34. val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions)
  35. zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
  36. true
  37. }
  38. } catch {
  39. case ze: ZkNodeExistsException =>
  40. val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
  41. throw new AdminCommandFailedException("Partition reassignment currently in " +
  42. "progress for %s. Aborting operation".format(partitionsBeingReassigned))
  43. case e: Throwable => error("Admin command failed", e); false
  44. }
  45. }

由上可知

  • executeAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions)方法获取ReassignPartitionsCommandOptions参数对应的执行计划文件内容
  • executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Long = -1)方法进行校验、判断限流值(以防分区重分配/迁移网络影响到leader,导致生产消费异常)
  • reassignPartitions(throttle: Long = -1)方法中的zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)在zk的/admin/reassign_partitions创建一个执行计划的节点,至此,execute命令执行结束,zk节点创建完毕,等待监听器监听、回调具体的执行逻辑

3、Controller监听动作

主controller会回调 KafkaController.onControllerFailover 这个方法, 这个方法注册了监听 “/admin/reassign_partitions” 目录的事件,如下代码

  1. /**
  2. * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
  3. * It does the following things on the become-controller state change -
  4. * 1. Register controller epoch changed listener
  5. * 2. Increments the controller epoch
  6. * 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and
  7. * leaders for all existing partitions.
  8. * 4. Starts the controller's channel manager
  9. * 5. Starts the replica state machine
  10. * 6. Starts the partition state machine
  11. * If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller.
  12. * This ensures another controller election will be triggered and there will always be an actively serving controller
  13. */
  14. def onControllerFailover() {
  15. if(isRunning) {
  16. info("Broker %d starting become controller state transition".format(config.brokerId))
  17. //read controller epoch from zk
  18. readControllerEpochFromZookeeper()
  19. // increment the controller epoch
  20. incrementControllerEpoch(zkUtils.zkClient)
  21. // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
  22. registerReassignedPartitionsListener()
  23. registerIsrChangeNotificationListener()
  24. registerPreferredReplicaElectionListener()
  25. partitionStateMachine.registerListeners()
  26. replicaStateMachine.registerListeners()
  27. initializeControllerContext()
  28. replicaStateMachine.startup()
  29. partitionStateMachine.startup()
  30. // register the partition change listeners for all existing topics on failover
  31. controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
  32. info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
  33. maybeTriggerPartitionReassignment()
  34. maybeTriggerPreferredReplicaElection()
  35. /* send partition leadership info to all live brokers */
  36. sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
  37. if (config.autoLeaderRebalanceEnable) {
  38. info("starting the partition rebalance scheduler")
  39. autoRebalanceScheduler.startup()
  40. autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
  41. 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
  42. }
  43. deleteTopicManager.start()
  44. }
  45. else
  46. info("Controller has been shut down, aborting startup/failover")
  47. }

4、Controller处理动作

源码分析Kafka分区重分配/迁移(kafka-reassign-partitions.sh) - 图2
处理 事件是通过 PartitionsReassignedListener 的handleDataChange来处理的。
实际上最终处理的是通过 onPartitionReassignment的方法

  1. /**
  2. * Starts the partition reassignment process unless -
  3. * 1. Partition previously existed
  4. * 2. New replicas are the same as existing replicas
  5. * 3. Any replica in the new set of replicas are dead
  6. * If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned
  7. * partitions.
  8. */
  9. class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
  10. this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
  11. val zkUtils = controller.controllerContext.zkUtils
  12. val controllerContext = controller.controllerContext
  13. /**
  14. * Invoked when some partitions are reassigned by the admin command
  15. *
  16. * @throws Exception On any error.
  17. */
  18. @throws(classOf[Exception])
  19. def handleDataChange(dataPath: String, data: Object) {
  20. debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
  21. .format(dataPath, data))
  22. val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
  23. val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
  24. partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
  25. }
  26. partitionsToBeReassigned.foreach { partitionToBeReassigned =>
  27. inLock(controllerContext.controllerLock) {
  28. if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
  29. error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
  30. .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
  31. controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
  32. } else {
  33. val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
  34. //initiateReassignReplicasForTopicPartition会调用onPartitionReassignment方法按照分配/迁移计划内容进行具体的数据迁移
  35. controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
  36. }
  37. }
  38. }
  39. }
  40. /**
  41. * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
  42. *
  43. * @throws Exception On any error.
  44. */
  45. @throws(classOf[Exception])
  46. def handleDataDeleted(dataPath: String) {
  47. }
  48. }

controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)会调用onPartitionReassignment方法按照分配/迁移计划内容进行具体的数据迁移

  1. def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
  2. reassignedPartitionContext: ReassignedPartitionsContext) {
  3. val newReplicas = reassignedPartitionContext.newReplicas
  4. val topic = topicAndPartition.topic
  5. val partition = topicAndPartition.partition
  6. val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
  7. try {
  8. val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
  9. assignedReplicasOpt match {
  10. case Some(assignedReplicas) =>
  11. if(assignedReplicas == newReplicas) {
  12. throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
  13. " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
  14. } else {
  15. info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
  16. // first register ISR change listener
  17. watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
  18. controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
  19. // mark topic ineligible for deletion for the partitions being reassigned
  20. deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
  21. //按照分配/迁移计划内容进行具体的数据迁移
  22. onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
  23. }
  24. case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
  25. .format(topicAndPartition))
  26. }
  27. } catch {
  28. case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
  29. // remove the partition from the admin path to unblock the admin client
  30. removePartitionFromReassignedPartitions(topicAndPartition)
  31. }
  32. }

5、onPartitionReassignment核心方法

  1. /**
  2. * This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition
  3. * reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener.
  4. * Reassigning replicas for a partition goes through a few steps listed in the code.
  5. * RAR = Reassigned replicas
  6. * OAR = Original list of replicas for partition
  7. * AR = current assigned replicas
  8. *
  9. * 1. Update AR in ZK with OAR + RAR.
  10. * 2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). We do this by forcing an update
  11. * of the leader epoch in zookeeper.
  12. * 3. Start new replicas RAR - OAR by moving replicas in RAR - OAR to NewReplica state.
  13. * 4. Wait until all replicas in RAR are in sync with the leader.
  14. * 5 Move all replicas in RAR to OnlineReplica state.
  15. * 6. Set AR to RAR in memory.
  16. * 7. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr
  17. * will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent.
  18. * In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader from adding any replica in
  19. * RAR - OAR back in the isr.
  20. * 8. Move all replicas in OAR - RAR to OfflineReplica state. As part of OfflineReplica state change, we shrink the
  21. * isr to remove OAR - RAR in zookeeper and send a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr.
  22. * After that, we send a StopReplica (delete = false) to the replicas in OAR - RAR.
  23. * 9. Move all replicas in OAR - RAR to NonExistentReplica state. This will send a StopReplica (delete = true) to
  24. * the replicas in OAR - RAR to physically delete the replicas on disk.
  25. * 10. Update AR in ZK with RAR.
  26. * 11. Update the /admin/reassign_partitions path in ZK to remove this partition.
  27. * 12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
  28. *
  29. * For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK
  30. * may go through the following transition.
  31. * AR leader/isr
  32. * {1,2,3} 1/{1,2,3} (initial state)
  33. * {1,2,3,4,5,6} 1/{1,2,3} (step 2)
  34. * {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4)
  35. * {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)
  36. * {1,2,3,4,5,6} 4/{4,5,6} (step 8)
  37. * {4,5,6} 4/{4,5,6} (step 10)
  38. *
  39. * Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently.
  40. * This way, if the controller crashes before that step, we can still recover.
  41. */
  42. def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
  43. //reassignedPartitionContext对象为执行计划中的每一个分区详情。reassignedReplicas为每个分区计划重分配的副本对应的brokerId集合.
  44. val reassignedReplicas = reassignedPartitionContext.newReplicas
  45. /*
  46. 根据要进行重新的副本分配的topic-partition,从zk中对应的topic/partition的state中找到对应的leader
  47. 的切换顺序集合(isr)的集合,如果重新分配的副本集合在isr的集合中都包含时,areReplicasInIsr函数的返回值为true,
  48. 否则表示新的副本集合中有副本不在isr中包含返回值为false.
  49. 这里如果是true时,执行的是对副本的具体分配,
  50. 如果是false的情况时,会更新每个partition的state的内容为新的副本信息,并设置新添加的副本的状态为NewReplica的状态,
  51. */
  52. //{1,2,3} 1/{1,2,3} (initial state)
  53. if (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {
  54. info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
  55. "reassigned not yet caught up with the leader")
  56. //这里先得到新分配的副本与已经存在的副本的差集,也就是新分配的副本在现在的副本中不包含的集合
  57. val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
  58. //这里得到新分配的副本与已经存在的副本集合的全集.
  59. val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
  60. //1. Update AR in ZK with OAR + RAR.
  61. //这里把新分配的replicas的副本集合与已经存在的副本集合进行合并后,得到一个新的副本集合,
  62. //把这个集合更新到partitionReplicaAssignment集合中对应的partition上
  63. //把这个topic对应的所有的partition的副本集合当成内容分配信息存储到zk的/brokers/topics/topicName节点中.
  64. // 由topic修改的监听程序来处理对这个zk节点的变化后的流程进行处理.
  65. updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
  66. //{1,2,3,4,5,6} 1/{1,2,3} (step 2)
  67. //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
  68. //(1)这里根据对应的partition的新的副本信息(原来分配的副本加上新分配的副本),生成向每一个副本所在的broker
  69. // 进行leaderIsr状态的请求(这个请求主要是当前的partition对应的副本集合),存储到leaderAndIsrRequestMap集合中,
  70. // 在提交请求时从这个集合中读取数据,这个集合中是存储的LeaderAndIsrRequest请求,
  71. //(2)这里根据对应的partition,这里在updateMetadataRequestMap集合中存储向所有的broker发送partition的
  72. // metadata修改的UpdateMetadataRequest请求,这个请求中,如果topic已经被删除,发送的请求的leader的id是-2,
  73. // 否则发送的请求是正常的请求,只是副本集合发生了变化.
  74. //(3)向对应的broker发送上面生成的两个请求.这个请求发送的前提是broker已经被启动.
  75. updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
  76. newAndOldReplicas.toSeq)
  77. //3. replicas in RAR - OAR -> NewReplica
  78. //通过对所有的新添加的副本进行迭代,通过replicaStateMachine实例更新副本状态为NewReplica
  79. startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
  80. info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
  81. "reassigned to catch up with the leader")
  82. } else {
  83. //这种情况下,表示当前重新分配的副本在isr的集合中都存在,
  84. //先得到老的已经分配的副本
  85. //4. Wait until all replicas in RAR are in sync with the leader.
  86. val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
  87. //并设置这些老的副本的状态为OnlineReplica.
  88. //5. replicas in RAR -> OnlineReplica
  89. reassignedReplicas.foreach { replica =>
  90. replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
  91. replica)), OnlineReplica)
  92. }
  93. //6. Set AR to RAR in memory.
  94. //7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and
  95. // a new AR (using RAR) and same isr to every broker in RAR
  96. //如果新分配的副本集合中不包含当前的partition的leader节点时,通过partitionStateMachine实例
  97. // 更新partition的状态为OnlinePartition.同时通过ReassignedPartitionLeaderSelector实例重新选择leader.
  98. //(1)如果新分配的副本集合中不包含当前的partition的leader节点时,通过partitionStateMachine实例更新partition的状态为OnlinePartition.
  99. // 同时通过ReassignedPartitionLeaderSelector实例重新选择leader.
  100. //(2)如果当前活着的broker节点中包含有Partition的leader节点时,向所有的broker节点发送更新leaderAndIsr与metadata的请求.
  101. //(3)如果当前的partition的leader节点已经下线,通过partitionStateMachine实例更新partition的状态为OnlinePartition.
  102. // 同时通过ReassignedPartitionLeaderSelector实例重新选择leader.
  103. moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
  104. //(1)更新replicaStateMachine中老副本的状态为OfflineReplica状态,并向这个副本对应的节点发起StopReplicaRequest请求.
  105. // 更新zk中topic,paritions,state节点下isr的信息多原来的副本节点中移出这个节点.并向余下的isr节点发起LeaderAndIsrRequest的选择请求.
  106. //(2)更新replicaStateMachine中对应此partition的此副本状态为ReplicaDeletionStarted状态,
  107. // 并向需要执行副本删除的节点发起StopReplicaRequest请求,此时请求的deletePartition的属性值为true.后面的步骤在这里执行完成后才会执行.
  108. //(3)更新此partition对应此副本的状态为ReplicaDeletionSuccessful状态.
  109. //(4)从replicaStateMachine的replicaState集合中移出此partition对应被下线的副本的副本状态.
  110. // 并更新partitionReplicaAssignment集合的副本集合,从这个集合中也移出此节点.
  111. //8. replicas in OAR - RAR -> Offline (force those replicas out of isr)
  112. //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
  113. stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
  114. //更新partitionReplicaAssignment集合中对应此partition的副本集合,并更新这个topic对应zk的节点信息,
  115. // 主要是所有的partitions的副本集合都会更新一次.由topic修改的监听程序来处理对这个zk节点的变化后的流程进行处理.
  116. //10. Update AR in ZK with RAR.
  117. updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
  118. //取消对partition的isr的修改的监听程序,并从partitionsBeingReassigned集合中移出这个准备重新分配的partition.
  119. //11. Update the /admin/reassign_partitions path in ZK to remove this partition.
  120. removePartitionFromReassignedPartitions(topicAndPartition)
  121. info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
  122. controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
  123. //向所有的broker节点发送此partition的metadata修改的UpdateMetadataRequest请求.
  124. //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
  125. sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
  126. //如果topic是已经被删除的topic,从准备删除的topic集合中移出这个topic
  127. // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
  128. deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
  129. }
  130. }