Kafka 创建 Topic 时如何将分区放置到不同的 Broker 中 Kafka 分区 分区数与 Broker 数

在 kafka 中, topic 是一个存储消息的逻辑概念,partition分区是 topic 的进一步拆分,每个 topic 可以拆分为多个 partition 分区,类似于数据库中表的水平拆分。

  • 一个 Partition 只对应一个 Broker,一个 Broker 可以管理多个 Partition。
  • 同一个 Topic 下,不同的 Partition 里的消息是不同的。
  • 在一个 Partition 里消息是有序的,Kafka 不保证跨分区的消息有序性。

——> 如果要保证消息的全局有序,那么可以将 Partition 设置为 1。

  • 可以动态的增加 Partition 的数量,但是不支持减少。
  • 对于同一个消费组,一个 Partition 只能被一个消费者消费。即分区数决定了同组消费者个数的上限。一般不建议同组消费者的数量大于分区数。
  • 分区越多,理论上整个集群所能达到的吞吐量就越大。
  • 一个 Partition 对应着一个日志文件夹,(文件夹的命名规则为 topic 名称+分区序号,比如 test1-0),该文件夹下会有多个分段的 log 日志文件,即分 segment 存储。
  • partition 的数量上限是没有限制的。

kafka 中关于topic、broker 及 partition 相关参数的几点说明:
1:num.partitions 参数
默认值为1,可增加 topic 的 partition 数量,不可减少其个数。
kafka 集群通过 partition 对 topic 进行横向扩展,当有新 broker 加入 kafka 集群,可通过 hash 调用 partition 个数负载均衡。
partition 数量选定标准:

  • 主题吞吐量。
  • 单个 partition 读取数据的最大吞吐量。
  • 每个 broker 包含的 partition 个数、可用磁盘空间和网络带宽。
  • 单个 broker 对 partition 个数有限制,partition 越多,占用的内存越多,完成选举所需时间越长。


Partition 分配算法

为了更好的做负载均衡,Kafka 尽量将所有的 Partition 均匀分配到整个集群上。Kafka 分配 Replica 的算法如下:

  • 将所有存活的 N 个 Brokers 和待分配的 Partition 排序;
  • 将第 i 个 Partition 分配到第 (i mod n) 个 Broker 上,这个 Partition 的第一个 Replica 存在于这个分配的 Broker 上,并且会作为partition 的优先副本;
  • 将第 i 个 Partition 的第 j 个Replica分配到第 ((i + j) mod n) 个 Broker 上。

假设现在有 5 个 Broker,分区数为 5,副本为 3 的主题,按照上面的说法,主题最终分配在整个集群的样子如下: 分区 - 图1 但事实真的是这样的吗?实际上如果真按照这种算法,会存在以下明显几个问题:

  • 所有主题的第一个分区都是存放在第一个 Broker 上,这样会造成第一个 Broker 上的分区总数多于其他的 Broker,这样就失去了负载均衡的目的;
  • 如果主题的分区数多于 Broker 的个数,多于的分区都是倾向于将分区发放置在前几个 Broker 上,同样导致负载不均衡。

随机的从 broker 列表里取一个 broker 当做起始位置,由此顺序的分配 partition;然后随机获取 nextReplicaShift,通过该参数确定副本 partition 的相对于 partition0 的偏移量。然后顺序的分配副本 Partition 的 broker。

  1. def assignReplicasToBrokers(brokerList: Seq[Int],
  2. nPartitions: Int,
  3. replicationFactor: Int,
  4. fixedStartIndex: Int = -1,
  5. startPartitionId: Int = -1)
  6. : Map[Int, Seq[Int]] = {
  7. if (nPartitions <= 0)
  8. throw new AdminOperationException("number of partitions must be larger than 0")
  9. if (replicationFactor <= 0)
  10. throw new AdminOperationException("replication factor must be larger than 0")
  11. if (replicationFactor > brokerList.size)
  12. throw new AdminOperationException("replication factor: " + replicationFactor +
  13. " larger than available brokers: " + brokerList.size)
  14. val ret = new mutable.HashMap[Int, List[Int]]()
  15. // 随机获取 startIndex
  16. val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
  17. var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0
  18. // 随机获取副本的偏移位置
  19. var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
  20. for (i <- 0 until nPartitions) {
  21. if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
  22. nextReplicaShift += 1
  23. val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
  24. var replicaList = List(brokerList(firstReplicaIndex))
  25. for (j <- 0 until replicationFactor - 1)
  26. replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
  27. ret.put(currentPartitionId, replicaList.reverse)
  28. currentPartitionId = currentPartitionId + 1
  29. }
  30. ret.toMap
  31. }
  32. private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  33. val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  34. (firstReplicaIndex + shift) % nBrokers
  35. }
  • Partition 数量不能小于等于 0。
  • 副本因子不能大于 Broker 的个数;
  • 第一个分区(编号为0)的第一个副本放置位置是随机从 brokerList 选择的;
  • 其他分区的第一个副本放置位置相对于第 0 个分区依次往后移。也就是如果我们有 5 个 Broker,5 个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个 Broker 上,依次类推;
  • 剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是随机产生的;

    分区 leader 选举

    从 ISR 列表里选第一个 broker 作为 Leader

