消费者(Consumer)负责订阅 Kafka 中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在 Kafka 的消费理念中还有一层消费者组(Consumer Group)的概念,每个消费者都有一个对应的消费者组。当消息发布到主题后,只会被投递给订阅它的每个消费者组中的一个消费者。

基本概念

消费者组是 Kafka 提供的可扩展且具有容错性的消费者机制,组内的所有消费者协调在一起消费订阅主题的所有分区。每个分区只能由同一个消费者组内的一个消费者实例来消费。主要特性有以下几点:

  • Consumer Group 下可以有一个或多个 Consumer 实例
  • Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group
  • Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内某个 Consumer 实例消费

1. 单消费者群组

假设主题 T1 有 4 个分区,我们创建了消费者 C1,它是消费者组 G1 里唯一的消费者,我们用它订阅主题 T1。消费者 C1 将收到主题 T1 全部 4 个分区的消息。
image.png
如果在消费者组 G1 里新增了一个消费者 C2,那么每个消费者将分别从两个分区中接收消息。我们假设消费者 C1 接收分区 0 和分区 1 的消息,消费者 C2 接收分区 2 和分区 3 的消息。
image.png
如果消费者组 G1 有 4 个消费者,那么每个消费者可以分配到一个分区。
image.png
但如果继续向消费者组 G1 里添加更多的消费者实例,超过了主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。
image.png
往群组里增加消费者是横向伸缩消费能力的主要方式。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。注意,不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。

理想情况下,消费者实例的数量应该等于该 Group 订阅主题的分区总数。假设一个 Consumer Group 订阅了 3 个主题,且它们的分区数分别是 1、2、3,那么通常为该 Group 设置 6 个 Consumer 实例是比较理想的情形,因为它能最大限度地实现高伸缩性。但如果你设置了 8 个实例,那么有 2 个实例将不会被分配任何分区,它们永远处于空闲状态。

2. 多消费者群组

如果多个消费者程序读取了同一个主题,但每个消费者程序都想要获取到主题中所有分区的消息,而不只是其中的部分分区。实际上,只要保证每个消费者都有自己的消费者组,那就都能获取到主题所有消息了。

当 Consumer Group 订阅了多个主题后,组内每个实例不要求一定要订阅主题的所有分区,它只会消费部分分区中的消息。同时 Consumer Group 之间彼此独立,它们能够订阅相同的一组主题而互不干涉。因此 Kafka 仅仅使用 Consumer Group 这一种机制就同时实现了传统消息引擎系统的两大模型:

  • 如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型。
  • 如果所有实例分别属于不同的 Group,那么它实现的就是发布/订阅模型。

假设新增了一个消费者组 G2,那么这个消费者组将从主题 T1 上接收所有的消息,与群组 G1 之间互不影响。群组 G2 可以增加更多的消费者,每个消费者可以消费若干个分区,就像群组 G1 那样。
image.png
消费者组是一个逻辑上的概念,它将旗下的消费者归为一类,每一个消费者只隶属于一个消费者组。每个消费者组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,这个名称通过消费者客户端参数 group.id 来设置。

消费者并非逻辑上的概念,它是实际的应用实例,它可以是一个线程,也可以是一个进程。同一个消费组内的消费者既可以部署在同一台机器上,也可以部署在不同的机器上。

消费组管理

在 Kafka 中,我们可以通过 kafka-consumer-groups.sh 脚本查看或变更消费组的信息。其提供的 list 参数可罗列出当前集群中所有的消费组,该功能对应 KatkaAdminClient 中的 listConsumerGroups() 方法:
image.png
kafka-consumer-groups.sh 脚本还可以配合 describe 参数来展示某一个消费组的详细信息,不过需要配合 group 参数来指定消费组名称。该功能对应 KafkaAdminClient 中的 describeConsumerGroups() 方法:
image.png
我们可以通过指定 delete 参数来删除一个指定的消费组,不过如果消费组中有消费者成员正在运行,则删除操作会失败。该功能对应 KafkaAdminClient 中的 deleteConsumerGroups() 方法:
image.png

重平衡

重平衡(rebalance)是指分区的所属权从一个消费者转移到另一个消费者的行为,它为消费组具备高可用性和伸缩性提供了保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。不过在重平衡发生期间的这一小段时间内,消费组内的消费者是无法读取消息的,因此消费组会变得不可用。

当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息后还没来得及提交消费位移就发生了重平衡操作,之后这个分区又被分配给了消费组内的另一个消费者,那原来被消费完的那部分消息又会被重新消费一遍。

1. 触发条件

1)有新的消费者加入消费组,或者有消费者主动退出消费组(发送 LeaveGroupRequest 请求)。比如客户端调用 unsubscrible() 方法取消对某些主题的订阅。

2)有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的 GC、网络延迟导致消费者长时间未向 GroupCoordinator 发送心跳等情况时,GroupCoordinator 会认为消费者已经下线。

3)消费组所对应的 GroupCoorinator 节点发生了变更。

4)消费组内所订阅的任一主题或者主题的分区数量发生变化。消费者可以使用正则表达式的方式订阅主题,如果新创建了一个满足该正则条件的主题,那么该消费组就会触发重平衡。

