






  1. // determine the offset commit mode
  2. this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
  3. getIsAutoCommitEnabled(),
  4. enableCommitOnCheckpoints,
  5. ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());


  • DISABLED:完全禁用offset提交。
  • ON_CHECKPOINTS:当checkpoint完成的时候再提交offset。
  • KAFKA_PERIODIC:周期性提交offset。


  1. /**
  2. * Determine the offset commit mode using several configuration values.
  3. *
  4. * @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
  5. * @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
  6. * @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
  7. *
  8. * @return the offset commit mode to use, based on the configuration values.
  9. */
  10. public static OffsetCommitMode fromConfiguration(
  11. boolean enableAutoCommit,
  12. boolean enableCommitOnCheckpoint,
  13. boolean enableCheckpointing) {
  14. if (enableCheckpointing) {
  15. // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
  16. return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
  17. } else {
  18. // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
  19. return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;


  1. 如果启用了checkpoint,并且启用了checkpoint完成时提交offset,返回ON_CHECKPOINTS。
  2. 如果未启用checkpoint,但是启用了自动提交,返回KAFKA_PERIODIC。
  3. 其他情况都返回DISABLED。 ```java // create the partition discoverer this.partitionDiscoverer = createPartitionDiscoverer( topicsDescriptor, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); this.partitionDiscoverer.open();

subscribedPartitionsToStartOffsets = new HashMap<>(); final List allPartitions = partitionDiscoverer.discoverPartitions();

  1. 这段代码中topicsDescriptorfixedTopicstopicPattern的封装。其中fixedTopics明确指定了topic的名称,称为固定topictopicPattern为匹配topic名称的正则表达式,用于分区发现。
  2. **createPartitionDiscoverer**方法创建了一个KafkaPartitionDiscoverer对象,主要负责Kafka分区发现。partitionDiscoverer.open()方法创建出一个KafkaConsumer
  3. **subscribedPartitionsToStartOffsets** 为已订阅的分区列表,这里将它初始化。<br />partitionDiscoverer.discoverPartitions()用户获取所有fixedTopics和匹配topicPatternTopic包含的所有分区信息。该部分代码稍后分析。
  4. 接下来**open**方法的代码结构如下:
  5. ```java
  6. if (restoredState != null) {
  7. // 从快照恢复逻辑...
  8. } else {
  9. // 直接启动逻辑...
  10. }


  1. // 如果restoredState没有存储某一分区的状态
  2. // 需要重头消费该分区
  3. for (KafkaTopicPartition partition : allPartitions) {
  4. if (!restoredState.containsKey(partition)) {
  5. restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
  6. }
  7. }
  8. for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
  9. // seed the partition discoverer with the union state while filtering out
  10. // restored partitions that should not be subscribed by this subtask
  11. // 此处可过滤掉不归该task负责的kafka分区
  12. if (KafkaTopicPartitionAssigner.assign(
  13. restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
  14. == getRuntimeContext().getIndexOfThisSubtask()){
  15. subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
  16. }
  17. }
  18. // 依照分区发现配置的topic正则表达式过滤分区
  19. if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
  20. // 过滤掉topic名称不符合topicsDescriptor的topicPattern的分区
  21. subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
  22. if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
  23. LOG.warn(
  24. "{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
  25. entry.getKey());
  26. return true;
  27. }
  28. return false;
  29. });
  30. }


  • GROUP_OFFSETS:从保存在zookeeper或者是Kafka broker的对应消费者组提交的offset开始消费,这个是默认的配置
  • EARLIEST:尽可能从最早的offset开始消费
  • LATEST:从最近的offset开始消费
  • TIMESTAMP:从用户提供的timestamp处开始消费
  • SPECIFIC_OFFSETS:从用户提供的offset处开始消费



  2. // 如果没有配置具体从哪个offset开始消费,程序抛出异常
  3. if (specificStartupOffsets == null) {
  4. throw new IllegalStateException(
  5. "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
  6. ", but no specific offsets were specified.");
  7. }
  8. for (KafkaTopicPartition seedPartition : allPartitions) {
  9. // 获取每个分区指定的消费起始offset
  10. Long specificOffset = specificStartupOffsets.get(seedPartition);
  11. if (specificOffset != null) {
  12. // since the specified offsets represent the next record to read, we subtract
  13. // it by one so that the initial state of the consumer will be correct
  14. // 如果分区配置了offset,设置从offset开始消费
  15. subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
  16. } else {
  17. // default to group offset behaviour if the user-provided specific offsets
  18. // do not contain a value for this partition
  19. // 如果分区没有配置offset,设置从GROUP_OFFSET开始消费
  20. subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
  21. }
  22. }
  23. break;


  1. case TIMESTAMP:
  2. // 如果没有配置timestamp,程序报错退出
  3. if (startupOffsetsTimestamp == null) {
  4. throw new IllegalStateException(
  5. "Startup mode for the consumer set to " + StartupMode.TIMESTAMP +
  6. ", but no startup timestamp was specified.");
  7. }
  8. // 根据timestamp获取分区的offset
  9. // 遍历这些分区
  10. for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset
  11. : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
  12. // 如果无offset,使用LATEST_OFFSET
  13. // 如果获取到了offset,从这个offset开始消费
  14. subscribedPartitionsToStartOffsets.put(
  15. partitionToOffset.getKey(),
  16. (partitionToOffset.getValue() == null)
  17. // if an offset cannot be retrieved for a partition with the given timestamp,
  18. // we default to using the latest offset for the partition
  19. ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
  20. // since the specified offsets represent the next record to read, we subtract
  21. // it by one so that the initial state of the consumer will be correct
  22. : partitionToOffset.getValue() - 1);
  23. }
  24. break;
  25. // 其他情况,使用KafkaTopicPartitionStateSentinel类对应的值作为offset
  26. default:
  27. for (KafkaTopicPartition seedPartition : allPartitions) {
  28. subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
  29. }




  1. if (subscribedPartitionsToStartOffsets == null) {
  2. throw new Exception("The partitions were not set for the consumer");
  3. }


  1. // initialize commit metrics and default offset callback method
  2. // 设置成功提交计数监控
  3. this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
  4. // 设置失败提交计数监控
  5. this.failedCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
  6. // 获取子任务index
  7. final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
  8. // 注册一个提交时的回调函数
  9. this.offsetCommitCallback = new KafkaCommitCallback() {
  10. @Override
  11. public void onSuccess() {
  12. // 提交成功,成功提交计数器加一
  13. successfulCommits.inc();
  14. }
  15. @Override
  16. public void onException(Throwable cause) {
  17. LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
  18. // 提交失败,失败提交计数器加一
  19. failedCommits.inc();
  20. }
  21. };


  1. if (subscribedPartitionsToStartOffsets.isEmpty()) {
  2. sourceContext.markAsTemporarilyIdle();
  3. }

