在 Kafka 的历史中,消费者客户端同生产者客户端一样也经历了两个大版本:第一个是于 Kafka 开源之初使用 Scala 语言编写的客户端,第二个是从 Kafka 0.9.x 版本开始推出的使用 Java 编写的客户端。旧消费者客户端已经被淘汰,故不再介绍。

客户端开发

  1. public class ConsumerDemo {
  2. private static final String BROKER_LIST = "127.0.0.1:9092";
  3. private static final String TOPIC = "TestTopic";
  4. private static final String GROUP_NAME = "TestGroup";
  5. private static final AtomicBoolean RUNNING = new AtomicBoolean(true);
  6. public static void main(String[] args) {
  7. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(initConsumerConfig());
  8. consumer.subscribe(Collections.singletonList(TOPIC));
  9. try {
  10. while (RUNNING.get()) {
  11. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  12. for (ConsumerRecord<String, String> record : records) {
  13. System.out.println("topic is:" + record.topic() + ", partition is:" + record.partition() + ", offset is:" + record.offset());
  14. System.out.println("key is:" + record.key() + ", value is:" + record.value());
  15. }
  16. }
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. } finally {
  20. consumer.close();
  21. }
  22. }
  23. private static Properties initConsumerConfig() {
  24. Properties properties = new Properties();
  25. properties.setProperty("bootstrap.servers", BROKER_LIST);
  26. properties.setProperty("key.deserializer", StringDeserializer.class.getName());
  27. properties.setProperty("value.deserializer", StringDeserializer.class.getName());
  28. properties.setProperty("group.id", GROUP_NAME);
  29. return properties;
  30. }
  31. }

订阅主题与分区

在创建好消费者后,我们就需要为该消费者订阅相关的主题了,一个消费者可以订阅一个或多个主题。上面代码示例中用 subscribe() 方法订阅了一个主题,对于这个方法而言,既可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。subscribe 的几个重载方法如下:

  1. void subscribe(Collection<String> topics);
  2. void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
  3. void subscribe(Pattern pattern);
  4. void subscribe(Pattern pattern, ConsumerRebalanceListener callback);

如果消费者采用的是正则表达式的方式订阅,在这之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。但同时也会立即触发一次消费者的重平衡动作。ConsumerRebalanceListener 是用来设置相应的重平衡监听器的。

  1. public interface ConsumerRebalanceListener {
  2. void onPartitionsRevoked(Collection<TopicPartition> partitions);
  3. void onPartitionsAssigned(Collection<TopicPartition> partitions);
  4. }

其中,onPartitionsRevoked 方法会在重平衡开始之前、消费者停止读取消息之后被调用,可通过该回调方法来处理消费位移的提交,以避免一些不必要的重复消费现象的发生。onPartitionsAssigned 方法在重新分配分区之后、消费者开始读取消息之前被调用,可通过该方法让新消费者从特定的偏移量处开始处理记录。

消费者不仅可以通过 subscribe 方法订阅主题,还可以直接订阅某些主题的特定分区,在 KafkaConsumer 中还提供了一个 assign() 方法来实现该功能,方法的具体定义如下:

  1. void assign(Collection<TopicPartition> partitions);

TopicPartition 类只有两个属性:topic 和 partition,分别表示分区所属的主题和自身的分区编号。如果我们事先不知道主题中有多少分区的话,KafkaConsumer 还提供了一个 partitionsFor() 方法可以用来查询指定主题的元数据信息,方法的具体定义如下:

  1. List<PartitionInfo> partitionsFor(String topic);
  2. List<PartitionInfo> partitionsFor(String topic, Duration timeout);

其中 PartitionInfo 类型即为主题的分区元数据信息,该类的主要结构如下:

  1. public class PartitionInfo {
  2. private final String topic;
  3. private final int partition;
  4. // leader副本所在位置
  5. private final Node leader;
  6. // 分区的AR集合
  7. private final Node[] replicas;
  8. // 分区的ISR集合
  9. private final Node[] inSyncReplicas;
  10. // 分区的OSR集合
  11. private final Node[] offlineReplicas;
  12. ......
  13. }

通过 partitionsFor() 方法的协助,我们可以通过 assign() 方法来实现订阅主题(全部分区)的功能:

  1. List<TopicPartition> partitions = new ArrayList<>();
  2. List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic_demo");
  3. if (partitionInfos != null) {
  4. for (PartitionInfo partitionInfo : partitionInfos) {
  5. TopicPartition p = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
  6. partitions.add(p);
  7. }
  8. }
  9. consumer.assign(partitions);

