exactly once

exactly once无非就是两种解决办法:

  • 消费时写入支持事务的数据库(可以回滚),把写入数据+更新offset 绑定成一个事务,如果更新offset失败则把写入的数据回滚。(类似mysql 只适合写入少量数据)
  • 消费时写入支持幂等性的数据库(可以覆盖),如果更新offset失败则再 重写写入一遍 刚才的数据,因为幂等性的原因,原有数据会被覆盖。(类似hbase es 适合写入大量数据)

精准一次性生产

  • ACK=-1 + 分区副本数>=2 + ISR个数>=2 + 幂等性 = exactly once

producer向partition发送数据后,partition需要回复ack(acknowledgement)。如果producer收到ack就会进行下一轮的发送,否则重新发送数据

ACK

1、ack=0:生产者不等待leader的ack,一直不停的写入。 优:延迟最低、缺:可能丢失数据(生产中使用少)
2、ack=1:leader完成同步后 就向生产者返回ack。如果在leader复制数据到follower的时候leader故障,则可能丢失数据(传输普通日志,允许丢失的场景)
3、ack=all(-1):leader、follower全部完成同步后才返回ack。如果在follower完成同步 leader未返回ack的时候,leader故障 则可能数据重复(钱相关的场景)

  • ack=all 要结合(min.insync.replicas 最少副本数) 使用。

    ISR: 默认min.insync.replicas = 1(只有leader,没有follower)

  • leader维护了一个动态的ISR(in-sync replica set,与leader保持同步的follower集合)。

当ISR中的follower完成同步之后,follower会向leader发送ack,如果follower长时间未完成,则该follower被剔除ISR。

  • ISR的作用还有: leader故障之后,从ISR中选举

    幂等性

    ```java Properties props = new Properties(); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, “true”);//这里设置为true props.put(“acks”, “all”); // 当 enable.idempotence 为 true,这里默认为 all props.put(“bootstrap.servers”, “localhost:9092”); props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”); props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);

KafkaProducer producer = new KafkaProducer(props);

producer.send(new ProducerRecord(topic, “test”); ```

  • 配置方法:
    • enable.idompotence = true; 0.11版本引入【幂等性】特性。
    • 要求max.in.flight.requests.per.connection<=5、retry>0、acks=’all’
  • 原理:发送一批消息会附带 主键:,broker会检查 主键重复则写入失败。
    • producerID: 重启会变,不能跨会话
    • PartitionID:不同分区ID不同,不能跨分区
    • SeqNumber:消息批次ID,递增
    • 综上所述!只能保证单个Producer对于同一个的Exactly Once语义。不能保证跨分区、跨会话幂等。

跨分区举栗子:如果生产者的分组策略是黏性分区,假设一条数据已经写入A分区成功 然而leader挂机没有返回ack。下一次尝试时会发到不同的分区B,这样AB分区都存有同样的一条数据。

事务性

image.png

  • 在开启幂等性的前提下,才可以开启事务
  • Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子操作来处理。即使跨分区处理 都可以在一个原子单元内完成。操作要么全部完成、要么全部失败
  • isolation.level=read_committed 表示消费端不可以看到尚未提交的事务数据
  • TransactionnalId(也叫txnid):即使故障恢复后也不会改变。 生产者从Transaction Coordinator这里拿到txnid

    精准一次性消费

    数据量小

    可以利用mysql的事务性,把数据消费和修改偏移量 写成一个事务

    数据量大

    写入hbase、es:先消费再手动提交offset(最少一次性消费,可能重复) + 下游的幂等性 = exactly once