概述

在上一节对 Controller 的网络组件、网络模型以及管理的元数据进行讲解,也看了部分重要的源码。本章着重对 Controller 功能进行讲解,包括:

  • Controller 是如何感知集群事件的?
  • Controller 是如何处理集群事件的?
  • Controller 故障转移

    Controller 如何感知事件

    这个问题回答起来十分简单:通过 Zookeeper 的 watch 机制感知集群元数据的变化。假设现在集群已经有一个稳定的 Controller,此刻集群新增了一个 Broker 节点,有趣的事情发生了:
  1. 新增的 Broker 节点会在 Zookeeper 的 /brokers/ids/ 节点下创建一个临时子节点。
  2. Zookeeper 通知 Controller:有人创建了一个临时节点 /brokers/ids/0,节点数据是 {"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.217.128:9092"],"jmx_port":-1,"port":9092,"host":"192.168.217.128","version":5,"timestamp":"1623963938447"}
  3. Controller 将 Zookeeper 事件转换为 Controller 内部事件 BrokerChange,然后再交给 KakfaChannel 完成处理逻辑。

那么问题又来了,到底什么时候向 Zookeeper 注册相关数据节点的 watch 呢? 这一部分是和 Controller 选举相关。当竞选 Controller 成功后,就会执行 Controller 上任逻辑,其中就包括向 Zookeeper 注册数据节点的 watch。
当 Controller 收到来自 Zookeeper 的 watch 事件后,需要转换为 Controller 的内部事件,转换逻辑在 ZookeeperClient#process(WatchedEvnet) 方法中。

组件

好了,在说完 Controller 为会么会有这么强大的能力后,现在我们熟悉与 Controller 相关的组件吧。和 Controller 有关的源码位于文件夹 kafka.controller 下,如下图所示:
Controller源码目录.png
文件看起来不多,但是 Scala 语言在一个文件内会定义多个类。ControllerContext 组件上一节已经讲过了,它是存储整个集群无数据的地方。当集群发送变动,这个缓存会第一时间更新。其它 Broker 也是从这个缓存对象中获取所需要的元数据信息。
在早期,其实并没有 Controller 组件的:每个 Broker 和 Zookeeper 直接打交道,包括元数据获取、感知其它 Broker 变化情况等。当集群数量较小时工作很好,但是当数据量大了的时候就出现严重问题:

  1. 惊群效应。一旦某个 Broker 宕机,集群内其它 Broker 都会收到该 Watch 事件。比较网络 I/O 存在较大延迟,因此,也增加集群处于不稳定状态的时间。
  2. Zookeeper 脑裂。这会造成数据严重不一致。

于是,kafka 提出使用 Controller 来做集群元数据管理器,所有的 Broker 和 Controller 打交道:向 Controller 获取元数据。Controller 和 Zookeeper 打交道。这样,Controller 相当于是集群推选出来的一个总代理,借助 Zookeeper 的 watch 机制拥有迅速感知集群的元数据变化,然后主动向相关 Broker 发送 Controller 请求。这一架构减轻了 Zookeeper 压力,降低集群的延迟。但是,当整个集群拥有百万 Topic 还是会存在以下问题:

  1. 首先一点,Zookeeper 不适合大量的写操作,而 Zookeeper 存储整个集群所有的元数据,可想而知 Zookeeper 的写压力很变得非常大。
  2. 其次,Controller 故障转移后需要和 Zookeeper 进行大量的 I/O 交互,拉取集群所有元数据,导致集群不可用时间变长。虽然现在社区已经优化了,但是招架不了这庞大的数量呀。

而基于 Raft 算法就可以解决前面的问题。一是每个 Broker 相当于持有账本,Controller 往上面写了什么,其它人跟着写什么(同步)。当发生 Controller 故障转移后,新的 Controller 在历史记录的基础上添加新的 Controller 记录。这样就可以不用全量同步数据,集群的不可用时间大大缩短。

Controller 单线程事件队列处理模型

下图是对 Broker 离线事件的流程处理示意图。
Broker 离线事件处理流程示意图.png
这一幅图已经将 Controller 单线程处理相关的组件都描述出来了。步骤 4 是将 Zookeeper 事件转换为 Controller 内部事件。步骤 7 是 Controller 事件的核心处理逻辑,比如 Controller 选举、Broker 离线等处理逻辑都由这个类完成。

