接口和方法

  • 所谓分区策略是决定生产者将消息发送到哪个分区的算法。
  • 实现接口 org.apache.kafka.clients.producer.Partitioner 即可修改分区策略

    1. public interface Partitioner extends Configurable, Closeable {
    2. int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);
    3. void close();
    4. }
  • 需要实现的方法

    1. int partition(String topic,
    2. Object key, byte[]
    3. keyBytes, Object value,
    4. byte[] valueBytes,
    5. Cluster cluster);
    • topickeykeyBytesvaluevalueBytes都属于消息数据
    • cluster则是集群信息(比如当前 Kafka 集群共有多少 topic 、多少 Broker 等)

分类

轮询 (Round-robin)

  • 默认情况下它是最合理的分区策略

随机(Randomness )

  • 并不随机
  • 需要自行实现
    1. @Override
    2. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){
    3. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    4. return ThreadLocalRandom.current().nextInt(partitions.size());
    5. }

    按消息键分区

    1. @Override
    2. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){
    3. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    4. return Math.abs(key.hashCode()) % partitions.size();
    5. }

自定义分区

  • 拿到上面的参数自行分区

框架实现

  • 实际上同时实现了两种策略
    • 如果指定了 Key,那么默认实现按消息键分区策略
    • 如果没有指定 Key,则使用轮询策略。