Kafka生产者分区策略
Kafka对于生产者生产消息的分区策略,默认提供三种策略供选择。
1、DefaultPartitioner:默认策略
- 如果ProducerRecord指定分区,则直接使用分区。
- 如果没指定分区,指定key,则对key hash对分区数求余,即为分区。
- 如果既没指定分区,也没指定key,采用粘滞分区策略。

2、RoundRobinPartitioner:轮询策略
- 采用轮询方式进行分区
3、UniformStickyPartitioner:沾滞分区策略
- 如果ProducerRecord指定分区,则直接使用分区。
- 如果topic在本地存在默认Partition,则直接使用。
- 如果不存在则取所有可用的Partiton
- 可用的Partition数量小于1,从所有Partition中随机选取
- 可用的Partition数量为1,直接选取。
- 前面两个条件都不满足则直接,随机选取一个与之前topic使用的 Partition不同的Partition。
4、 自定义分区策略:实现Partitioner类 并在生产者端配置partitioner.class
消费者offset 机制
auto.commit.enable: true — 自动提交位移参数 auto.commit.interval.ms: 5000 — 如果enable.auto.commit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位)。
consumer默认采用自动提交位移策略,但是自动提交位移非常容易造成消息丢失,,在实际的生产环境中一般都是关闭自动提交位移,采用手动提交的方式,手动提交的方式非常灵活,会依据消费者提交的offset自动设置最新的偏移量。例如:我们实际消费的offset为15,但是手动提交了20,那么broker端会记录改消费者最新的offset为20,而不是15,并且不会报错。
手动提交策略
手动提交api有两种方式
- 同步 ```java void commitSync();
void commitSync(Duration timeout);
void commitSync(Map
void commitSync(final Map
在调用commitSync()时,Consumer会处于阻塞状态,直到远端的Broker返回提交结果,这个状态才会结束。在任何系统中,因为程序而非系统资源限制导致的阻塞都可能是系统的瓶颈,会影响整个系统系统的TPS。2. 异步```javavoid commitAsync();void commitAsync(OffsetCommitCallback callback);void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
调用commitAsync之后,它会立即返回,不会阻塞,因此不会影响Consumer应用的TPS。由于此方法是异步的,Kafka提供了回调函数(callback),可以在此函数内实现提交之后的逻辑,比如记录日志或处理异常等。
注:commitAsync不能自动重试,commitAsync是异步操作,倘若提交失败后自动重试,此时重试时提交的offset可能早已经过期或不是最新值了。因此异步提交的重试没有意义,所以commitAsync不会重试。
细粒度提交offset (推荐)
Kafka无论是同步提交还是异步提交默认都是提交所有poll下来的消息的offset,如果我们一次性拉取大量的消息,那么我们可以分段提交offset,减少出错时的恢复时间。
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();AtomicInteger count = new AtomicInteger();try {ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(100));records.forEach(record -> {handlerRecord(record);offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));count.getAndIncrement();// 分区数处理的消息达到100后,进行提交if (count.get() % 100 == 0) {kafkaConsumer.commitAsync(offsets, null);}});} catch (Exception e) {// 业务处理失败处理或异步提交失败处理errHandler(e);} finally {try {kafkaConsumer.commitSync();} catch (Exception e) {// 提交失败最终处理}}
