定义

notifyCheckpointComplete方法在CheckpointListener接口中定义

  1. /**
  2. * This interface must be implemented by functions/operations that want to receive
  3. * a commit notification once a checkpoint has been completely acknowledged by all
  4. * participants.
  5. */
  6. @PublicEvolving
  7. public interface CheckpointListener {
  8. /**
  9. * This method is called as a notification once a distributed checkpoint has been completed.
  10. *
  11. * Note that any exception during this method will not cause the checkpoint to
  12. * fail any more.
  13. *
  14. * @param checkpointId The ID of the checkpoint that has been completed.
  15. * @throws Exception
  16. */
  17. void notifyCheckpointComplete(long checkpointId) throws Exception;
  18. }

简单说这个方法的含义就是在checkpoint做完之后,JobMaster会通知task执行这个方法,例如在FlinkKafkaProducernotifyCheckpointComplete中做了事务的提交。

样例

下面的程序会被分为两个task,task1是Source: Example Source和task2是Map -> Sink: Example Sink

  1. DataStream<KafkaEvent> input = env.addSource(
  2. new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
  3. .assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).name("Example Source")
  4. .keyBy("word")
  5. .map(new MapFunction<KafkaEvent, KafkaEvent>() {
  6. @Override
  7. public KafkaEvent map(KafkaEvent value) throws Exception {
  8. value.setFrequency(value.getFrequency() + 1);
  9. return value;
  10. }
  11. });
  12. input.addSink(
  13. new FlinkKafkaProducer<>(
  14. "bar",
  15. new KafkaSerializationSchemaImpl(),
  16. properties,
  17. FlinkKafkaProducer.Semantic.EXACTLY_ONCE)).name("Example Sink");

operator调用notifyCheckpointComplete

根据上面的例子,task1中只有一个source的operator,但是task2中有两个operator,分别是map和sink。

StreamTask中,调用task的notifyCheckpointComplete方法

  1. @Override
  2. public void notifyCheckpointComplete(long checkpointId) throws Exception {
  3. boolean success = false;
  4. synchronized (lock) {
  5. if (isRunning) {
  6. LOG.debug("Notification of complete checkpoint for task {}", getName());
  7. for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
  8. if (operator != null) {
  9. operator.notifyCheckpointComplete(checkpointId);
  10. }
  11. }
  12. success = true;
  13. }
  14. else {
  15. LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
  16. }
  17. }
  18. if (success) {
  19. syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId, this::finishTask);
  20. }
  21. }

其中关键的部分就是

  1. for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
  2. if (operator != null) {
  3. operator.notifyCheckpointComplete(checkpointId);
  4. }
  5. }

operator的调用顺序取决于allOperators变量,可以看到源码中的注释,operator是以逆序存放的

  1. /**
  2. * Stores all operators on this chain in reverse order.
  3. */
  4. private final StreamOperator<?>[] allOperators;

也就是说上面客户端的代码,虽然先调用了map后调用的sink,但是实际执行的时候,确实先调用sink的notifyCheckpointComplete方法,后调用map的。

对Exactly-Once语义的影响

注:以下讨论的都是基于kafka source和sink
上面的例子,是先执行source的notifyCheckpointComplete方法,再执行sink的notifyCheckpointComplete方法。但是如果把.keyBy("word")去掉,那么只会有一个task,所有operator逆序执行,也就是先调用sink的notifyCheckpointComplete方法再调用source的。
为了方便理解整个流程,下文只考察并发度为1的情况,不考虑部分subtask成功部分不成功的情况。具体分析之前要先搞清楚FlinkKafkaProducerFlinkKafkaConsumernotifyCheckpointComplete方法都做了什么事情,请先阅读Flink kafka source源码解析
Flink kafka sink源码解析

先sink后source

sink成功之后source执行之前 sink成功之前
checkpoint恢复 exactly-once 丢数据
__consumer_offsets恢复 重复消费 exactly-once

sink成功之后source执行之前,表示sink的notifyCheckpointComplete方法执行成功了,但是在执行source的notifyCheckpointComplete方法之前任务失败。
sink成功之前,表示sink的notifyCheckpointComplete方法执行失败,提交事务失败。

测试用例

测试代码主体架构如下:

  1. DataStream<KafkaEvent> input = env.addSource(
  2. new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
  3. .assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).name("Example Source")
  4. .map(new MapFunction<KafkaEvent, KafkaEvent>() {
  5. @Override
  6. public KafkaEvent map(KafkaEvent value) throws Exception {
  7. value.setFrequency(value.getFrequency() + 1);
  8. return value;
  9. }
  10. });
  11. input.addSink(
  12. new FlinkKafkaProducer<>(
  13. "bar",
  14. new KafkaSerializationSchemaImpl(),
  15. properties,
  16. FlinkKafkaProducer.Semantic.EXACTLY_ONCE)).name("Example Sink");