通过 subscribe() 方法订阅主题具有消费者自动重平衡的功能,在多个消费者的情况下可以根据分区分配策略自动分配各个消费者与分区的关系。当消费者组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过 assign() 方法订阅分区时,是不具备消费者自动重平衡的功能的。

消息消费

Kafka 中的消费是基于拉模式的,整个消费过程是一个不断轮询的过程,消费者所要做的就是重复地调用 poll() 方法。poll() 方法返回所订阅主题(分区)上的一组消息(ConsumerRecords),如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空;如果订阅的所有分区中都没有可供消费的消息,那么 poll() 方法返回空的消息集合。

  1. ConsumerRecords<K, V> poll(Duration timeout);

传给 poll() 方法的参数是一个超时时间,如果该参数为 0,poll() 会立即返回而不管是否已经拉取到了消息,否则它会在指定时间内一直等待 broker 返回数据。超时时间的设置取决于应用程序对响应速度的要求,比如要在多长时间内把控制权归还给执行轮询的线程。

返回的 ConsumerRecords 表示一次拉取操作所获得的消息集,它内部包含了若干个 ConsumerRecord,这个和生产者发送的消息类型 ProducerRecord 相对应,不过内容更丰富,具体结构参考如下代码:

  1. public class ConsumerRecord<K, V> {
  2. // 消息所属主题
  3. private final String topic;
  4. // 消息所在分区
  5. private final int partition;
  6. // 消息在所属分区的偏移量
  7. private final long offset;
  8. // 时间戳
  9. private final long timestamp;
  10. // 时间戳类型:CreateTime表示消息的创建时间、LogAppendTime表示消息追加到日志文件的时间
  11. private final TimestampType timestampType;
  12. private final int serializedKeySize;
  13. private final int serializedValueSize;
  14. private final Headers headers;
  15. private final K key;
  16. private final V value;
  17. // CRC32 的校验值
  18. private volatile Long checksum;
  19. ......
  20. }

ConsumerRecords 类还提供了一个 records() 方法用来获取消息集中指定分区的消息,这样可以按照分区维度来进行消费,这一点在手动提交位移时很有用。具体定义如下:

  1. public List<ConsumerRecord<K, V>> records(TopicPartition partition);

消费指定分区的代码示例:

  1. try {
  2. while (true) {
  3. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  4. // 获取这一批消息所在的各个分区
  5. Set<TopicPartition> partitionSet = records.partitions();
  6. for (TopicPartition topicPartition : partitionSet) {
  7. // 获取指定分区的消息
  8. List<ConsumerRecord<String, String>> recordList = records.records(topicPartition);
  9. System.out.println("topic=" + topicPartition.topic() + ", partition=" + topicPartition.partition());
  10. for (ConsumerRecord<String, String> record : recordList) {
  11. System.out.println("key=" + record.key() + ", value=" + record.value());
  12. }
  13. }
  14. }
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. } finally {
  18. consumer.close();
  19. }

注意,在退出应用程序之前应当使用 close() 方法关闭消费者,这样网络连接和 socket 也会随之关闭,并且会立即触发一次分区重平衡,而不会等待 broker 控制器发现它不再发送心跳并认定它已死亡,因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息。

位移提交

在旧消费者客户端中,消费位移是存储在 Zookeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题 __consumer_offsets 中。提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 宕机重启后,就能够从 Kafka 中读取之前提交的位移值,然后从原来的位移处继续消费,避免重新消费。因为 Consumer 能够同时消费多个分区的数据,所以 Consumer 需要为分配给它的每个分区提交各自的位移。

如下图所示,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x,即 lastConsumerOffset。不过要注意的是,当前消费者需要提交的消费位移并不是 x,而是 x+1,即 position,它表示下一条需要拉取的消息的位置。此外,在消费者中还有一个 committed offset 的概念,它表示已经提交过的消费位移。
image.png
KafkaConsumer 提供了 position() 和 committed() 方法来获取 position 和 committed offset 的值:

  1. long position(TopicPartition partition);
  2. long position(TopicPartition partition, final Duration timeout);
  3. Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions);
  4. Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, final Duration timeout);

