image.png


Sink端提交

幂等性

幂等性是保证最终一次性,但是中途可能不一致。举个栗子:
计算班里有多少人: 5 10 | 15 20 15 20 |
| 表示检查点,在进行到20的时候sink失败了 于是从上一个检查点重新写入,于是 15 20又重新显示了一次。

事务写入

image.png

  • 事务的四大特性:原子性(Atomicity)、 一致性(Correspondence)、隔离性(Isolation)和持久性(Durability),这就是著名的 ACID

    预写日志:WAL

  • 比较简单,类似检查点做一个缓存。有缺点:数据已写入sink 在返回ack确认时出现故障,flink则错误的认为没有写入sink。可能重复消费

  • 实现GenericWriteAheadSink

    两阶段提交:2PC(真正基于事务)

  • 实现TwoPhaseCommitSinkFunction

  • 通过与flink自身的checkpoint进行配合,实际增加很小的开销

1、flink的第一条数据进入 kafka会开启第一个事务
2、第一个barrier进入kafaka时,会开启第二个事务
3、等jobManager通知kafak,已完成第一个barrier前的检查点,则提交第一个事务
4、等jobManager通知kafak,已完成第一个barrier和第二个barrier之间的检查点,则提交第二个事务
输入端:
在source任务中,将读取的kafka-offset保存为算子状态,写入检查点。当发生故障时从检查点读取offset、通过FlinkKafakConsumer重新从offset消费恢复状态。
输出端:
FlinkKafkaProducer就实现了TwoPhaseCommitSinkFunction接口
需要的配置:
1、启用检查点
2、FlinkKafkaProducer的构造函数传入Semantic.EXACTLY_ONCE
3、sink端Kafka配置: 数据的隔离级别isolation.level = read_committed。只可以读取已提交的数据,未提交的数据读取不到。
4、flink和sink端超时时间的配置:flink的事务超时时间为transaction.timeout.ms 默认一小时、sink端kafka的事务超时时间为transaction.max.timeout.ms默认15分钟。 会出现kafka已经超时 而flink继续等待的情况,需要设置成前者<=后者