ZookeeperClient

封装了对 Zookeeper 相关 API,比如创建节点、创建节点并添加 Watch、删除节点和判断节点是否存在等等便捷的方法。它是 Kafka 和 Zookeeper 交流的类。我们只需要知道这一点就好了。这里只贴出重要的变量和方法。

  1. // kafka.zookeeper.ZooKeeperClient
  2. /**
  3. * 节点发生变化时触发的handler集合
  4. * key:Zookeeper对应的节点路径,value:路径变更后调用的处理方法
  5. */
  6. private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
  7. /**
  8. * 子节点发生变化时触发的handler集合
  9. */
  10. private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
  11. // 状态处理器,主要用于Zookeeper session 过期
  12. private val stateChangeHandlers = new ConcurrentHashMap[String, StateChangeHandler]().asScala

ZookeeperClient 有三个变量缓存不同类型的 ChangeHandlers。zNodeChangeHandlers 缓存节点变化的 Handlers,zNodeChildChangeHandlers 缓存子节点变化的 handlers,而 stateChangeHandlers 缓存 Zookeeper 状态处理器。比如和 Zookeeper Session 会话过期就会调用缓存的 stateChangeHandlers 做处理。
当创建 Zookeeper 对象时,需要传入一个 Watcher 实现类:

  1. zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig)

当 Zookeeper 节点发生变更时,就会执行 ZookeeperClientWatcher#process(WatchedEvent) 回调函数:

  1. /**
  2. * Zookeeper监听器,继承Zookeeper提供的 {@link Watcher} 接口
  3. * 实现 {@link Watcher.processor} 方法
  4. */
  5. private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
  6. /**
  7. * 处理Zookeeper的事件,从相应集合中找到合适的Handler处理响应
  8. *
  9. * @param event Zookeeper事件
  10. */
  11. override def process(event: WatchedEvent): Unit = {
  12. debug(s"Received event: $event")
  13. // 核心步骤是根据 路径+EventType匹配Handler
  14. Option(event.getPath) match {
  15. case None =>
  16. // 没有路径,表明是状态改变,比如认证失败,但最常见的是会话过期
  17. val state = event.getState
  18. stateToMeterMap.get(state).foreach(_.mark())
  19. inLock(isConnectedOrExpiredLock) {
  20. isConnectedOrExpiredCondition.signalAll()
  21. }
  22. if (state == KeeperState.AuthFailed) {
  23. // 认证失败
  24. error("Auth failed.")
  25. stateChangeHandlers.values.foreach(_.onAuthFailure())
  26. // If this is during initial startup, we fail fast. Otherwise, schedule retry.
  27. val initialized = inLock(isConnectedOrExpiredLock) {
  28. isFirstConnectionEstablished
  29. }
  30. if (initialized)
  31. scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", RetryBackoffMs)
  32. } else if (state == KeeperState.Expired) {
  33. // 会话过期
  34. scheduleReinitialize("session-expired", "Session expired.", delayMs = 0L)
  35. }
  36. case Some(path) =>
  37. // 根据事件类型从缓存中获取对应的handler处理器并回调相关方法
  38. (event.getType: @unchecked) match {
  39. // 子节点数据变更事件
  40. case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange())
  41. // 节点被创建事件
  42. case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(_.handleCreation())
  43. // 节点被删除事件
  44. case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(_.handleDeletion())
  45. // 节点数据变更事件
  46. case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(_.handleDataChange())
  47. }
  48. }
  49. }
  50. }

Zookeeper 提供的 WatchedEvent 对象包含丰富的监听信息:

  1. // org.apache.zookeeper.WatchedEvent
  2. public class WatchedEvent {
  3. // 连接状态:过期(Expired)、关闭(Closed)等等
  4. final private KeeperState keeperState;
  5. // 事件类型:创建节点(NodeCreated)、删除节点(NodeDeleted)、
  6. // 节点数据发生变化(NodeDataChanged)、子节点发生变化(NodeChildrenChanged)等等
  7. final private EventType eventType;
  8. // 产生事件的路径
  9. private String path;
  10. }

