在 Kafka 集群中会有一个或多个 broker 节点,其中有一个 broker 会被选举为控制器(Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举出新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有的 broker 更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topic 增加分区数时,同样还是由控制器负责分区的重新分配。

控制器的选举

Kafka 中的控制器选举的工作依赖于 Zookeeper,成功竞选为控制器的 broker 会在 Zookeeper 中创建 /controller 这个临时节点,此临时节点的内容参考如下:

  1. {
  2. "version":1,
  3. "brokerid":0,
  4. "timestamp":"1529210278988"
  5. }
  • version 在目前版本中固定为 1
  • brokerid 表示称为控制器的 broker 节点的 id 编号
  • timestamp 表示竞选成为控制器时的时间戳。

在任意时刻,集群中有且仅有一个控制器。每个 broker 启动的时候都会去尝试读取 /controller 节点的 brokerid的值,如果读取到 brokerid 的值不为 -1 则表示已有其它 broker 节点成功竞选为控制器,所以当前 broker 会放弃竞选;如果 Zookeeper 中不存在 /controller 这个节点或这个节点中的数据异常,那么就会尝试去创建这个节点,此时也有可能其他 broker 同时去尝试创建这个节点,只有创建成功的那个 broker 才会成为控制器。每个 broker 都会在内存中保存当前控制器的 brokerid 值,这个值可以标识为 activeControllerId。

由于是临时节点,一旦 Broker 与 ZooKeeper 的会话终止,该节点就会消失。集群上所有的 Broker 都在实时监听 ZooKeeper 上的这个 /controller 节点。这里的监听分为两个含义:监听这个节点是否存在,倘若发现这个节点不存在,Broker 会立即抢注该节点,即创建 /controller 节点。创建成功的那个 Broker 当选为新一届的 Controller。监听这个节点数据是否发生了变更。同样,一旦发现该节点的内容发生了变化,Broker 也会立即启动新一轮的 Controller 选举。

Zookeeper 中还有一个与控制器有关的 /controller_epoch 节点,这个节点是持久节点,节点中存放的是一个整型的 controller_epoch 值。controller_epoch 用于记录控制器发生变更的次数,又称为控制器的纪元。

controller_epoch 的初始值为 1,当控制器发生变更时,每选出一个新的控制器就将该字段值加 1。每个和控制器交互的请求都会带上 controller_epoch 字段,如果请求的 controller_epoch 值小于内存中的 controller_epoch 值,则认为这个请求是向已经过期的控制器所发送的请求,那这个请求会被认定为无效的请求。如果请求的 controller_epoch 值大于内存中的 controller_epoch 值,那么说明已经有新的控制器当选了。由此可见,Kafka 通过 controller_epoch 来保证控制器的唯一性,进而保证相关操作的一致性。

控制器的职责

具备控制器身份的 broker 需要比其他普通的 broker 多一份职责,具体细节如下:

  • 监听分区相关的变化。包括:处理 ISR 集合变更的动作,处理优先副本的选举动作。
  • 监听主题相关的变化。
  • 监听 broker 相关的变化。
  • 监听主题中的分区分配变化。
  • 更新集群的元数据信息。

下图说明了控制器中到底保存了哪些数据:
image.png
这里面比较重要的数据有:

  • 所有主题信息及具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等。
  • 所有 Broker 信息。包括当前运行中的 Broker 及关闭中的 Broker 等。
  • 所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表。

这些数据其实在 ZooKeeper 中也保存了一份。每当控制器初始化时,它都会从 ZooKeeper 上读取对应的元数据并填充到自己的缓存中。有了这些数据,控制器就能对外提供数据服务了。这里的对外主要指对其他 Broker 而言,控制器通过向这些 Broker 发送请求的方式将这些数据同步到其他 Broker 上。

控制器的原理

控制器在选举成功之后会读取 Zookeeper 中各个节点的数据来初始化上下文信息(ControllerContext),并且也需要管理这些上下文信息。比如为某个 topic 增加了若干个分区,控制器在负责创建这些分区的同时也要更新上下文信息,并且需要将这些变更信息同步到其他普通的 broker 节点中。

不管是监听器触发的事件,还是定时任务触发的事件都会读取或更新控制器中的上下文信息,那么这样就会涉及到多线程间的同步。如果单纯使用锁机制来实现,那么整体的性能会大打折扣。针对这一现象,Kafka 的控制器使用单线程基于事件队列的模型,将每个事件都做一层封装,然后按照事件发生的先后顺序暂存到 LinkedBlockingQueue 中,最后使用一个专用的线程(ControllerEventThread)按照 FIFO 的原则顺序处理各个事件,这样不需要锁机制就可以在多线程间维护线程安全。
image.png

1. Controller 选举

在 Kafka 的早期版本中,并没有采用 Kafka Controller 这样一个概念来对分区和副本的状态进行管理,而是依赖于 Zookeeper,每个 broker 都会在 Zookeeper 上为分区和副本注册大量的监听器。当分区或副本状态变化时会唤醒很多不必要的监听器,这种严重依赖于 Zookeeper 的设计会有脑裂、羊群效应以及造成 Zookeeper 过载的隐患。而在目前新版设计中,只有 Kafka Controller 会在 Zookeeper 上注册相应的监听器,其他的 broker 极少需要再监听 Zookeeper 中的数据变化,这样省去了很多不必要的麻烦。不过每个 broker 还是会对 /controller 节点添加监听器,以此来监听此节点的数据变化(ControllerChangeHandler)。

当 /controller 节点数据发生变化时,每个 broker 都会更新自身内存中保存的 activeControllerId。如果 broker 在数据变更前是控制器,那么如果在数据变更后自身的 brokerid 值与新的 activeControllerId 值不一致,那么就需要退位,关闭相应的资源,比如关闭状态机、注销相应的监听器等。节点数据发生变化有可能是控制器由于异常而下线,造成 /controller 这个临时节点会被自动删除;也有可能是其他原因将此节点删除了。

当 /controller 节点被删除时,每个 broker 都会参与新一轮的选举。如果有需要,可以手动删除 /controller 节点来触发新一轮的选举。当然关闭控制器所对应的 broker 以及手动向 /controller 节点写入新的 brokerid 的所对应的数据同样可以触发新一轮的选举。

2. 分区 leader 选举

分区 leader 副本的选举由控制器负责具体实施。当创建分区(创建主题或增加分区都有创建分区的动作)或分区上线(比如分区中原先的 leader 副本下线,此时分区需要选举一个新的 leader 上线来对外提供服务)的时候都需要执行 leader 的选举动作,对应的选举策略为 OfflinePartitionLeaderElectionStrategy

这种策略的基本思路是按照 AR 集合中副本的顺序查找第一个存活的副本,并且这个副本在 ISR 集合中。一个分区的 AR 集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的 ISR 集合中副本的顺序可能会改变。如果 ISR 集合中没有可用的副本,那么此时还要再检查一下配置的 unclean.leader.election.enable 参数,该参数默认为 false。如果这个参数配置为 true,那么表示允许从非 ISR 列表中选举 leader,那么从 AR 列表中找到第一个存活的副本即为 leader。

当分区进行重分配时也需要执行 leader 的选举动作,对应的选举策略为 ReassignPartitionLeaderElectionStrategy。这个选举策略的思路比较简单:从重分配的 AR 列表中找到第一个存活的副本,且这个副本在目前的 ISR 列表中。当发生优先副本的选举时,直接将优先副本设置为 leader 即可,而 AR 集合中的第一个副本即为优先副本。

还有一种情况会发生 leader 的选举,当某节点被优雅地关闭时,位于这个节点上的 leader 副本都会下线,所以与此对应的分区需要执行 leader 的选举。与此对应的选举策略为ControlledShutdownPartitionLeaderElectionStrategy:从 AR 列表中找到第一个存活的副本,且这个副本在目前的 ISR 列表中,与此同时还要确保这个副本不处于正在被关闭的节点上。

控制器的故障转移

在 Kafka 集群运行过程中,只有一台 broker 充当控制器的角色,这必然就存在单点失效的风险。而 Kafka 为控制器提供了故障转移功能,也就是说所谓的 Failover。

故障转移是指:当运行中的控制器突然宕机或意外终止时,Kafka 能够快速感知到并立即启用备用控制器来代替之前失败的控制器。该过程是自动完成的,无需手动干预。下图简单地展示了控制器故障转移的过程:
image.png
最开始时,Broker 0 是控制器。当 Broker 0 宕机后,ZooKeeper 通过 Watch 机制感知并删除了 /controller 临时节点。之后所有存活的 Broker 开始竞选新的控制器身份。最终 Broker 3 赢得了选举,并成功地在 ZooKeeper 上重建了 /controller 节点。之后 Broker 3 会从 ZooKeeper 中读取集群元数据信息并初始化到本地缓存。至此控制器的 Failover 完成,可以行使正常的工作职责了。