下面是获取数据的过程。这里创建了一个KafkaFetcher,负责借助KafkaConsumer API从Kafka broker获取数据。

  1. this.kafkaFetcher = createFetcher(
  2. sourceContext,
  3. subscribedPartitionsToStartOffsets,
  4. watermarkStrategy,
  5. (StreamingRuntimeContext) getRuntimeContext(),
  6. offsetCommitMode,
  7. getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
  8. useMetrics);


  1. if (!running) {
  2. return;
  3. }


  1. if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
  2. // 直接启动获取数据任务
  3. kafkaFetcher.runFetchLoop();
  4. } else {
  5. // 否则,启动定期分区发现任务和数据获取任务
  6. runWithPartitionDiscovery();
  7. }


  1. private void runWithPartitionDiscovery() throws Exception {
  2. final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
  3. // 启动分区发现定时任务
  4. createAndStartDiscoveryLoop(discoveryLoopErrorRef);
  5. // 启动kafka broker数据获取任务
  6. kafkaFetcher.runFetchLoop();
  7. // make sure that the partition discoverer is waked up so that
  8. // the discoveryLoopThread exits
  9. // 使partitionDiscoverer.discoverPartitions()抛异常
  10. // 能够从discoveryLoopThread 返回
  11. partitionDiscoverer.wakeup();
  12. // 等待discoveryLoopThread 执行完毕
  13. joinDiscoveryLoopThread();
  14. // rethrow any fetcher errors
  15. final Exception discoveryLoopError = discoveryLoopErrorRef.get();
  16. if (discoveryLoopError != null) {
  17. throw new RuntimeException(discoveryLoopError);
  18. }
  19. }


  1. private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
  2. // 创建一个discoveryLoop线程
  3. discoveryLoopThread = new Thread(() -> {
  4. try {
  5. // --------------------- partition discovery loop ---------------------
  6. // throughout the loop, we always eagerly check if we are still running before
  7. // performing the next operation, so that we can escape the loop as soon as possible
  8. while (running) {
  9. if (LOG.isDebugEnabled()) {
  10. LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
  11. }
  12. final List<KafkaTopicPartition> discoveredPartitions;
  13. try {
  14. // 尝试发现新分区,如果方法抛出异常,退出循环
  15. discoveredPartitions = partitionDiscoverer.discoverPartitions();
  16. } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {
  17. // the partition discoverer may have been closed or woken up before or during the discovery;
  18. // this would only happen if the consumer was canceled; simply escape the loop
  19. break;
  20. }
  21. // no need to add the discovered partitions if we were closed during the meantime
  22. // 如果没有发现新的分区,或者数据源已关闭之时,没必要再添加新分区
  23. if (running && !discoveredPartitions.isEmpty()) {
  24. kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
  25. }
  26. // do not waste any time sleeping if we're not running anymore
  27. if (running && discoveryIntervalMillis != 0) {
  28. try {
  29. // 睡眠discoveryIntervalMillis时间
  30. Thread.sleep(discoveryIntervalMillis);
  31. } catch (InterruptedException iex) {
  32. // may be interrupted if the consumer was canceled midway; simply escape the loop
  33. break;
  34. }
  35. }
  36. }
  37. } catch (Exception e) {
  38. discoveryLoopErrorRef.set(e);
  39. } finally {
  40. // calling cancel will also let the fetcher loop escape
  41. // (if not running, cancel() was already called)
  42. if (running) {
  43. cancel();
  44. }
  45. }
  46. }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
  47. // 启动分区发现定时任务线程
  48. discoveryLoopThread.start();
  49. }


  1. public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
  2. // 确保没有关闭数据源,也没有wakeup
  3. if (!closed && !wakeup) {
  4. try {
  5. List<KafkaTopicPartition> newDiscoveredPartitions;
  6. // (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern
  7. // 如果配置了fixedTopic,获取这些topic的分区
  8. if (topicsDescriptor.isFixedTopics()) {
  9. newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
  10. } else {
  11. // 如果没有配置fixedTopic
  12. // 1. 获取所有topic
  13. List<String> matchedTopics = getAllTopics();
  14. // retain topics that match the pattern
  15. // 2. 逐个排除名字不是fixedTopic,或名字不匹配topicPattern的topic
  16. Iterator<String> iter = matchedTopics.iterator();
  17. while (iter.hasNext()) {
  18. if (!topicsDescriptor.isMatchingTopic(iter.next())) {
  19. iter.remove();
  20. }
  21. }
  22. if (matchedTopics.size() != 0) {
  23. // get partitions only for matched topics
  24. // 3. 如果有匹配的topic,获取他们的分区
  25. newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
  26. } else {
  27. // 否则newDiscoveredPartitions 设置为null
  28. newDiscoveredPartitions = null;
  29. }
  30. }
  31. // (2) eliminate partition that are old partitions or should not be subscribed by this subtask
  32. if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
  33. throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
  34. } else {
  35. Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
  36. KafkaTopicPartition nextPartition;
  37. while (iter.hasNext()) {
  38. nextPartition = iter.next();
  39. // 分区存入discoveredPartitions集合中
  40. // 返回值为分区是否归当前task消费
  41. if (!setAndCheckDiscoveredPartition(nextPartition)) {
  42. iter.remove();
  43. }
  44. }
  45. }
  46. return newDiscoveredPartitions;
  47. } catch (WakeupException e) {
  48. // the actual topic / partition metadata fetching methods
  49. // may be woken up midway; reset the wakeup flag and rethrow
  50. wakeup = false;
  51. throw e;
  52. }
  53. } else if (!closed && wakeup) {
  54. // may have been woken up before the method call
  55. wakeup = false;
  56. throw new WakeupException();
  57. } else {
  58. throw new ClosedException();
  59. }
  60. }