可以根据 KeeperState 判断当前会话状态,遇到认证失败或会话过期则执行相关逻辑,这里就不在细说。根据路径 pathEventType 事件类型从对应的 zNodeChildChangeHandlerszNodeChangeHandlers 集合中获取处理器并调用相关方法。比如子节点 /broker/ids 数据发生删除事件,那么根据 /brokers/ids 从 zNodeChildChangeHandlers 获取 BrokerChangeHandler 处理器,然后根据事件类型 NodeChildrenChanged 调用 BrokerChangeHandler#handleChildChange() 方法。

StateChangeHandler

它是一个接口,定义了和状态相关的处理方法。

  1. /**
  2. * Zookeeper节点变更就会触发相关回调方法
  3. */
  4. trait ZNodeChangeHandler {
  5. /**
  6. * 对应Zookeeper的路径
  7. */
  8. val path: String
  9. /**
  10. * 处理创建节点事件
  11. */
  12. def handleCreation(): Unit = {}
  13. /**
  14. * 处理节点被删除事件
  15. */
  16. def handleDeletion(): Unit = {}
  17. /**
  18. * 处理节点数据改变事件
  19. */
  20. def handleDataChange(): Unit = {}
  21. }

一般会创建一个匿名内部类并实现方法方法,比如下面就是当启用 KafkaController 组件的时候就向 ZookeeperClient 注册一个 StateChangeHandler,不管当前 Broker 有没有当选 Controller,它都需要和 Broker 保持心跳连接,Broker 根据这个心跳连接判断是否离线。如果遇到 Zookeeper 会话过期了,Kafka 会重新创建一个 Zookeeper 客户端并建立新的连接。

  1. // kafka.controller.KafkaController#startup
  2. def startup() = {
  3. // #1 注册用于session过期后触发重新选举的handler
  4. zkClient.registerStateChangeHandler(new StateChangeHandler {
  5. override val name: String = StateChangeHandlers.ControllerHandler
  6. /**
  7. * 在重新初始化session后做的操作:添加「RegisterBrokerAndReelect」事件
  8. */
  9. override def afterInitializingSession(): Unit = {
  10. eventManager.put(RegisterBrokerAndReelect)
  11. }
  12. /**
  13. * 在重新初始化session前做的操作:添加「Expire」事件
  14. */
  15. override def beforeInitializingSession(): Unit = {
  16. val queuedEvent = eventManager.clearAndPut(Expire)
  17. // Block initialization of the new session until the expiration event is being handled,
  18. // which ensures that all pending events have been processed before creating the new session
  19. // 阻塞等待时间被处理结束,session过期触发重新选举,必须等待选举这个时间完成Controller才能正常工作
  20. queuedEvent.awaitProcessing()
  21. }
  22. })
  23. // ...
  24. }

ZNodeChangeHandler

一个接口,这个接口定义了当 Zookeeper 节点发生数据变更而对应的处理方法。比如当创建一个新的节点就会调用 ZNodeChangeHandler#handleCreation(),节点被删除了调用 ZNodeChangeHandler#handleDeletion()。具体定义如下:

  1. /**
  2. * Zookeeper节点变更就会触发相关回调方法
  3. */
  4. trait ZNodeChangeHandler {
  5. /**
  6. * 对应Zookeeper的路径
  7. */
  8. val path: String
  9. /**
  10. * 处理创建节点事件
  11. */
  12. def handleCreation(): Unit = {}
  13. /**
  14. * 处理节点被删除事件
  15. */
  16. def handleDeletion(): Unit = {}
  17. /**
  18. * 处理节点数据改变事件
  19. */
  20. def handleDataChange(): Unit = {}
  21. }

实现类

【kafak】KafkaController 组件(二) - 图3

ZNodeChildChangeHandler

定义相关节点子节点发生事件所对应的回调方法:

  1. /**
  2. * ZK中子节点变更触发器
  3. */
  4. trait ZNodeChildChangeHandler {
  5. val path: String
  6. /**
  7. * 处理子节点变更事件
  8. */
  9. def handleChildChange(): Unit = {}
  10. }

实现类

【kafak】KafkaController 组件(二) - 图4

ControllerEventThread

