指定offset消费
消费模式
在flink的kafka source中有以下5种模式指定offset消费
public enum StartupMode {/** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),/** Start from the earliest offset possible. */EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),/** Start from the latest offset. */LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),/*** Start from user-supplied timestamp for each partition.* Since this mode will have specific offsets to start with, we do not need a sentinel value;* using Long.MIN_VALUE as a placeholder.*/TIMESTAMP(Long.MIN_VALUE),/*** Start from user-supplied specific offsets for each partition.* Since this mode will have specific offsets to start with, we do not need a sentinel value;* using Long.MIN_VALUE as a placeholder.*/SPECIFIC_OFFSETS(Long.MIN_VALUE);}
默认为GROUP_OFFSETS,表示根据上一次group id提交的offset位置开始消费。每个枚举的值其实是一个long型的负数,根据不同的模式,在每个partition初始化的时候会默认将offset设置为这个负数。其他的方式和kafka本身的语义类似,就不在赘述。
指定offset
本文只讨论默认的GROUP_OFFSETS方式,下文所有分析都是基于这种模式。但是还是需要区分是否开启了checkpoint。在开始分析之前需要对几个重要的变量进行说明:
subscribedPartitionsToStartOffsets- 所属类:
FlinkKafkaConsumerBase.java - 定义:
/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
说明:保存订阅topic的所有partition以及初始消费的offset
- 所属类:
subscribedPartitionStates- 所属类:
AbstractFetcher.java - 定义:
/** All partitions (and their state) that this fetcher is subscribed to. */private final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates;
说明:保存了所有订阅的partition的offset等详细信息,例如
- ```java /* The offset within the Kafka partition that we already processed. / private volatile long offset;
- 所属类:
/* The offset of the Kafka partition that has been committed. / private volatile long committedOffset;
<br />每次消费完数据之后都会更新这些值,这个变量非常的重要,在做checkpoint的时候,保存的offset等信息都是来自于这个变量。这个变量的初始化如下:```java// initialize subscribed partition states with seed partitionsthis.subscribedPartitionStates = createPartitionStateHolders(seedPartitionsWithInitialOffsets,timestampWatermarkMode,watermarksPeriodic,watermarksPunctuated,userCodeClassLoader);
消费之后更新相应的offset主要在KafkaFetcher#runFetchLoop方法while循环中调用emitRecord(value, partition, record.offset(), record);
restoredState- 所属类:
FlinkKafkaConsumerBase.java - 定义:
/*** The offsets to restore to, if the consumer restores state from a checkpoint.** <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.** <p>Using a sorted map as the ordering is important when using restored state* to seed the partition discoverer.*/private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;
说明:如果指定了恢复的checkpoint路径,启动时候将会读取这个变量里面的内容获取起始offset,而不再是使用StartupMode中的枚举值作为初始的offset
- 所属类:
unionOffsetStates- 所属类:
FlinkKafkaConsumerBase.java - 定义:
/** Accessor for state in the operator state backend. */private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
说明:保存了checkpoint要持久化存储的内容,例如每个partition已经消费的offset等信息
- 所属类:
非checkpoint模式
在没有开启checkpoint的时候,消费kafka中的数据,其实就是完全依靠kafka自身的机制进行消费。
checkpoint模式
开启checkpoint模式以后,会将offset等信息持久化存储以便恢复时使用。但是作业重启以后如果由于某种原因读不到checkpoint的结果,例如checkpoint文件丢失或者没有指定恢复路径等。
第一种情况,如果读取不到checkpoint的内容
subscribedPartitionsToStartOffsets会初始化所有partition的起始offset为-915623761773L这个值就表示了当前为GROUP_OFFSETS模式。
default:for (KafkaTopicPartition seedPartition : allPartitions) {subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());}
第一次消费之前,指定读取offset位置的关键方法是KafkaConsumerThread#reassignPartitions代码片段如下:
for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {// the KafkaConsumer by default will automatically seek the consumer position// to the committed group offset, so we do not need to do it.newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);} else {consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);}}
因为是GROUP_OFFSET模式 ,所以会调用newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);需要说明的是,在state里面需要存储的是成功消费的最后一条数据的offset,但是通过position这个方法返回的是下一次应该消费的起始offset,所以需要减1。这里更新这个的目的是为了checkpoint的时候可以正确的拿到offset。
这种情况由于读取不到上次checkpoint的结果,所以依旧是依靠kafka自身的机制,即根据__consumer_offsets记录的内容消费。
第二种情况,checkpoint可以读取到
这种情况下,subscribedPartitionsToStartOffsets初始的offset就是具体从checkpoint中恢复的内容,这样KafkaConsumerThread#reassignPartitions实际走的分支就是
consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
这里加1的原理同上,state保存的是最后一次成功消费数据的offset,所以加1才是现在需要开始消费的offset。
总结
本文介绍了程序启动时,如何确定从哪个offset开始消费,这也是《Flink kafka source源码解析》系列的最后一篇。后续会继续分析flink kafka sink的相关源码以及flink结合kafka端到端的EXACTLY_ONCE是如何实现的。
注:本文基于flink 1.9.0和kafka 2.3
