8、消息传递保障

    • at most once(消费者最多收到一次消息,0—1次):acks = 0 可以实现。
    • at least once(消费者至少收到一次消息,1—多次):ack = all 可以实现。
    • exactly once(消费者刚好收到一次消息):at least once 加上消费者幂等性可以实现,还可以用kafka生产者的幂等性来实现。

    kafka生产者的幂等性:因为发送端重试导致的消息重复发送问题,kafka的幂等性可以保证重复发送的消息只接收一次,只需在生产者加上参数 props.put(“enable.idempotence”, true) 即可,默认是false不开启。
    具体实现原理是,kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和Sequence Number,如果相同不会再接收。

    1. PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID 对用户完全是透明的。生产者如果重启则会生成新的PID
    2. Sequence Number:对于每个 PID,该 Producer 发送到每个 Partition 的数据都有对应的序列号,这些序列号是从0开始单调递增的。

    9、kafka的事务
    Kafka的事务不同于Rocketmq,Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败),一般在kafka的流式计算场景用得多一点,比如,kafka需要对一个topic里的消息做不同的流式计算处理,处理完分别发到不同的topic里,这些topic分别被不同的下游系统消费(比如hbase,redis,es等),这种我们肯定希望系统发送到多个topic的数据保持事务一致性。Kafka要实现类似Rocketmq的分布式事务需要额外开发功能。
    kafka的事务处理可以参考官方文档

    1. Properties props = new Properties();
    2. props.put("bootstrap.servers", "localhost:9092");
    3. props.put("transactional.id", "my-transactional-id");
    4. Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
    5. //初始化事务
    6. producer.initTransactions();
    7. try {
    8. //开启事务
    9. producer.beginTransaction();
    10. for (int i = 0; i < 100; i++){
    11. //发到不同的主题的不同分区
    12. producer.send(new ProducerRecord<>("hdfs-topic", Integer.toString(i), Integer.toString(i)));
    13. producer.send(new ProducerRecord<>("es-topic", Integer.toString(i), Integer.toString(i)));
    14. producer.send(new ProducerRecord<>("redis-topic", Integer.toString(i), Integer.toString(i)));
    15. }
    16. //提交事务
    17. producer.commitTransaction();
    18. } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    19. // We can't recover from these exceptions, so our only option is to close the producer and exit.
    20. producer.close();
    21. } catch (KafkaException e) {
    22. // For all other exceptions, just abort the transaction and try again.
    23. //回滚事务
    24. producer.abortTransaction();
    25. }
    26. producer.close();

    10、kafka高性能的原因

    • 磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,不会写入文件中的某个位置(随机写)保证了磁盘顺序写。
    • 数据传输的零拷贝
    • 读写数据的批量batch处理以及压缩传输

    数据传输零拷贝原理:
    image.png