Kafka 0.9版本之前,offset存储在zookeeper,0.9版本之后,默认将offset存储在Kafka的一个内置的topic中。除此之外,Kafka还可以选择自定义存储offset。
offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalace。
当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalance。
消费者发生Rebalance之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的offset位置继续消费。
要实现自定义存储offset,需要借助ConsumerRebalanceListener,以下为示例代码,其中提交和获取offset的方法,需要根据所选的offset存储系统自行实现。
package com.consumer;import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import java.util.*;public class CustomConsumer4 {// 记录分区的信息,这个也可以用redis等等存储. key是主体的名字和分区号private static Map<TopicPartition, Long> currentOffset = new HashMap<>();public static void main(String[] args) {//创建配置信息Properties props = new Properties();//Kafka集群props.put("bootstrap.servers", "zjj101:9092");//消费者组,只要group.id相同,就属于同一个消费者组props.put("group.id", "test");//关闭自动提交offsetprops.put("enable.auto.commit", "false");//Key和Value的反序列化类props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//创建一个消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//消费者订阅主题consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {/*该方法会在分区之前调用分区之前各个消费者应该把offset提交*/@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {// 提交旧的offsetcommitOffset(currentOffset);}/*该方法会在分区之后调用分区之后应该给你offset信息重新给你*/@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {//获取新的offsetcurrentOffset.clear();for (TopicPartition partition : partitions) {consumer.seek(partition, getOffset(partition));//定位到最近提交的offset位置继续消费}}});while (true) {ConsumerRecords<String, String> records = consumer.poll(100);//消费者拉取数据for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());}commitOffset(currentOffset);//异步提交}}//获取某分区的最新offsetprivate static long getOffset(TopicPartition partition) {return 0;}//提交该消费者所有分区的offsetprivate static void commitOffset(Map<TopicPartition, Long> currentOffset) {}}
