概述

Kafka 生产者幂等是在 Kafka 0.11.0.0 版本实现,Kafka 引入 EOS(Exactly once semantics,精确一次处理语义)的特性,这个特性主要靠 Kafka 幂等性和 Kafka 事务来保证的。
Kakfa 的幂等性能保证 Broker 端的单个分区内只会收到有且只有一条消息,不会出现重复消息,但使用 PRODUCER 幂等是需要一些前提条件:

  1. 单分区幂等。PRODUCER 幂等只能保证某个分区内部不会出现重复消息,想要实现多分区幂等,需要依赖 Kafka 事务来保证。
  2. 单会话幂等。PRODUCER 无法实现跨会话幂等。即同一个 PRODUCER 宕机重启,那么它无法保证重启前和重启后之间的消息也是唯一的。

    消息语义

    目前 Kafka 能提供以下 3 种消息语义:
消息语义 Kafka 实现 优点 缺点
最多一次 at most once 禁用生产者重试,修改 Client 端配置参数 retries=0 集群吞吐量非常大 消息丢失
至少一次 at least once 不需要改动什么配置,默认是这种语义 吞吐量较高,消息不会丢失 消息重复
精确一次 exactly once 修改 Client 端 enable.idempotence=true 单个分区内不会出现重复消息 吞吐量较低

重复消息产生的原因和场景

生产者生产重复消息的根本原因是 Broker 已经对消息持久化,但 Producer 并未收到对消息的 ACK 响应,导致 Producer 不断重试发送消息,而 Borker 收到消息后并未对消息进行去重操作,最终导致消息重复。
Producer 未能收到 ACK 响应的场景有很多,比如瞬时集群网络环境出现波动,导致 Producer 超时重发(超时时间由 request.timeout.ms 配置,默认值:30S) ,或者由于 Broker 下线 -> 分区 Leader 变更,消息已经持久化至本地日志文件,但响应未能及时发送给 Producer。

Kafka 的幂等解决方案

Kafka 实现幂等非常简单,就是让消息具有”全局”唯一性。注意,这里”全局”并非指整个 Kafka 集群,它仅是针对单个分区而言(针对整个 Kafka 集群消息唯一那就是 Kafka 事务了)。幂等性是保证单个分区的消息唯一,而单个分区可以对应多个生产者,因此,消息的唯一性是由 PID+自增ID 保证。
为什么幂等只针对单个分区呢? 因为如果保证全局唯一需要花费很大的力气,势必会造成整个集群吞吐量下降,利用分区进行隔离,可以较好兼顾吞吐量的同时保证分区内消息不重复。

Producer 获取 PID、生成自增ID(Sequence numbers)

我们先看一张幂等生产者获取 PID 并发送消息的流程示意图:
生产者获取PID并发送消息过程.png

获取PID

每个幂等 Producer 在初始化时都会向事务协调器申请一个 PID,而这个 PID 是由 Zookeeper 保证在集群的唯一性。对用户来说,PID 是透明无感知的。

  1. // org.apache.kafka.clients.producer.internals.TransactionManager#bumpIdempotentEpochAndResetIdIfNeeded
  2. synchronized void bumpIdempotentEpochAndResetIdIfNeeded() {
  3. if (!isTransactional()) {
  4. if (epochBumpRequired) {
  5. bumpIdempotentProducerEpoch();
  6. }
  7. // 状态不为INITIALIZING且没有Producer ID,那么就需要发送「InitProducerId」请求以获取PID
  8. if (currentState != State.INITIALIZING && !hasProducerId()) {
  9. transitionTo(State.INITIALIZING);
  10. InitProducerIdRequestData requestData = new InitProducerIdRequestData()
  11. .setTransactionalId(null)
  12. .setTransactionTimeoutMs(Integer.MAX_VALUE);
  13. InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), false);
  14. enqueueRequest(handler);
  15. }
  16. }
  17. }

Broker 端处理 InitProducerId

  1. // kafka.coordinator.transaction.TransactionCoordinator#handleInitProducerId
  2. /**
  3. * 生成生产者的PID
  4. *
  5. * @param transactionalId 事务ID
  6. * @param transactionTimeoutMs 事务超时时间
  7. * @param expectedProducerIdAndEpoch
  8. * @param responseCallback 回调方法
  9. */
  10. def handleInitProducerId(transactionalId: String,
  11. transactionTimeoutMs: Int,
  12. expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch],
  13. responseCallback: InitProducerIdCallback): Unit = {
  14. if (transactionalId == null) {
  15. // 如果只是幂等生产者,则transactionalId==null,直接分配PID并返回
  16. val producerId = producerIdGenerator.generateProducerId()
  17. // 返回响应
  18. responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))
  19. //...
  20. }

