生产者组件图

image.png

摘要

Kafka定义了一个分区策略接口, 通过具体的实现策略来指定消息发送的分区。

  1. public interface Partitioner extends Configurable, Closeable {
  2. /**
  3. * Compute the partition for the given record.
  4. *
  5. * @param topic The topic name
  6. * @param key The key to partition on (or null if no key)
  7. * @param keyBytes The serialized key to partition on( or null if no key)
  8. * @param value The value to partition on or null
  9. * @param valueBytes The serialized value to partition on or null
  10. * @param cluster The current cluster metadata
  11. */
  12. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
  13. /**
  14. * This is called when partitioner is closed.
  15. */
  16. public void close();
  17. }

Kafka内置了多种分区策略,同时也支持自定义分区策略,通过实现Partitioner接口,并在配置中指定partitioner.class 属性等于自定义策略Class的全限定名称。

默认策略(粘性分区策略+随机)

Kafka2.7.0版本里,默认策略命名为: DefaultPartitioner 主要逻辑为:

  • 如果消息记录里指定了分区,则使用
  • 如果消息记录里未指定分区,但是包含Key, 则基于Key做Hash,然后取hash值与分区数量取模
  • 如果消息里未指定分区,且Key为空,则采用粘性分区策略(消息记录对应的消息批次满了后,通过随机策略更换新的分区)

代码里通过ThreadLocalRandom.current().nextInt()来获取随机数,然后转为正数后与分区数量取模,
所以这里明显就是采用的随机算法!

  1. if (availablePartitions.size() < 1) {
  2. Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
  3. newPart = random % partitions.size();
  4. } else if (availablePartitions.size() == 1) {
  5. newPart = availablePartitions.get(0).partition();
  6. } else {
  7. while (newPart == null || newPart.equals(oldPart)) {
  8. Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
  9. newPart = availablePartitions.get(random % availablePartitions.size()).partition();
  10. }
  11. }

随机策略

所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示
image.png

策略实现

未指定Key,委托到内部的StickyPartitionCache 来实现分区。

  1. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
  2. int numPartitions) {
  3. if (keyBytes == null) {
  4. return stickyPartitionCache.partition(topic, cluster);
  5. }
  6. // hash the keyBytes to choose a partition
  7. return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  8. }

轮询策略(RoundRobinPartitioner)

即顺序分配。比如一个主题下有3个分区,那么第一条消息被发送到分区0,第二条被发送到分区1,第三条被发送到分区2,以此类推。当生产第4条消息时又会重新开始,即将其分配到分区0,就像下面这张图展示的那样.
image.png
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一.

策略实现

关键逻辑:内部在内存里维持了一个Topic到AtomicInteger的ConcurrentHashMap, 计算分区时,通过AtomicInteger.getAndIncrement() 与主题分区数取模。如果AomicInteger自增溢出后,通过与Integer.MAX_VALUE取与,转换为整数, 继续与分区数取模,得到分区的轮询值

  1. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  2. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  3. int numPartitions = partitions.size();
  4. int nextValue = nextValue(topic);
  5. List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
  6. if (!availablePartitions.isEmpty()) {
  7. int part = Utils.toPositive(nextValue) % availablePartitions.size();
  8. return availablePartitions.get(part).partition();
  9. } else {
  10. // no partitions are available, give a non-available partition
  11. return Utils.toPositive(nextValue) % numPartitions;
  12. }
  13. }