ControllerEventThread 不断从队列中获取待处理事件,最后委托 KafkaController 完成事件处理。核心方法是 doWor()

  1. /**
  2. * Controller 事件处理线程核心方法:
  3. * ① 从任务队列中取出Controller事件
  4. * ② 判断事件类型是否为ShutdownEventThread,如果是,什么也不做
  5. * ③
  6. */
  7. override def doWork(): Unit = {
  8. // #1 从事件队列中获取待处理的Controller事件(阻塞调用)
  9. val dequeued = pollFromEventQueue()
  10. dequeued.event match {
  11. // #2 如果是关闭线程事件,什么也不用做。关闭线程操作是由外部执行
  12. case ShutdownEventThread => // The shutting down of the thread has been initiated at this point. Ignore this event.
  13. case controllerEvent =>
  14. // #3 获取事件状态
  15. _state = controllerEvent.state
  16. eventQueueTimeHist.update(time.milliseconds() - dequeued.enqueueTimeMs)
  17. try {
  18. def process(): Unit = dequeued.process(processor)
  19. rateAndTimeMetrics.get(state) match {
  20. case Some(timer) => timer.time {
  21. // #4 任务处理
  22. process()
  23. }
  24. case None => process()
  25. }
  26. } catch {
  27. case e: Throwable => error(s"Uncaught error processing event $controllerEvent", e)
  28. }
  29. _state = ControllerState.Idle
  30. }
  31. }

KafkaController