Broker 端的事务管理器会向 Zookeeper 申请号段,源码如下:

  1. // kafka.coordinator.transaction.ProducerIdManager#getNewProducerIdBlock
  2. /**
  3. * 尝试向Zookeeper节点写入数据,直到写入成功后返回
  4. */
  5. private def getNewProducerIdBlock(): Unit = {
  6. var zkWriteComplete = false
  7. while (!zkWriteComplete) {
  8. // 获取 /latest_producer_id_block 节点数据
  9. // {"version":1,"broker":5,"block_start":"15000","block_end":"15999"}
  10. val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
  11. // generate the new producerId block
  12. currentProducerIdBlock = dataOpt match {
  13. case Some(data) =>
  14. // 解析数据
  15. val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data)
  16. debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
  17. // 如果超出最大值,则抛出异常
  18. if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) {
  19. // we have exhausted all producerIds (wow!), treat it as a fatal error
  20. fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})")
  21. throw new KafkaException("Have exhausted all producerIds.")
  22. }
  23. // 生成下一批次的请求体/latest_producer_id_block节点中
  24. ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
  25. case None =>
  26. debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
  27. ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
  28. }
  29. // 生成新的JSON,这些数据将会存储到
  30. val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
  31. // 尝试将数据写入节点,有可能写入失败,如果写入失败,那么重试,直到写入成功为止
  32. val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
  33. newProducerIdBlockData,
  34. zkVersion,
  35. Some(checkProducerIdBlockZkData))
  36. zkWriteComplete = succeeded
  37. if (zkWriteComplete)
  38. info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version")
  39. }
  40. }

Producer 管理自增 ID

当 Producer 准备发送数据,第一步从事务管理器获取 PID,然后再从本地事务管理器中获取 Sequence Number,具体获取操作是调用 RecordAccumulator#drainBatchesForOneNode() 方法完成的,具体源码如下:

  1. // org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode
  2. if (producerIdAndEpoch != null && !batch.hasSequence()) {
  3. // 如果分区的PID/epoch和最新的生产者不匹配,我们更新并重置sequence numbers。
  4. // 并将PID和EPOCH更新为最新值
  5. transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
  6. // If the batch already has an assigned sequence, then we should not change the producer id and
  7. // sequence number, since this may introduce duplicates. In particular, the previous attempt
  8. // may actually have been accepted, and if we change the producer id and sequence here, this
  9. // attempt will also be accepted, causing a duplicate.
  10. //
  11. // Additionally, we update the next sequence number bound for the partition, and also have
  12. // the transaction manager track the batch so as to ensure that sequence ordering is maintained
  13. // even if we receive out of order responses.
  14. // 设置PID+Sequence Number
  15. batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
  16. transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
  17. log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + "{} being sent to partition {}", producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, batch.baseSequence(), tp);
  18. transactionManager.addInFlightBatch(batch);
  19. }

Broker 处理幂等Producer的PRODUCER请求

下面我们看看 Broker 是如何处理幂等的 PRODUCER 请求的。入口方法是 KafakApis#handle

  1. override def handle(request: RequestChannel.Request): Unit = {
  2. try {
  3. if (!apiVersionManager.isApiEnabled(request.header.apiKey)) {
  4. throw new IllegalStateException(s"API ${request.header.apiKey} is not enabled")
  5. }
  6. // #2 根据请求头的apiKey字段判断属于哪类请求,然后match路由到合适的方法并处理
  7. request.header.apiKey match {
  8. case ApiKeys.PRODUCE => handleProduceRequest(request)
  9. //...
  10. }
  11. }
  12. }

幂等消费者如何保证分区顺序性

在幂等 Producer 未出来之前,单个分区的顺序性是通过将 Client 端参数MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 1 而实现。这意味着客户端发送请求后必须等待响应才能发送下一个请求,消息发送属于串行化发送,虽然保证了单分区的消息顺序性,但势必会让 Kafka 的吞吐量下降。
而幂等消费者的出现可类比 TCP 早期的滑动窗口,它也与 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 这个参数紧密相关,此时不再要求等于 1,而是 <=5。这意味着客户端可以一次性最多发送 5 个请求给 Broker 端,Broker 会缓存该 Producer 最近 5 个请求的元数据用来做重复校验,这也是 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 <=5 的原因。如果出现序号相同,说明是重复消息,舍弃,这实现了单分区幂等。
那分区顺序性是如何保证的呢?通过判断 PID+Sequence Number + 重试机制。如果 Broker 收到的消息的序号和缓存中的不连续,那么返回 OUT_OF_ORDER_SEQUENCE_NUMBER错误,Client 收到该异常后会重发,直到超过最大重试次数或超时。
Client 收到某个批次返回的 OUT_OF_ORDER_SEQUENCE_NUMBER 响应后,会做以下事情:

  1. 将批数据重新入队。入队也是有要要求的,是按 Sequence Number 排序。
  2. 开启新一轮从 RecordAccumulator 抽取数据(调用 RecordAccumulator#drain() 方法抽取数据)。如果批次有 Sequence Number,那么说明该批次是幂等重试批次(因为新的批次此时是没有PID 和 Sequence Number 数据的),这时候会判断位于 in-flight 首个批次的 Sequence Number 和
  3. 队列中是否还有未收到 ACK 的请求,如果没有,才会允许这个批次