2. 消费者协调器和组协调器

在重平衡过程中,消费者组下的所有消费者实例会协调在一起共同参与,在协调者组件的帮助下,完成订阅主题分区的分配。协调者由消费者协调器(ConsumerCoordinator)和组协调器(GroupCoordinator)组成,它们之间使用一套组协调协议进行交互,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。

下面以一个简单的例子来讲解一下重平衡操作的具体内容。当有消费者加入消费组时,消费者、消费组及组协调器之间会经历以下几个阶段。

2.1 FIND_COORDINATOR

消费者需要确定它所属的消费组对应的 GroupCoordinator 所在的 broker,并创建与该 broker 相互通信的网络连接。如果消费者已经保存了与消费组对应的 GroupCoordinator 节点的信息,并且与它之间的网络连接是正常的,那么就可以进入第二阶段。否则,就需要向集群中的某个节点发送 FindCoordinatorRequest 请求来查找对应的 GroupCoordinator,这里的“某个节点”并非是集群中的任意节点,而是负载最小的节点。

Kafka 在收到 FindCoordinatorRequest 请求后,会根据请求携带的 groupld 查找对应的 GroupCoordinator 节点,如果找到对应的 GroupCoordinator 则会返回其相对应的 node_id、host 和 port 信息。具体查找过程是先根据消费组 groupId 的哈希值计算 __consumer_offsets 中的分区编号,具体算法如下所示:

  1. partitionId = Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)

其中 offsetsTopicPartitionCount 为主题 __consumer_offsets 的分区个数,这个值可以通过 broker 端参数 offsets.topic.num.partitions 来配置,默认为 50。找到对应分区后,再寻找此分区 Leader 副本所在的 broker 节点,该 broker 节点即为这个 groupld 所对应的 GroupCoordinator 节点。

2.2 JOIN_GROUP

在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向组协调器发送 JoinGroupRequest 请求,在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。之后会阻塞等待 Kafka 服务端的响应。服务端收到请求后会交由 GroupCoordinator 处理。

1)选举消费组的 Leader
GroupCoordinator 需要为消费组内的消费者选举出一个消费组的 Leader,这个选举算法也很简单,分两种情况分析:如果消费组内还没有 Leader,那么第一个加入消费组的消费者即为消费组的 Leader。如果 Leader 消费者由于某些原因退出了消费组,则会近似随机的重新选举一个新的 Leader。Leader 消费者的任务是收集所有成员的订阅信息,根据这些信息制定出具体的分区消费分配方案。

2)选举分区分配策略
每个消费者都可以设置自己的分区分配策略,对消费组而言需要从各个消费者呈报上来的各个分配策略中选举一个彼此都信服的策略来进行整体上的分区分配。这个分区分配的选举并非由 Leader 消费者决定,而是根据消费组内的各个消费者投票来决定的。最终选举的分配策略基本上可以看作被各个消费者支持的最多的策略,具体的选举过程如下:

  • 收集各个消费者支持的所有分配策略,组成候选集 candidates。
  • 每个消费者从候选集 candidates 中找出第一个自身支持的策略,为这个策略投上一票。
  • 计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。
  • 如果有消费者并不支持选出的分配策略,则会抛出 IllegalArgumentException 异常。

此后,Kafka 服务端就要发送 JoinGroupResponse 响应给各个消费者,Leader 消费者和其他普通消费者收到的响应内容并不相同,发送给 Leader 消费者的 JoinGroupResponse 中会附带最终选举出来的分区分配策略,Leader 消费者会为各个消费者制定对应的分区分配方案。
image.png

2.3 SYNC_GROUP

Leader 消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者,具体则是通过 GroupCoordinator 来负责转发同步分配方案的。这一步的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。

在同步阶段,各个消费者会向 GroupCoordinator 发送 SyncGroupRequest 请求来同步分配方案,但只有 Leader 消费者发送的请求中才会包含各个消费组对应的分区分配方案。其他成员的请求体中并没有实际的内容,这样做的目的是让协调者接收分配方案后统一以 SyncGroupResponse 的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。
image.png
GroupCoordinator 会将从 Leader 消费者发送过来的分配方案提取出来,连同整个消费组的元数据信息一起存入 Kafka 的 __consumer_offsets 主题中,最后发送响应给各个消费者告知各自所属的分区分配方案。当消费者收到所属的分配方案后会调用 PartitionAssignor 中的 onAssignment() 方法进行分区分配。随后再调用 ConsumerRebalanceListener 中的 OnPartitionAssigned() 方法。之后开启心跳任务定期向 GroupCoordinator 发送 HeartbeatRequest 来确定彼此在线。

2.4 HEARTBEAT

进入这个阶段后,消费组中的所有消费者就会处于正常工作状态。在正式消费前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交到了 GroupCoordinator,并且 GroupCoordinator 将其保存到了 Kafka 内部的 __consumer_offsets 主题中,此时消费者可以通过 OffsetFetchRequest 请求获取上次提交的消 费位移并从此处继续消费。

