概述

当集群没有可用的 Controller 时,每个 Broker 都可以参与 Controller 竞选,但只能有一个 Broker 竞选成功。

实现逻辑

谁能创建 /controller 临时节点成功,谁就成当选成为集群的 Controller。其它 Broker 会监听 /controller 节点,一旦发现该节节点创建、删除和数据变更等情况,就会注册相关 Controller 事件。

  1. /**
  2. * 这个处理器监听Zookeeper节点「/controller」的变化:创建、删除、修改都会被监听到,
  3. * 并配合具体的变化向 {@link ControllerEventManager} 事件管理器中放入对应的Controller事件。
  4. * 事件驱动模型
  5. *
  6. * @param eventManager
  7. */
  8. class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
  9. /**
  10. * Controller所关闭的Zookeeper节点:/controller
  11. */
  12. override val path: String = ControllerZNode.path
  13. /**
  14. * 当节点创建时,将 {@link ControllerChange} 事件提交给
  15. * 事件管理器 {@link ControllerEventManager} 处理
  16. */
  17. override def handleCreation(): Unit = eventManager.put(ControllerChange)
  18. /**
  19. * 当节点被删除时,将 {@link Reelect} 事件提交给事件管理器处理。
  20. * 节点被删除意味着接下来需要进行Controller选举
  21. */
  22. override def handleDeletion(): Unit = eventManager.put(Reelect)
  23. /**
  24. * 当节点被修改时,将 {@link ControllerChange} 事件交给事件管理器处理
  25. */
  26. override def handleDataChange(): Unit = eventManager.put(ControllerChange)
  27. }

本章节主要关注 Reelct 选举事件。

Controller 选举

Controller 选举流程图如下所示:
Controller 选举流程.png

  1. /**
  2. * 执行Controller选举操作
  3. */
  4. private def processReelect(): Unit = {
  5. // #1 由于已经触发新的Controller选举,
  6. // 旧的Controller就需要退位了
  7. maybeResign()
  8. // #2 尝试竞选Controller
  9. elect()
  10. }

