checkpoint模式下offset的提交
在Flink kafka source源码解析(二)介绍了在没有开启checkpoint的时候,offset的提交方式,本文将重点介绍开启checkpoint后,flink kafka consumer提交offset的方式。
初始化offsetCommitMode
通过Flink kafka source源码解析(二)的介绍可以知道,当调用了env.enableCheckpointing方法后offsetCommitMode的值就是ON_CHECKPOINTS,而且会通过下面方法强制关闭kafka自动提交功能,这个值很重要,后续很多地方都是根据这个值去判断如果操作的。
/*** Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS.* This overwrites whatever setting the user configured in the properties.* @param properties - Kafka configuration properties to be adjusted* @param offsetCommitMode offset commit mode*/static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");}}
保存offset
在做checkpoint的时候会调用FlinkKafkaConsumerBase#snapshotState方法,其中pendingOffsetsToCommit会保存要提交的offset
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call can happen// on this function at a time: either snapshotState() or notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}
同时,下面的变量会作为checkpoint的一部分保存下来,以便恢复时使用。
/** Accessor for state in the operator state backend. */private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
在snapshotState方法中会同时保存offset
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}
提交offset
在checkpoint完成以后,task会调用notifyCheckpointComplete方法,里面判断offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS的时候,调用fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);方法,最终会将要提交的offset通过KafkaFetcher#doCommitInternalOffsetsToKafka方法中的consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);保存到KafkaConsumerThread.java中的nextOffsetsToCommit成员变量里面。
这样就会保证当有需要提交的offset的时候,下面代码会执行consumer.commitAsync,从而完成了手动提交offset到kafka。
final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null);if (commitOffsetsAndCallback != null) {log.debug("Sending async offset commit request to Kafka broker");// also record that a commit is already in progress// the order here matters! first set the flag, then send the commit command.commitInProgress = true;consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));}
总结
本文介绍了在checkpoint模式下,flink kafka source提交offset的方式,后续会介绍consumer读取offset的流程。
注:本文基于flink 1.9.0和kafka 2.3
