简介
FlinkKafkaConsumer为Flink消费Kafka数据的连接器。在Flink中的角色为数据源。
FlinkKafkaConsumer的继承结构
如下图所示:
我们发现FlinkKafkaConsumer继承自FlinkKafkaConsumerBase。FlinkKafkaConsumerBase又实现了SourceFunction和RichFunction接口。接下来我们重点分析它的open和run方法。
FlinkKafkaConsumerBase的open方法
该方法包含的内容为FlinkKafkaConsumer的初始化逻辑。
首先设置提交offset的模式
// determine the offset commit mode
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
OffsetCommitMode是一个枚举类型,具有如下三个值:
- DISABLED:完全禁用offset提交。
- ON_CHECKPOINTS:当checkpoint完成的时候再提交offset。
- KAFKA_PERIODIC:周期性提交offset。
判断OffsetCommitMode的逻辑封装在了OffsetCommitModes.fromConfiguration方法中。该方法的代码如下:
/**
* Determine the offset commit mode using several configuration values.
*
* @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
* @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
* @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
*
* @return the offset commit mode to use, based on the configuration values.
*/
public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {
if (enableCheckpointing) {
// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
} else {
// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
这段代码逻辑可以总结为:
- 如果启用了checkpoint,并且启用了checkpoint完成时提交offset,返回ON_CHECKPOINTS。
- 如果未启用checkpoint,但是启用了自动提交,返回KAFKA_PERIODIC。
- 其他情况都返回DISABLED。 ```java // create the partition discoverer this.partitionDiscoverer = createPartitionDiscoverer( topicsDescriptor, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); this.partitionDiscoverer.open();
subscribedPartitionsToStartOffsets = new HashMap<>();
final List
这段代码中topicsDescriptor为fixedTopics和topicPattern的封装。其中fixedTopics明确指定了topic的名称,称为固定topic。topicPattern为匹配topic名称的正则表达式,用于分区发现。
**createPartitionDiscoverer**方法创建了一个KafkaPartitionDiscoverer对象,主要负责Kafka分区发现。partitionDiscoverer.open()方法创建出一个KafkaConsumer。
**subscribedPartitionsToStartOffsets** 为已订阅的分区列表,这里将它初始化。<br />partitionDiscoverer.discoverPartitions()用户获取所有fixedTopics和匹配topicPattern的Topic包含的所有分区信息。该部分代码稍后分析。
接下来**open**方法的代码结构如下:
```java
if (restoredState != null) {
// 从快照恢复逻辑...
} else {
// 直接启动逻辑...
}
如果consumer是从快照恢复的,restoredState不为空。反之restoredState为空。
我们首先分析一下从快照恢复的逻辑。代码如下:
// 如果restoredState没有存储某一分区的状态
// 需要重头消费该分区
for (KafkaTopicPartition partition : allPartitions) {
if (!restoredState.containsKey(partition)) {
restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
}
}
for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
// seed the partition discoverer with the union state while filtering out
// restored partitions that should not be subscribed by this subtask
// 此处可过滤掉不归该task负责的kafka分区
if (KafkaTopicPartitionAssigner.assign(
restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask()){
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
}
// 依照分区发现配置的topic正则表达式过滤分区
if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
// 过滤掉topic名称不符合topicsDescriptor的topicPattern的分区
subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
LOG.warn(
"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
entry.getKey());
return true;
}
return false;
});
}
接下来我们分析下Consumer直接启动的逻辑(不从快照恢复)。
在此之前需要了解下StartupMode这个枚举类型。该枚举类型有5个值:
- GROUP_OFFSETS:从保存在zookeeper或者是Kafka broker的对应消费者组提交的offset开始消费,这个是默认的配置
- EARLIEST:尽可能从最早的offset开始消费
- LATEST:从最近的offset开始消费
- TIMESTAMP:从用户提供的timestamp处开始消费
- SPECIFIC_OFFSETS:从用户提供的offset处开始消费
然后,Comsumer使用分区发现工具来获取初始的分区。根据StartupMode来设置它们的起始消费offset。
我们先看SPECIFIC_OFFSETS这种情况。
case SPECIFIC_OFFSETS:
// 如果没有配置具体从哪个offset开始消费,程序抛出异常
if (specificStartupOffsets == null) {
throw new IllegalStateException(
"Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
", but no specific offsets were specified.");
}
for (KafkaTopicPartition seedPartition : allPartitions) {
// 获取每个分区指定的消费起始offset
Long specificOffset = specificStartupOffsets.get(seedPartition);
if (specificOffset != null) {
// since the specified offsets represent the next record to read, we subtract
// it by one so that the initial state of the consumer will be correct
// 如果分区配置了offset,设置从offset开始消费
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
} else {
// default to group offset behaviour if the user-provided specific offsets
// do not contain a value for this partition
// 如果分区没有配置offset,设置从GROUP_OFFSET开始消费
subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
}
}
break;
如果采用TIMESTAMP模式,逻辑如下所示:
case TIMESTAMP:
// 如果没有配置timestamp,程序报错退出
if (startupOffsetsTimestamp == null) {
throw new IllegalStateException(
"Startup mode for the consumer set to " + StartupMode.TIMESTAMP +
", but no startup timestamp was specified.");
}
// 根据timestamp获取分区的offset
// 遍历这些分区
for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset
: fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
// 如果无offset,使用LATEST_OFFSET
// 如果获取到了offset,从这个offset开始消费
subscribedPartitionsToStartOffsets.put(
partitionToOffset.getKey(),
(partitionToOffset.getValue() == null)
// if an offset cannot be retrieved for a partition with the given timestamp,
// we default to using the latest offset for the partition
? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
// since the specified offsets represent the next record to read, we subtract
// it by one so that the initial state of the consumer will be correct
: partitionToOffset.getValue() - 1);
}
break;
// 其他情况,使用KafkaTopicPartitionStateSentinel类对应的值作为offset
default:
for (KafkaTopicPartition seedPartition : allPartitions) {
subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
}
接下来的if语句段负责打印一些日志信息。这里就不再分析了。
FlinkKafkaConsumerBase的run方法
run方法包含了从KafkaConsumer消费数据,和向Flink下游发送数据的逻辑。
首先检查open方法中初始化的subscribedPartitionsToStartOffsets是否为null。
if (subscribedPartitionsToStartOffsets == null) {
throw new Exception("The partitions were not set for the consumer");
}
接下来配置成功commit和失败commit数量的监控。
// initialize commit metrics and default offset callback method
// 设置成功提交计数监控
this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
// 设置失败提交计数监控
this.failedCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
// 获取子任务index
final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
// 注册一个提交时的回调函数
this.offsetCommitCallback = new KafkaCommitCallback() {
@Override
public void onSuccess() {
// 提交成功,成功提交计数器加一
successfulCommits.inc();
}
@Override
public void onException(Throwable cause) {
LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
// 提交失败,失败提交计数器加一
failedCommits.inc();
}
};
接下来判断subscribedPartitionsToStartOffsets集合是否为空。如果为空,标记数据源的状态为暂时空闲。
if (subscribedPartitionsToStartOffsets.isEmpty()) {
sourceContext.markAsTemporarilyIdle();
}
下面是获取数据的过程。这里创建了一个KafkaFetcher,负责借助KafkaConsumer API从Kafka broker获取数据。
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
watermarkStrategy,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
接下来检测running变量的状态。如果没有running,直接返回。
if (!running) {
return;
}
最后是根据分区发现间隔时间的配置来确定是否启动分区的定时发现任务。
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
// 直接启动获取数据任务
kafkaFetcher.runFetchLoop();
} else {
// 否则,启动定期分区发现任务和数据获取任务
runWithPartitionDiscovery();
}
最后我们分析下runWithPartitionDiscovery方法。代码如下:
private void runWithPartitionDiscovery() throws Exception {
final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
// 启动分区发现定时任务
createAndStartDiscoveryLoop(discoveryLoopErrorRef);
// 启动kafka broker数据获取任务
kafkaFetcher.runFetchLoop();
// make sure that the partition discoverer is waked up so that
// the discoveryLoopThread exits
// 使partitionDiscoverer.discoverPartitions()抛异常
// 能够从discoveryLoopThread 返回
partitionDiscoverer.wakeup();
// 等待discoveryLoopThread 执行完毕
joinDiscoveryLoopThread();
// rethrow any fetcher errors
final Exception discoveryLoopError = discoveryLoopErrorRef.get();
if (discoveryLoopError != null) {
throw new RuntimeException(discoveryLoopError);
}
}
我们再跟踪下看看如何启动分区发现定时任务的。
private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
// 创建一个discoveryLoop线程
discoveryLoopThread = new Thread(() -> {
try {
// --------------------- partition discovery loop ---------------------
// throughout the loop, we always eagerly check if we are still running before
// performing the next operation, so that we can escape the loop as soon as possible
while (running) {
if (LOG.isDebugEnabled()) {
LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
}
final List<KafkaTopicPartition> discoveredPartitions;
try {
// 尝试发现新分区,如果方法抛出异常,退出循环
discoveredPartitions = partitionDiscoverer.discoverPartitions();
} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {
// the partition discoverer may have been closed or woken up before or during the discovery;
// this would only happen if the consumer was canceled; simply escape the loop
break;
}
// no need to add the discovered partitions if we were closed during the meantime
// 如果没有发现新的分区,或者数据源已关闭之时,没必要再添加新分区
if (running && !discoveredPartitions.isEmpty()) {
kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
}
// do not waste any time sleeping if we're not running anymore
if (running && discoveryIntervalMillis != 0) {
try {
// 睡眠discoveryIntervalMillis时间
Thread.sleep(discoveryIntervalMillis);
} catch (InterruptedException iex) {
// may be interrupted if the consumer was canceled midway; simply escape the loop
break;
}
}
}
} catch (Exception e) {
discoveryLoopErrorRef.set(e);
} finally {
// calling cancel will also let the fetcher loop escape
// (if not running, cancel() was already called)
if (running) {
cancel();
}
}
}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
// 启动分区发现定时任务线程
discoveryLoopThread.start();
}
我们再详细研究下上述方法中partitionDiscoverer.discoverPartitions()的调用,即发现分区的执行过程。代码如下:
public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
// 确保没有关闭数据源,也没有wakeup
if (!closed && !wakeup) {
try {
List<KafkaTopicPartition> newDiscoveredPartitions;
// (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern
// 如果配置了fixedTopic,获取这些topic的分区
if (topicsDescriptor.isFixedTopics()) {
newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
} else {
// 如果没有配置fixedTopic
// 1. 获取所有topic
List<String> matchedTopics = getAllTopics();
// retain topics that match the pattern
// 2. 逐个排除名字不是fixedTopic,或名字不匹配topicPattern的topic
Iterator<String> iter = matchedTopics.iterator();
while (iter.hasNext()) {
if (!topicsDescriptor.isMatchingTopic(iter.next())) {
iter.remove();
}
}
if (matchedTopics.size() != 0) {
// get partitions only for matched topics
// 3. 如果有匹配的topic,获取他们的分区
newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
} else {
// 否则newDiscoveredPartitions 设置为null
newDiscoveredPartitions = null;
}
}
// (2) eliminate partition that are old partitions or should not be subscribed by this subtask
if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
} else {
Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
KafkaTopicPartition nextPartition;
while (iter.hasNext()) {
nextPartition = iter.next();
// 分区存入discoveredPartitions集合中
// 返回值为分区是否归当前task消费
if (!setAndCheckDiscoveredPartition(nextPartition)) {
iter.remove();
}
}
}
return newDiscoveredPartitions;
} catch (WakeupException e) {
// the actual topic / partition metadata fetching methods
// may be woken up midway; reset the wakeup flag and rethrow
wakeup = false;
throw e;
}
} else if (!closed && wakeup) {
// may have been woken up before the method call
wakeup = false;
throw new WakeupException();
} else {
throw new ClosedException();
}
}
kafkaFetcher的runFetchLoop方法
此方法为FlinkKafkaConsumer获取数据的主入口,通过一个循环来不断获取kafka broker的数据。
public void runFetchLoop() throws Exception {
try {
// kick off the actual Kafka consumer
// 启动kafka消费线程,定期从kafkaConsumer拉取数据并转交给handover对象
consumerThread.start();
while (running) {
// this blocks until we get the next records
// it automatically re-throws exceptions encountered in the consumer thread
// 获取handover中的数据
// 如果此时consumerThread尚未把数据交给handover,该方法会阻塞
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
// get the records for each topic partition
for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {
// 获取属于该分区的records
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
partitionConsumerRecordsHandler(partitionRecords, partition);
}
}
}
finally {
// this signals the consumer thread that no more work is to be done
consumerThread.shutdown();
}
// on a clean exit, wait for the runner thread
try {
consumerThread.join();
}
catch (InterruptedException e) {
// may be the result of a wake-up interruption after an exception.
// we ignore this here and only restore the interruption state
Thread.currentThread().interrupt();
}
}
此方法中的collect kafka数据的逻辑在partitionConsumerRecordsHandler中。我们查看下它的代码:
protected void partitionConsumerRecordsHandler(
List<ConsumerRecord<byte[], byte[]>> partitionRecords,
KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {
// 反序列化Kafka record为bean
// 此deserializer需要实现KafkaDeserializationSchema接口
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
deserializer.deserialize(record, kafkaCollector);
// emit the actual records. this also updates offset state atomically and emits
// watermarks
// 发送数据,更新offset,生成timestamp和watermark
emitRecordsWithTimestamps(
kafkaCollector.getRecords(),
partition,
record.offset(),
record.timestamp());
// 如果数据源已到末尾,停止fetcher循环
if (kafkaCollector.isEndOfStreamSignalled()) {
// end of stream signaled
running = false;
break;
}
}
}
它调用了emitRecordWithTimestamp方法,继续查看。
protected void emitRecordsWithTimestamps(
Queue<T> records,
KafkaTopicPartitionState<T, KPH> partitionState,
long offset,
long kafkaEventTimestamp) {
// emit the records, using the checkpoint lock to guarantee
// atomicity of record emission and offset state update
synchronized (checkpointLock) {
T record;
while ((record = records.poll()) != null) {
// 此处调用SourceFunction中的sourceContext
// 数据源收集元素逻辑在此
long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);
sourceContext.collectWithTimestamp(record, timestamp);
// this might emit a watermark, so do it after emitting the record
// 可能定时发送水位线
partitionState.onEvent(record, timestamp);
}
partitionState.setOffset(offset);
}
}
KafkaConsumerThread
KafkaConsumerThread负责在单独的线程中从Kafka中拉取数据到handover。这里我们分析下它的run方法中获取数据的部分。
// main fetch loop
while (running) {
// check if there is something to commit
// 检查是否则commit过程中
if (!commitInProgress) {
// get and reset the work-to-be committed, so we don't repeatedly commit the same
// 获取需要提交的offset值,以及commit回调函数
// 获取完毕之后需要设置为null,防止反复提交
final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
nextOffsetsToCommit.getAndSet(null);
if (commitOffsetsAndCallback != null) {
log.debug("Sending async offset commit request to Kafka broker");
// also record that a commit is already in progress
// the order here matters! first set the flag, then send the commit command.
// 开始提交过程
commitInProgress = true;
// 异步提交offset
consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
}
}
// 为consumer指定新的分区
// 由于分区发现功能的存在,consumer需要添加新发现的分区,否则poll数据会报错
try {
if (hasAssignedPartitions) {
newPartitions = unassignedPartitionsQueue.pollBatch();
}
else {
// if no assigned partitions block until we get at least one
// instead of hot spinning this loop. We rely on a fact that
// unassignedPartitionsQueue will be closed on a shutdown, so
// we don't block indefinitely
newPartitions = unassignedPartitionsQueue.getBatchBlocking();
}
if (newPartitions != null) {
reassignPartitions(newPartitions);
}
} catch (AbortedReassignmentException e) {
continue;
}
if (!hasAssignedPartitions) {
// Without assigned partitions KafkaConsumer.poll will throw an exception
continue;
}
// get the next batch of records, unless we did not manage to hand the old batch over
if (records == null) {
try {
// 从consumer拉取数据
// 这里的pollTimeout可以通过配置flink.poll-timeout参数修改
// pollTimeout默认值为100ms
records = consumer.poll(pollTimeout);
}
catch (WakeupException we) {
continue;
}
}
try {
// 将数据交给handover
handover.produce(records);
records = null;
}
catch (Handover.WakeupException e) {
// fall through the loop
}
}
// end main fetch loop
}
catch (Throwable t) {
// let the main thread know and exit
// it may be that this exception comes because the main thread closed the handover, in
// which case the below reporting is irrelevant, but does not hurt either
handover.reportError(t);
}
finally {
// make sure the handover is closed if it is not already closed or has an error
handover.close();
// make sure the KafkaConsumer is closed
try {
consumer.close();
}
catch (Throwable t) {
log.warn("Error while closing Kafka consumer", t);
}
}
checkpoint流程
checkpoint流程大部分代码为状态的读写。这里为大家总结下主要的流程,不分析具体的代码。
snapshotState方法
FlinkKafkaConsumerBase的snapshotState方法包含snapshot的流程。包含如下:
- 如果KafkaFetcher尚未初始化完毕。需要保存已订阅的topic连同他们的初始offset。
- 如果KafkaFetcher已初始化完毕,调用fetcher的snapshotCurrentState方法。
- 如果offsetCommitMode为ON_CHECKPOINTS类型,还需要将topic和offset写入到pendingOffsetsToCommit集合中。该集合用于checkpoint成功的时候向Kafka broker提交offset。(offsetCommitMode不为ON_CHECKPOINTS和DISABLED的时候,使用的是自动提交offset的模式)
notifyCheckpointComplete方法
在所有的operator都快照成功的时候,会向JobManager的CheckpointCoordinator发送确认消息,然后coordinator会通知各个operator checkpoint已经完成。(详细请参见Flink 源码之快照
)为了保证保证数据不会被遗漏和重复消费,ON_CHECKPOINTS模式运行的FlinkKafkaConsumer只能在这个时候提交offset到kafka consumer。调用notifyCheckpointComplete的时候通知kafka consumer,将checkpoint之时保存的各个分区的offset提交给kafka broker。从而保证数据的一致性。