整个 Controller 最最核心的类,封装了 Controller 事件处理逻辑。相关变量源码解析如下:

  1. /**
  2. * Controller 发送请求类型:它会给集群中所有Broker(包括它自己所在的Broker)机器发送网络请求。
  3. * Controller只会向Broker发送三类请求RPC:
  4. * ① LeaderAndIsr:告诉Broker相关主题各个分区的Leader副本位于哪台Broker、ISR位于哪些Broker。
  5. * 这是非常重要的且优先级最高的控制类请求。
  6. * ② StopReplica:告知指定的Broker停止它上面的副本对象,甚至还能删除副本底层的日志数据。
  7. * 主要的使用场景是:①分区副本迁移;②删除主题。这两个场景都涉及到停掉Broker上副本操作。
  8. * ③ UpdateMetadata:会更新Broker上的元数据缓存。集群上的所有元数据变更,都首先发生在Controller端,然后
  9. * 再经由请求广播给集群上的所有Broker。
  10. * Controller会为集群中的每个Broker创建一个对应的RequestSendTWhread线程,这个线程不断从阻塞队列中获取待发送的请求。
  11. *
  12. * Controller单线程事件队列处理模型及基础组件:
  13. * ZookeeperWatcher线程/KafkaReqeustHandler线程/定时任务线程/其它线程->事件队列(一个)->ControllerEventThread线程
  14. *
  15. * @param config 当前Broker配置信息
  16. * @param zkClient Zookeeper客户端
  17. * @param time 时间工具类
  18. * @param metrics 指标监控类
  19. * @param initialBrokerInfo 初始的Broker详情(从ZK中获取)
  20. * @param initialBrokerEpoch 初始的Broker版本号,用来隔离旧的Controller发送的数据,确
  21. * 保数据一致性
  22. * @param tokenManager
  23. * @param brokerFeatures
  24. * @param featureCache
  25. * @param threadNamePrefix
  26. */
  27. class KafkaController(val config: KafkaConfig,
  28. zkClient: KafkaZkClient,
  29. time: Time,
  30. metrics: Metrics,
  31. initialBrokerInfo: BrokerInfo,
  32. initialBrokerEpoch: Long,
  33. tokenManager: DelegationTokenManager,
  34. brokerFeatures: BrokerFeatures,
  35. featureCache: FinalizedFeatureCache,
  36. threadNamePrefix: Option[String] = None)
  37. extends ControllerEventProcessor with Logging with KafkaMetricsGroup {
  38. this.logIdent = s"[Controller id=${config.brokerId}] "
  39. /**
  40. * Broker相关信息
  41. */
  42. @volatile private var brokerInfo = initialBrokerInfo
  43. @volatile private var _brokerEpoch = initialBrokerEpoch
  44. private val isAlterIsrEnabled = config.interBrokerProtocolVersion.isAlterIsrSupported
  45. private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
  46. /**
  47. * Controller 上下文:保存Controller元数据的容器,所有的元数据信息都封装在这个类中
  48. * 特别重要的一个类
  49. */
  50. val controllerContext = new ControllerContext
  51. /**
  52. * 底层更接近网络层,可以理解为用于管理Controller和Broker连接的SocketChannel
  53. * 是一个微小的生产者-消费者模型(也就是有阻塞队列用于解耦)
  54. */
  55. var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics, stateChangeLogger, threadNamePrefix)
  56. // have a separate scheduler for the controller to be able to start and stop independently of the kafka server
  57. /**
  58. * 线程调度器:当前唯一负责定期执行分区重平衡Leader选举
  59. * 有一个单独的调度程序,Controller 能够独立于 kafka 服务器启动和停止
  60. */
  61. private[controller] val kafkaScheduler = new KafkaScheduler(1)
  62. /**
  63. * Controller事件管理器,是事件处理环境所构建的生产者-消费者模型的一环
  64. * 主要是提供相关方法用于添加Controller事件(内部有一个队列存放Controller相关的事件)
  65. * 当其它线程(组件)感知到某个Controller事件发生,就会通过ControllerEventManager的API
  66. * 将事件放入到任务队列中,内部有一个单线程不断轮询并处理事件。
  67. */
  68. private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,
  69. controllerContext.stats.rateAndTimeMetrics)
  70. /**
  71. *
  72. */
  73. private val brokerRequestBatch =
  74. new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger)
  75. /**
  76. * 副本状态机:负责副本状态转换
  77. */
  78. val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,
  79. new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
  80. /**
  81. * 分区状态机:负责分区状态转换
  82. */
  83. val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,
  84. new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
  85. /**
  86. * 主题移除管理器:负责删除主题及日志文件
  87. */
  88. val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
  89. partitionStateMachine, new ControllerDeletionClient(this, zkClient))
  90. /**
  91. * 定义各种Handler,这些Handler会注册到Zookeeper的监听器。
  92. * Zookeeper第一时间感知节点发生变化,然后就会触发下面对应的Handler的执行
  93. */
  94. // controller变更 ZK监听器:监听 /controller节点,包括创建、删除和数据变更等情况
  95. private val controllerChangeHandler = new ControllerChangeHandler(eventManager)
  96. // 「/brokers/ids」节点变更处理器
  97. private val brokerChangeHandler = new BrokerChangeHandler(eventManager)
  98. /**
  99. * Broker信息变更 ZK监听器,每个Broker对应一个BrokerModificationsHandler
  100. * 比如Broker的配置信息发生变化就会触发相关事件执行
  101. */
  102. private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty
  103. // 主题数量变更 ZK监听器
  104. private val topicChangeHandler = new TopicChangeHandler(eventManager)
  105. // 主题删除 ZK监听器:监听 /admin/delete_topics的子节点数量变更情况
  106. private val topicDeletionHandler = new TopicDeletionHandler(eventManager)
  107. // 分区变更 ZK监听器:监听主题分区数据变更的监听器,比如新增副本、分区Leader变更
  108. private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
  109. /**
  110. * 分区重分配 ZK监听器:监听 /admin/reassign_partitions 节点是否被创建,如果创建则触发handler执行,
  111. * 当我们需要对Kakfa集群进行扩容以应对即将到来的大流量和业务尖峰,扩容后的Broker默认是不会有任何的分区副本,所以需要手动使用
  112. * kafka-reassign-partition.sh脚本将现有的分区副本挪一小部分到新创建的Broker中,使整个集群处于稳定的状态
  113. */
  114. private val partitionReassignmentHandler = new PartitionReassignmentHandler(eventManager)
  115. /**
  116. * Preferred Replica选举 ZK监听器
  117. * 一旦发现新提交的任务,就为目标主题执行Preferred Leader选举
  118. */
  119. private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(eventManager)
  120. /**
  121. * ISR副本集合变更 ZK监听器:监听ISR副本集合变更,一旦触发,就需要获取ISR发生变更的分工列表,
  122. * 然后更新Controller端对应的Leader和ISR缓存元数据
  123. */
  124. private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(eventManager)
  125. // 日志路径变更 ZK监听器
  126. private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(eventManager)
  127. // 统计相关的字段,有些监控是非常重要的
  128. // 当前Controller所在的BrokerID
  129. @volatile private var activeControllerId = -1
  130. // 离线分区数量
  131. @volatile private var offlinePartitionCount = 0
  132. // 满足Preferred Leader选举条件的总分区数量
  133. @volatile private var preferredReplicaImbalanceCount = 0
  134. // 集群主题总数
  135. @volatile private var globalTopicCount = 0
  136. // 集群分区总数
  137. @volatile private var globalPartitionCount = 0
  138. // 集群中待删除的主题总数
  139. @volatile private var topicsToDeleteCount = 0
  140. // 集群中待删除的副本总数
  141. @volatile private var replicasToDeleteCount = 0
  142. // 集群中暂时无法删除的主题总数
  143. @volatile private var ineligibleTopicsToDeleteCount = 0
  144. // 集群中暂时无法删除的副本总数
  145. @volatile private var ineligibleReplicasToDeleteCount = 0
  146. /**
  147. * 单线程调度器:用来定时删除过期的tokens
  148. */
  149. private val tokenCleanScheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "delegation-token-cleaner")
  150. //..
  151. }

