经常,业务上我们遇到消费失败问题,但是呢又不好把失败的所有消息挨个挨个找出来消费,这时候我们就可以采用新建消费端,从指定的时间戳去消费一段时间内的消息来补数据的,这里需要我们消费端满足幂等条件,可以重复消费。

    1. public class ConsumerDemo {
    2. public static void main(String[] args) throws ParseException {
    3. String topic = "test_topic01";
    4. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    5. Date date = sdf.parse("2022-04-09 12:10:45.617");
    6. Long end = sdf.parse("2022-04-09 12:10:45.617").getTime();
    7. //
    8. reconsume(topic, date, end);
    9. }
    10. // 从指定的时间开始消费kafka消息
    11. private static void reconsume(String topic, Date start, Long end) {
    12. // 消费端配置
    13. Properties prop = clientConf();
    14. // 获取元数据
    15. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
    16. // 拉取分片信息
    17. List<PartitionInfo> partitions = consumer.partitionsFor(topic);
    18. // 业务线程池,每个分片单独一个线程处理
    19. ExecutorService executorService = Executors.newFixedThreadPool(partitions.size());
    20. // 拉取指定时间戳对应的offset
    21. Map<TopicPartition, Long> map = new HashMap<>();
    22. for (PartitionInfo partition : partitions) {
    23. map.put(new TopicPartition(topic, partition.partition()), start.getTime());
    24. }
    25. Map<TopicPartition, OffsetAndTimestamp> timestampMap = consumer.offsetsForTimes(map);
    26. // future集合
    27. List<CompletableFuture> list = new ArrayList<>(partitions.size());
    28. try {
    29. // 遍历所有分片
    30. for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : timestampMap.entrySet()) {
    31. // 根据
    32. CompletableFuture<ConsumerRecord<String, String>> future =
    33. run(prop, end, executorService, entry);
    34. list.add(future);
    35. }
    36. // 等待所有分片消费完成
    37. CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).join();
    38. System.out.println(topic + "消费完成");
    39. } finally {
    40. executorService.shutdown();
    41. consumer.close();
    42. }
    43. }
    44. private static CompletableFuture<ConsumerRecord<String, String>> run(Properties prop, Long end, ExecutorService executorService,
    45. Map.Entry<TopicPartition, OffsetAndTimestamp> entry) {
    46. CompletableFuture<ConsumerRecord<String, String>> future = CompletableFuture.supplyAsync(() -> {
    47. // 消费客户端
    48. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
    49. // 分配分片
    50. consumer.assign(Collections.singleton(entry.getKey()));
    51. // 根据时间戳转换为offset,指定offset位置开始消费
    52. consumer.seek(entry.getKey(), entry.getValue().offset());
    53. while (true) {
    54. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
    55. for (ConsumerRecord<String, String> record : records) {
    56. // 消费到指定时间戳,结束
    57. if (record.timestamp() > end) {
    58. System.out.printf("分区[%d]消费完成[offset=%d]...............\n", record.partition(), record.offset());
    59. return record;
    60. }
    61. System.out.printf("offset=%d,key=%s,val=%s,partition=%d\n", record.offset(), record.key(), record.value(), record.partition());
    62. }
    63. }
    64. }, executorService);
    65. return future;
    66. }
    67. private static Properties clientConf() {
    68. Properties prop = new Properties();
    69. prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.127.157:9092");
    70. prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group0x");
    71. prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    72. prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    73. prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    74. prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    75. prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    76. return prop;
    77. }
    78. }

    为什么可以按时间戳获取消息呢,实际上kafka服务端维护了一个时间戳与offset的对应关系的索引。
    image.png
    在broker数据存储端,维护了一个索引文件,可以根据时间戳找到对应的offset,然后我们可以根据具体的offset去消费消息,这里需要保证broker服务端的服务器时间的正确性