经常,业务上我们遇到消费失败问题,但是呢又不好把失败的所有消息挨个挨个找出来消费,这时候我们就可以采用新建消费端,从指定的时间戳去消费一段时间内的消息来补数据的,这里需要我们消费端满足幂等条件,可以重复消费。
public class ConsumerDemo {public static void main(String[] args) throws ParseException {String topic = "test_topic01";SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");Date date = sdf.parse("2022-04-09 12:10:45.617");Long end = sdf.parse("2022-04-09 12:10:45.617").getTime();//reconsume(topic, date, end);}// 从指定的时间开始消费kafka消息private static void reconsume(String topic, Date start, Long end) {// 消费端配置Properties prop = clientConf();// 获取元数据KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);// 拉取分片信息List<PartitionInfo> partitions = consumer.partitionsFor(topic);// 业务线程池,每个分片单独一个线程处理ExecutorService executorService = Executors.newFixedThreadPool(partitions.size());// 拉取指定时间戳对应的offsetMap<TopicPartition, Long> map = new HashMap<>();for (PartitionInfo partition : partitions) {map.put(new TopicPartition(topic, partition.partition()), start.getTime());}Map<TopicPartition, OffsetAndTimestamp> timestampMap = consumer.offsetsForTimes(map);// future集合List<CompletableFuture> list = new ArrayList<>(partitions.size());try {// 遍历所有分片for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : timestampMap.entrySet()) {// 根据CompletableFuture<ConsumerRecord<String, String>> future =run(prop, end, executorService, entry);list.add(future);}// 等待所有分片消费完成CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).join();System.out.println(topic + "消费完成");} finally {executorService.shutdown();consumer.close();}}private static CompletableFuture<ConsumerRecord<String, String>> run(Properties prop, Long end, ExecutorService executorService,Map.Entry<TopicPartition, OffsetAndTimestamp> entry) {CompletableFuture<ConsumerRecord<String, String>> future = CompletableFuture.supplyAsync(() -> {// 消费客户端KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);// 分配分片consumer.assign(Collections.singleton(entry.getKey()));// 根据时间戳转换为offset,指定offset位置开始消费consumer.seek(entry.getKey(), entry.getValue().offset());while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));for (ConsumerRecord<String, String> record : records) {// 消费到指定时间戳,结束if (record.timestamp() > end) {System.out.printf("分区[%d]消费完成[offset=%d]...............\n", record.partition(), record.offset());return record;}System.out.printf("offset=%d,key=%s,val=%s,partition=%d\n", record.offset(), record.key(), record.value(), record.partition());}}}, executorService);return future;}private static Properties clientConf() {Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.127.157:9092");prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group0x");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());return prop;}}
为什么可以按时间戳获取消息呢,实际上kafka服务端维护了一个时间戳与offset的对应关系的索引。
在broker数据存储端,维护了一个索引文件,可以根据时间戳找到对应的offset,然后我们可以根据具体的offset去消费消息,这里需要保证broker服务端的服务器时间的正确性。
