区分

从用户的角度来说,位移提交分为自动提交和手动提交
从 Consumer 端的角度来说,位移提交分为同步提交和异步提交


代码

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("group.id", "test");
  4. props.put("enable.auto.commit", "true");
  5. props.put("auto.commit.interval.ms", "2000");
  6. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  7. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  8. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  9. consumer.subscribe(Arrays.asList("foo", "bar"));

用户端

自动提交

  • Consumer 端参数叫 enable.auto.commit,如果值是 true,则 Consumer 在后台默默地为你定期提交位移
  • 自动提交的提交间隔由一个 auto.commit.interval.ms 来控制。

缺点

  • rebalance 时, consumer 会重新从上一次 commit 的 offset 消费,可能会造成重复消费

手动提交

  • 设置 enable.auto.commit = false
  • 使用 Kafka Consumer API 提供的位移提交的方法,
    • 比如 KafkaConsumer#commitSync()

消费端

同步提交

  • KafkaConsumer#commitSync()
    • 方法堵塞,tps 减低

异步提交

  • KafkaConsumer#commitAsync()
    • 提供回调
    • 但是不会重试

最佳实践

结合优点

  1. 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动Broker 端 GC 等。
    1. 因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
  2. 我们不希望程序总处于阻塞状态,影响 TPS
    1. try {
    2. while (true) {
    3. ConsumerRecords<String, String> records =
    4. consumer.poll(Duration.ofSeconds(1));
    5. process(records); // 处理消息
    6. commitAysnc(); // 使用异步提交规避阻塞
    7. }
    8. } catch (Exception e) {
    9. handle(e); // 处理异常
    10. } finally {
    11. try {
    12. consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
    13. } finally {
    14. consumer.close();
    15. }
    16. }

    缺点

  • 上面如果 while 循环出现异常,最后调用 Consumer#commitSync() 如果是无参的,会提交最大的 offset,导致丢失消息

切分提交

  • commitSync(Map<TopicPartition, OffsetAndMetadata>)
  • commitAsync(Map<TopicPartition, OffsetAndMetadata>)
    • 键就是 TopicPartition,即消费的分区,而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据
  1. private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
  2. int count = 0;
  3. try {
  4. while (true) {
  5. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  6. for (ConsumerRecord<String, String> record: records) {
  7. process(record); // 处理消息
  8. offsets.put(new TopicPartition(record.topic(), record.partition()),
  9. new OffsetAndMetadata(record.offset() + 1);
  10. // 累计 100 条消息就统一提交一次位移
  11. ifcount % 100 == 0
  12. consumer.commitAsync(offsets, null); // 回调处理逻辑是 null
  13. count++;
  14. }
  15. }
  16. } catch (Exception e) {
  17. // do
  18. } finally {
  19. ifcount % 100 == 0
  20. consumer.commitSync(offsets, null); // 回调处理逻辑是 null
  21. count++;
  22. }