协议设计
Kafka 自定义了一组基于 TCP 的二进制协议,只要遵守这组协议的格式,就可以向 Kafka 发送消息,也可以从 Kafka 中拉取消息,或者做一些其他的事情,比如提交消费位移等。
produceRequest


消息累加器 RecordAccumulator 中的消息以 <分区, Deque<ProducerBatch>> 的形式进行缓存,之后由 Sender 线程转变成 <Node,List<ProducerBatch>> 的形式。针对每个 Node,Sender 线程在发送消息前会将对应的 List<ProducerBatch> 形式的内容转变成 ProduceRequest 的具体结果。List<ProducerBatch> 中的内容首先会按照主题名称进行分类(ProduceRequest#topic ),然后分区编号进行分类(ProduceRequest#partition),分类之后的 ProducerBatch 集合就对应 ProduceRequest 中的 record_set。
V6 版本的 ProduceResponse 的组织:

消息追加是针对单个分区而言,则响应也是针对分区粒度来进行划分的。
拉取消息的协议类型
即 FetchRequest/FetchResponse,对应 api_key = 1,表示 FETCH。
V8


无论是 follower 副本之间交互还是消费者客户端和 follower 通信,如果要拉取某个分区中的消息,就需要指定详细的拉取信息,即 partitions。
FetchRequest 请求是一个非常频繁的请求,如果拉取的分区数有很多,比如有 1000 个分区,那么在网络上会频繁交互固定长度为 24KB 的内容。因此,我们可以把一次请求能确定的数据能不能缓存起来?
Kafka 1.1.0 版本针对 FetchRequet 引入 session_id、epoch 和 forgotten_topics_data 等描述:
session_id和epoch确定一条拉取链路的fetch session。当 session 建立或变更时会发送全量式的 FetchRequest 请求:包含所有需要拉取的分区信息。- 当 session 稳定时则会发送增量式的 FetchRequest 请求:topics 为空,因为内容已经被缓存在 session 两侧。
- 如果需要从当前 fetch session 中取消对某些分区的拉取订阅,则可以使用
forgotten_topics_data字段实现。
这种增量模式在大规模(有大量的分区副本需要实时同步)的 Kafka 集群中非常有用,它可以提升集群间的网络带宽的有效使用率。不过对客户端而言效果并不明显,一般情况下单个客户端并不会订阅太多的分区。
responses数组类型,表示响应的具体内容。具体细化为每个分区的响应。partition_responses 分区元信息和具体消息内容。
时间轮
Kafka 中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。
Kafka 所实现时间轮可以将插入和删除操作的时间复杂度都降为 。Kafka 的时间轮(TimingWheel) 是一个存储定时任务的环形队列,底层采用数组实现。
- 数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。
- TimerTaskList 是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry)。
- TimerTaskEntry 包含真正的定时任务(TimerTask)。
- 时间轮由多个时间格组成的,每个时间格代表当前时间轮的基本时间跨度(tickMs)。
- 时间轮的时间格个数固定(wheelSize),整个时间轮的总体时间跨度
计算得到。
- 时间轮中有一个表盘指针(currentTime),表示时间轮当前所处的时间,它是 tickMs 的整数倍。currentTime 可以将整个时间轮划分为到期部分和未到期部分,所指向的时间格属于到期部分,表示刚好到期,需要处理此时间格包含的所有任务。

