生产者分区机制
概念
Kafka 有主题(Topic)的概念,它是承载真实数据的逻辑容器,而在主题之下还分为若干个分区,也就是说 Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份.
分区的作用
- 提供负载均衡的能力
- 实现高伸缩性, 可以增加新的节点机器来增加整体系统的吞吐量
分区策略
Kafka 默认分区策略实际上同时实现了两种策略:如果指定了 Key,那么默认实现按消息键保序策略;如果没有指定 Key,则使用轮询策略。
轮询
- Round-robin, 顺序分配
- 轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一
随机
- Randomness, 实现示例
- 均匀分布不如随机, 随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。 ```java
List
![image.png](https://cdn.nlark.com/yuque/0/2021/png/281275/1636552976241-6730f66a-c278-4376-a3e4-1abf151e944a.png#clientId=u618fab6c-aea0-4&from=paste&id=u67b79b0b&margin=%5Bobject%20Object%5D&name=image.png&originHeight=948&originWidth=3414&originalType=url&ratio=1&size=166100&status=done&style=none&taskId=u7e20844a-e2e1-4ccd-b15f-4f4479528ce)
<a name="dGWex"></a>
#### 按消息键保序
- Kafka 允许为每条消息定义消息键,简称为 Key, 它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据
- 一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略
- 实现示例
```java
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
自定义分区策略
- 实现org.apache.kafka.clients.producer.Partitioner接口的partition()和close()方法, 通常只需要实现最重要的 partition 方法
- 设置partitioner.class参数为你自己实现类的 Full Qualified Name
示例: 按Broker 所在的 IP 地址实现定制化的分区策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
生产者压缩算法
消息版本
对于每个版本的消息结构的细节,可以参考kafka官方文档的5.3 Message Format 章
目前 Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。
消息(v1叫message,v2叫record)是分批次(batch)读写的,batch是kafka读写(网络传输和文件读写)的基本单位,不同版本,对相同(或者叫相似)的概念,叫法不一样。
v1(kafka 0.11.0之前):message set, message
v2(kafka 0.11.0以后):record batch,record
v2版本的升级:
- 不再对每条消息进行CRC校验, 而在消息集合上进行校验
- v1版本是将多条消息压缩后保存到外层消息的消息体字段中, 而v2是对整个消息集合进行压缩
压缩过程
Producer 端压缩、Broker 端保持、Consumer 端解压缩。
producer开启压缩示例, kafka会将压缩算法封装进消息集合中, consumer读取时会知道使用哪种算法解压
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启GZIP压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
Broker端也会解压, 目的是为了验证消息正确性
- 注意, 若Broker端指定与Producer端不同的算法, 则会先解压再压缩, 使得Broker端CPU飙升; 若存在多个消息版本(存在老版本的消费者程序), Broker需要对版本进行转换, 无法使用”Zero Copy”技术, 对性能有很大影响
压缩算法的对比
吞吐量上, LZ4>Snappy>zstd>GZIP
压缩比上, zstd>LZ4>GZIP>Snappy
最佳实践
如果Producer机器上CPU资源充足, 环境中带宽资源有限, 则建议开启zstd压缩, 能极大节省网络资源消耗
幂等Producer
在0.11后, 可通过以下配置指定producer的幂等性, 配置后kafka将自动做消息的重复去重, 实现原理是在broker端多保存些字段, 当producer发送了具有相同字段值的消息后, Broker即丢弃掉, 但注意幂等性 Producer 的作用范围
- 仅保证单分区的幂等性(ps, 仅这一条我就觉得没啥卵用)
- 仅实现单会话的幂等性, 当重启Producer进程后, 即丧失
props.put(“enable.idempotence”, ture)
或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
事务型Producer
事务型Producer在发送消息时, 能保证一批消息全部操作成功或失败, 配置如下
- 同幂等Producer, 设置enable.idempotence = true
- transactional. id, 最好为有意义的业务名 ```java
producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch (KafkaException e) { producer.abortTransaction(); } ``` 消费端业务配置隔离级别, isolation.level
- read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
- read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
补充: 原理参考: https://www.jianshu.com/p/f77ade3f41fd