方法 #1 移除 Zookeeper 注销相关 ChangeHandler 处理器、关闭调度器、关闭副本、分区状态机、重置 ControllerContext 的注册节点等。
方法 #2 是 controller 选举的核心方法:

  1. /**
  2. * Controller选举核心方法:
  3. * 1.首先确认「/controller」的broker id 是否>=0,
  4. * 如果是,则说明Controller已存在,如果为-1,则需要进行接下来的选举操作(注意,并非100%竞选成功)
  5. * 2.再尝试获取两个版本号,分别是/controller_epoch的值和这个节点对应的ZK的version的值,称为zkVersion
  6. * 3.尝试创建/controller临时节点,并将版本号+1
  7. * 4.执行成功当选controller的后续逻辑(failover:故障转移)
  8. */
  9. private def elect(): Unit = {
  10. // #1 从Zookeeper获取Controller Id,即所在的Broker ID(/controller)。
  11. // 如果没有则为-1,意味着当前此刻集群没有选出确切的controller
  12. activeControllerId = zkClient.getControllerId.getOrElse(-1)
  13. // #2 id 不等于-1,表示集群已经选出Controller,直接返回
  14. if (activeControllerId != -1) {
  15. debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
  16. return
  17. }
  18. try {
  19. // #3 尝试创建/controller临时节点,并将版本号+1
  20. val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
  21. controllerContext.epoch = epoch
  22. controllerContext.epochZkVersion = epochZkVersion
  23. activeControllerId = config.brokerId
  24. info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
  25. s"and epoch zk version is now ${controllerContext.epochZkVersion}")
  26. // #4 执行成功当选controller的后续逻辑(failover:故障转移)
  27. onControllerFailover()
  28. } catch {
  29. case e: ControllerMovedException =>
  30. maybeResign()
  31. if (activeControllerId != -1)
  32. debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
  33. else
  34. warn("A controller has been elected but just resigned, this will result in another round of election", e)
  35. case t: Throwable =>
  36. error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +
  37. s"Trigger controller movement immediately", t)
  38. triggerControllerMove()
  39. }
  40. }
  1. /**
  2. * 创建/controller临时节点,并更新相关版本号。
  3. * 1. 从ZK中获取/controller_epoch详情,包括controller版本号和节点版本号(zkVersion),这存在以下两种情况
  4. * ① 如果/controller_epoch节点存在,直接返回
  5. * ② 如果节点不存在,则尝试创建,这一步又会出现以下情况:
  6. * 1.成功创建,则正常返回获取到的数据
  7. * 2.创建失败,说明有其它Broker抢先创建,那就就再次发送获取数据的请求并返回
  8. * 2. 创建/controller
  9. * ① 版本号+1
  10. *
  11. *
  12. * Registers a given broker in zookeeper as the controller and increments controller epoch.
  13. * 将Broker ID向Zookeeper注册为Controller,且更新controller epoch
  14. * ① 获取/controller_epoch节点数据,得到(Controller版本号,ZNode版本号)。如果节点不存在,则尝试创建新的,
  15. * 如果创建新的过程中发现已经被别的Broker抢先创建,那么再次向Zookeeper发送请求获取最新的/controller_epoch节点的数据
  16. * ②
  17. *
  18. *
  19. *
  20. *
  21. * @param controllerId Broker ID
  22. * @return Tuple变量(已更新的Controller版本号, zkVersion版本号)
  23. * @throws ControllerMovedException 创建 /controller 节点失败或增加controller epoch失败,那么抛出异常
  24. */
  25. def registerControllerAndIncrementControllerEpoch(controllerId: Int): (Int, Int) = {
  26. val timestamp = time.milliseconds()
  27. // #1 从ZK中获取节点/controller_epoch的值和zkVersion版本号(一定可以得到确切值)。
  28. // 如果节点不存在则创建并初始化
  29. val (curEpoch, curEpochZkVersion) = getControllerEpoch
  30. .map(e => (e._1, e._2.getVersion))
  31. .getOrElse(maybeCreateControllerEpochZNode())
  32. // #2 尝试创建「/controller」节点并更新「/controller_epoch」的值,这两步是原子操作
  33. val newControllerEpoch = curEpoch + 1 // 节点「/controller_epoch」的值表示controller版本号
  34. val expectedControllerEpochZkVersion = curEpochZkVersion
  35. debug(s"Try to create ${ControllerZNode.path} and increment controller epoch to $newControllerEpoch " +
  36. s"with expected controller epoch zkVersion $expectedControllerEpochZkVersion")
  37. /**
  38. * 之前尝试创建/controller,但是遇到NODEEXISTS或BADVERSION错误,可能/controller已经被创建了
  39. *
  40. * 检查Controller和版本号
  41. * @return
  42. */
  43. def checkControllerAndEpoch(): (Int, Int) = {
  44. // #1 从ZK中获取Controller ID,如果不存在,则抛出异常
  45. val curControllerId = getControllerId.getOrElse(throw new ControllerMovedException(
  46. s"The ephemeral node at ${ControllerZNode.path} went away while checking whether the controller election succeeds. " +
  47. s"Aborting controller startup procedure"))
  48. // #2 判断controller id和controller epoch的值是否和本地broker缓存相等,
  49. // 只要发现其中一个不相等,说明出现数据不一致,抛出异常
  50. if (controllerId == curControllerId) {
  51. // #2-1 获取「/controller_epoch」节点的值
  52. val (epoch, stat) = getControllerEpoch.getOrElse(
  53. throw new IllegalStateException(s"${ControllerEpochZNode.path} existed before but goes away while trying to read it"))
  54. // #2-2 如果和newControllerEpoch相等,说明本轮选举事务一致,
  55. // 否则是其它轮次的Controller选举向/controller_epoch写入一个更大的值,那么低于该值的其它轮次会抛出异常
  56. if (epoch == newControllerEpoch)
  57. return (newControllerEpoch, stat.getVersion)
  58. }
  59. // 抛出异常
  60. throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
  61. }

onControllerFailover

Broker 竞选 controller 成功,接下来需要做以下事情:

  1. 向 Zookeeper 注册各类的监听处理器,包括:
    1. BrokerChangeHandler
    2. TopicChangeHandler
    3. TopicDeletionHandler
    4. LogDirEventNotificationHandler
    5. IsrChangeNotificationHandler
  2. 删除日志路径变更事件和 ISR 副本变更通知事件
  3. 启动 ControllerChannelManager,用来管理和其它 Broker 的网络连接
  4. 启动副本状态机 ReplicaStateMachine,用来管理集群所有副本状态
  5. 启动分区状态机 PartitionStateMachine,用来管理集群所有分区状态
  6. 初始化分区重分区管理器
  7. 尝试为给定的分区选出一个 Leader 副本
  8. 启动 Controller 调度器
  9. 定时清理过期 token
  1. /**
  2. * 成功当选Controller后所执行的方法逻辑,主要包含:
  3. * ① 注册各类ZK监听器
  4. * ② 删除日志路径变更和ISR副本变更通知事件
  5. * ③ 启动Controller通道管理器(即 {@link ControllerChannelManager})
  6. * ③ 启动副本状态机{@link ReplicaStateMachine}和分区状态机 {@link PartitionStateMachine}
  7. *
  8. * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
  9. * It does the following things on the become-controller state change -
  10. *
  11. * 如果这个方法出现任何异常,那么会放弃controller身份。这确保整个集群有可用的controller对象。
  12. * 1. Initializes the controller's context object that holds cache objects for current topics, live brokers and
  13. * leaders for all existing partitions.
  14. * 2. Starts the controller's channel manager
  15. * 3. Starts the replica state machine
  16. * 4. Starts the partition state machine
  17. * If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller.
  18. * This ensures another controller election will be triggered and there will always be an actively serving controller
  19. */
  20. private def onControllerFailover(): Unit = {
  21. maybeSetupFeatureVersioning()
  22. info("Registering handlers")
  23. // #1 将以下Handler添加到ZkClient本地缓存中,
  24. // 后续在初始化ControllerContext根据这个集合判断是否需要对节点注册Zookeeper监听器
  25. val childChangeHandlers = Seq(brokerChangeHandler,
  26. topicChangeHandler,
  27. topicDeletionHandler,
  28. logDirEventNotificationHandler,
  29. isrChangeNotificationHandler)
  30. childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
  31. val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
  32. nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
  33. // #2 删除ZK中的日志路径变更通知事件
  34. info("Deleting log dir event notifications")
  35. zkClient.deleteLogDirEventNotifications(controllerContext.epochZkVersion)
  36. // #3 删除ZK中ISR副本变更通知事件
  37. info("Deleting isr change notifications")
  38. zkClient.deleteIsrChangeNotifications(controllerContext.epochZkVersion)
  39. // #4 初始化ControllerContext上下文,里面包含Controller关于集群的一切元数据
  40. // 注册Broker监听器,一旦Broker发生变更,当前Controller可以第一时间感知并执行handler处理逻辑
  41. initializeControllerContext()
  42. // #5 从Zookeeper获取正在删除的主题列表
  43. info("Fetching topic deletions in progress")
  44. val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
  45. // #6 初始化主题删除管理器
  46. info("Initializing topic deletion manager")
  47. topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
  48. // #7 向集群广播「UpdateMetadataRequest」为什么要做这一步呢?目的是接下来副本状态机和分区状态机启动做铺垫,这两个状态机会向
  49. // 有关Broker发送「LeaderAndIsrRequests」,这里Broker就因为有集群元数据就可以处理这个请求了
  50. info("Sending update metadata request")
  51. sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
  52. // #8 启动副本状态机
  53. replicaStateMachine.startup()
  54. // #9 启动分区状态机
  55. partitionStateMachine.startup()
  56. // #10 初始化分区重分配管理器
  57. info(s"Ready to serve as the new controller with epoch $epoch")
  58. initializePartitionReassignments()
  59. // #11 恢复主题删除操作
  60. topicDeletionManager.tryTopicDeletion()
  61. // #12
  62. val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
  63. // #13 尝试为给定的分区选出一个副本Leader
  64. onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered)
  65. // #14 启动controller调度器
  66. info("Starting the controller scheduler")
  67. kafkaScheduler.startup()
  68. if (config.autoLeaderRebalanceEnable) {
  69. // #15 每5秒钟自动执行Leader重平衡
  70. scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
  71. }
  72. // #16 定时清理过期的token
  73. if (config.tokenAuthEnabled) {
  74. info("starting the token expiry check scheduler")
  75. tokenCleanScheduler.startup()
  76. tokenCleanScheduler.schedule(name = "delete-expired-tokens",
  77. fun = () => tokenManager.expireTokens(),
  78. period = config.delegationTokenExpiryCheckIntervalMs,
  79. unit = TimeUnit.MILLISECONDS)
  80. }
  81. }

