- kafka 是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,是只读的操作
- Kafka 的消费者读取消息是可以重演的(replayable)
**
选项
- 重设位移大致可以从两个维度来进行
- 位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给
定的位移值
2. 时间维度。我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也
可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前
的位移值
- Earliest。表示将位移调整到主题当前最早位移处。这个最早位移不一定就是 0,因为在生产环境中,很久远的消息会被 Kafka 自动删除
- 用于重新消费主题的所有消息
- Latest。略表示把位移重设成最新末端位移
- 跳过所有历史消息,打算从最新的消息处开始消费
- Current。将位移调整成消费者当前提交的最新位移
- 要把位移重设到消费者重启时的位置
- Specified-Offset。表示消费者把位移值调整到你指定的位移处
- 通用的策略。
- 消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理
- Shift-By-N 。手动地“跳过”此消息的相对数值
- DateTime 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。
- 常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天 0 点
- Duration 策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处
- 具体格式是 PnDTnHnMnS
设置
通过消费者 API 来实现
- 0.11 版本之前只能这样实现
不推荐
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);
seek
方法只能重设一个分区的位移- OffsetAndMetadata 类是一个封装了 Long 型的位移和自定义元数据的复合类,只是一般情况下,自定义元数据为空,因此你基本上可以认为这个类表征的主要是消息的位移值
seekToBeginning
和seekToEnd
则拥有一次重设多个分区的能力
案例
Earliest
```java Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserialize consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
String topic = “test”; // 要重设位移的 Kafka 主题
try (final KafkaConsumer
1. 你要创建的消费者程序,要禁止自动提交位移<br />2. 组 ID 要设置成你要重设的消费者组的组 ID<br />3. 调用 seekToBeginning 方法时,需要一次性构造主题的所有分区对象<br />4. 最重要的是,一定要调用带长整型的 poll 方法,而不要调用 `consumer.poll(Duration.ofSecond(0))`
- `Latest`
- 要使用 `seekToEnd` 方法即可
```java
consumer.seekToEnd(
consumer.partitionsFor(topic).stream().map(partitionInfo ->
new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
Current
consumer.partitionsFor(topic).stream().map(info ->
new TopicPartition(topic, info.partition())).forEach(tp -> {
long committedOffset = consumer.committed(tp).offset();
consumer.seek(tp, committedOffset);
});
partitionsFor
方法获取给定主题的所有分区,然后依次获取对应分区上的已提交位移- 最后通过 seek 方法重设位移到已提交位移处
Specified-Offset
long targetOffset = 1234L;
for (PartitionInfo info : consumer.partitionsFor(topic)) {
TopicPartition tp = new TopicPartition(topic, info.partition());
consumer.seek(tp, targetOffset);
}
Shift-By-N
for (PartitionInfo info : consumer.partitionsFor(topic)) {
TopicPartition tp = new TopicPartition(topic, info.partition());
// 假设向前跳 123 条消息
long targetOffset = consumer.committed(tp).offset() + 123L;
consumer.seek(tp, targetOffset);
}
DateTime
KafkaConsumer.offsetsForTimes()
long ts = LocalDateTime.of(
2019, 6, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
Map<TopicPartition, Long> timeToSearch =
consumer.partitionsFor(topic).stream().map(info ->
new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -> ts));
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
构造了 LocalDateTime 实例,然后利用它去查找对应的位移值,最后调用seek,实现了重设位移
Duration
Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream()
.map(info -> new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis(
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
通过 kafka-consumer-groups 命令行脚本来实现
Kafka 0.11 版本中新引入的
- 推荐
案例
Earliest 策略直接指定 **--to-earliest**
kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
--group test-group --reset-offsets --all-topics \
--to-earliest \
–execute
Latest 策略直接指定**--to-latest**
kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
--group test-group --reset-offsets --all-topics \
--to-latest \
–execute
Current 策略直接指定`—to-current**`
kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
--group test-group --reset-offsets --all-topics \
--to-current \
–execute
Specified-Offset 策略直接指定**--to-offset**
kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
--group test-group --reset-offsets --all-topics \
--to-offset <offset> \
–execute
Shift-By-N 策略直接指定**--shift-by N**
kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
--group test-group --reset-offsets --all-topics \
--shift-by <offset_N> \
–execute
DateTime 策略直接指定**--to-datetime**
kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
--group test-group --reset-offsets --all-topics \
--to-datetime 2019-06-20T20:00:00.000 \
–execute
Duration 策略们直接指定**--by-duration**
kafka-consumer-groups.sh --bootstrap-server kafka-host:port \
--group test-group --reset-offsets --all-topics \
--by-duration PT0H30M0S \
–execute