概述

Controller 是 Kafka 极为重要的组件,它核心职责是管理整个集群元数据信息。集群的元数据是不断变更的,截止 Kafka 2.8 Kafka 还是通过 ZK 感知集群元数据变更(比如 Broker 上线、下线),使用 ZK 存储集群元数据。虽然在 kafak 2.8 推出基于 Raft 版本的 Controller,但是仍有许多功能不可用,目前在生产环境中还是得依托 ZK 才能完成相关感知和元数据存储,这是事件驱动模型
Controller 自身也需要实现高可用,集群任意时刻最多只会存在一个 Controller(当然可能也会出现脑裂情况,kafka 通过版本号解决脑裂问题),当 Controller 所在的 Broker 节点出现各种问题导致 Controller 离线或崩溃,这些就会进行 Controller 选举,这种行为称为 Controller 故障转移(failover)。
因此,本篇文章会对以下两点做源码级讲解:

  1. Controller 在 Kafka 的定位和功能,其中包含:
    1. 集群元数据缓存管理,比如增删主题、分区等操作。
    2. 领导者选举
  2. Controller 故障转移触发时机以及核心步骤。

    Controller 相关组件一览

    | 组件名称 | 说明 | | —- | —- | | ControllerChannelManager | 管理 Controller 与其它 Broker 的 Socket 连接,Controller 通过
    ControllerChannelManager 来与其它Broker进行数据交互。 | | ControllerContext | 集群元数据缓存对象,提供便捷的API获取不同维度的集群元数据 | | ControllerEventManager | Contoller事件管理器,整个 Controller 是基于事件驱动模型。
    利用 Zookeeper 的 watcher 机制关注某些节点,一旦这些节点或其子节点发生数据变更事件,Controller 就会收到该通知事件。而 ControllerEventManager 就是处理这些事件的对象。 | | KafkaController | Controller 核心类,几乎实现了 Controller 事件的所有处理逻辑。
    核心方法是 processor(ControllerEvent) | | PartitionStateMachine | 分区状态管理机,管理集群所有分区的状态 | | ReplicaStateMachine | 副本状态管理机,管理集群所有副本的状态 | | TopicDeletionManager | 主题删除管理器,用来删除主题 |

我们会一一对上面的组件进行源码级的讲解。

集群元数据缓存 ControllerContext

要想了解 Controller 具体工作原理,我们首先要清楚它管理了哪些数据。毕竟,这是 Controller 在Kafka 存在的意义。Zookeeper 作为 Kafka 元数据的真理之原,Controller 当然会和它打交道,那 Zookeeper 具体保存了哪些数据呢? 请看下图:
Zookeeper 数据.png
其实存在 Zookeeper 的元数据并不多,顶层结点有以下几个核心路径: 【kafka】KafkaController 组件(一) - 图2上面只提到 Kafka 核心的元数据节点,至于其它比如权限校验等相关的节点暂不涉及。

注意,Kafka 2.8 版本有一个重磅更新,就是移除 Zookeeper。基于 Raft 共识算法选举 Controller 并维护集群元数据信息。当前,基于 Raft 共识算法的集群并不稳定,有许多特性还未支持,比如事务。但是,随着时间推移,去 Zookeeper 势在必行。 Raft 共识算法入门 推荐这篇文章 Raft 动画演示 推荐看这里 Kafka KIP-500 讨论在这里