Broker启动

这个事件也比较重要,需要关注以下几点:

  1. Startup Event 是在什么时候向事件管理器注册的?
  2. 如何处理 Startup Event?

首先,Startup 是在启动 KafkaController 时向事件管理器注册,源码如下:

  1. // kafka.server.KafkaServer#startup
  2. // 首先是入口方法,即KafkaServer
  3. kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
  4. kafkaController.startup()
  1. // kafka.controller.Startup$
  2. def startup() = {
  3. // #1 注册用于session过期后触发重新选举的handler
  4. zkClient.registerStateChangeHandler(new StateChangeHandler {
  5. override val name: String = StateChangeHandlers.ControllerHandler
  6. /**
  7. * 在重新初始化session后做的操作:添加「RegisterBrokerAndReelect」事件
  8. */
  9. override def afterInitializingSession(): Unit = {
  10. eventManager.put(RegisterBrokerAndReelect)
  11. }
  12. /**
  13. * 在重新初始化session前做的操作:添加「Expire」事件
  14. */
  15. override def beforeInitializingSession(): Unit = {
  16. val queuedEvent = eventManager.clearAndPut(Expire)
  17. // Block initialization of the new session until the expiration event is being handled,
  18. // which ensures that all pending events have been processed before creating the new session
  19. // 阻塞等待时间被处理结束,session过期触发重新选举,必须等待选举这个时间完成Controller才能正常工作
  20. queuedEvent.awaitProcessing()
  21. }
  22. })
  23. // #2 将启动事件注册到事件管理器中
  24. eventManager.put(Startup)
  25. // #3 启动事件管理器
  26. eventManager.start()
  27. }