此方法为FlinkKafkaConsumer获取数据的主入口,通过一个循环来不断获取kafka broker的数据。

  1. public void runFetchLoop() throws Exception {
  2. try {
  3. // kick off the actual Kafka consumer
  4. // 启动kafka消费线程,定期从kafkaConsumer拉取数据并转交给handover对象
  5. consumerThread.start();
  6. while (running) {
  7. // this blocks until we get the next records
  8. // it automatically re-throws exceptions encountered in the consumer thread
  9. // 获取handover中的数据
  10. // 如果此时consumerThread尚未把数据交给handover,该方法会阻塞
  11. final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
  12. // get the records for each topic partition
  13. for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {
  14. // 获取属于该分区的records
  15. List<ConsumerRecord<byte[], byte[]>> partitionRecords =
  16. records.records(partition.getKafkaPartitionHandle());
  17. partitionConsumerRecordsHandler(partitionRecords, partition);
  18. }
  19. }
  20. }
  21. finally {
  22. // this signals the consumer thread that no more work is to be done
  23. consumerThread.shutdown();
  24. }
  25. // on a clean exit, wait for the runner thread
  26. try {
  27. consumerThread.join();
  28. }
  29. catch (InterruptedException e) {
  30. // may be the result of a wake-up interruption after an exception.
  31. // we ignore this here and only restore the interruption state
  32. Thread.currentThread().interrupt();
  33. }
  34. }