前面我们看过集群元数据在 Zookeeper 是什么样的,Controller 本质就是对这些元数据进行维护并感知。在 Controller 中,存储 Zookeeper 的数据的类是由 kafka.controller.ControllerContext 类承载,核心源码如下所示:

  1. // kafka.controller.ControllerContext
  2. class ControllerContext {
  3. // Controller统计信息类
  4. val stats = new ControllerStats
  5. // 离线分区计数器(统计类参数)
  6. var offlinePartitionCount = 0
  7. // 满足Preferred Leader选举条件的总分区数
  8. // Preferred Leader是指Leader ID 是否和ISR集合的第一个ID相等
  9. var preferredReplicaImbalanceCount = 0
  10. // 处于关闭中Broker的ID集合
  11. val shuttingDownBrokerIds = mutable.Set.empty[Int]
  12. // 处于正常运行中Broker集合
  13. private val liveBrokers = mutable.Set.empty[Broker]
  14. // 处于正常运行中Broker的Epoch列表,
  15. private val liveBrokerEpochs = mutable.Map.empty[Int, Long]
  16. // Controller当前Epoch值,实际就是Zookeeper节点/controller_epoch节点的值
  17. // 可以认为是Controller版本号
  18. var epoch: Int = KafkaController.InitialControllerEpoch
  19. // Controller对应Zookeeper节点的Epoch,
  20. // 实际上是Zookeeper节点/controller_epoch节点的dataVersion值
  21. // Kafka 使用 epochZkVersion 来判断和防止 Zombie Controller。
  22. // 这也就是说,原先在老 Controller 任期内的 Controller 操作在新 Controller 不能成功执行,
  23. // 因为新 Controller 的 epochZkVersion 要比老 Controller 的大。
  24. var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
  25. // 集群中所有主题列表
  26. val allTopics = mutable.Set.empty[String]
  27. // 集群中所有主题的ID
  28. var topicIds = mutable.Map.empty[String, Uuid]
  29. // 集群中所有主题的名称
  30. var topicNames = mutable.Map.empty[Uuid, String]
  31. // 主题分区的副本列表<主题名称,<分区ID, 该分区所有副本详情>>
  32. val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int, ReplicaAssignment]]
  33. // 分区的Leader/ISR副本信息 <分区, Leader副本、ISR集合、Controller版本号>
  34. private val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]
  35. // 集群正处于「分区重平衡」过程的主题分区列表
  36. val partitionsBeingReassigned = mutable.Set.empty[TopicPartition]
  37. // 主题分区状态列表
  38. val partitionStates = mutable.Map.empty[TopicPartition, PartitionState]
  39. // key:broker id,value:分区详情。表示某个broker中不可用的分区列表
  40. val replicasOnOfflineDirs = mutable.Map.empty[Int, Set[TopicPartition]]
  41. // 主题分区的副本状态列表
  42. val replicaStates = mutable.Map.empty[PartitionAndReplica, ReplicaState]
  43. // 待删除主题列表
  44. val topicsToBeDeleted = mutable.Set.empty[String]
  45. // 已开启删除操作的主题列表
  46. val topicsWithDeletionStarted = mutable.Set.empty[String]
  47. // 暂时无法执行删除操作的主题列表
  48. val topicsIneligibleForDeletion = mutable.Set.empty[String]
  49. // ...(method)
  50. }

【kafka】KafkaController 组件(一) - 图3kafka.controller.ControllerContext 除了定义以上这些重要的字段,这些字段个人认为最重要的是 partitionAssignments,它是一个 Map 集合类型(<主题名称,<分区ID, 该分区所有副本详情>>),存储集群中所有主题分区的副本分配情况。Kafka 通过这个字段获取各种维度的数据。比如,我想获取某个 Broker 的所有分区:

  1. // 获取某个Broker上的所有分区详情
  2. def partitionsOnBroker(brokerId: Int): Set[TopicPartition] = {
  3. partitionAssignments.flatMap {
  4. case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter {
  5. case (_, partitionAssignment) => partitionAssignment.replicas.contains(brokerId)
  6. }.map {
  7. case (partition, _) => new TopicPartition(topic, partition)
  8. }
  9. }.toSet
  10. }

再比如,我想获取某个主题的所有分区对象,可以这么写:

  1. def partitionsForTopic(topic: String): collection.Set[TopicPartition] = {
  2. partitionAssignments.getOrElse(topic, mutable.Map.empty).map {
  3. case (partition, _) => new TopicPartition(topic, partition)
  4. }.toSet
  5. }