位移提交的时机也很有讲究,处理不当可能会造成重复消费和消息丢失的现象。如下图所示,当前一次 poll() 操作所拉取的消息集为 [x+2,x+7],x+2 代表上一次提交的消费位移,说明已经完成了 x+1 及之前的所有消息的消费,x+5 表示当前正在处理的位置。如果拉取到消息之后就进行了位移提交,即提交了x+8,那么当前消费 x+5 时遇到了异常,在故障恢复之后,我们重新拉取的消息是从 x+8 开始的。这便导致了消息丢失。

再考虑另一种情况,位移提交的动作是在消费完所有拉取到的消息后才执行的,那么当消费 x+5 时发生异常,在故障恢复后,我们重新拉取的消息是从 x+2 开始的,这便导致了消息重复消费的现象。
image.png
鉴于位移提交甚至是位移管理对 Consumer 端的巨大影响,Kafka 提供了多种提交位移的方法。从用户的角度主要分为自动提交和手动提交;从 Consumer 端的角度主要分为同步提交和异步提交。

1. 自动提交消费位移

在 Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期提交的间隔由参数 auto.commit.interval.ms 来控制,默认值是 5 秒。

在默认情况下,消费者每隔 5s 会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交是在 poll() 方法的逻辑里完成的,Kafka 会在每次真正向服务端发起拉取请求之前检查是否可以进行位移提交,如果可以则会提交从上一次轮询到的最大的消息位移。

自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但是随之而来的是重复消费和消息丢失的问题。假设在自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象。我们可以通过减少位移提交的时间间隔来减小重复消息的时间窗口,但这样并不能避免,而且也会使位移提交更加频繁。这是自动提交消费位移机制的一个缺陷。

2. 手动提交消费位移

在 Kafka 中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 false。手动提交可以再细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync() 和 commitAsync() 两种类型的方法。

2.1 同步提交

commitSync() 方法会根据 poll() 方法拉取的最新位移来进行提交。它是一个同步操作,只要没有发生不可恢复的错误,它就会阻塞消费者线程直至位移提交完成。对于不可恢复的异常,如 CommitFailedException,我们可以将其捕获并做针对性的处理。

  1. while (true) {
  2. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  3. for (ConsumerRecord<String, String> record : records) {
  4. // ......
  5. }
  6. try {
  7. consumer.commitSync();
  8. } catch (CommitFailedException e) {
  9. handle(e); // 处理提交失败异常
  10. }
  11. }

对于采用 commitSync() 的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度、更精确的位移提交,那么就需要使用 commitSync() 的另一个含参方法:

  1. void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)

该方法提供了一个 offsets 参数,用来提交指定分区的位移。无参的 commitSync() 方法只能提交当前拉取的消息批次对应的 position 值。如果需要提交一个中间值,那么就可以使用这种方式。比如我们可以按照分区的粒度划分提交位移的界限,代码示例如下:

  1. try {
  2. while (true) {
  3. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  4. for (TopicPartition topicPartition : records.partitions()) {
  5. List<ConsumerRecord<String, String>> recordList = records.records(topicPartition);
  6. for (ConsumerRecord<String, String> record : recordList) {
  7. // ......
  8. }
  9. long lastConsumerOffset = recordList.get(recordList.size() - 1).offset();
  10. consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(lastConsumerOffset + 1)));
  11. }
  12. }
  13. } finally {
  14. consumer.close();
  15. }

2.2 异步提交

与 commitSync() 方法相反,异步提交的方式 commitAsync() 在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以使消费者的性能得到一定增强。该方法有三个不同的重载方法,具体定义如下:

  1. void commitAsync();
  2. void commitAsync(OffsetCommitCallback callback);
  3. void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

由于 commitAsync() 方法是异步的,所以 Kafka 提供了一个 OffsetCommitCallback 回调函数供你实现位移提交完成后的逻辑,比如记录日志或处理异常等。如下代码展示了 commitAsync() 的使用:

  1. try {
  2. while (true) {
  3. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  4. for (ConsumerRecord<String, String> record : records) {
  5. // ......
  6. }
  7. consumer.commitAsync(new OffsetCommitCallback() {
  8. @Override
  9. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  10. if (exception != null) {
  11. System.out.println("commit fail!");
  12. }
  13. }
  14. });
  15. }
  16. } finally {
  17. consumer.close();
  18. }