步骤 #2 就是将 Startup 事件注册到事件管理器中,随后启动Controller事件处理线程。
所有的 Controller 事件都是由 KafkaController 完成的,这个类代码量有 3275 行,着实令人心惊肉跳。但是没有关系,核心入口其实就是 kafka.controller.KafkaController#process 方法,从这里出发,就可以逐一窥探 Controller 的世界。
通过一系列的 CASE 判断,最终来到了 processStartup() 方法:

  1. /**
  2. * Controller启动事件会触发此方法执行
  3. * ① 注册 {@link ControllerChangeHandler} 处理器
  4. * ② 执行选举
  5. */
  6. private def processStartup(): Unit = {
  7. // #1 向ZookeeperClient注册Controller变更处理器,
  8. zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
  9. // #2 执行Controller选举
  10. elect()
  11. }

BrokerChange事件

当集群中 Broker 上线/下线时,会产生 BrokerChange 事件。我们知道,一个 Broker 上存在多个分区的 Leader 副本,也存在 Follower 副本。当 Broker 下线时,会有:

  • 触发 Leader 重分配。需要在 ISR 集合中选出一个新的 Follower 作为 Leader。
  • ISR 集合扩缩容。Follower 副本下线,可能会导致部分分区的 ISR 扩缩容发生。
  1. /**
  2. * 处理brokers数量变更,比如新增、删除broker
  3. */
  4. private def processBrokerChange(): Unit = {
  5. if (!isActive) return
  6. // #1 从ZK中获取所有子节点/brokers/ids/<broker id>的详细数据
  7. // 返回现在存活的Broker列表
  8. val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
  9. val curBrokerIdAndEpochs = curBrokerAndEpochs map { case (broker, epoch) => (broker.id, epoch) }
  10. val curBrokerIds = curBrokerIdAndEpochs.keySet
  11. val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
  12. // #2 本轮「新增」broker列表
  13. val newBrokerIds = curBrokerIds.diff(liveOrShuttingDownBrokerIds)
  14. // #3 本轮「离线」broker列表
  15. val deadBrokerIds = liveOrShuttingDownBrokerIds.diff(curBrokerIds)
  16. // #4 本轮「重启」的broker列表:通过判断Epoch值是否与controller缓存值相等,从而得出broker是否发生过重启
  17. val bouncedBrokerIds = (curBrokerIds & liveOrShuttingDownBrokerIds)
  18. .filter(brokerId => curBrokerIdAndEpochs(brokerId) > controllerContext.liveBrokerIdAndEpochs(brokerId))
  19. // #5 准备数据
  20. val newBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => newBrokerIds.contains(broker.id) }
  21. val bouncedBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => bouncedBrokerIds.contains(broker.id) }
  22. val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
  23. val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
  24. //val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
  25. val bouncedBrokerIdsSorted = bouncedBrokerIds.toSeq.sorted
  26. // #6 为新增的broker准备网络相关的组件,这些是由「ControllerChannelManager」管理的
  27. newBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
  28. // #7 broker发生重启,需要先移除controller缓存数据,再新增该broker
  29. // 因为重启过的Broker可能修改了配置等信息,所以需要重新加入缓存
  30. bouncedBrokerIds.foreach(controllerChannelManager.removeBroker)
  31. bouncedBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
  32. // #8 移除「离线」broker 相关缓存
  33. deadBrokerIds.foreach(controllerChannelManager.removeBroker)
  34. // #9 处理新增的Broker
  35. if (newBrokerIds.nonEmpty) {
  36. // #9-1 区分兼容的和不兼容的brokers
  37. val (newCompatibleBrokerAndEpochs, newIncompatibleBrokerAndEpochs) =
  38. partitionOnFeatureCompatibility(newBrokerAndEpochs)
  39. if (!newIncompatibleBrokerAndEpochs.isEmpty) {
  40. warn("Ignoring registration of new brokers due to incompatibilities with finalized features: " +
  41. newIncompatibleBrokerAndEpochs.map { case (broker, _) => broker.id }.toSeq.sorted.mkString(","))
  42. }
  43. // #9-2 只有新的可兼容的broker才能被添加进缓存中
  44. controllerContext.addLiveBrokers(newCompatibleBrokerAndEpochs)
  45. // #9-3 为新的broker启动相关组件(副本状态机、分区状态机、元数据更新操作)
  46. onBrokerStartup(newBrokerIdsSorted)
  47. }
  48. // #10 处理重启的Brokers
  49. if (bouncedBrokerIds.nonEmpty) {
  50. // #10-1 移除缓存
  51. controllerContext.removeLiveBrokers(bouncedBrokerIds)
  52. onBrokerFailure(bouncedBrokerIdsSorted)
  53. val (bouncedCompatibleBrokerAndEpochs, bouncedIncompatibleBrokerAndEpochs) =
  54. partitionOnFeatureCompatibility(bouncedBrokerAndEpochs)
  55. if (!bouncedIncompatibleBrokerAndEpochs.isEmpty) {
  56. warn("Ignoring registration of bounced brokers due to incompatibilities with finalized features: " +
  57. bouncedIncompatibleBrokerAndEpochs.map { case (broker, _) => broker.id }.toSeq.sorted.mkString(","))
  58. }
  59. // #10-2 重新加入
  60. controllerContext.addLiveBrokers(bouncedCompatibleBrokerAndEpochs)
  61. onBrokerStartup(bouncedBrokerIdsSorted)
  62. }
  63. // #11 处理「离线」的brokers
  64. if (deadBrokerIds.nonEmpty) {
  65. // 移除缓存、调用onBrokerFailure
  66. controllerContext.removeLiveBrokers(deadBrokerIds)
  67. onBrokerFailure(deadBrokerIdsSorted)
  68. }
  69. //
  70. //if (newBrokerIds.nonEmpty || deadBrokerIds.nonEmpty || bouncedBrokerIds.nonEmpty) {
  71. // info(s"Updated broker epochs cache: ${controllerContext.liveBrokerIdAndEpochs}")
  72. //}
  73. }