再比如,我想获取某个主题的所有副本对象,可以这么写:

  1. def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
  2. partitionAssignments.getOrElse(topic, mutable.Map.empty).flatMap {
  3. case (partition, assignment) => assignment.replicas.map { r =>
  4. PartitionAndReplica(new TopicPartition(topic, partition), r)
  5. }
  6. }.toSet
  7. }

以上 API 都是定义在 kafka.controller.ControllerContext 类中,它还定义了非常多的方法对变量进行操作。

Controller 网络模型

支撑 Controller 网络模型相关组件

ControllerChannelManager组件.png
相关组件都定义在 ControllerChannelManager.scala 文件中,如上图所示。对这些组件进行简单概述:

组件名称 说明
ControllerChannelManager 相当于管理者,有序组织其它网络组件
QueueItem 一个 POJO 类,用来封装待发送的请求
RequestSendThread Controller 请求发送线程,将请求交给 NetworkCLient 对象发送。
Controller 会和集群所有的 Broker 建立连接,每个连接有独立的 RequestSendThread 线程
AbstractControllerBrokerRequestBatch 一个抽象类,定义了三个重要的构造 Controller 请求的方法。
调用以下方法生成 RPC 请求并放入缓冲队列中:
addLeaderAndIsrRequestForBrokers
addStopReplicaRequestForBrokers
addUpdateMetadataRequestForBrokers
ControllerBrokerRequestBatch 继承上面的抽象类,并实现抽象方法
ControllerBrokerStateInfo 一个 POJO 类,组合发送线程、请求队列、网络I/O类、监控指标等重要的组件

网络模型示意图

Controller 网络模型.png
Controller 网络模型其实是一个生产者-消费者模型,请求队列 BlockingQueue[QueueItem] 是连接生产者和消费者的桥梁。ControllerChannelManager#sendRequest() 方法会根据目标 Broker ID 从缓存中获取对应的 BlockingQueue,然后将待发送的请求插入到队列中。请求发送线程 ReqeustSendThread 不断调用 BlockingQueue#take() 方法,委托 NetworkClient 将请求发送给 Broker,并阻塞等待 Broker 的 ACK 响应,成功收到响应后再执行 callbacl(response) 函数函数。
请注意, Kafka 基于 Java NIO 的网络模型,底层的 SocketChannel 是非阻塞的,需要在 while 循环中调用多次才能完整接收数据包,所以这里阻塞并非指线程阻塞,而是指逻辑被阻塞了。

ControllerChannelManager

ControllerChannelManager 的核心变量只有一个:

  1. /**
  2. * Controller会和每个Broker建立TCP连接,这个字段就是维护当前Controller和其实Broker的连接信息。
  3. * 每个ControllerBrokerStateInfo对象中都会有一个任务队列,ControllerChannelManager对象是一个生产者,
  4. * 它产生的请求体会被放入指定的Broker的任务队列中,而每个Broker又会有独立的{@link RequestSendThread}线程,
  5. * 这个线程就属于消费者,因此,它们构成一个简单的消费者-生产者模型。
  6. * ControllerChannelManager->产生Request->通过「brokerStateInfo」找到对应的ControllerBrokerStateInfo
  7. * ->把请求放入任务队列中
  8. */
  9. protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]

后缀 Manager 就可以看出是管理和其它 Broker 的连接,而连接信息是使用一个 POJO 类 ControllerBrokerStateInfo 存储:

  1. /**
  2. * POJO类
  3. *
  4. * @param networkClient 网络层I/O操作对象,提供方便的API用于发送请求/接收响应
  5. * @param brokerNode Broker节点信息
  6. * @param messageQueue 阻塞的消息队列,存储待发送给Broker的请求对象
  7. * @param requestSendThread 发送线程。Controller与每个Broker都建立一条TCP连接,这个
  8. * @param queueSizeGauge 监控相关
  9. * @param requestRateAndTimeMetrics 监控相关
  10. * @param reconfigurableChannelBuilder 支持动态更改配置的类
  11. */
  12. case class ControllerBrokerStateInfo(networkClient: NetworkClient,
  13. brokerNode: Node,
  14. messageQueue: BlockingQueue[QueueItem],
  15. requestSendThread: RequestSendThread,
  16. queueSizeGauge: Gauge[Int],
  17. requestRateAndTimeMetrics: Timer,
  18. reconfigurableChannelBuilder: Option[Reconfigurable])

