Kafka 0.9版本之前,offset存储在zookeeper,0.9版本之后,默认将offset存储在Kafka的一个内置的topic中。除此之外,Kafka还可以选择自定义存储offset。
offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalace。
当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalance。
消费者发生Rebalance之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的offset位置继续消费。
要实现自定义存储offset,需要借助ConsumerRebalanceListener,以下为示例代码,其中提交和获取offset的方法,需要根据所选的offset存储系统自行实现。
package com.consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class CustomConsumer4 {
// 记录分区的信息,这个也可以用redis等等存储. key是主体的名字和分区号
private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
public static void main(String[] args) {
//创建配置信息
Properties props = new Properties();
//Kafka集群
props.put("bootstrap.servers", "zjj101:9092");
//消费者组,只要group.id相同,就属于同一个消费者组
props.put("group.id", "test");
//关闭自动提交offset
props.put("enable.auto.commit", "false");
//Key和Value的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消费者订阅主题
consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
/*该方法会在分区之前调用
分区之前各个消费者应该把offset提交*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 提交旧的offset
commitOffset(currentOffset);
}
/*该方法会在分区之后调用
分区之后应该给你offset信息重新给你*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//获取新的offset
currentOffset.clear();
for (TopicPartition partition : partitions) {
consumer.seek(partition, getOffset(partition));//定位到最近提交的offset位置继续消费
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);//消费者拉取数据
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
}
commitOffset(currentOffset);//异步提交
}
}
//获取某分区的最新offset
private static long getOffset(TopicPartition partition) {
return 0;
}
//提交该消费者所有分区的offset
private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
}
}