注意:
在成功提交或碰到无怯恢复的错误之前,commitSync() 会一直重试,但 commitAsync() 不会。它之所以不进行重试是因为在它收到服务器的响应时,可能有一个更大的偏移量已经提交成功。此时如果重试成功的话就会覆盖原偏移量,如果发生了再均衡就会出现重复消息,因此 commitAsync() 是不会重试的。

为此我们可以设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号相对应的值。在遇到位移提交失败需要重试的时候,可以检查所提交的位移和序号值的大小,如果前者小于后者,则说明有更大的位移已经提交了,那就不需要再进行本次重试了,否则可以进行重试提交。

同步异步结合使用
如果是手动提交,我们需要将 commitSync() 和 commitAsync() 组合使用才能到达最理想的效果。因为我们可以利用 commitSync() 的自动重试来规避那些瞬时错误,比如网络抖动,Broker 端 GC 等。这些问题通过自动重试一般都会成功。同时我们也不希望程序阻塞太久影响应用 TPS。

  1. try {
  2. while(true) {
  3. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  4. process(records); // 处理消息
  5. commitAysnc(); // 使用异步提交规避阻塞
  6. }
  7. } finally {
  8. try {
  9. consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
  10. } finally {
  11. consumer.close();
  12. }
  13. }

对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性。

控制消费

在有些场景下我们可能需要暂停某些分区的消费,KafkaConsumer 中提供了对消费速率进行控制的方法,通过使用 pause() 和 resume() 方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作,方法具体定义如下:

  1. void pause(Collection<TopicPartition> partitions);
  2. void resume(Collection<TopicPartition> partitions);

KafkaConsumer 还提供了一个无参的 paused() 方法来返回被暂停的分区集合,方法具体定义如下:

  1. Set<TopicPartition> paused();

指定位移消费

前面我们讲述了如何进行消费位移的提交,正是有了消费位移的持久化,才使得消费者在关闭、崩溃或者是重平衡的时候,可以让接替的消费者能够根据存储的消费位移继续进行消费。

试想一下,当一个新的消费组建立时,它根本没有可以查找的消费位移,或者消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移。当 __consumer_offsets 主题中有关这个消费组的位移信息过期而被删除后,它也没有可以查找的消费位移。

1. auto.offset.reset

在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数 auto.offset.reset 配置来决定从何处开始进行消费,该参数默认值为 latest,表示从分区末尾开始消费消息。除了查找不到消费位移,位移越界也会触发该参数的执行。如下图所示:按照默认的配置,消费者会从 9 开始拉取消息进行消费。如果将该参数配置为 earliest,那么消费者会从分区的起始处,也就是 0 开始消费。
image.png
auto.offset.reset 参数还有一个可配置的值—— none,该值表示当查到不到消费位移时,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,而是抛出 NoOffsetForPartitionException 异常。

2. seek 方法

消息的拉取是根据 poll() 方法中的逻辑来处理的,该方法无法精确掌控消费的起始位置。而 auto.offset.reset 参数也只能在找不到消费位移或位移越界的情况下粗粒度地从开头或末尾开始消费。有时我们需要从特定的位移处开始拉取消息,而 KafkaConsumer 中的 seek() 方法可以让我们得以追前消费或回溯消费:

  1. void seek(TopicPartition partition, long offset);

seek() 方法中的参数 partition 表示分区,而 offset 参数用来指定从分区的哪个位置开始消费。seek() 方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll() 方法的调用过程中实现的。因此在执行 seek() 方法前需要先执行一次 poll() 方法,等分配到分区后才可以重置消费位置。seek() 方法的使用如下:

  1. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  2. consumer.subscribe(Collections.singletonList("topic_demo"));
  3. Set<TopicPartition> assigment = new HashSet<>();
  4. // 保证通过poll方法已经为该消费者分配了分区
  5. while (assigment.size() == 0) {
  6. consumer.poll(Duration.ofMillis(100));
  7. assigment = consumer.assignment();
  8. }
  9. for (TopicPartition topicPartition : assigment) {
  10. // 指定从分区的第10个offset开始消费
  11. consumer.seek(topicPartition, 10);
  12. }
  13. while (true) {
  14. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
  15. for (ConsumerRecord<String, String> record : records) {
  16. // ......
  17. }
  18. }