核心方法

方法名称 功能
startup 启动所有请求发送线程
shutdown 关闭所有请求发送线程
sendRequest 将待发送的请求放入特定的Broker ArrayBlockingQueue中,请求发送线程会不断从该队列中获取请求并发送
addNewBroker 创建和Broker建立Socket连接所需的组件,并将映射关系 缓存起来
removeBroker 关闭和Broker建立的连接,包含缓存、请求发送线程
startRequestSendThread 启动请求发送线程

核心方法不多,我们主要看看 sendRequest 是如何发送请求发送的:

  1. /**
  2. * 将待发送的请求放入指定的Broker任务队列,等待相应的 {@link RequestSendThread} 线程消费
  3. *
  4. * @param brokerId 请求发往的目的地
  5. * @param request 待发送的请求
  6. * @param callback 发送成功后的回调方法
  7. */
  8. def sendRequest(brokerId: Int, request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
  9. callback: AbstractResponse => Unit = null): Unit = {
  10. brokerLock synchronized {
  11. // #1 根据Broker ID获取「ControllerBrokerStateInfo」对象
  12. val stateInfoOpt = brokerStateInfo.get(brokerId)
  13. stateInfoOpt match {
  14. case Some(stateInfo) =>
  15. // #2 将请求和回调等数据使用QueueItem对象封装,并加入到任务队列中等待发送到Broker端
  16. stateInfo.messageQueue.put(QueueItem(request.apiKey, request, callback, time.milliseconds()))
  17. case None =>
  18. // 找不到,目标Broker离线了
  19. warn(s"Not sending request $request to broker $brokerId, since it is offline.")
  20. }
  21. }

非常简单,根据 broker id 从 brokerStateInfo Map 集合中获取 ControllerBrokerStateInfo 对象,然后从这个对象中获取 messageQueue ,最后将待发送的请求使用 QueueItem 包装后放入队列中,是不是非常简单。

RequestSendThread

  1. /**
  2. * Control 请求发送线程
  3. *
  4. * @param controllerId Controller所在的Broker节点ID
  5. * @param controllerContext Controller上下文,内部保存Controller元数据信息
  6. * @param queue 请求阻塞队列
  7. * @param networkClient 执行网络I/O的类
  8. * @param brokerNode 目标节点ID,发送线程和目标节点建立一条TCP连接
  9. * @param config Kafka配置信息
  10. * @param time 时间工具类
  11. * @param requestRateAndQueueTimeMetrics 相关监控指标
  12. * @param stateChangeLogger
  13. * @param name
  14. */
  15. class RequestSendThread(val controllerId: Int,
  16. val controllerContext: ControllerContext,
  17. val queue: BlockingQueue[QueueItem],
  18. val networkClient: NetworkClient,
  19. val brokerNode: Node,
  20. val config: KafkaConfig,
  21. val time: Time,
  22. val requestRateAndQueueTimeMetrics: Timer,
  23. val stateChangeLogger: StateChangeLogger,
  24. name: String)
  25. extends ShutdownableThread(name = name) {
  26. // ...
  27. }

请求线程的I/O操作是依赖 NetworkClient 组件。

  1. /**
  2. * Controller请求发送线程的核心方法,
  3. * 我们说过,Controller属于消费者-生产者模型,而{@link RequestSendThread}属于消费者,
  4. * 它会从{@param queue}队列中获取任务(实际就是待发送的请求体),然后通过 {@param networkClient}网络层
  5. * 向{@link brokerNode}发送请求,并阻塞等待响应成功接收,最后再触发回调方法的执行,
  6. * 以上就是线程 {@link RequestSendThread} 和其它组件配合使用
  7. * 小结:①从任务队列中获取待发送的请求②发送并阻塞等待响应返回③执行回调方法
  8. */
  9. override def doWork(): Unit = {
  10. // 重试退避时间
  11. def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)
  12. // #1 从缓冲队列中获取待发送的请求
  13. val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take()
  14. var clientResponse: ClientResponse = null
  15. try {
  16. // 判断是否发送成功
  17. var isSendSuccessful = false
  18. while (isRunning && !isSendSuccessful) {
  19. // 如果一个Broker长时间处于宕机,那么在某个时间点Zookeeper的监听顺会触发一个「removeBroker」事件,这个事件将会调用这个线程的
  20. // shutdown()方法,这样我们就会停止重试
  21. try {
  22. // #2 判断TCP连接处于「可发送数据」状态
  23. if (!brokerReady()) {
  24. // #2-1 TCP连接不可用,阻塞线程直到超出重试退避时间(会不断尝试重新建立TCP连接)
  25. isSendSuccessful = false
  26. backoff()
  27. }
  28. else {
  29. // #2-2 TCP处于正常状态,可以发送数据,那么就构建新的请求体
  30. val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder, time.milliseconds(), true)
  31. // #2-3 发送请求,等待接收Response(假阻塞,因为是使用while不断轮询)
  32. clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
  33. isSendSuccessful = true
  34. }
  35. } catch {
  36. case e: Throwable =>
  37. // 如果发送失败,重连Broker并且重新发送数据
  38. warn(s"Controller $controllerId epoch ${controllerContext.epoch} fails to send request $requestBuilder " + s"to broker $brokerNode. Reconnecting to broker.", e)
  39. networkClient.close(brokerNode.idString)
  40. isSendSuccessful = false
  41. backoff()
  42. }
  43. }
  44. // #3 处理响应体
  45. if (clientResponse != null) {
  46. val requestHeader = clientResponse.requestHeader
  47. // #3-1 获取RPC请求类型
  48. val api = requestHeader.apiKey
  49. if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA) {
  50. // 如果RCP请求类型不属于LEADER_AND_ISR、STOP_REPLICA、UPDATE_METADATA,则抛出异常
  51. throw new KafkaException(s"Unexpected apiKey received: $apiKey")
  52. }
  53. // 获取响应体
  54. val response = clientResponse.responseBody
  55. // 触发方法回调
  56. if (callback != null) {
  57. callback(response)
  58. }
  59. }
  60. } catch {
  61. case e: Throwable =>
  62. error(s"Controller $controllerId fails to send a request to broker $brokerNode", e)
  63. // If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
  64. networkClient.close(brokerNode.idString)
  65. }
  66. }

核心逻辑简单描述一下:请求发送线程不断调用 doWork() 方法将 ArrayBlockingQueue 存储的请求发往 Broker,并以”阻塞”方式接收响应,最后触发回调方法。

总结

Controller 会和集群中处于在线的 Broker 建立 Socket 连接,而 ControllerChannelManager 负责管理这件事情,并且暴露 sendRequest 方法给其它方法调用。总而言之,Controller 这一套网络模型属于迷你型的生产者-消费者模型,依靠 ArrayBlockingQueue 将生产者和消费者解耦。从请求的角度看,这是一个异步请求模型,上层方法调用 sendRequest 发送请求,可以继续执行下面逻辑,RequestSendThread 线程执行发送请求、阻塞并接收响应,然后触发回调方法。这个回调方法很有讲究,它会根据请求类型生成对应的事件并注册到事件管理器 ControllerEventManager 中,由事件管理器负责处理响应事件。从这个角度看 Controller ,它是一个事件模型。所以不同的事件看的角度不同,会有不同的理解,而且需要结合上下文才能准确定位。