测试环境采用的是flink 1.9.0 Standalone Cluster模式,一个JobManager,一个TaskManager,默认只保存一个checkpoint。
模拟异常的方法,通过kill -9杀掉JobManager和TaskManager进程。

  1. FlinkKafkaProducer#commit方法第一行设置断点,当程序走到这个断点的时候kill -9杀掉JobManager和TaskManager进程,模拟sink的notifyCheckpointComplete方法执行失败的场景;
  2. 监控1,通过bin/kafka-console-consumer.sh --topic bar --bootstrap-server 10.1.236.66:9092监控producer是否flush数据;监控2,通过bin/kafka-console-consumer.sh --topic bar --bootstrap-server 10.1.236.66:9092 --isolation-level read_committed监控producer的事务是否成功提交;监控3,通过bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 10.1.236.66:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config /tmp/Consumer.properties监控consumer的offset是否提交到kafka
  3. 发送数据一条数据a,5,1572845161023,当走到断点的时候,说明consumer的checkpoint已经生成,但是还没有将offset提交到kafka,也就是checkpoint认为offset已经成功发送,但是kafka认为并没有发送,监控1有数据,监控2监控3都没有数据。kill -9杀掉JobManager和TaskManager进程
  4. 重新启动,并提交作业,不指定checkpoint路径。监控1,2,3,都有数据,所以这种情况,监控2,只收到了一次数据,也就是exactly-once。这时候监控3收到的数据为:partition0的offset=37,partition1的offset=43,partition2的offset=39
  5. 同样1-3步骤,发送数据一条数据b,6,1572845161023,第4步,启动作业的时候通过-s指定要恢复的checkpoint路径,启动后监控1,2都没有数据,但是监控3的数据为:partition0的offset=37,partition1的offset=43,partition2的offset=40,再查看task的日志FlinkKafkaConsumerBase - Consumer subtask 0 restored state: {KafkaTopicPartition{topic='foo', partition=0}=36, KafkaTopicPartition{topic='foo', partition=1}=42, KafkaTopicPartition{topic='foo', partition=2}=39}.,说明checkpoint认为上一次partition2的offset=39已经成功消费,所以恢复之后向kafka发送的offset为40。这样就导致了partition2的offset=39这条数据丢失

同样的方法可以测试sink成功之后source执行之前的场景,只是这时候需要将断点设置在TwoPhaseCommitSinkFunction#notifyCheckpointComplete方法的最后一行,这样就会发现故障之前,监控1,2都是有数据的,监控3没有数据。不指定checkpoint路径恢复,监控1,2都会收到数据,这样就导致了重复消费。如果指定checkpoint路径消费,那么监控1,2就不会收到数据,保证了exactly-once

原因分析

产生上面情况的原因主要就是因为checkpoint存储的offset和kafka中的offset不一致导致的。

先source后sink

需要说明的一点这个场景的两个task实际是并行的,并没有绝对的先后关系,只是会有这种前后关系的可能。

source成功之后sink执行之前 source成功之前
checkpoint恢复 丢数据 丢数据
__consumer_offsets恢复 丢数据 exactly-once

测试用例

模拟source成功之后sink执行之前

  1. 需要在上面的用例中加入keyby算子,确保生成两个task,监控3收到数据的时候说明consumer的notifyCheckpointComplete方法已经执行完。在FlinkKafkaProducer#commit方法第一行设置断点,当程序走到这个断点并且监控3收到数据的时候,kill -9杀掉JobManager和TaskManager进程,模拟sink执行notifyCheckpointComplete方法失败的场景;
  2. 这时候重启作业,checkpoint和kafka中offset已经是一致的了,无论是从checkpoint还是kafka,都是一样的。所以source认为已经成功消费了,不会再读上次的offset,都会导致数据丢失。

source成功之前
对于在source之前程序就挂掉,相当于所有的operator都没有执行notifyCheckpointComplete方法,但是source的checkpoint已经做过了,只是没有将offset发送到kafka,这样只有从__consumer_offsets恢复才能保证不丢数据。

总结

本文通过一种极端的测试场景希望让读者可以更深入的理解flink中的Exactly-Once语义。在程序挂了以后需要排查是什么原因和什么阶段导致的,才能通过合适的方式恢复作业。在实际的生产环境中,会有重试或者更多的方式保证高可用,也建议保留多个checkpoint,以便业务上可以恢复正确的数据。
注:本文基于flink 1.9.0和kafka 2.3