主要流程

一般在flink中创建kafka source的代码如下:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. //KafkaEventSchema为自定义的数据字段解析类
  3. env.addSource(new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)

而Kafka的KafkaConsumer API中消费某个topic使用的是poll方法如下:

  1. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  2. consumer.poll(Duration.ofMillis(100));

本文将介绍flink从env.addSource方法最终调用到consumer.poll方法的过程。

源码分析

初始化

初始化执行env.addSource的时候会创建StreamSource对象,即final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);这里的function就是传入的FlinkKafkaConsumer对象,StreamSource构造函数中将这个对象传给父类AbstractUdfStreamOperator的userFunction变量,源码如下:

StreamSource.java

  1. public StreamSource(SRC sourceFunction) {
  2. super(sourceFunction);
  3. this.chainingStrategy = ChainingStrategy.HEAD;
  4. }

AbstractUdfStreamOperator.java

  1. public AbstractUdfStreamOperator(F userFunction) {
  2. this.userFunction = requireNonNull(userFunction);
  3. checkUdfCheckpointingPreconditions();
  4. }

Task运行

task启动后会调用到SourceStreamTask中的performDefaultAction()方法,这里面会启动一个线程sourceThread.start();,部分源码如下

  1. private final LegacySourceFunctionThread sourceThread;
  2. @Override
  3. protected void performDefaultAction(ActionContext context) throws Exception {
  4. sourceThread.start();
  5. }

LegacySourceFunctionThread的run方法中,通过调用headOperator.run方法,最终调用了StreamSource中的run方法,部分源码如下:

  1. public void run(final Object lockingObject,
  2. final StreamStatusMaintainer streamStatusMaintainer,
  3. final Output<StreamRecord<OUT>> collector,
  4. final OperatorChain<?, ?> operatorChain) throws Exception {
  5. //省略部分代码
  6. this.ctx = StreamSourceContexts.getSourceContext(
  7. timeCharacteristic,
  8. getProcessingTimeService(),
  9. lockingObject,
  10. streamStatusMaintainer,
  11. collector,
  12. watermarkInterval,
  13. -1);
  14. try {
  15. userFunction.run(ctx);
  16. //省略部分代码
  17. } finally {
  18. // make sure that the context is closed in any case
  19. ctx.close();
  20. if (latencyEmitter != null) {
  21. latencyEmitter.close();
  22. }
  23. }
  24. }

这里最重要的就是userFunction.run(ctx);,这个userFunction就是在上面初始化的时候传入的FlinkKafkaConsumer对象,也就是说这里实际调用了FlinkKafkaConsumer中的run方法,而具体的方法实现在其父类FlinkKafkaConsumerBase中,至此,进入了真正的kafka消费阶段。

Kafka消费阶段

FlinkKafkaConsumerBase#run中创建了一个KafkaFetcher对象,并最终调用了kafkaFetcher.runFetchLoop(),这个方法的代码片段如下:

  1. /** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */
  2. private final KafkaConsumerThread consumerThread;
  3. @Override
  4. public void runFetchLoop() throws Exception {
  5. try {
  6. final Handover handover = this.handover;
  7. // kick off the actual Kafka consumer
  8. consumerThread.start();
  9. //省略部分代码
  10. }

可以看到实际启动了一个KafkaConsumerThread线程。进入到KafkaConsumerThread#run中,下面只是列出了这个方法的部分源码,完整代码请参考KafkaConsumerThread.java

  1. @Override
  2. public void run() {
  3. // early exit check
  4. if (!running) {
  5. return;
  6. }
  7. // This method initializes the KafkaConsumer and guarantees it is torn down properly.
  8. // This is important, because the consumer has multi-threading issues,
  9. // including concurrent 'close()' calls.
  10. try {
  11. this.consumer = getConsumer(kafkaProperties);
  12. } catch (Throwable t) {
  13. handover.reportError(t);
  14. return;
  15. }
  16. try {
  17. // main fetch loop
  18. while (running) {
  19. try {
  20. if (records == null) {
  21. try {
  22. records = consumer.poll(pollTimeout);
  23. } catch (WakeupException we) {
  24. continue;
  25. }
  26. }
  27. }
  28. // end main fetch loop
  29. }
  30. } catch (Throwable t) {
  31. handover.reportError(t);
  32. } finally {
  33. handover.close();
  34. try {
  35. consumer.close();
  36. } catch (Throwable t) {
  37. log.warn("Error while closing Kafka consumer", t);
  38. }
  39. }
  40. }

至此,终于走到了真正从kafka拿数据的代码,即records = consumer.poll(pollTimeout);。因为KafkaConsumer不是线程安全的,所以每个线程都需要生成独立的KafkaConsumer对象,即this.consumer = getConsumer(kafkaProperties);

  1. KafkaConsumer<byte[], byte[]> getConsumer(Properties kafkaProperties) {
  2. return new KafkaConsumer<>(kafkaProperties);
  3. }

总结

本文只是介绍了flink消费kafka数据的关键流程,后续会更详细的介绍在AT_LEAST_ONCEEXACTLY_ONCE不同场景下FlinkKafkaConsumer管理offset的流程。

注:本文基于flink 1.9.0和kafka 2.3