区分
从用户的角度来说,位移提交分为自动提交和手动提交
从 Consumer 端的角度来说,位移提交分为同步提交和异步提交
代码
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "2000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));
用户端
自动提交
- Consumer 端参数叫
enable.auto.commit,如果值是 true,则 Consumer 在后台默默地为你定期提交位移 - 自动提交的提交间隔由一个
auto.commit.interval.ms来控制。
缺点
- rebalance 时, consumer 会重新从上一次 commit 的 offset 消费,可能会造成重复消费
手动提交
- 设置
enable.auto.commit = false - 使用 Kafka Consumer API 提供的位移提交的方法,
- 比如
KafkaConsumer#commitSync()
- 比如
消费端
同步提交
KafkaConsumer#commitSync()- 方法堵塞,tps 减低
异步提交
KafkaConsumer#commitAsync()- 提供回调
- 但是不会重试
最佳实践
结合优点
- 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。
- 因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
- 我们不希望程序总处于阻塞状态,影响 TPS
try {while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息commitAysnc(); // 使用异步提交规避阻塞}} catch (Exception e) {handle(e); // 处理异常} finally {try {consumer.commitSync(); // 最后一次提交使用同步阻塞式提交} finally {consumer.close();}}
缺点
- 上面如果 while 循环出现异常,最后调用
Consumer#commitSync()如果是无参的,会提交最大的 offset,导致丢失消息
切分提交
commitSync(Map<TopicPartition, OffsetAndMetadata>)commitAsync(Map<TopicPartition, OffsetAndMetadata>)- 键就是 TopicPartition,即消费的分区,而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();int count = 0;try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record: records) {process(record); // 处理消息offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1);// 累计 100 条消息就统一提交一次位移if(count % 100 == 0)consumer.commitAsync(offsets, null); // 回调处理逻辑是 nullcount++;}}} catch (Exception e) {// do} finally {if(count % 100 == 0)consumer.commitSync(offsets, null); // 回调处理逻辑是 nullcount++;}