Controller 感知到分区 Leader 所在的 broker 挂了(Controller 监听了很多 zk 节点可以感知到 broker 存活),Controller 会从 ISR 列表(参数 unclean.leader.election.enable=false 的前提下)里挑第一个 broker 作为 Leader (第一个 broker 最先放进 ISR 列表,可能是同步数据最多的副本),如果参数 unclean.leader.election.enable 为 true,代表在 ISR 列表里所有副本都挂了的时候可以在 ISR 列表以外的副本中选 Leader,这种设置,可以提高可用性,但是选出的新 Leader 有可能数据少很多。

消费者分区分配策略

消费者 Rebalance

Kafka 里一个分区只能被一个消费组里的一个消费者消费。 Rebalance 就是将一个 Topic 下的 Partition 交由 Consumer Group 里的指定的消费者消费的过程。就是建立 Partition 与 Consumer 的对应关系。

Rebalance 就是说如果消费组里的消费者数量有变化或消费的分区数有变化,Kafka 会重新分配消费者消费分区的关系。比如 Consumer Group 中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,那么又会把一些分区重新交还给他。
注意:Rebalance 只针对 subscribe 这种不指定分区消费的情况,如果通过 assign 这种消费方式指定了分区,Kafka 不会进行 Rebanlance。
如下情况可能会触发消费者 Rebalance:

  1. 消费组里的 Consumer 增加或减少了;
  2. 动态给 topic 增加了分区;
  3. 消费组订阅了更多的 topic。

Rebalance 过程中,消费者无法从 Kafka 消费消息,这对 Kafka 的 TPS 会有影响,如果 Kafka 集群内节点较多,比如数百个,那重平衡可能会耗时极多,所以应尽量避免在系统高峰期的 Rebalance 发生。

消费者 Rebalance 分区分配策略

一个 Consumer Group 中有多个 Consumer,一个 topic 有多个 Partition,所以必然会涉及到 Partition 的分配问题,即确定那个 Partition 由哪个 Consumer 来消费。
主要有三种 Rebalance的策略:range、round-robin、sticky。
Kafka 提供了消费者客户端参数 partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。默认情况为 range 分配策略
假设一个主题有 10 个分区(0-9),现在有三个 Consumer 消费:

range 策略

Range 策略就是按照分区序号排序。
假设 n=分区数/消费者数量 = 3, m=分区数%消费者数量 = 1,那么前 m 个消费者每个分配 n+1 个分区,后面的(消费者数量-m )个消费者每个分配 n 个分区。
Consumer1:0,1,2,3
Consumer2:4,5,6
Consumer3:7,8,9

基本原则是按区间等分,如果没法均等分配的话,前面的分区在一开始分配的时候就会多分配

round-robin 策略

Consumer1:0,3,6,9
Consumer2:1,4,7
Consumer3:2,5,8

sticky 策略

sticky 策略初始时分配策略与 round-robin 类似,但是在 Rebalance 的时候,需要保证如下两个原则。
1)分区的分配要尽可能均匀 。
2)分区的分配尽可能与上次分配的保持相同。
当两者发生冲突时,第一个目标优先于第二个目标 。这样可以最大程度维持原来的分区分配的策略。
比如对于第一种 range 情况的分配,如果第三个 Consumer 挂了,那么重新用 sticky 策略分配的结果如下:
consumer1 除了原有的 0~3,会再分配一个 7;
consumer2 除了原有的 4~6,会再分配 8 和 9。

Rebalance 过程

分区方案由谁制定? 如果由 Kafka broker 来制定,那么每次新增消费者,broker 都需要进行 rebalance,broker 的压力会比较大,影响 Kafka 性能。因此是由消费者来制定的。分区方案制定完成后需要再同步给组内的所有的消费者,而每个消费者之间是没法直接通信的,因此这个通信交由 broker 来完成。 GroupCoordinate:broker 端的协调者,消费者对应的 __consumer_offsets 的分区的 Leader; ConsumerCoordinate:消费组的协调者,由第一个加入消费组的消费者担任。

消费组里的 Coordinator 指定分区方案,然后将分区方案同步给 Kafka 的 Coordinator,然后由 GroupCoordinate 将分区方案同步给所有的消费者。

当有消费者加入消费组时,消费者、消费组及组协调器之间会经历以下几个阶段。
image.png

第一阶段:选择组协调器

组协调器 GroupCoordinator:每个 Consumer Group 都会选择一个 broker 作为自己的组协调器 Coordinator,负责监控这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者 Rebalance。
Consumer Group 中的每个 Consumer 启动时会向 Kafka 集群中的某个节点发送 FindCoordinatorRequest 请求来查找对应的组协调器 GroupCoordinator,并跟其建立网络连接。
组协调器选择方式
Consumer 消费的 offset 要提交到 __consumer_offsets 的哪个分区,这个分区 Leader 对应的 broker 就是这个Consumer Group 的 Coordinator。

第二阶段:加入消费组 JOIN GROUP

在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。然后 GroupCoordinator 从一个 Consumer Group 中选择第一个加入 Group 的 Consumer 作为 Leader(消费组协调器),把 Consumer Group 情况发送给这个 Leader,接着这个 Leader 会负责制定分区方案。

第三阶段:SYNC GROUP

Consumer Leader 通过给 GroupCoordinator 发送 SyncGroupRequest,接着 GroupCoordinator 就把分区方案下发给各个 Consumer,他们会根据指定分区的 Leader broker 进行网络连接以及消息消费。

生产者分区策略(消息路由)

producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:
(1)指明 partition 的情况下,直接将指明值作为 partiton 值;
(2)没有指明 partition 值,但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。