消费者通过向 GroupCoordinator 发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的。心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。如果消费者停止发送心跳的时间足够长,超过 session.timeout.ms 配置的时间,默认 10 秒,则 GroupCoordinator 会判定这个消费者已经死亡,就会触发一次重平衡。消费者的心跳间隔由 heartbeat.interval.ms参数指定,默认 3 秒,这个参数必须比 session.timeout.ms 参数设定的值要小,一般 heartbeat.interval.ms 的配置值不能超过 session.timeout.ms 配置值的 1/3,这样可以保证 Consumer 实例在被判定为死亡之前,能够发送至少 3 轮的心跳请求。

此外,session.timeout.ms 这个参数的配置值必须在 broker 端参数 group.min.session.timeout.ms(默认值为 6 秒)和 group.max.session.timeout.ms(默认值为 5 分钟)允许的范围内。

还有一个参数 max.poll.interval.ms,它限定了 Consumer 端两次调用 poll() 方法的最大时间间隔,默认 5 分钟。也就是消费者在获取更多消息之前可以空闲的时间量的上限。如果此超时时间期满之前 poll() 没有调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。如果要避免非预期的 Rebalance,最好将该参数值设置得大一点,比下游最大处理时间稍长一点。这样 Consumer 就不会因为处理这些消息的时间太长而引发 Rebalance 了。

3. 缺点

在重平衡期间,所有 Consumer 实例都会停止消费一直等待 Rebalance 完成,类似于 JVM 中的 STW,这严重影响了 Consumer 端 TPS。并且整个重平衡的效率并不高,因为每次重平衡,Group 下的所有成员都要参与进来,全部重新分配所有分区。因此,如果 Group 下成员很多,重平衡一次要耗费很久的时间。

比如一个 Group 下有 10 个成员,每个成员平均消费 5 个分区。假设现在有一个成员退出了,此时就需要开启新一轮的 Rebalance,把这个成员之前负责的 5 个分区转移给其他成员。比较好的做法是维持当前 9 个成员消费分区的方案不变,然后将 5 个分区随机分配给这 9 个成员。但 Kafka 在默认情况下,重平衡时不会保留之前的分配方案,Group 会打散这 50 个分区(10 个成员 * 5 个分区),并由当前存活的 9 个成员重新分配它们。

基于这个原因,社区于 0.11.0.0 版本推出了 StickyAssignor,即有粘性的分区分配策略。所谓的有粘性,是指每次 Rebalance 时,该策略会尽可能地保留之前的分配方案,尽量实现分区分配的最小变动。

位移管理

老版本的 Consumer Group 把位移保存在 ZooKeeper 中,这种设计使得 Broker 端不需要保存位移数据,减少了 Broker 端需要持有的状态空间,因而有利于实现高伸缩性。不过 ZooKeeper 这类框架并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作,这极大地拖慢了 ZooKeeper 集群的性能,因此 Kafka 在新版本中重新设计了 Consumer Group 的位移管理方式,采用了将位移保存在 Kafka 内部主题的方法。这个内部主题就是 __consumer_offsets。

1. __consumer_offsets

新版本 Consumer 的位移管理机制就是将位移数据作为一条普通的 Kafka 消息提交到 consumer_offsets 中。一般情况下,当集群中第一次有消费者消费消息时会自动创建位移主题 consumer_offsets,不过它的副本因子还受 offsets.topic.replication.factor 参数限制,默认值为 3,分区数可以通过 offsets.topic.num.partitions 参数设置,默认为 50。

当然你也可以选择手动创建位移主题,可以在 Kafka 集群尚未启动任何 Consumer 前使用 Kafka API 创建。手动创建的好处在于,你可以创建满足你实际场景需要的位移主题。不过还是让 Kafka 自动创建比较好。因为目前 Kafka 源码中有一些地方硬编码了 50 分区数,因此如果你自行创建了一个不同于默认分区数的位移主题,可能会碰到各种各种奇怪的问题。

虽说位移主题是一个普通的 Kafka 主题,但它的消息格式却是 Kafka 自己定义的,用户不能修改,也就是说你不能随意地向这个主题写消息,因为一旦你写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。因此千万不要随意向该主题发送消息。

2. CommitFailedException

所谓 CommitFailedException 就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。如果异常是可恢复的瞬时错误,提交位移的 API 自己就能规避它们了,因为很多提交位移的 API 方法是支持自动错误重试的,比如 commitSync 方法。注意,该异常针对的是手动提交位移的场景,因为自动提交位移失败会由 Kafka 内部消化处理。

代码中对 CommitFailedException 的注释是:本次提交位移失败了,原因是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例。出现这个情况的原因是,你的消费者实例连续两次调用 poll() 方法的时间间隔超过了期望的 max.poll.interval.ms 参数值。这通常表示,你的消费者实例花费了太长的时间进行消息处理,耽误了调用 poll() 方法。有如下两个相应的解决办法:

  • 增加期望的时间间隔 max.poll.interval.ms 参数值,默认 5 分钟。
  • 减少 poll() 方法一次性返回的消息数量,即减少 max.poll.records 参数值,默认 500 条。