关键方法:

  • onBrokerStartup(newBrokerIdsSorted)
  • onBrokerFailure(deadBrokerIdsSorted)

新增 Broker 逻辑:

  1. 向集群广播有一个新的 Broker 加入集群了,请大家同步该 Broker 的元数据。
  2. 向 Broker 发送 UpdateMetadata 请求,并附带集群所有分区信息。
  3. 将新增的 Broker 上所有副本设置为 Online 状态。
  4. 触发分区状态机修改相关分区状态。
  5. 重启之前暂停的副本迁移工作。
  6. 重启之前暂停主题删除操作。
  7. 为新增的Broker注册监听器。

    1. /**
    2. *
    3. *
    4. * 此回调是由副本状态机的broker改变监听器调用,入参是新的brokers id 列表。
    5. * 这个方法执行以下步骤:
    6. * ① 向所有存活的或正在关闭的broker发送「updatemetadata」请求
    7. * ② 触发所有新的/离线的分区的 OnlinePartition 状态更改。
    8. * ③ 检查是否重分配的副本分配给任何新启动的brokers。如果是这样的话,它会为每个每个主题/分区执行重分配逻辑。
    9. * 请注意,此时我们不需要为所有主题/分区刷新leader/isr缓存,原因有两点:
    10. * ① 当分区状态机触发在线状态变更时,将仅刷新当前新分区的leader和ISR或离线(而不是该控制器知道的每个分区)(rather than every partition this controller is aware of)
    11. * ② 即使我们确实刷新了缓存,也不能保证在领导者和 ISR 请求到达每个代理时它仍然有效。 broker通过检查leader epoch以推断请求的合法性。
    12. */
    13. /**
    14. * 首先,我们应该对入参「newBrokers」有一个清楚的认识:它不代表是新创建的Broker,有可能之前处于离线状态,然后经过一段时间重启并重新加入集群中,它也属于newBrokers。
    15. * 所以,为什么一开始需要从「replicasOnOfflineDirs」移除缓存就是因为它们之前存在过,所以这一步是有必要的。
    16. * #2 元数据更新请求「updatemetadata」的发送也是有技巧的,kakfa将broker分为两大类,
    17. * 一类是已存在的,和新增的。前者不需要携带完整的分区状态信息,只需要发个通知告知broker有新的成员加入,
    18. * 而后者需要携带完整信息,这样就可以以最快速度让所有的broker都有完整的元数据信息。
    19. * 「newBrokers」对象可能包含副本,我们需要将这些副本对象变更为「online」以对外提供服务。
    20. * 即Leader副本对外提供读写服务,follower副本自动向Leader副本拉取消息。
    21. *
    22. * @param newBrokers
    23. */
    24. private def onBrokerStartup(newBrokers: Seq[Int]): Unit = {
    25. info(s"New broker startup callback for ${newBrokers.mkString(",")}")
    26. // 首先从replicasOnOfflineDirs缓存中移除数据,因为broker现在处于「可用」状态了
    27. newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
    28. val newBrokersSet = newBrokers.toSet
    29. // #1 获取「已存在」的brokers
    30. val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds.diff(newBrokersSet)
    31. // #2 向已存在的brokers发送「updatemetadata」更新元数据信息请求,以至于让这些broker知道有新成员加入
    32. // 由于没有分区状态的变更,所以不需要在请求中包含任何分区状态
    33. // 这一步目的是让「已存在」的broker感知到有新的broker加入
    34. sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)
    35. // #3 向新增的brokers发送「updatemetadata」更新元数据信息请求,请求携带集群完整的分区状态信息
    36. // 令这些新增的brokers同步集群当前所有分区数据
    37. // 当可控关闭的情况下,当一个新的broker出现时,leader不会被选出。因此,至少在觉的可控关闭情况下,元数据可以最快到达新的broker节点
    38. sendUpdateMetadataRequest(newBrokers, controllerContext.partitionsWithLeaders)
    39. // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
    40. // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
    41. val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
    42. // #4 将新增broker上的所有副本设置为「online」在线状态
    43. replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers.toSeq, OnlineReplica)
    44. // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
    45. // to see if these brokers can become leaders for some/all of those
    46. // #5 分区状态机变更分区状态
    47. partitionStateMachine.triggerOnlinePartitionStateChange()
    48. // check if reassignment of some partitions need to be restarted
    49. // #6 重启之前暂停副本迁移工作
    50. maybeResumeReassignments { (_, assignment) =>
    51. assignment.targetReplicas.exists(newBrokersSet.contains)
    52. }
    53. // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
    54. // on the newly restarted brokers, there is a chance that topic deletion can resume
    55. val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
    56. // #7 重启之前暂停主题删除操作
    57. if (replicasForTopicsToBeDeleted.nonEmpty) {
    58. info(s"Some replicas ${replicasForTopicsToBeDeleted.mkString(",")} for topics scheduled for deletion " +
    59. s"${controllerContext.topicsToBeDeleted.mkString(",")} are on the newly restarted brokers " +
    60. s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics")
    61. topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
    62. }
    63. // #8 为新增Brokers注册「BrokerModificationsHandler」监听器,目的是监听「/brokers/ids/[broker id]」节点数据的变化
    64. registerBrokerModificationsHandler(newBrokers)
    65. }

    处理离线 Broker 逻辑:

  8. 更新 ControllerContext 缓存。

  9. 将副本的状态变更为离线状态。
  10. 取消该节点的监听器

    1. /**
    2. * 当一个broker关闭时,controller需要关注以下事情:
    3. * ① broker管理的副本对象 -- 置为「离线状态」
    4. * ② ZK监听器 -- 移除监听器
    5. * ③ 相关缓存 -- 移除相关缓存数据
    6. *
    7. * @param deadBrokers 已终止运行的Broker ID列表
    8. */
    9. private def onBrokerFailure(deadBrokers: Seq[Int]): Unit = {
    10. info(s"Broker failure callback for ${deadBrokers.mkString(",")}")
    11. // #1「更新Controller元数据」:将给定的Broker从「replicasOnOfflineDirs」缓存中移除
    12. deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
    13. // #2 从「正在关闭中(shuttingDownBrokerIds)」移除deadBrokers
    14. val deadBrokersThatWereShuttingDown =
    15. deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
    16. if (deadBrokersThatWereShuttingDown.nonEmpty)
    17. info(s"Removed ${deadBrokersThatWereShuttingDown.mkString(",")} from list of shutting down brokers.")
    18. // #3 执行副本清理工作
    19. // #3-1 获取死亡brokers中所有的副本对象,由于副本所在的broker已被关闭,所以这些副本的状态需要变更为「离线」状态
    20. val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)
    21. // #3-2 变更副本的状态为「离线」
    22. onReplicasBecomeOffline(allReplicasOnDeadBrokers)
    23. // #4 注销死亡brokers注册的「BrokerModificationsHandler」监听器,
    24. // 这个监听器用来监听「/brokers/ids/[broker id]」节点数据变更的,比如broker重启就会触发此handler相关回调方法
    25. unregisterBrokerModificationsHandler(deadBrokers)
    26. }