此方法中的collect kafka数据的逻辑在partitionConsumerRecordsHandler中。我们查看下它的代码:

  1. protected void partitionConsumerRecordsHandler(
  2. List<ConsumerRecord<byte[], byte[]>> partitionRecords,
  3. KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {
  4. // 反序列化Kafka record为bean
  5. // 此deserializer需要实现KafkaDeserializationSchema接口
  6. for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
  7. deserializer.deserialize(record, kafkaCollector);
  8. // emit the actual records. this also updates offset state atomically and emits
  9. // watermarks
  10. // 发送数据,更新offset,生成timestamp和watermark
  11. emitRecordsWithTimestamps(
  12. kafkaCollector.getRecords(),
  13. partition,
  14. record.offset(),
  15. record.timestamp());
  16. // 如果数据源已到末尾,停止fetcher循环
  17. if (kafkaCollector.isEndOfStreamSignalled()) {
  18. // end of stream signaled
  19. running = false;
  20. break;
  21. }
  22. }
  23. }


  1. protected void emitRecordsWithTimestamps(
  2. Queue<T> records,
  3. KafkaTopicPartitionState<T, KPH> partitionState,
  4. long offset,
  5. long kafkaEventTimestamp) {
  6. // emit the records, using the checkpoint lock to guarantee
  7. // atomicity of record emission and offset state update
  8. synchronized (checkpointLock) {
  9. T record;
  10. while ((record = records.poll()) != null) {
  11. // 此处调用SourceFunction中的sourceContext
  12. // 数据源收集元素逻辑在此
  13. long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);
  14. sourceContext.collectWithTimestamp(record, timestamp);
  15. // this might emit a watermark, so do it after emitting the record
  16. // 可能定时发送水位线
  17. partitionState.onEvent(record, timestamp);
  18. }
  19. partitionState.setOffset(offset);
  20. }
  21. }



  1. // main fetch loop
  2. while (running) {
  3. // check if there is something to commit
  4. // 检查是否则commit过程中
  5. if (!commitInProgress) {
  6. // get and reset the work-to-be committed, so we don't repeatedly commit the same
  7. // 获取需要提交的offset值,以及commit回调函数
  8. // 获取完毕之后需要设置为null,防止反复提交
  9. final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
  10. nextOffsetsToCommit.getAndSet(null);
  11. if (commitOffsetsAndCallback != null) {
  12. log.debug("Sending async offset commit request to Kafka broker");
  13. // also record that a commit is already in progress
  14. // the order here matters! first set the flag, then send the commit command.
  15. // 开始提交过程
  16. commitInProgress = true;
  17. // 异步提交offset
  18. consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
  19. }
  20. }
  21. // 为consumer指定新的分区
  22. // 由于分区发现功能的存在,consumer需要添加新发现的分区,否则poll数据会报错
  23. try {
  24. if (hasAssignedPartitions) {
  25. newPartitions = unassignedPartitionsQueue.pollBatch();
  26. }
  27. else {
  28. // if no assigned partitions block until we get at least one
  29. // instead of hot spinning this loop. We rely on a fact that
  30. // unassignedPartitionsQueue will be closed on a shutdown, so
  31. // we don't block indefinitely
  32. newPartitions = unassignedPartitionsQueue.getBatchBlocking();
  33. }
  34. if (newPartitions != null) {
  35. reassignPartitions(newPartitions);
  36. }
  37. } catch (AbortedReassignmentException e) {
  38. continue;
  39. }
  40. if (!hasAssignedPartitions) {
  41. // Without assigned partitions KafkaConsumer.poll will throw an exception
  42. continue;
  43. }
  44. // get the next batch of records, unless we did not manage to hand the old batch over
  45. if (records == null) {
  46. try {
  47. // 从consumer拉取数据
  48. // 这里的pollTimeout可以通过配置flink.poll-timeout参数修改
  49. // pollTimeout默认值为100ms
  50. records = consumer.poll(pollTimeout);
  51. }
  52. catch (WakeupException we) {
  53. continue;
  54. }
  55. }
  56. try {
  57. // 将数据交给handover
  58. handover.produce(records);
  59. records = null;
  60. }
  61. catch (Handover.WakeupException e) {
  62. // fall through the loop
  63. }
  64. }
  65. // end main fetch loop
  66. }
  67. catch (Throwable t) {
  68. // let the main thread know and exit
  69. // it may be that this exception comes because the main thread closed the handover, in
  70. // which case the below reporting is irrelevant, but does not hurt either
  71. handover.reportError(t);
  72. }
  73. finally {
  74. // make sure the handover is closed if it is not already closed or has an error
  75. handover.close();
  76. // make sure the KafkaConsumer is closed
  77. try {
  78. consumer.close();
  79. }
  80. catch (Throwable t) {
  81. log.warn("Error while closing Kafka consumer", t);
  82. }
  83. }





  1. 如果KafkaFetcher尚未初始化完毕。需要保存已订阅的topic连同他们的初始offset。
  2. 如果KafkaFetcher已初始化完毕,调用fetcher的snapshotCurrentState方法。
  3. 如果offsetCommitMode为ON_CHECKPOINTS类型,还需要将topic和offset写入到pendingOffsetsToCommit集合中。该集合用于checkpoint成功的时候向Kafka broker提交offset。(offsetCommitMode不为ON_CHECKPOINTS和DISABLED的时候,使用的是自动提交offset的模式)


    在所有的operator都快照成功的时候,会向JobManager的CheckpointCoordinator发送确认消息,然后coordinator会通知各个operator checkpoint已经完成。(详细请参见Flink 源码之快照
    )为了保证保证数据不会被遗漏和重复消费,ON_CHECKPOINTS模式运行的FlinkKafkaConsumer只能在这个时候提交offset到kafka consumer。调用notifyCheckpointComplete的时候通知kafka consumer,将checkpoint之时保存的各个分区的offset提交给kafka broker。从而保证数据的一致性。