Kafka exactly once
开启幂等:
props.put("enable.idempotence", ture);
代码逻辑:
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 初始化事务,包括结束该Transaction ID对应的未完成的事务(如果有)
// 保证新的事务在一个正确的状态下启动
producer.initTransactions();
// 开始事务
producer.beginTransaction();
// 消费数据
ConsumerRecords<String, String> records = consumer.poll(100);
try{
// 发送数据
producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
// 发送消费数据的Offset,将上述数据消费与数据发送纳入同一个Transaction内
producer.sendOffsetsToTransaction(offsets, "group1");
// 数据发送及Offset发送均成功的情况下,提交事务
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// 数据发送或者Offset发送出现异常时,终止事务
producer.abortTransaction();
} finally {
// 关闭Producer和Consumer
producer.close();
consumer.close();
}
参考
【Kafka-0.11.0.0是如何实现Exactly-once语义的】https://www.jianshu.com/p/5d889a67dcd3
【Flink实战: 结合Kafka构建端到端的Exactly-Once处理程序】https://my.oschina.net/u/992559/blog/1821399
【Flink实现Kafka到Mysql的Exactly-Once】https://www.jianshu.com/p/5bdd9a0d7d02
【kafka exactly once批处理】https://www.jianshu.com/p/19801ed5578e
【Kafka事务机制与Exactly Once语义实现原理】https://www.zybuluo.com/tinadu/note/949867
【RocketMQ使用Exactly-Once投递语义收发消息】https://help.aliyun.com/document_detail/102777.html