Kafka exactly once

image.png
开启幂等:

  1. props.put("enable.idempotence", ture);

代码逻辑:

  1. Producer<String, String> producer = new KafkaProducer<String, String>(props);
  2. // 初始化事务,包括结束该Transaction ID对应的未完成的事务(如果有)
  3. // 保证新的事务在一个正确的状态下启动
  4. producer.initTransactions();
  5. // 开始事务
  6. producer.beginTransaction();
  7. // 消费数据
  8. ConsumerRecords<String, String> records = consumer.poll(100);
  9. try{
  10. // 发送数据
  11. producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
  12. // 发送消费数据的Offset,将上述数据消费与数据发送纳入同一个Transaction内
  13. producer.sendOffsetsToTransaction(offsets, "group1");
  14. // 数据发送及Offset发送均成功的情况下,提交事务
  15. producer.commitTransaction();
  16. } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
  17. // 数据发送或者Offset发送出现异常时,终止事务
  18. producer.abortTransaction();
  19. } finally {
  20. // 关闭Producer和Consumer
  21. producer.close();
  22. consumer.close();
  23. }

参考

【Kafka-0.11.0.0是如何实现Exactly-once语义的】https://www.jianshu.com/p/5d889a67dcd3
【Flink实战: 结合Kafka构建端到端的Exactly-Once处理程序】https://my.oschina.net/u/992559/blog/1821399
【Flink实现Kafka到Mysql的Exactly-Once】https://www.jianshu.com/p/5bdd9a0d7d02
【kafka exactly once批处理】https://www.jianshu.com/p/19801ed5578e
【Kafka事务机制与Exactly Once语义实现原理】https://www.zybuluo.com/tinadu/note/949867
【RocketMQ使用Exactly-Once投递语义收发消息】https://help.aliyun.com/document_detail/102777.html