接口和方法
- 所谓分区策略是决定生产者将消息发送到哪个分区的算法。
实现接口
org.apache.kafka.clients.producer.Partitioner
即可修改分区策略public interface Partitioner extends Configurable, Closeable {
int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);
void close();
}
需要实现的方法
int partition(String topic,
Object key, byte[]
keyBytes, Object value,
byte[] valueBytes,
Cluster cluster);
topic
、key
、keyBytes
、value
和valueBytes
都属于消息数据cluster
则是集群信息(比如当前 Kafka 集群共有多少 topic 、多少 Broker 等)
分类
轮询 (Round-robin)
- 默认情况下它是最合理的分区策略
随机(Randomness )
- 并不随机
- 需要自行实现
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
}
按消息键分区
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
}
自定义分区
- 拿到上面的参数自行分区
框架实现
- 实际上同时实现了两种策略
- 如果指定了 Key,那么默认实现按消息键分区策略
- 如果没有指定 Key,则使用轮询策略。