主要流程
一般在flink中创建kafka source的代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//KafkaEventSchema为自定义的数据字段解析类env.addSource(new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
而Kafka的KafkaConsumer API中消费某个topic使用的是poll方法如下:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.poll(Duration.ofMillis(100));
本文将介绍flink从env.addSource方法最终调用到consumer.poll方法的过程。
源码分析
初始化
初始化执行env.addSource的时候会创建StreamSource对象,即final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);这里的function就是传入的FlinkKafkaConsumer对象,StreamSource构造函数中将这个对象传给父类AbstractUdfStreamOperator的userFunction变量,源码如下:
StreamSource.java
public StreamSource(SRC sourceFunction) {super(sourceFunction);this.chainingStrategy = ChainingStrategy.HEAD;}
AbstractUdfStreamOperator.java
public AbstractUdfStreamOperator(F userFunction) {this.userFunction = requireNonNull(userFunction);checkUdfCheckpointingPreconditions();}
Task运行
task启动后会调用到SourceStreamTask中的performDefaultAction()方法,这里面会启动一个线程sourceThread.start();,部分源码如下
private final LegacySourceFunctionThread sourceThread;@Overrideprotected void performDefaultAction(ActionContext context) throws Exception {sourceThread.start();}
在LegacySourceFunctionThread的run方法中,通过调用headOperator.run方法,最终调用了StreamSource中的run方法,部分源码如下:
public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final Output<StreamRecord<OUT>> collector,final OperatorChain<?, ?> operatorChain) throws Exception {//省略部分代码this.ctx = StreamSourceContexts.getSourceContext(timeCharacteristic,getProcessingTimeService(),lockingObject,streamStatusMaintainer,collector,watermarkInterval,-1);try {userFunction.run(ctx);//省略部分代码} finally {// make sure that the context is closed in any casectx.close();if (latencyEmitter != null) {latencyEmitter.close();}}}
这里最重要的就是userFunction.run(ctx);,这个userFunction就是在上面初始化的时候传入的FlinkKafkaConsumer对象,也就是说这里实际调用了FlinkKafkaConsumer中的run方法,而具体的方法实现在其父类FlinkKafkaConsumerBase中,至此,进入了真正的kafka消费阶段。
Kafka消费阶段
在FlinkKafkaConsumerBase#run中创建了一个KafkaFetcher对象,并最终调用了kafkaFetcher.runFetchLoop(),这个方法的代码片段如下:
/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */private final KafkaConsumerThread consumerThread;@Overridepublic void runFetchLoop() throws Exception {try {final Handover handover = this.handover;// kick off the actual Kafka consumerconsumerThread.start();//省略部分代码}
可以看到实际启动了一个KafkaConsumerThread线程。进入到KafkaConsumerThread#run中,下面只是列出了这个方法的部分源码,完整代码请参考KafkaConsumerThread.java
@Overridepublic void run() {// early exit checkif (!running) {return;}// This method initializes the KafkaConsumer and guarantees it is torn down properly.// This is important, because the consumer has multi-threading issues,// including concurrent 'close()' calls.try {this.consumer = getConsumer(kafkaProperties);} catch (Throwable t) {handover.reportError(t);return;}try {// main fetch loopwhile (running) {try {if (records == null) {try {records = consumer.poll(pollTimeout);} catch (WakeupException we) {continue;}}}// end main fetch loop}} catch (Throwable t) {handover.reportError(t);} finally {handover.close();try {consumer.close();} catch (Throwable t) {log.warn("Error while closing Kafka consumer", t);}}}
至此,终于走到了真正从kafka拿数据的代码,即records = consumer.poll(pollTimeout);。因为KafkaConsumer不是线程安全的,所以每个线程都需要生成独立的KafkaConsumer对象,即this.consumer = getConsumer(kafkaProperties);
KafkaConsumer<byte[], byte[]> getConsumer(Properties kafkaProperties) {return new KafkaConsumer<>(kafkaProperties);}
总结
本文只是介绍了flink消费kafka数据的关键流程,后续会更详细的介绍在AT_LEAST_ONCE和EXACTLY_ONCE不同场景下FlinkKafkaConsumer管理offset的流程。
注:本文基于flink 1.9.0和kafka 2.3
