生产者分区机制

概念

Kafka 有主题(Topic)的概念,它是承载真实数据的逻辑容器,而在主题之下还分为若干个分区,也就是说 Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份.
image.png

分区的作用

  • 提供负载均衡的能力
  • 实现高伸缩性, 可以增加新的节点机器来增加整体系统的吞吐量

分区策略

Kafka 默认分区策略实际上同时实现了两种策略:如果指定了 Key,那么默认实现按消息键保序策略;如果没有指定 Key,则使用轮询策略。

轮询

  • Round-robin, 顺序分配
  • 轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一

image.png

随机

  • Randomness, 实现示例
  • 均匀分布不如随机, 随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。 ```java

List partitions = cluster.partitionsForTopic(topic); return ThreadLocalRandom.current().nextInt(partitions.size());

  1. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/281275/1636552976241-6730f66a-c278-4376-a3e4-1abf151e944a.png#clientId=u618fab6c-aea0-4&from=paste&id=u67b79b0b&margin=%5Bobject%20Object%5D&name=image.png&originHeight=948&originWidth=3414&originalType=url&ratio=1&size=166100&status=done&style=none&taskId=u7e20844a-e2e1-4ccd-b15f-4f4479528ce)
  2. <a name="dGWex"></a>
  3. #### 按消息键保序
  4. - Kafka 允许为每条消息定义消息键,简称为 Key, 它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据
  5. - 一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略
  6. - 实现示例
  7. ```java
  8. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  9. return Math.abs(key.hashCode()) % partitions.size();

image.png

自定义分区策略

  1. 实现org.apache.kafka.clients.producer.Partitioner接口的partition()和close()方法, 通常只需要实现最重要的 partition 方法
  2. 设置partitioner.class参数为你自己实现类的 Full Qualified Name

示例: 按Broker 所在的 IP 地址实现定制化的分区策略

  1. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  2. return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

生产者压缩算法

消息版本

对于每个版本的消息结构的细节,可以参考kafka官方文档的5.3 Message Format 章

目前 Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。

消息(v1叫message,v2叫record)是分批次(batch)读写的,batch是kafka读写(网络传输和文件读写)的基本单位,不同版本,对相同(或者叫相似)的概念,叫法不一样。
v1(kafka 0.11.0之前):message set, message
v2(kafka 0.11.0以后):record batch,record

v2版本的升级:

  • 不再对每条消息进行CRC校验, 而在消息集合上进行校验
  • v1版本是将多条消息压缩后保存到外层消息的消息体字段中, 而v2是对整个消息集合进行压缩

压缩过程

Producer 端压缩、Broker 端保持、Consumer 端解压缩。

  • producer开启压缩示例, kafka会将压缩算法封装进消息集合中, consumer读取时会知道使用哪种算法解压

    1. Properties props = new Properties();
    2. props.put("bootstrap.servers", "localhost:9092");
    3. props.put("acks", "all");
    4. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    5. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    6. // 开启GZIP压缩
    7. props.put("compression.type", "gzip");
    8. Producer<String, String> producer = new KafkaProducer<>(props);
  • Broker端也会解压, 目的是为了验证消息正确性

  • 注意, 若Broker端指定与Producer端不同的算法, 则会先解压再压缩, 使得Broker端CPU飙升; 若存在多个消息版本(存在老版本的消费者程序), Broker需要对版本进行转换, 无法使用”Zero Copy”技术, 对性能有很大影响

压缩算法的对比

吞吐量上, LZ4>Snappy>zstd>GZIP
压缩比上, zstd>LZ4>GZIP>Snappy

最佳实践

如果Producer机器上CPU资源充足, 环境中带宽资源有限, 则建议开启zstd压缩, 能极大节省网络资源消耗

幂等Producer

在0.11后, 可通过以下配置指定producer的幂等性, 配置后kafka将自动做消息的重复去重, 实现原理是在broker端多保存些字段, 当producer发送了具有相同字段值的消息后, Broker即丢弃掉, 但注意幂等性 Producer 的作用范围

  • 仅保证单分区的幂等性(ps, 仅这一条我就觉得没啥卵用)
  • 仅实现单会话的幂等性, 当重启Producer进程后, 即丧失
    1. props.put(“enable.idempotence”, ture)
    2. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG true)

事务型Producer

事务型Producer在发送消息时, 能保证一批消息全部操作成功或失败, 配置如下

  • 同幂等Producer, 设置enable.idempotence = true
  • transactional. id, 最好为有意义的业务名 ```java

producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch (KafkaException e) { producer.abortTransaction(); } ``` 消费端业务配置隔离级别, isolation.level

  • read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
  • read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

补充: 原理参考: https://www.jianshu.com/p/f77ade3f41fd