• kafka 是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,是只读的操作
    • Kafka 的消费者读取消息是可以重演的(replayable)

**


选项

  • 重设位移大致可以从两个维度来进行
  1. 位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给
    定的位移值
    2. 时间维度。我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也
    可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前
    的位移值

image.png

  • Earliest。表示将位移调整到主题当前最早位移处。这个最早位移不一定就是 0,因为在生产环境中,很久远的消息会被 Kafka 自动删除
    • 用于重新消费主题的所有消息
  • Latest。略表示把位移重设成最新末端位移
    • 跳过所有历史消息,打算从最新的消息处开始消费
  • Current。将位移调整成消费者当前提交的最新位移
    • 要把位移重设到消费者重启时的位置
  • Specified-Offset。表示消费者把位移值调整到你指定的位移处
    • 通用的策略
    • 消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理
  • Shift-By-N 。手动地“跳过”此消息的相对数值
  • DateTime 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。
    • 常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天 0 点
  • Duration 策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处
    • 具体格式是 PnDTnHnMnS

设置

通过消费者 API 来实现

  • 0.11 版本之前只能这样实现
  • 不推荐

    1. void seek(TopicPartition partition, long offset);
    2. void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
    3. void seekToBeginning(Collection<TopicPartition> partitions);
    4. void seekToEnd(Collection<TopicPartition> partitions);
  • seek 方法只能重设一个分区的位移

    • OffsetAndMetadata 类是一个封装了 Long 型的位移和自定义元数据的复合类,只是一般情况下,自定义元数据为空,因此你基本上可以认为这个类表征的主要是消息的位移值
  • seekToBeginningseekToEnd 则拥有一次重设多个分区的能力

案例

  • Earliest ```java Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserialize consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

String topic = “test”; // 要重设位移的 Kafka 主题 try (final KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties)) { consumer.subscribe(Collections.singleton(topic)); consumer.poll(0); consumer.seekToBeginning( consumer.partitionsFor(topic).stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())) .collect(Collectors.toList())); }

  1. 1. 你要创建的消费者程序,要禁止自动提交位移<br />2. ID 要设置成你要重设的消费者组的组 ID<br />3. 调用 seekToBeginning 方法时,需要一次性构造主题的所有分区对象<br />4. 最重要的是,一定要调用带长整型的 poll 方法,而不要调用 `consumer.poll(Duration.ofSecond(0))`
  2. - `Latest`
  3. - 要使用 `seekToEnd` 方法即可
  4. ```java
  5. consumer.seekToEnd(
  6. consumer.partitionsFor(topic).stream().map(partitionInfo ->
  7. new TopicPartition(topic, partitionInfo.partition()))
  8. .collect(Collectors.toList()));
  • Current

    1. consumer.partitionsFor(topic).stream().map(info ->
    2. new TopicPartition(topic, info.partition())).forEach(tp -> {
    3. long committedOffset = consumer.committed(tp).offset();
    4. consumer.seek(tp, committedOffset);
    5. });
    • partitionsFor 方法获取给定主题的所有分区,然后依次获取对应分区上的已提交位移
    • 最后通过 seek 方法重设位移到已提交位移处
  • Specified-Offset

    1. long targetOffset = 1234L;
    2. for (PartitionInfo info : consumer.partitionsFor(topic)) {
    3. TopicPartition tp = new TopicPartition(topic, info.partition());
    4. consumer.seek(tp, targetOffset);
    5. }
  • Shift-By-N

    1. for (PartitionInfo info : consumer.partitionsFor(topic)) {
    2. TopicPartition tp = new TopicPartition(topic, info.partition());
    3. // 假设向前跳 123 条消息
    4. long targetOffset = consumer.committed(tp).offset() + 123L;
    5. consumer.seek(tp, targetOffset);
    6. }
  • DateTime

    • KafkaConsumer.offsetsForTimes()

      1. long ts = LocalDateTime.of(
      2. 2019, 6, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
      3. Map<TopicPartition, Long> timeToSearch =
      4. consumer.partitionsFor(topic).stream().map(info ->
      5. new TopicPartition(topic, info.partition()))
      6. .collect(Collectors.toMap(Function.identity(), tp -> ts));
      7. for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
      8. consumer.offsetsForTimes(timeToSearch).entrySet()) {
      9. consumer.seek(entry.getKey(), entry.getValue().offset());
      10. }
    • 构造了 LocalDateTime 实例,然后利用它去查找对应的位移值,最后调用seek,实现了重设位移

  • Duration

    1. Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream()
    2. .map(info -> new TopicPartition(topic, info.partition()))
    3. .collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis(
    4. for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
    5. consumer.offsetsForTimes(timeToSearch).entrySet()) {
    6. consumer.seek(entry.getKey(), entry.getValue().offset());
    7. }

    通过 kafka-consumer-groups 命令行脚本来实现

  • Kafka 0.11 版本中新引入的

  • 推荐

**

案例

Earliest 策略直接指定 **--to-earliest**

  1. kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
  2. --group test-group --reset-offsets --all-topics \
  3. --to-earliest \
  4. execute

Latest 策略直接指定**--to-latest**

  1. kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
  2. --group test-group --reset-offsets --all-topics \
  3. --to-latest \
  4. execute


Current 策略直接指定`
—to-current**`

  1. kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
  2. --group test-group --reset-offsets --all-topics \
  3. --to-current \
  4. execute

Specified-Offset 策略直接指定**--to-offset**

  1. kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
  2. --group test-group --reset-offsets --all-topics \
  3. --to-offset <offset> \
  4. execute

Shift-By-N 策略直接指定**--shift-by N**

  1. kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
  2. --group test-group --reset-offsets --all-topics \
  3. --shift-by <offset_N> \
  4. execute

DateTime 策略直接指定**--to-datetime**

  1. kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
  2. --group test-group --reset-offsets --all-topics \
  3. --to-datetime 2019-06-20T20:00:00.000 \
  4. execute

Duration 策略们直接指定**--by-duration**

  1. kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
  2. --group test-group --reset-offsets --all-topics \
  3. --by-duration PT0H30M0S \
  4. execute