指定offset消费

消费模式

在flink的kafka source中有以下5种模式指定offset消费

  1. public enum StartupMode {
  2. /** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */
  3. GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),
  4. /** Start from the earliest offset possible. */
  5. EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
  6. /** Start from the latest offset. */
  7. LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),
  8. /**
  9. * Start from user-supplied timestamp for each partition.
  10. * Since this mode will have specific offsets to start with, we do not need a sentinel value;
  11. * using Long.MIN_VALUE as a placeholder.
  12. */
  13. TIMESTAMP(Long.MIN_VALUE),
  14. /**
  15. * Start from user-supplied specific offsets for each partition.
  16. * Since this mode will have specific offsets to start with, we do not need a sentinel value;
  17. * using Long.MIN_VALUE as a placeholder.
  18. */
  19. SPECIFIC_OFFSETS(Long.MIN_VALUE);
  20. }

默认为GROUP_OFFSETS,表示根据上一次group id提交的offset位置开始消费。每个枚举的值其实是一个long型的负数,根据不同的模式,在每个partition初始化的时候会默认将offset设置为这个负数。其他的方式和kafka本身的语义类似,就不在赘述。

指定offset

本文只讨论默认的GROUP_OFFSETS方式,下文所有分析都是基于这种模式。但是还是需要区分是否开启了checkpoint。在开始分析之前需要对几个重要的变量进行说明:

  • subscribedPartitionsToStartOffsets

    • 所属类:FlinkKafkaConsumerBase.java
    • 定义:
      1. /** The set of topic partitions that the source will read, with their initial offsets to start reading from. */
      2. private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
    • 说明:保存订阅topic的所有partition以及初始消费的offset

  • subscribedPartitionStates

    • 所属类:AbstractFetcher.java
    • 定义:
      1. /** All partitions (and their state) that this fetcher is subscribed to. */
      2. private final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates;
    • 说明:保存了所有订阅的partition的offset等详细信息,例如

    • ```java /* The offset within the Kafka partition that we already processed. / private volatile long offset;

/* The offset of the Kafka partition that has been committed. / private volatile long committedOffset;

  1. <br />每次消费完数据之后都会更新这些值,这个变量非常的重要,在做checkpoint的时候,保存的offset等信息都是来自于这个变量。这个变量的初始化如下:
  2. ```java
  3. // initialize subscribed partition states with seed partitions
  4. this.subscribedPartitionStates = createPartitionStateHolders(
  5. seedPartitionsWithInitialOffsets,
  6. timestampWatermarkMode,
  7. watermarksPeriodic,
  8. watermarksPunctuated,
  9. userCodeClassLoader);


消费之后更新相应的offset主要在KafkaFetcher#runFetchLoop方法while循环中调用emitRecord(value, partition, record.offset(), record);

  • restoredState

    • 所属类:FlinkKafkaConsumerBase.java
    • 定义:
      1. /**
      2. * The offsets to restore to, if the consumer restores state from a checkpoint.
      3. *
      4. * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
      5. *
      6. * <p>Using a sorted map as the ordering is important when using restored state
      7. * to seed the partition discoverer.
      8. */
      9. private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;
    • 说明:如果指定了恢复的checkpoint路径,启动时候将会读取这个变量里面的内容获取起始offset,而不再是使用StartupMode中的枚举值作为初始的offset

  • unionOffsetStates

    • 所属类:FlinkKafkaConsumerBase.java
    • 定义:
      1. /** Accessor for state in the operator state backend. */
      2. private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
    • 说明:保存了checkpoint要持久化存储的内容,例如每个partition已经消费的offset等信息

非checkpoint模式

在没有开启checkpoint的时候,消费kafka中的数据,其实就是完全依靠kafka自身的机制进行消费。

checkpoint模式

开启checkpoint模式以后,会将offset等信息持久化存储以便恢复时使用。但是作业重启以后如果由于某种原因读不到checkpoint的结果,例如checkpoint文件丢失或者没有指定恢复路径等。

第一种情况,如果读取不到checkpoint的内容

subscribedPartitionsToStartOffsets会初始化所有partition的起始offset为-915623761773L这个值就表示了当前为GROUP_OFFSETS模式。

  1. default:
  2. for (KafkaTopicPartition seedPartition : allPartitions) {
  3. subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
  4. }

第一次消费之前,指定读取offset位置的关键方法是KafkaConsumerThread#reassignPartitions代码片段如下:

  1. for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
  2. if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
  3. consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
  4. newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  5. } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
  6. consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
  7. newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  8. } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
  9. // the KafkaConsumer by default will automatically seek the consumer position
  10. // to the committed group offset, so we do not need to do it.
  11. newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  12. } else {
  13. consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
  14. }
  15. }

因为是GROUP_OFFSET模式 ,所以会调用newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);需要说明的是,在state里面需要存储的是成功消费的最后一条数据的offset,但是通过position这个方法返回的是下一次应该消费的起始offset,所以需要减1。这里更新这个的目的是为了checkpoint的时候可以正确的拿到offset。

这种情况由于读取不到上次checkpoint的结果,所以依旧是依靠kafka自身的机制,即根据__consumer_offsets记录的内容消费。

第二种情况,checkpoint可以读取到

这种情况下,subscribedPartitionsToStartOffsets初始的offset就是具体从checkpoint中恢复的内容,这样KafkaConsumerThread#reassignPartitions实际走的分支就是

  1. consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);

这里加1的原理同上,state保存的是最后一次成功消费数据的offset,所以加1才是现在需要开始消费的offset。

总结

本文介绍了程序启动时,如何确定从哪个offset开始消费,这也是《Flink kafka source源码解析》系列的最后一篇。后续会继续分析flink kafka sink的相关源码以及flink结合kafka端到端的EXACTLY_ONCE是如何实现的。

注:本文基于flink 1.9.0和kafka 2.3