分区分配策略
Kafka提供了消费者客户端参数partition.assignment.strategy来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。除此之外,Kafka还提供了另外两种分配策略:RoundRobinAssignor 和 StickyAssignor。消费者客户端参数 partition.assignment.strategy可以配置多个分配策略,彼此之间以逗号分隔。默认使用的是RangeAssignor

RangeAssignor分配策略

RangeAssignor 分配策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个主题,RangeAssignor策略会将消费组内所有订阅这个主题的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。注意:可能出现部分消费者过载的情况。

  1. public class RangeAssignor extends AbstractPartitionAssignor {
  2. @Override
  3. public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
  4. Map<String, Subscription> subscriptions) {
  5. Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
  6. Map<String, List<TopicPartition>> assignment = new HashMap<>();
  7. for (String memberId : subscriptions.keySet())
  8. assignment.put(memberId, new ArrayList<>());
  9. for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
  10. String topic = topicEntry.getKey();
  11. List<MemberInfo> consumersForTopic = topicEntry.getValue();
  12. Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
  13. if (numPartitionsForTopic == null)
  14. continue;
  15. Collections.sort(consumersForTopic);
  16. int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
  17. int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
  18. List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
  19. for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
  20. int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
  21. int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
  22. assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
  23. }
  24. }
  25. return assignment;
  26. }
  27. }

RoundRobinAssignor分配策略

RoundRobinAssignor分配策略的原理是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。RoundRobinAssignor分配策略对应的 partition.assignment.strategy 参数值为org.apache.kafka.clients.consumer.RoundRobinAssignor。

如果同一个消费组内的消费者订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能导致分区分配得不均匀。如果某个消费者没有订阅消费组内的某个主题,那么在分配分区的时候此消费者将分配不到这个主题的任何分区。

  1. public class RoundRobinAssignor extends AbstractPartitionAssignor {
  2. @Override
  3. public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
  4. Map<String, Subscription> subscriptions) {
  5. Map<String, List<TopicPartition>> assignment = new HashMap<>();
  6. List<MemberInfo> memberInfoList = new ArrayList<>();
  7. for (Map.Entry<String, Subscription> memberSubscription : subscriptions.entrySet()) {
  8. assignment.put(memberSubscription.getKey(), new ArrayList<>());
  9. memberInfoList.add(new MemberInfo(memberSubscription.getKey(),
  10. memberSubscription.getValue().groupInstanceId()));
  11. }
  12. CircularIterator<MemberInfo> assigner = new CircularIterator<>(Utils.sorted(memberInfoList));
  13. for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
  14. final String topic = partition.topic();
  15. while (!subscriptions.get(assigner.peek().memberId).topics().contains(topic))
  16. assigner.next();
  17. assignment.get(assigner.next().memberId).add(partition);
  18. }
  19. return assignment;
  20. }
  21. }

StickyAssignor分配策略

它主要有两个目的:

(1)分区的分配要尽可能均匀。
(2)分区的分配尽可能与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor分配策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂得多

  1. org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;

自定义分区分配策略

自定义的分配策略必须要实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor接口。ConsumerPartitionAssignor接口的定义如下:
GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);

消费者协调器和组协调器

了解了Kafka 中消费者的分区分配策略之后是否会有这样的疑问:如果消费者客户端中配置了两个分配策略,那么以哪个为准呢?如果有多个消费者,彼此所配置的分配策略并不完全相同,那么以哪个为准?多个消费者之间的分区分配是需要协同的,那么这个协同的过程又是怎样的呢?这一切都是交由消费者协调器(ConsumerCoordinator)和组协调器(GroupCoordinator)来完成的,它们之间使用一套组协调协议进行交互。

————————————————
版权声明:本文为CSDN博主「sunhyly」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/ashylya/article/details/106680737