作为一个广泛应用的实时计算引擎,为了支持不同的业务场景,Flink提供了许多常见外部数据框架的Connector。Kafka也是平时大家用的最多的数据流载体,为了实现在生产消费过程的恰好一次语义,做到数据不重不漏,在社区版本迭代过程中,这一点日趋完善。今天我们就谈谈在最新的Flink1.12版本中恰好一次语义是如何实现的。

Kafka Consumer

对于将Kafka作为source算子的程序,由于Kafka自身使用offset来管理已消费数据在队列中的位置,所以保障消费的恰好一次主要是通过记录已经消费的offset,当任务fail后从之前保存的已消费到的offset重新消费即可做到数据不漏读。这个对于offset的保存操作是在做Checkpoint或者Savepoint时进行的,具体是保存在ListState中,由于这里是source算子,还没涉及到keyby操作,所以这个ListState是属于operator state。
但是仔细想想,这种机制能做到数据不漏读,但不能做到不重读,为什么呢。因为Checkpoint是有周期的,是读取了一批数据之后才通过Checkpoint去提交最新消费到的offset,并不是每读取一条数据就立即提交offset。我们假设一个场景,当我们完成上次Checkpoint后,继续消费了一批数据,但是这时候任务fail,这批在上次Checkpoint之后消费的数据并没有把最后一条数据的offset提交上去,我们任务restore之后,读取的state里面保存的是上次Checkpoint提交的offset,肯定是比当前已消费的数据其offset早的,但是我们之前消费的这一小批数据在任务fail前也处理完毕并输出到下游组件了。这时候再从上次Checkpoint保存的offset恢复消费,那么这一小批数据将会被重复读取消费,产生重复的计算结果,对于下游而言,也就是会再次收到同一份数据。
如此看来,好像Flink Kafka Connector只能做到至少一次,也就是顶多保证每个数据不漏读,但是可能会被重复读。事实上,如果单纯考虑Kafka作为source算子,确实只能做到至少一次语义,但是纵观整个数据链路,如果我们能做到sink出去的数据可以撤回或者做成幂等,好像这样即使source消费重复了也没关系。真相确实如此,Flink所定义的恰好一次是一种端到端的恰好一次,从source到中间的计算性算子再到最后的source,只要能做到从外部整体看到恰好一次就可以了。这样就要求下游的数据接收方能做到写入可撤回或者写入幂等操作。如果我们的下游依旧是Kafka,那么它是否能实现这一构想呢?

Kafka Producer

如果在Flink中将Kafka作为Sink,想要做到写入幂等或者“可撤回”,那么可以开启Kafka的事务特性,这一特性已经在0.11之后的版本中支持。Flink中提供了TwoPhaseCommitSinkFunction类,但凡需要实现事务写入的Sink算子均需要继承该类,其原理是分布式事务的二阶段提交算法,JM在这里作为协调者,TM在这里作为参与者。当Checkpoint Barrier到达Sink后,在snapshotState方法里通过KafkaProducer.flush()将数据刷写出去,但是在事务的生命周期里这部分数据并未真正提交。本次事务尚未结束,下一次事务马上开启,即本次事务的提交部分将异步进行。当所有节点做完state的异步阶段并发送ack后,协调者JM确认Checkpoint成功发送答复ack,通知各节点调用notifyCheckpointComplete方法,各节点才通过调用KafkaProducer.commitTransaction()去提交本次事务,此时数据才真正被Kafka所接受。通过这样一些保障机制,结合Flink Source算子的重放机制,才能真正实现端到端的恰好一次语义。