Kafka 是一个高性能,高容错,多副本,可复制的分布式消息系统。在整个系统中,涉及到多处选举机制,被不少人搞混,这里总结一下,本篇文章大概会从三个方面来讲解。
- 控制器(Broker)选举机制
- 分区副本选举机制
- 消费组选举机制
如果对 Kafka 不了解的话,可以先看这篇博客《一文快速了解 Kafka》。
控制器选举
控制器是 Kafka 的核心组件,它的主要作用是在 Zookeeper 的帮助下管理和协调整个 Kafka 集群。集群中任意一个 Broker 都能充当控制器的角色,但在运行过程中,只能有一个 Broker 成为控制器。
控制器的作用可以查看文末
控制器选举可以认为是 Broker 的选举。
集群中第一个启动的 Broker 会通过在 Zookeeper 中创建临时节点 /controller 来让自己成为控制器,其他 Broker 启动时也会在 zookeeper 中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在 Zookeeper 中创建 watch 对象,便于它们收到控制器变更的通知。
那么如果控制器由于网络原因与 Zookeeper 断开连接或者异常退出,那么其他 broker 通过 watch 收到控制器变更的通知,就会去尝试创建临时节点 /controller,如果有一个 Broker 创建成功,那么其他 broker 就会收到创建异常通知,也就意味着集群中已经有了控制器,其他 Broker 只需创建 watch 对象即可。
如果集群中有一个 Broker 发生异常退出了,那么控制器就会检查这个 broker 是否有分区的副本 leader,如果有那么这个分区就需要一个新的 leader,此时控制器就会去遍历其他副本,决定哪一个成为新的 leader,同时更新分区的 ISR 集合。
如果有一个 Broker 加入集群中,那么控制器就会通过 Broker ID 去判断新加入的 Broker 中是否含有现有分区的副本,如果有,就会从分区副本中去同步数据。
防止控制器脑裂
如果控制器所在 broker 挂掉了或者 Full GC 停顿时间太长超过 zookeepersession timeout 出现假死,Kafka 集群必须选举出新的控制器,但如果之前被取代的控制器又恢复正常了,它依旧是控制器身份,这样集群就会出现两个控制器,这就是控制器脑裂问题。
解决方法:
为了解决 Controller 脑裂问题,ZooKeeper 中还有一个与 Controller 有关的持久节点 /controller_epoch,存放的是一个整形值的 epoch number(纪元编号,也称为隔离令牌),集群中每选举一次控制器,就会通过 Zookeeper 创建一个数值更大的 epoch number,如果有 broker 收到比这个 epoch 数值小的数据,就会忽略消息。
分区副本选举机制
由控制器执行。
- 从 Zookeeper 中读取当前分区的所有 ISR (in-sync replicas) 集合。
- 调用配置的分区选择算法选择分区的 leader。
Unclean leader 选举
ISR 是动态变化的,所以 ISR 列表就有为空的时候,ISR 为空说明 leader 副本也挂掉了。此时 Kafka 要重新选举出新的 leader。但 ISR 为空,怎么进行 leader 选举呢?
Kafka 把不在 ISR 列表中的存活副本称为 “非同步副本”,这些副本中的消息远远落后于 leader,如果选举这种副本作为 leader 的话就可能造成数据丢失。所以 Kafka broker 端提供了一个参数 unclean.leader.election.enable,用于控制是否允许非同步副本参与 leader 选举;如果开启,则当 ISR 为空时就会从这些副本中选举新的 leader,这个过程称为 Unclean leader 选举。
可以根据实际的业务场景选择是否开启 Unclean leader 选举。一般建议是关闭 Unclean leader 选举,因为通常数据的一致性要比可用性重要。消费组选主
在 Kafka 的消费端,会有一个消费者协调器以及消费组,组协调器(Group Coordinator)需要为消费组内的消费者选举出一个消费组的 leader。
如果消费组内还没有 leader,那么第一个加入消费组的消费者即为消费组的 leader,如果某一个时刻 leader 消费者由于某些原因退出了消费组,那么就会重新选举 leader,选举方式如下:
private val members = new mutable.HashMap[String, MemberMetadata]
leaderId = members.keys.headOption
在组协调器中消费者的信息是以 HashMap 的形式存储的,其中 key 为消费者的 member_id,而 value 是消费者相关的元数据信息。而 leader 的取值为 HashMap 中的第一个键值对的 key(等同于随机)。
消费组的 Leader 和 Coordinator 没有关联。消费组的 leader 负责 Rebalance 过程中消费分配方案的制定。
消费端 Rebalance 机制
就 Kafka 消费端而言,有一个难以避免的问题就是消费者的重平衡即 Rebalance。Rebalance 是让一个消费组的所有消费者就如何消费订阅 topic 的所有分区达成共识的过程,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 的完成。因为要停止消费等待重平衡完成,因此 Rebalance 会严重影响消费端的 TPS,是应当尽量避免的。
触发 Rebalance 的时机
Rebalance 的触发条件有 3 个。
- 消费组成员个数发生变化。例如有新的 Consumer 实例加入或离开该消费组。
- 订阅的 Topic 个数发生变化。
- 订阅 Topic 的分区数发生变化。
Rebalance 发生时,Group 下所有 Consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。但是 Rebalance 过程对 consumer group 会造成比较严重的影响。在 Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成。
Rebalance 过程
Rebalance 过程分为两步:Join 和 Sync。
- Join。所有成员都向 Group Coordinator 发送 JoinGroup 请求,请求加入消费组。一旦所有成员都发送了 JoinGroup 请求,Coordinator 会从中选择一个 Consumer 担任 leader 的角色,并把组成员信息以及订阅信息发给 leader—— 注意 leader 和 coordinator 不是一个概念。leader 负责消费分配方案的制定。
- Sync。这一步 leader 开始分配消费方案,即哪个 consumer 负责消费哪些 topic 的哪些 partition。一旦完成分配,leader 会将这个方案封装进 SyncGroup 请求中发给 coordinator,非 leader 也会发 SyncGroup 请求,只是内容为空。coordinator 接收到分配方案之后会把方案塞进 SyncGroup 的 response 中发给各个 consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。
避免不必要的 Rebalance
前面说过 Rebalance 发生的时机有三个,后两个时机是可以人为避免的。发生 Rebalance 最常见的原因是消费组成员个数发生变化。
这其中消费者成员正常的添加和停掉导致 Rebalance,也是无法避免。但是在某些情况下,Consumer 实例会被 Coordinator 错误地认为已停止从而被踢出 Group。从而导致 rebalance。
这种情况可以通过 Consumer 端的参数 session.timeout.ms 和 max.poll.interval.ms 进行配置。
有关这种情况,可以查看博客《一文理解 Kafka 重复消费的原因和解决方案》
除了这个参数,Consumer 还提供了控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更快地知道是否开启 Rebalance,因为 Coordinator 通知各个 Consumer 实例是否开启 Rebalance 就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。
总之,要为业务处理逻辑留下充足的时间使 Consumer 不会因为处理这些消息的时间太长而引发 Rebalance,但也不能时间设置过长导致 Consumer 宕机但迟迟没有被踢出 Group。
补充
Kafka 控制器的作用
Kafka 控制器的作用是管理和协调 Kafka 集群,具体如下:
- 主题管理:创建、删除 Topic,以及增加 Topic 分区等操作都是由控制器执行。
- 分区重分配:执行 Kafka 的 reassign 脚本对 Topic 分区重分配的操作,也是由控制器实现。
- Preferred leader 选举。
因为在 Kafka 集群长时间运行中,broker 的宕机或崩溃是不可避免的,leader 就会发生转移,即使 broker 重新回来,也不会是 leader 了。在众多 leader 的转移过程中,就会产生 leader 不均衡现象,可能一小部分 broker 上有大量的 leader,影响了整个集群的性能,所以就需要把 leader 调整回最初的 broker 上,这就需要 Preferred leader 选举。
- 集群成员管理:控制器能够监控新 broker 的增加,broker 的主动关闭与被动宕机,进而做其他工作。这也是利用 Zookeeper 的 ZNode 模型和 Watcher 机制,控制器会监听 Zookeeper 中 /brokers/ids 下临时节点的变化。
数据服务:控制器上保存了最全的集群元数据信息,其他所有 broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
Kafka 协调器
Kafka 中主要有两种协调器:
组协调器(Group Coordinator)
- 消费者协调器(Consumer Coordinator)
Kafka 为了更好的实现消费组成员管理、位移管理以及 Rebalance 等,broker 服务端引入了组协调器(Group Coordinator),消费端引入了消费者协调器(Consumer Coordinator)。
每个 broker 启动时,都会创建一个组协调器实例,负责监控这个消费组里的所有消费者的心跳以及判断是否宕机,然后开启消费者 Rebalance。
每个 Consumer 启动时,会创建一个消费者协调器实例并会向 Kafka 集群中的某个节点发送 FindCoordinatorRequest 请求来查找对应的组协调器,并跟其建立网络连接。
客户端的消费者协调器和服务端的组协调器会通过心跳保持通信。
Kafka 舍弃 ZooKeeper 的理由
Kafka 目前强依赖于 ZooKeeper:ZooKeeper 为 Kafka 提供了元数据的管理,例如一些 Broker 的信息、主题数据、分区数据等等,还有一些选举、扩容等机制也都依赖 ZooKeeper。
- 运维复杂度
运维 Kafka 的同时需要保证一个高可用的 Zookeeper 集群,增加了运维和故障排查的复杂度。
- 性能差
- 在一些大公司,Kafka 集群比较大,分区数很多的时候,ZooKeeper 存储的元数据就会很多,性能就会变差。
- ZooKeeper 需要选举,选举的过程中是无法提供服务的。
- Zookeeper 节点如果频繁发生 Full Gc,与客户端的会话将超时,由于无法响应客户端的心跳请求,从而与会话相关联的临时节点也会被删除。
所以 Kafka 2.8 版本上支持内部的 quorum 服务来替换 ZooKeeper 的工作。