经常,业务上我们遇到消费失败问题,但是呢又不好把失败的所有消息挨个挨个找出来消费,这时候我们就可以采用新建消费端,从指定的时间戳去消费一段时间内的消息来补数据的,这里需要我们消费端满足幂等条件,可以重复消费。
public class ConsumerDemo {
public static void main(String[] args) throws ParseException {
String topic = "test_topic01";
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Date date = sdf.parse("2022-04-09 12:10:45.617");
Long end = sdf.parse("2022-04-09 12:10:45.617").getTime();
//
reconsume(topic, date, end);
}
// 从指定的时间开始消费kafka消息
private static void reconsume(String topic, Date start, Long end) {
// 消费端配置
Properties prop = clientConf();
// 获取元数据
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
// 拉取分片信息
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
// 业务线程池,每个分片单独一个线程处理
ExecutorService executorService = Executors.newFixedThreadPool(partitions.size());
// 拉取指定时间戳对应的offset
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo partition : partitions) {
map.put(new TopicPartition(topic, partition.partition()), start.getTime());
}
Map<TopicPartition, OffsetAndTimestamp> timestampMap = consumer.offsetsForTimes(map);
// future集合
List<CompletableFuture> list = new ArrayList<>(partitions.size());
try {
// 遍历所有分片
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : timestampMap.entrySet()) {
// 根据
CompletableFuture<ConsumerRecord<String, String>> future =
run(prop, end, executorService, entry);
list.add(future);
}
// 等待所有分片消费完成
CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).join();
System.out.println(topic + "消费完成");
} finally {
executorService.shutdown();
consumer.close();
}
}
private static CompletableFuture<ConsumerRecord<String, String>> run(Properties prop, Long end, ExecutorService executorService,
Map.Entry<TopicPartition, OffsetAndTimestamp> entry) {
CompletableFuture<ConsumerRecord<String, String>> future = CompletableFuture.supplyAsync(() -> {
// 消费客户端
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
// 分配分片
consumer.assign(Collections.singleton(entry.getKey()));
// 根据时间戳转换为offset,指定offset位置开始消费
consumer.seek(entry.getKey(), entry.getValue().offset());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
// 消费到指定时间戳,结束
if (record.timestamp() > end) {
System.out.printf("分区[%d]消费完成[offset=%d]...............\n", record.partition(), record.offset());
return record;
}
System.out.printf("offset=%d,key=%s,val=%s,partition=%d\n", record.offset(), record.key(), record.value(), record.partition());
}
}
}, executorService);
return future;
}
private static Properties clientConf() {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.127.157:9092");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group0x");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return prop;
}
}
为什么可以按时间戳获取消息呢,实际上kafka服务端维护了一个时间戳与offset的对应关系的索引。
在broker数据存储端,维护了一个索引文件,可以根据时间戳找到对应的offset,然后我们可以根据具体的offset去消费消息,这里需要保证broker服务端的服务器时间的正确性。