Kafka生产者分区策略

Kafka对于生产者生产消息的分区策略,默认提供三种策略供选择。
1、DefaultPartitioner:默认策略

  1. 如果ProducerRecord指定分区,则直接使用分区。
  2. 如果没指定分区,指定key,则对key hash对分区数求余,即为分区。
  3. 如果既没指定分区,也没指定key,采用粘滞分区策略。

生产者与消费者 - 图1

2、RoundRobinPartitioner:轮询策略

  1. 采用轮询方式进行分区

3、UniformStickyPartitioner:沾滞分区策略

  1. 如果ProducerRecord指定分区,则直接使用分区。
  2. 如果topic在本地存在默认Partition,则直接使用。
  3. 如果不存在则取所有可用的Partiton
    1. 可用的Partition数量小于1,从所有Partition中随机选取
    2. 可用的Partition数量为1,直接选取。
    3. 前面两个条件都不满足则直接,随机选取一个与之前topic使用的 Partition不同的Partition。

4、 自定义分区策略:实现Partitioner类 并在生产者端配置partitioner.class

消费者offset 机制

auto.commit.enable: true — 自动提交位移参数 auto.commit.interval.ms: 5000 — 如果enable.auto.commit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位)。

consumer默认采用自动提交位移策略,但是自动提交位移非常容易造成消息丢失,,在实际的生产环境中一般都是关闭自动提交位移,采用手动提交的方式,手动提交的方式非常灵活,会依据消费者提交的offset自动设置最新的偏移量。例如:我们实际消费的offset为15,但是手动提交了20,那么broker端会记录改消费者最新的offset为20,而不是15,并且不会报错。

手动提交策略

手动提交api有两种方式

  1. 同步 ```java void commitSync();

void commitSync(Duration timeout);

void commitSync(Map offsets);

void commitSync(final Map offsets, final Duration timeout);

  1. 在调用commitSync()时,Consumer会处于阻塞状态,直到远端的Broker返回提交结果,这个状态才会结束。在任何系统中,因为程序而非系统资源限制导致的阻塞都可能是系统的瓶颈,会影响整个系统系统的TPS
  2. 2. 异步
  3. ```java
  4. void commitAsync();
  5. void commitAsync(OffsetCommitCallback callback);
  6. void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

调用commitAsync之后,它会立即返回,不会阻塞,因此不会影响Consumer应用的TPS。由于此方法是异步的,Kafka提供了回调函数(callback),可以在此函数内实现提交之后的逻辑,比如记录日志或处理异常等。

注:commitAsync不能自动重试,commitAsync是异步操作,倘若提交失败后自动重试,此时重试时提交的offset可能早已经过期或不是最新值了。因此异步提交的重试没有意义,所以commitAsync不会重试。

细粒度提交offset (推荐)

Kafka无论是同步提交还是异步提交默认都是提交所有poll下来的消息的offset,如果我们一次性拉取大量的消息,那么我们可以分段提交offset,减少出错时的恢复时间

  1. Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
  2. AtomicInteger count = new AtomicInteger();
  3. try {
  4. ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(100));
  5. records.forEach(record -> {
  6. handlerRecord(record);
  7. offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));
  8. count.getAndIncrement();
  9. // 分区数处理的消息达到100后,进行提交
  10. if (count.get() % 100 == 0) {
  11. kafkaConsumer.commitAsync(offsets, null);
  12. }
  13. });
  14. } catch (Exception e) {
  15. // 业务处理失败处理或异步提交失败处理
  16. errHandler(e);
  17. } finally {
  18. try {
  19. kafkaConsumer.commitSync();
  20. } catch (Exception e) {
  21. // 提交失败最终处理
  22. }
  23. }