Kafka exactly once

开启幂等:
props.put("enable.idempotence", ture);
代码逻辑:
Producer<String, String> producer = new KafkaProducer<String, String>(props);// 初始化事务,包括结束该Transaction ID对应的未完成的事务(如果有)// 保证新的事务在一个正确的状态下启动producer.initTransactions();// 开始事务producer.beginTransaction();// 消费数据ConsumerRecords<String, String> records = consumer.poll(100);try{// 发送数据producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));// 发送消费数据的Offset,将上述数据消费与数据发送纳入同一个Transaction内producer.sendOffsetsToTransaction(offsets, "group1");// 数据发送及Offset发送均成功的情况下,提交事务producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// 数据发送或者Offset发送出现异常时,终止事务producer.abortTransaction();} finally {// 关闭Producer和Consumerproducer.close();consumer.close();}
参考
【Kafka-0.11.0.0是如何实现Exactly-once语义的】https://www.jianshu.com/p/5d889a67dcd3
【Flink实战: 结合Kafka构建端到端的Exactly-Once处理程序】https://my.oschina.net/u/992559/blog/1821399
【Flink实现Kafka到Mysql的Exactly-Once】https://www.jianshu.com/p/5bdd9a0d7d02
【kafka exactly once批处理】https://www.jianshu.com/p/19801ed5578e
【Kafka事务机制与Exactly Once语义实现原理】https://www.zybuluo.com/tinadu/note/949867
【RocketMQ使用Exactly-Once投递语义收发消息】https://help.aliyun.com/document_detail/102777.html