轮询验证

  1. /**
  2. * @author xiele on 2021/2/19
  3. */
  4. public class RoundRobin {
  5. private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
  6. private static final String topic = "mockTopic";
  7. private static final int partitionNums = 10;
  8. public static void main(String[] args) {
  9. RoundRobin rr = new RoundRobin();
  10. rr.modForPartitionsWhenOverflow();
  11. }
  12. /**
  13. Case1: AtomicInteger在正数范围类自增,与分区数取模
  14. * 验证轮询取模算法
  15. nextValue=0, mod=0
  16. nextValue=1, mod=1
  17. nextValue=2, mod=2
  18. nextValue=3, mod=3
  19. nextValue=4, mod=4
  20. nextValue=5, mod=5
  21. nextValue=6, mod=6
  22. nextValue=7, mod=7
  23. nextValue=8, mod=8
  24. nextValue=9, mod=9
  25. nextValue=10, mod=0
  26. nextValue=11, mod=1
  27. ...
  28. nextValue=97, mod=7
  29. nextValue=98, mod=8
  30. nextValue=99, mod=9
  31. */
  32. private void modForPartitions() {
  33. int nextValue;
  34. while ((nextValue = nextValue(topic)) < 100) {
  35. int mod = nextValue % partitionNums;
  36. System.out.println("nextValue=" + nextValue + ", mod=" + mod);
  37. }
  38. }
  39. /**
  40. Case2: AtomicInteger自增到异常-转为正式继续自增,与分区数取模
  41. * 验证内存里的AtomicInteger轮询溢出的情况
  42. nextValue=2147483647,newNextValue=2147483647,mod=7
  43. nextValue=-2147483648,newNextValue=0,mod=0
  44. nextValue=-2147483647,newNextValue=1,mod=1
  45. nextValue=-2147483646,newNextValue=2,mod=2
  46. nextValue=-2147483645,newNextValue=3,mod=3
  47. nextValue=-2147483644,newNextValue=4,mod=4
  48. nextValue=-2147483643,newNextValue=5,mod=5
  49. nextValue=-2147483642,newNextValue=6,mod=6
  50. nextValue=-2147483641,newNextValue=7,mod=7
  51. nextValue=-2147483640,newNextValue=8,mod=8
  52. nextValue=-2147483639,newNextValue=9,mod=9
  53. nextValue=-2147483638,newNextValue=10,mod=0
  54. nextValue=-2147483637,newNextValue=11,mod=1
  55. */
  56. private void modForPartitionsWhenOverflow() {
  57. AtomicInteger count = new AtomicInteger(Integer.MAX_VALUE);
  58. int i = 0;
  59. while (i++ < 100) {
  60. int nextValue = count.getAndIncrement();
  61. int newNextValue = Utils.toPositive(nextValue);
  62. System.out.println("nextValue=" + nextValue + ",newNextValue=" + newNextValue + ",mod=" + (newNextValue % partitionNums));
  63. }
  64. }
  65. private int nextValue(String topic) {
  66. AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> new AtomicInteger(0));
  67. return counter.getAndIncrement();
  68. }
  69. }

**

消息Key保序策略

也称Key-ordering策略。Kafka允许为每条消息定义消息键,简称为Key。这个Key的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务ID等;也可以用来表征消息元数据。特别是在Kafka不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进Key里面的。一旦消息被定义了Key,那么你就可以保证同一个Key的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示。
image.png
如相同类型的Key, 相关规则的Key, 位于同一分区. Kafka默认的分区策略中也包含了基于Key Hash的策略。

  1. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
  2. int numPartitions) {
  3. // hash the keyBytes to choose a partition
  4. return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  5. }

其他/自定义

基于实际场景来抉择,例如地理位置,商家信息,大客户。
以下示例为某个大客户的数据单独指定固定的分区,避免与其他耦合在一起,隔离数据,提升处理性能。

  1. public class BananaPartitioner implements Partitioner {
  2. public void configure(Map<String, ?> configs) {}
  3. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  4. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  5. int numPartitions = partitions.size();
  6. if ((keyBytes == null) || (!(key instanceOf String)))
  7. throw new InvalidRecordException("We expect all messages to have customer name as key")
  8. if (((String) key).equals("Banana"))
  9. return numPartitions; // Banana总是被分配到最后一个分区
  10. // 其他记录被散列到其他分区
  11. return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)) }
  12. public void close() {}
  13. }