炸一看,KafkaController 内部定义了茫茫多的变量,其实最重的几个变量解析如下:

变量名 说明
controllerContext 保存集群元数据的容器
controllerChannelManager Controller 的网络 I/O 层
kafkaScheduler 线程调度器,负责定期执行分区重平衡等操作
eventManager 事件管理器
replicaStateMachine 副本状态机,负责副本状态转换
partitionStateMachine 分区状态机,负责分区状态转换

核心方法当属 process(ControllerEvent)

  1. // kafka.controller.KafkaController#process
  2. override def process(event: ControllerEvent): Unit = {
  3. try {
  4. event match {
  5. case event: MockEvent =>
  6. // Used only in test cases
  7. event.process()
  8. case ShutdownEventThread =>
  9. error("Received a ShutdownEventThread event. This type of event is supposed to be handle by ControllerEventThread")
  10. case AutoPreferredReplicaLeaderElection =>
  11. processAutoPreferredReplicaLeaderElection()
  12. case ReplicaLeaderElection(partitions, electionType, electionTrigger, callback) =>
  13. processReplicaLeaderElection(partitions, electionType, electionTrigger, callback)
  14. case UncleanLeaderElectionEnable =>
  15. processUncleanLeaderElectionEnable()
  16. case TopicUncleanLeaderElectionEnable(topic) =>
  17. processTopicUncleanLeaderElectionEnable(topic)
  18. // ...
  19. }

这是一个路由方法,根据不同的 ControllerEvent 调用不同的方法处理该事件。这个方法可讲的东西太多了,后续会着重讲解:

  • Broker 上线/下线。
  • Controller 选举。

    总结

    本章节最主要的目的是熟悉 KafkaController 事件处理模型,了解 controller 通过 Zookeeper 的 watch 机制感知集群元数据变化,并通过各种 ChangeHandler 将 Zookeeper 事件转换为 Controller 内部事件,并注册到 ControllerEventManger。最终经过简易版的生产者-消费者模型,由 ControllerEventThread 线程调用 KafkaChannel#process(ControllerEvent) 处理 Controller 事件。

    附录

    Zookeeper 节点数据

    使用 stat /controller 得到以下结果:
    1. cZxid = 0x59
    2. ctime = Thu Jun 03 16:44:56 CST 2021
    3. mZxid = 0x5c
    4. mtime = Fri Jun 04 07:51:03 CST 2021
    5. pZxid = 0x59
    6. cversion = 0
    7. dataVersion = 1
    8. aclVersion = 0
    9. ephemeralOwner = 0x1000a0997b90007
    10. dataLength = 54
    11. numChildren = 0
    | 字段名称 | 功能 | | —- | —- | | czxid | 创建该节点的事物ID | | ctime | 创建该节点的时间 | | mZxid | 更新该节点的事物ID | | mtime | 更新该节点的时间 | | pZxid | 操作当前节点的子节点列表的事物ID(这种操作包含增加子节点,删除子节点) | | cversion | 当前节点的子节点版本号 | | dataVersion | 当前节点的数据版本号 | | aclVersion | 当前节点的acl权限版本号 | | ephemeralOwner | 当前节点的如果是临时节点,该属性是临时节点的事物ID | | dataLength | 当前节点的d的数据长度 | | numChildren | 当前节点的子节点个数 |

使用 get [path] 命令获取节点数据内容:

  1. get /controller
  2. {"version":1,"brokerid":1,"timestamp":"1622569390846"}
字段名称 功能
version Controller 版本号
brokerid Controller所在的Broker ID
timestamp Controller 创建时间

ZK 上 /brokers/ids 节点状态:

  1. cZxid = 0x5
  2. ctime = Thu Jun 03 16:36:22 CST 2021
  3. mZxid = 0x5
  4. mtime = Thu Jun 03 16:36:22 CST 2021
  5. pZxid = 0x6d
  6. cversion = 15
  7. dataVersion = 0
  8. aclVersion = 0
  9. ephemeralOwner = 0x0
  10. dataLength = 0
  11. numChildren = 1

ZK 上 /brokers/ids/1 节点数据:

  1. {
  2. "features": {},
  3. "listener_security_protocol_map": {
  4. "PLAINTEXT": "PLAINTEXT"
  5. },
  6. "endpoints": ["PLAINTEXT://192.168.217.128:9092"],
  7. "jmx_port": -1,
  8. "port": 9092,
  9. "host": "192.168.217.128",
  10. "version": 5,
  11. "timestamp": "1622653209620"
  12. }

ZK 上 /brokers/ids/1 节点状态:

  1. cZxid = 0x6d
  2. ctime = Fri Jun 04 14:14:28 CST 2021
  3. mZxid = 0x6d
  4. mtime = Fri Jun 04 14:14:28 CST 2021
  5. pZxid = 0x6d
  6. cversion = 0
  7. dataVersion = 1
  8. aclVersion = 0
  9. ephemeralOwner = 0x1000a0997b90009
  10. dataLength = 214
  11. numChildren = 0

ZK 上 /brokers/topics 节点状态:

  1. cZxid = 0x6
  2. ctime = Thu Jun 03 16:36:22 CST 2021
  3. mZxid = 0x6
  4. mtime = Thu Jun 03 16:36:22 CST 2021
  5. pZxid = 0x21
  6. cversion = 1
  7. dataVersion = 0
  8. aclVersion = 0
  9. ephemeralOwner = 0x0
  10. dataLength = 0
  11. numChildren = 1

ZK 上 /brokers/topics 节点数据

  1. [james]

ZK 上 /brokers/topics/james 节点状态:

  1. cZxid = 0x21
  2. ctime = Thu Jun 03 16:37:00 CST 2021
  3. mZxid = 0x45
  4. mtime = Thu Jun 03 16:44:04 CST 2021
  5. pZxid = 0x22
  6. cversion = 1
  7. dataVersion = 1
  8. aclVersion = 0
  9. ephemeralOwner = 0x0
  10. dataLength = 116
  11. numChildren = 1

ZK 上 /brokers/topics/james 节点数据:

  1. {
  2. "partitions": {
  3. "0": [1]
  4. },
  5. "topic_id": "yR4JWjUESDexyuZ9yIIFVg",
  6. "adding_replicas": {},
  7. "removing_replicas": {},
  8. "version": 3
  9. }

当然,james 下面还包含子节点 /partitions/[分区ID号]/state,比如 /brokers/topics/james/partitions/0/state 节点数据为:

  1. {
  2. "controller_epoch": 4,
  3. "leader": 1,
  4. "version": 1,
  5. "leader_epoch": 2,
  6. "isr": [1]
  7. }

节点 /brokers/topics/james/partitions/0/state 节点详情

  1. cZxid = 0x24
  2. ctime = Thu Jun 03 16:37:00 CST 2021
  3. mZxid = 0x5a
  4. mtime = Thu Jun 03 16:44:57 CST 2021
  5. pZxid = 0x24
  6. cversion = 0
  7. dataVersion = 2
  8. aclVersion = 0
  9. ephemeralOwner = 0x0
  10. dataLength = 72
  11. numChildren = 0