区分
从用户的角度来说,位移提交分为自动提交和手动提交
从 Consumer 端的角度来说,位移提交分为同步提交和异步提交
代码
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
用户端
自动提交
- Consumer 端参数叫
enable.auto.commit
,如果值是 true,则 Consumer 在后台默默地为你定期提交位移 - 自动提交的提交间隔由一个
auto.commit.interval.ms
来控制。
缺点
- rebalance 时, consumer 会重新从上一次 commit 的 offset 消费,可能会造成重复消费
手动提交
- 设置
enable.auto.commit = false
- 使用 Kafka Consumer API 提供的位移提交的方法,
- 比如
KafkaConsumer#commitSync()
- 比如
消费端
同步提交
KafkaConsumer#commitSync()
- 方法堵塞,tps 减低
异步提交
KafkaConsumer#commitAsync()
- 提供回调
- 但是不会重试
最佳实践
结合优点
- 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。
- 因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
- 我们不希望程序总处于阻塞状态,影响 TPS
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch (Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
缺点
- 上面如果 while 循环出现异常,最后调用
Consumer#commitSync()
如果是无参的,会提交最大的 offset,导致丢失消息
切分提交
commitSync(Map<TopicPartition, OffsetAndMetadata>)
commitAsync(Map<TopicPartition, OffsetAndMetadata>)
- 键就是 TopicPartition,即消费的分区,而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1);
// 累计 100 条消息就统一提交一次位移
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回调处理逻辑是 null
count++;
}
}
} catch (Exception e) {
// do
} finally {
if(count % 100 == 0)
consumer.commitSync(offsets, null); // 回调处理逻辑是 null
count++;
}