概述
Kafka 生产者幂等是在 Kafka 0.11.0.0 版本实现,Kafka 引入 EOS(Exactly once semantics,精确一次处理语义)的特性,这个特性主要靠 Kafka 幂等性和 Kafka 事务来保证的。
Kakfa 的幂等性能保证 Broker 端的单个分区内只会收到有且只有一条消息,不会出现重复消息,但使用 PRODUCER 幂等是需要一些前提条件:
- 单分区幂等。PRODUCER 幂等只能保证某个分区内部不会出现重复消息,想要实现多分区幂等,需要依赖 Kafka 事务来保证。
- 单会话幂等。PRODUCER 无法实现跨会话幂等。即同一个 PRODUCER 宕机重启,那么它无法保证重启前和重启后之间的消息也是唯一的。
消息语义
目前 Kafka 能提供以下 3 种消息语义:
| 消息语义 | Kafka 实现 | 优点 | 缺点 |
|---|---|---|---|
| 最多一次 at most once | 禁用生产者重试,修改 Client 端配置参数 retries=0 | 集群吞吐量非常大 | 消息丢失 |
| 至少一次 at least once | 不需要改动什么配置,默认是这种语义 | 吞吐量较高,消息不会丢失 | 消息重复 |
| 精确一次 exactly once | 修改 Client 端 enable.idempotence=true | 单个分区内不会出现重复消息 | 吞吐量较低 |
重复消息产生的原因和场景
生产者生产重复消息的根本原因是 Broker 已经对消息持久化,但 Producer 并未收到对消息的 ACK 响应,导致 Producer 不断重试发送消息,而 Borker 收到消息后并未对消息进行去重操作,最终导致消息重复。
Producer 未能收到 ACK 响应的场景有很多,比如瞬时集群网络环境出现波动,导致 Producer 超时重发(超时时间由 request.timeout.ms 配置,默认值:30S) ,或者由于 Broker 下线 -> 分区 Leader 变更,消息已经持久化至本地日志文件,但响应未能及时发送给 Producer。
Kafka 的幂等解决方案
Kafka 实现幂等非常简单,就是让消息具有”全局”唯一性。注意,这里”全局”并非指整个 Kafka 集群,它仅是针对单个分区而言(针对整个 Kafka 集群消息唯一那就是 Kafka 事务了)。幂等性是保证单个分区的消息唯一,而单个分区可以对应多个生产者,因此,消息的唯一性是由 PID+自增ID 保证。
为什么幂等只针对单个分区呢? 因为如果保证全局唯一需要花费很大的力气,势必会造成整个集群吞吐量下降,利用分区进行隔离,可以较好兼顾吞吐量的同时保证分区内消息不重复。
Producer 获取 PID、生成自增ID(Sequence numbers)
我们先看一张幂等生产者获取 PID 并发送消息的流程示意图:
获取PID
每个幂等 Producer 在初始化时都会向事务协调器申请一个 PID,而这个 PID 是由 Zookeeper 保证在集群的唯一性。对用户来说,PID 是透明无感知的。
// org.apache.kafka.clients.producer.internals.TransactionManager#bumpIdempotentEpochAndResetIdIfNeededsynchronized void bumpIdempotentEpochAndResetIdIfNeeded() {if (!isTransactional()) {if (epochBumpRequired) {bumpIdempotentProducerEpoch();}// 状态不为INITIALIZING且没有Producer ID,那么就需要发送「InitProducerId」请求以获取PIDif (currentState != State.INITIALIZING && !hasProducerId()) {transitionTo(State.INITIALIZING);InitProducerIdRequestData requestData = new InitProducerIdRequestData().setTransactionalId(null).setTransactionTimeoutMs(Integer.MAX_VALUE);InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), false);enqueueRequest(handler);}}}
Broker 端处理 InitProducerId
// kafka.coordinator.transaction.TransactionCoordinator#handleInitProducerId/*** 生成生产者的PID** @param transactionalId 事务ID* @param transactionTimeoutMs 事务超时时间* @param expectedProducerIdAndEpoch* @param responseCallback 回调方法*/def handleInitProducerId(transactionalId: String,transactionTimeoutMs: Int,expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch],responseCallback: InitProducerIdCallback): Unit = {if (transactionalId == null) {// 如果只是幂等生产者,则transactionalId==null,直接分配PID并返回val producerId = producerIdGenerator.generateProducerId()// 返回响应responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))//...}
Broker 端的事务管理器会向 Zookeeper 申请号段,源码如下:
// kafka.coordinator.transaction.ProducerIdManager#getNewProducerIdBlock/*** 尝试向Zookeeper节点写入数据,直到写入成功后返回*/private def getNewProducerIdBlock(): Unit = {var zkWriteComplete = falsewhile (!zkWriteComplete) {// 获取 /latest_producer_id_block 节点数据// {"version":1,"broker":5,"block_start":"15000","block_end":"15999"}val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)// generate the new producerId blockcurrentProducerIdBlock = dataOpt match {case Some(data) =>// 解析数据val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data)debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")// 如果超出最大值,则抛出异常if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) {// we have exhausted all producerIds (wow!), treat it as a fatal errorfatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})")throw new KafkaException("Have exhausted all producerIds.")}// 生成下一批次的请求体/latest_producer_id_block节点中ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)case None =>debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)}// 生成新的JSON,这些数据将会存储到val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)// 尝试将数据写入节点,有可能写入失败,如果写入失败,那么重试,直到写入成功为止val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,newProducerIdBlockData,zkVersion,Some(checkProducerIdBlockZkData))zkWriteComplete = succeededif (zkWriteComplete)info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version")}}
Producer 管理自增 ID
当 Producer 准备发送数据,第一步从事务管理器获取 PID,然后再从本地事务管理器中获取 Sequence Number,具体获取操作是调用 RecordAccumulator#drainBatchesForOneNode() 方法完成的,具体源码如下:
// org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNodeif (producerIdAndEpoch != null && !batch.hasSequence()) {// 如果分区的PID/epoch和最新的生产者不匹配,我们更新并重置sequence numbers。// 并将PID和EPOCH更新为最新值transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);// If the batch already has an assigned sequence, then we should not change the producer id and// sequence number, since this may introduce duplicates. In particular, the previous attempt// may actually have been accepted, and if we change the producer id and sequence here, this// attempt will also be accepted, causing a duplicate.//// Additionally, we update the next sequence number bound for the partition, and also have// the transaction manager track the batch so as to ensure that sequence ordering is maintained// even if we receive out of order responses.// 设置PID+Sequence Numberbatch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + "{} being sent to partition {}", producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, batch.baseSequence(), tp);transactionManager.addInFlightBatch(batch);}
Broker 处理幂等Producer的PRODUCER请求
下面我们看看 Broker 是如何处理幂等的 PRODUCER 请求的。入口方法是 KafakApis#handle:
override def handle(request: RequestChannel.Request): Unit = {try {if (!apiVersionManager.isApiEnabled(request.header.apiKey)) {throw new IllegalStateException(s"API ${request.header.apiKey} is not enabled")}// #2 根据请求头的apiKey字段判断属于哪类请求,然后match路由到合适的方法并处理request.header.apiKey match {case ApiKeys.PRODUCE => handleProduceRequest(request)//...}}}
幂等消费者如何保证分区顺序性
在幂等 Producer 未出来之前,单个分区的顺序性是通过将 Client 端参数MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 1 而实现。这意味着客户端发送请求后必须等待响应才能发送下一个请求,消息发送属于串行化发送,虽然保证了单分区的消息顺序性,但势必会让 Kafka 的吞吐量下降。
而幂等消费者的出现可类比 TCP 早期的滑动窗口,它也与 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 这个参数紧密相关,此时不再要求等于 1,而是 <=5。这意味着客户端可以一次性最多发送 5 个请求给 Broker 端,Broker 会缓存该 Producer 最近 5 个请求的元数据用来做重复校验,这也是 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 <=5 的原因。如果出现序号相同,说明是重复消息,舍弃,这实现了单分区幂等。
那分区顺序性是如何保证的呢?通过判断 PID+Sequence Number + 重试机制。如果 Broker 收到的消息的序号和缓存中的不连续,那么返回 OUT_OF_ORDER_SEQUENCE_NUMBER错误,Client 收到该异常后会重发,直到超过最大重试次数或超时。
Client 收到某个批次返回的 OUT_OF_ORDER_SEQUENCE_NUMBER 响应后,会做以下事情:
- 将批数据重新入队。入队也是有要要求的,是按 Sequence Number 排序。
- 开启新一轮从
RecordAccumulator抽取数据(调用RecordAccumulator#drain()方法抽取数据)。如果批次有 Sequence Number,那么说明该批次是幂等重试批次(因为新的批次此时是没有PID 和 Sequence Number 数据的),这时候会判断位于in-flight首个批次的 Sequence Number 和 - 队列中是否还有未收到 ACK 的请求,如果没有,才会允许这个批次