如果对未分配到的分区执行 seek() 方法,那么会报出 IllegalStateExceptioon 的异常。如果消费组内的消费者在启动的时候能够找到消费位移,除非发生位移越界,否则 auto.offset.reset 参数并不会奏效,此时如果想指定从开头或末尾开始消费,就需要 seek() 方法指定从分区末尾开始消费,代码示例如下:

  1. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  2. consumer.subscribe(Collections.singletonList("topic_demo"));
  3. Set<TopicPartition> assigment = new HashSet<>();
  4. while (assigment.size() == 0) {
  5. consumer.poll(Duration.ofMillis(100));
  6. assigment = consumer.assignment();
  7. }
  8. Map<TopicPartition, Long> partitionEndOffset = consumer.endOffsets(assigment);
  9. for (TopicPartition topicPartition : assigment) {
  10. consumer.seek(topicPartition, partitionEndOffset.get(topicPartition));
  11. }
  12. while (true) {
  13. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
  14. // ......
  15. }

代码示例中的 endOffsets() 方法用来获取指定分区的末尾的消息位置,参考上图中 9 的位置,注意这里获取的不是 8,而是将要写入最新消息的位置。与 endOffsets 对应的是 beginningOffsets() 方法,一个分区的起始位置起初是 0,但由于日志清理会清理旧的数据,所以分区的起始位置也会不断增加。但其实 KafkaConsumer 直接提供了 seekToBeginning() 和 seekToEnd() 方法来实现从分区开头或末尾开始消费。

  1. void seekToBeginning(Collection<TopicPartition> partitions);
  2. void seekToEnd(Collection<TopicPartition> partitions);

有时候我们并不知道特定的消费位置,却知道一个相关的时间点,比如我们想要消费昨天 8 点之后的消息。此时我们无法直接使用 seek() 方法来追溯到相应的位置。KafkaConsumer 同样考虑到了这种情况,它提供了一个 offsetsForTimes() 方法,通过 timestamp 来查询与此对应的分区位置。

  1. Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);

参数 timestampsToSearch 是一个 Map 类型,key 为待查询的分区,而 value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,代码示例如下:

  1. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  2. consumer.subscribe(Collections.singletonList("topic_demo"));
  3. Set<TopicPartition> assigment = new HashSet<>();
  4. while (assigment.size() == 0) {
  5. consumer.poll(Duration.ofMillis(100));
  6. assigment = consumer.assignment();
  7. }
  8. // 消费一天前的数据
  9. Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
  10. for (TopicPartition topicPartition : assigment) {
  11. timestampToSearch.put(topicPartition, System.currentTimeMillis() - 24 * 3600 * 1000);
  12. }
  13. Map<TopicPartition, OffsetAndTimestamp> timestampMap = consumer.offsetsForTimes(timestampToSearch);
  14. for (TopicPartition topicPartition : assigment) {
  15. OffsetAndTimestamp offset = timestampMap.get(topicPartition);
  16. if (offset != null) {
  17. consumer.seek(topicPartition, offset.offset());
  18. }
  19. }
  20. while (true) {
  21. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
  22. // ......
  23. }

前面说过位移越界也会触发 auto.offset.reset 参数的执行,位移越界是指知道消费位置却无法在实际的分区中查找到,比如想要从上面那张图中的位置 10 处拉取消息时就会发生位移越界,此时则会根据 auto.offset.reset 参数的默认值来将拉取位置重置。但是要注意,如果拉取上面那张图中位置 9 处的消息时并未越界,因为这个位置代表特定的含义(LEO)。

消费者拦截器

消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作。与生产者拦截器对应的,消费者拦截器需要自定义实现 ConsumerInterceptor 接口。接口定义如下:

  1. public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
  2. public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
  3. public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
  4. public void close();
  5. }

KafkaConsumer 会在 poll() 方法返回前调用 onConsumer() 方法来对消息进行相应的定制化操作,比如修改返回的消息内容、按照某种规则过滤消息等。如果在执行 onConsumer() 方法时抛出了异常,那么会被捕获并记录到日志中,但是异常不会再向上传递。KafkaConsumer 会在提交完消费位移之后调用拦截器的 onCommit() 方法,可以使用这个方法来记录跟踪所提交的位移信息,比如当使用 commitAsync 的无参方法时,通过该方法可以获取位移提交的细节。

在消费者中也有拦截器链的概念,和生产者的拦截器链一样,也是按照 interceptor.classes 参数配置的拦截器的顺序来一一执行的。注意,如果在拦截器链中某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。