- 定时任务时间跨度大,使用单个时间轮会存在空推情况。Kafka 为此引入了层级时间轮的概念:当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。
- 随着时间流逝,任务会降级。原本定时为 450ms 的任务还剩下 50ms 的时间,它会被重新提交到层级时间轮中。类似钟表。
- 实现
- TimingWheel 中的每个双向环形链表 TimerTaskList 都会有一个哨兵节点(sentinel),引入哨兵节点可以简化边界条件,也称为哑元节点(dummy node),它是一个附加的链表节点。
- 除第一层时间轮,其余高层时间轮的起始时间(startMs)都设置为创建此层时间轮时前面第一轮的 createTime。currentTime 会随着时间推移而推进,但不会改变 tickMs 的整数倍的既定事实。若某一时刻的时间为 timeMs,那么时间时间轮的
currentTime = timeMs - (timeMs % tickMs),时间每推进一次,每个层级的时间轮的 currentTime 都会依据此公式执行推进。 - Kafka 中的定时器只需要维持第一层时间轮的引用。
- Kafka 中的定时器借了 JDK 中的 DelayQueue 来协助推进时间轮。每个使用到的 TimerTaskList 都加入 DelayQueue,DelayQueue 会根据 TimerTaskList 对应的走进时间 expiration 来排序,最短的 expiration 的 TimerTaskList 会被排在 DelayQueue 队头。Kafka 会有一个叫做 ExpiredOperationReaper 线程获取 DelayQueue 中超时的任务列表 TimerTaskList 之后,既可以根据它的 expiration 来推进时间轮的时间,也可以就获取的 TimerTaskList 执行相应的操作。
分析到这里可以发现, Kafka 中的 TimingWheel 专门用来执行插入和删除 TimerTaskEntry 的操作,而 DelayQueue 专门负责时间推进的任务。 试想一下, DelayQueue 中的第一个超时任 务列表的 expiration 为 200ms , 第二个超时任务为 840ms , 这里获取 DelayQueue 的队头只需要 。(1 )的时间复杂度(获取之后 DelayQueue 内部才会再次切换出新的队头)。如果采用每秒定时 推进,那么获取第一个超时的任务列表时执行的 200 次推进中有 199 次属于“空推进”,而获 取第二个超时任务时又需要执行 639 次“空推进” , 这样会无故空耗机器的性能资源,这里采 用 DelayQueue 来辅助以少量空间换时间,从而做到了 “精准推进” 。 Kafka 中的定时器真可谓 “知人善用” , 用 TimjngWheel 做最擅长的任务添加和删除操作,而用 DelayQueue 做最擅长的 时间推进工作,两者相辅相成。
延时操作
ack = -1 表示需要等待所有副本接收消息并响应后才算成功写入。这里会有一个超时时间限制。在将消息写入 Leader 副本的本地日志文件后,Kafka 会创建一个延时的生产操作(DelayedProducer)用来处理消息正常写入所有副本或超时的情况,以返回相应的响应结果给客户端。
延时操作和定时操作是不一样,定时操作是自触发,当到了一预设时间后就会被触发。而延迟操作能够支持外部事件的触发,当有结果返回,延迟操作结束,或超时也能结束延迟操作。
随着 follower 副本不断地与 leader 副本进行消息同步,进而促使 HW 进一步增长,HW 每增长一步都会检测是否能够完成此次延时生产操作,如果可以就执行以此返回响应结果给客户端,如果超时则强制执行。
延迟操作创建后会被加入延时操作管理器(DelayedOperationPurgatory)来做专门的处理。
Kafka 在处理拉取请求时,会先读取一次日志文件,
如果收集不到足够多(fetchMInBytes,由参数 fetch.min.bytes=1 设置)的消息 , 那么就会创建一个延时拉取操作(DelayedFetch)以等待拉取到足够数量的消息。当延时拉取操作执行时,会再读取一次日志文件,然后将拉取结果返回给 follower 副本。
延时拉取操作同样是由超时触发或外部事件触发而被执行的。
- 如果是 follower 副本的延时拉取,它的外部事件就是消息追加到了 leader 副本的本地日志文件中
- 如果是消费者客户端的延时拉取,它的外部事件可以简单地理解为 HW 的增长
对于消费者或 follower 副本而言,其默认的事务
隔离级别为 read_uncommitted。
控制器
在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故 障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时, 由控制器负责通知所有 broker 更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。
控制器的选举及异常恢复
- 控制器的选举工作依赖 ZK,成功竞选为控制器的 broker 会在 ZK 中创建 /controller 这个临时节点
- 在任意时刻,集群中有且仅有一个控制器。每个 broker 启动时都会尝试读取 /controller 节点的 brokerid 的值,如果该值不为 -1,则表示已经有其他 broker 节点成功竞选,所以当前 broker 就会放弃竞选。
- 如果不存在 /controller 节点,或者这个节点中的数据异常,则会尝试去创建 /controller 节点。这一步会和其他 broker 竞争,谁先创建成功谁就可以成功 Controller。
- 每个 broker 都会在内存中保存当前控制器的 brokerid 值。
/controller_epoch用于记录控制器发生变更的次数,记录当前的控制器是第几代控制器,称为纪元。每个和控制器交互的请求都会携带controller_epoch字段,如果请求小于内存值,则认为这个请求是向已过期的控制器所发送的请求,那么这个请求会被认为是无效。Kafka 通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。
具体控制器身份的 broker 需要比其他普通的 broker 多一份职责,细节如下:
- 监听分区相关变量。ZK 相关节点信息
/admin/reassign_partition。PartitionReassignmentHandler用来处理分区重分区的动作,为 ZK 中的/isr_change_notification节点注册IsrChangeNotificationHandler,用来处理 ISR 集合变更动作。为 ZK/admin/preferred-replica-election节点添加PreferredReplicaElectionHandler,用来处理优化副本的选举动作。 - 监听主题相关的变化。
/brokers/topics节点添加 TopicChangeHandler,处理主题增减变化。/admin/delete_topics节点添加 TopicDeletionHandler,用来处理删除主题动作。 - 监听 broker 相关的变化。
/brokers/ids节点添加 BrokerChangeHandler,用来处理 broker 增减的变化。 - 从 ZK 中读取获取当前所有和主题、分区及 broker 有关的信息并进行相应的管理。对所有主题所对应的 ZK 的
/brokers/topics/<topic>节点添加 PartitionModificationHandler,用来监听主题中的分区分配变化。 - 启动并管理分区状态机和副本状态机
- 更新集群的元数据信息
- 如果
auto.leader.rebalance.enable=true,则会开启auto-leader-rebalance-task的定时任务负责维护分区的优先副本的均衡。
控制器在选举成功之后会读取 ZooKeeper 中各个节点的数据来初始化上下文信息
( ControllerContext),并且需要管理这些上下文信息。
Kafka 的控制器使用单线程基于事件队列的模型, 将每个事件都做一层封装, 然后 按照事件发生 的先后顺序暂存到 LinkedB!ockingQueue 中 ,最后使用 一个专用的线程 ( ControllerEventThread)按照 FIFO ( First Input First Output,先入先出)的原则Jll页序处理各个
事件,这样不需要锁机制就可以在多线程间维护线程安全。
大量依赖 ZK 的 Watcher 会导致很多不必要的监听器唤醒动作,而且严重依赖 ZK 的设计会有脑裂、羊群效应,以及造成 ZK 过载的隐患。在新版的设计中,只有 Controller 在 ZK 上注册相关的监听器,其他 broker 极少需要再监听 ZK 中的数据变化。不过每个 broker 还是会对 /controller 节点添加监听器,以此来监听此节点的变化。
当 /controller 节点的数据发生变化时, 每个 broker 都会更新自身内存中保存的 activeControllerld。
如果 broker 在数据变更前是控制器,在数据变更后自身的 brokerid 值与新的 activeControllerld 值不一致,那么就需要“退位” , 关闭相应的资源,比如关闭状态机、 注销相应的监听器等。
有可能控制器由于异常而下线,造成 /controller 这个临时节点被自动删除,也有可能是其他原因将此节点删除了 。
优雅关闭
kafka-server-stop.sh 脚本底层使用 ps 命令,这个集合的输出长度 len 被硬编码为小于等于 PAGE_SIZE 的大小,而和 kafka 相关的信息则大于 PAGE_SIZE。
#1 修改脚本,把 \.kafka 去掉,这在正常情况下使用,但还有些极端情况#2 获取 kafka 的服务进程 PIDS#3 kill -s TERM $PIDS 或 kill -15 $PIDS 命令关闭
因为 kafka-config.sh进程捕获终止信号的时候会执行关闭钩子函数:
- 关闭一些必要的资源
- 执行一个控制关闭(ControlledShutdown)的动作,有两个优点:

如果这些分区的副本数大于 1 且 leader 副本位于待关闭 broker 上,那么需要实施 leader 副 本的迁移及新的 ISR 的变更。具体的选举分配的方案由专用的选举器 ControlledShutdownLeaderSelector 提供。
如果副本数大于 1 ,则由 Kafka 控制器协调并处理关闭,如果副本数为 1,则关闭动作会在整个 ControlledShutdown 动作执行之后由副本管理器来具体实施。
如果在 Kafka 管制器处理之后 leader 副本还没有成功迁移,那么会将这些没有成功迁移的 leader 副本的分区记录下来,并且写入 ControlledShutdownResponse 响应(整个ControlledShutdown 动作是一个同步阻塞的过程),
待关闭的 broker 收到 ControlledShutdownResponse 响应之后,需要判断整个 ControlledShutdown 动作是否执行成功,以此来进行可能的重试或继续执行接下来的关闭资源动作。执行成功的 标准是 ControlledShutdownResponse 中 error_code 字段值为 0,并且 partitions remaining 数组字段为空。
我们可以扩展 KafkaAdminClient 做一些扩展来达到目的。
分区 Leader 的选举
分区 leader 副本的选举由控制器负责具体实施。当创建分区(创建主题或增加分区都有创 建分区的动作〉或分区上线(比如分区中原先的 leader 副本下线,此时分区需要选举一个新的 leader 上线来对外提供服务)的时候都需要执行 leader 的选举动作,对应的选举策略为 OfflinePartitionLeaderElectionStrategy。 这种策略是按照 AR 集合中副本的顺序查找第一个存活的副本,并且这个副本在 ISR 集合中。
还有 一种情况会发生 leader 的选举,当某节点被优雅地关 闭 ( 也 就是 执 行 ControlledShutdown)时,位于这个节点上的 lead巳r 副本都会下线,所以与此对应的分区需要执 行 leader 的选举。与此对应的选举策略(ControlledShutdownPartitionLeaderElectionStrategy)为 : 从 AR 列表中找到第一个存活的副本,且这个副本在目前的 ISR 列表中,与此同时还要确保这 个副本不处于正在被关闭的节点上。
参数解密
如果 broker 端没有显式配置 listeners (或 advertised.listeners)使用 IP 地址, 那么最好将 bootstrap.server 配置成主机名而不要使用 IP 地址,因为 Kafka 内部使用的是 全称域名(Fully Qualified Domain Name ) 。 如果不统一 , 则会出现无法获取元数据的异常。
broker.id
broker 在启动时会在 ZK 中的 /brokers/ids 路径下创建一个以当前 brokerdId 为名称的虚节点,broker 的健康状态检查就依赖于此虚节点。当 broker 下线时,该虚节点会自动删除,其他 broker 节点或客户端通过判断 /brokers/ids 路径下是否有此 broker 的 brokerId 节点来确定该 broker 的健康状态。
broker 在成功启动之后在每个日志根目录下都会有一个 meta.properties 文件。
bootstrap.servers
这个参数配置的是用来发现 Kafka 集群元数据信息的服务地址。
- 客户端 P2 与
bootstrap.servers参数所指定的 Server 建立连接,并发送 MetadataRequest 请求来获取集群的元数据信息。 - Server 在收到 metadataRequest 后,返回 集群的元信息
- 客户端 P2 收到后解析出集群的元信息,然后与集群中的各个节点建立连接,之后就可以发送消息了。
在绝大多数情况下,Kafka 本身就扮演着第一步和第二步中的 Server 角色。我们可以在这个 Server 角色大做文章,比如添加一些中路由功能、负载均衡功能。
客户端
分区分配策略
partition.assignment.strategy 设置消费者与订阅主题之间的分区分配策略。默认值:org.apache.kafka.clients.consumer.RangeAssignor,即采用 RangeAssignor 分配策略。除此之外,还提供另外两种:
- RoundRobinAssignor
- StickyAssignor
RangeAssignor
按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。
RoundRobinAssignor
将消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。
如果同一个消费组内的消费者订阅的信息是相同的,分配均匀,如果不相同,则会导致不均匀分配。
StickyAssignor
主要有两个目的:
- 分区的分配要尽可能均匀
- 分区的分配尽可能与上次分配的保持相同
自定义分区分配策略
必须实现org.apache.kafka.clients.consumer.internals.PartitionAssignor接口。现为org.apache.kafka.clients.consumer.ConsumerPartitionAssignor,该接口包含两个内部类:
- Subscription 用来表示消费者的订阅信息
-
消费者协调器和组协调器
如果有多个消费者,彼此所配置的分配策略并不完全 相同,那么以哪个为准?
这一切都是交由消费者协调器( ConsumerCoordinator )和组协调器 ( GroupCoordinator)来完成的,它们之间使用一套组协调协议进行交互。再均衡原理
新版的消费者客户端对此进行了重新设计, 将全部消费组分成多个子集, 每个消费组的子 集在服务端对应一个 GroupCoordinator 对其进行管理, GroupCoordinator 是 Kafka 服务端中用于 管理消费组的组件。而消费者客户端中的 ConsumerCoordinator 组件负责与 GroupCoordinator 进行交互。
ConsumerCoordinator 与 GroupCoordinator 之间最重要的职责就是负责执行消费者再均衡的操作。有以下情况会触发再均衡: 有新的消费者加入消费组
- 有消费者宕机下线。注意,这里不一定需要真正下线,也有可能是遇到:
- 长时间 GC
- 网络延迟导致未能及时发送心跳
- 有消费者主动退出消费组。比如客户端调用了
unsubscrible()方法取消对某些主题的订阅。 - 消费组所对应的 GroupCoorinator 节点发生了变更
- 消费组内订阅的任一主题或主题的分区数量发生变化
示例:消费者加入消费组,消费者、消费组及组协调器所经历的几个阶段。
阶段一 FIND_COORDINATOR
- 消费者确定它所属的消费组所对应的
GroupCoordinator所在的broker。 - 创建与该 broker 的网络连接。
如果以上两个步骤成功,则会进入阶段二。否则就需要向集群中的某个节点发送
FindCoordinatorRequest请求来查找对应的GroupCoordinator,这里的某个节点并非是集群中的任意节点,而是负载最小的节点。阶段二 JOIN_GROUP
在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组阶段,消费者向 GroupCoordinator 发送 JoinGroupRequest 请求并处理响应。
如果是原有的消费者重新加入,需要执行准备工作:
- 若
enable.auto.commit=true,则需要向 GroupCoordinator 提交消费位移。过程是阻塞的。 - 若添加了自定义再均衡监听器 ConsumerRebalanceListener,此时会调用
onPartitionsRevoked()方法在重新加入消费组之前执行自定义规则逻辑,比如清除一些状态或提交消费位移。 - 之前与 GroupCoordinator 节点之间的心跳检测就不需要。所以在成功重新加入之前需要禁止心跳检测。
- 若
消费者在发送 JoinGroupRequest 请求之后会阻塞等待 Kafka 服务端的响应。
- 选举消费组的 leader。
- 如果消费组没有 leader,则第一个加入即为消费组 leader。消费组的 leader 选举是十分随意的。
选举分区分配策略。分区分配的选举并非由 leader 消费者决定,而是根据消费组内的各个消费者投票决定的。
leader 消费者根据阶段二选举出的分区分配策略来实施具体的操作。
- 将本身分区分配方案通过 GroupCoordinator 同步给各个消费者(发送 SyncGroupRequest)。只有 leader 消费者发送的 SyncGroupRequest 语法中才包含具体的分配方案(保存在 group_assignment 字段中)
- GroupCoordinator 会将整个消费组的元数据信息存入
————consumer_offsets主题中,最后将分配方案发送响应给各个消费者。阶段四 HEARTBEAT
进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。
在正式消费之前,消 费者还需要确定拉取消息的起始位置,可以通过 OffsetFetchRequest 请求获取上次提交的消费位移并从此处继续消费。
消费者通过向 GroupCoordinator 发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系。
心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。
如果消费者停 止发送心跳的时间足够长,则整个会话就被判定为过期, GroupCoordinator 也会认为这个消费者 己经死亡,就会触发一次再均衡行为。消费者的心跳间隔时间由参数heartbeat.interval.ms指定,默认值为 3000,即 3 秒, 这个参数必须比session.timeout.ms参数设定的值要小,__consumer_offset
offsets.retention.minutes=7天,超时这个时间后消费位移的信息就会被删除(墓碑消息和日志压缩)。由于存在位移信息自动删除这一操作,所以会引发异常。一般增大这个默认值即可。事务
Kafka 从 0.11.0.0 版本开始引入幂等和事务这两个特性,以此来实现 EOS(Exactly once semantics,精确一次处理语义)。幂等
对接口的多次调用所产生的结果和调用一次是一致的。
开启幂等enable.idempotence=true,不过,要确保幂等功能正常,还需要确保生产者客户端 retries、acks、max.in.flight.request.per.connection 这几个参数不被错误配置。实际在使用时,用户完全可以不用配置这几个参数(使用默认配置)。
实现:
- 引入 producer id(PID) 和序列号(sequence number) 。每个新的生产者实例在初始化时都会被分配一个 PID,消息发送到每一个分区都有对应的序列号,这些序列号从 0 开始单调递增。生产者每发送一条消息就将
(PID, 分区)对应的序列号的值加 1。 - broker 端会在内存为每一对
(PID, 分区)维护一个序列号。对于收到的每一条消息,只有当序列号的值(SN_new) 比 broker 端维护的对应的序列号的值(SN_old)大 1 时,broker 才会接收它。- SN_new < SN_old + 1,说明消息重复写入,消息可直接丢弃。
- SN_new > SN_old + 1,说明中间有数据尚未被写入,出现乱序。暗示有可能消息丢失可能,对应的生产者会抛出 OutOfOrderSequenceException,这个是非常严重的异常。
Kafka 的幂等只能保证单个生产者会话(session)中的单分区的幂等。
事务
幂等性并不能跨分区动作,而事务可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。
操作的原子性是指多个操作要么全部成功,要么全部失败。不存在部分成功或部分失败的可能。
为了实现事务,应用程序必须提供唯一的 transactionId
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionId");
为了保证新的生产者启动后具有相同 transactionalld 的旧生产者能够立即失效,每个生产者通过 transactionalld 获取 PID 的同时,还会获取一个单调递增的 producer epoch
从消费者的角度分析,事务能保证的语义相对偏弱。
生产者典型代码
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);producer.initTransactions();producer.beginTransaction();try {// 创建 ProducerRecordproducer.send(record);producer.commitTransaction();} cache (ProducerFencedException e) {producer.abortTransaction();} finally {producer.close();}
消费端 isolation.level 与事务有莫大的关联,默认是 rad_uncommitted,表示消费者可读到未提交的事务。
日志文件中除了普通的消息,还有一种消息专门用来标志一个事务的结束,它就是控制消息( Contro!Batch),它有两种类型:
- COMMIT 表征事务已经成功提交
- ABORT 事务已经成功中止
KafkaConsumer 通过这个控制消息来判断对应的事务是被提交了还是中止了,然后结合参数 isolation.level 配置的隔离级别来决定是否将相应的消息返回给消费端应用。
Kafka 还引入了事务协调器 (TransactionCoordinator)来负责处理事 务,这一点可以类比一下组协调器(GroupCoordinator ) 。
TransactionCoordinator 会将事务状态持久化到内部主题 位ansaction state 中。





