注:源码版本基于 2.8.0

概述

分区重平衡是指在一个消费组中,分区的所属权从组内其中一个消费者转移到组内另一个消费者的过程,它为消费组具备高可用伸缩性提供保障。
消费者组和分区关系如下图所示:Kafka 消费者组分区分配示意图.png

触发时机

总的来说,有以下三点是触发分区重平衡发生的条件:

  1. 组成员数量发生变更。
  2. 消费者组订阅的主题数量发生变更。
  3. 消费者组订阅主题的分区数量发生变更。

    新旧版本设计对比

    Kafka 分区重平衡有两个不同的版本,称为旧版本和新版本。
版本 元数据存储位置 服务端如何感知消费者 实现方式 优点 缺点
旧版本 Zookeeper 通过 Zookeeper ① 所有消费者和 ZK 建立连接,并注册一个临时节点。② 服务端通过 watch 机制监听临时节点,也就能感知消费者新增/移除事件。③ 分区元数据信息保留在 ZK 上,服务端也监听分区元数据变更事件。④ 当服务端监听到事件发生,从而使用分区分配策略为消费者组分配分区。 实现简单,不需要过多并发编程 ZK 脑裂,分区分配策略更换麻烦。服务端管理所有分区,单节点导致延迟很高且易崩溃
新版本 内部主题 与消费者的心跳连接 ① 集群有多个消费者组协调器,每个组协调器管理部分消费者组。即分而治之。② 消费者不再直连 ZK,而是和组协调器相连,通过心跳保活机制让组协调器知道消费者实时状态。③ 组协调器协调各个消费者连接、请求等服务,但不参与分区分配,而是起协调作用,在消费者组中选出一个 Leader,并将元数据告诉它,由它完成分配方案,然后再由组协调器传达给各个消费者。 分而治之,降低延迟和提高吞吐量。各消费者组可以根据需要个性化更新分配方案。 实现复杂

源码分析

下面,通过源码+图表方式分析 Kafka 消费者是如何完成分区重平衡操作。

阶段一:确定组协调器的位置

  1. 消费者向负载最小的节点(Node)发送 FindCoordinatorRequest 请求,消费者线程会在 while 不断轮询直到得到响应或超时退出。

    1. // org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady
    2. /**
    3. * 确定组协调器并建立连接。
    4. * ① 向负载最小的节点发送「获取本消费组协调器位置」请求。
    5. * ② 正常情况下,收到响应后由 {@link FindCoordinatorResponseHandler} 处理器处理,
    6. * 通过反序列化二进制数据后得到组协调器的IP+PORT信息,然后将信息放入 {@link #coordinator},
    7. * 这样,组协调器所在的节点就被找到了。
    8. * ③ 当遇到异常时,会判断是否可重试,然后在重试退避时间外再次发送请求,直到超时或成功得到组协调器位置。
    9. *
    10. * 这个方法会在while循环中不断尝试,直到成功获取组协调器位置或超时。
    11. *
    12. * @param timer 工具类,绑定阻塞超时时间
    13. * @return true:有且仅当broker端的消费组协调器成功并初始化TCP
    14. */
    15. protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
    16. // #1 判断组协调器是否已就绪(知道IP+PORT并且成功建立连接)
    17. if (!coordinatorUnknown())
    18. return true;
    19. do {
    20. // #2 如果之前在查找协调器过程中出现致命异常,现在向上抛出
    21. if (fatalFindCoordinatorException != null) {
    22. final RuntimeException fatalException = fatalFindCoordinatorException;
    23. fatalFindCoordinatorException = null;
    24. throw fatalException;
    25. }
    26. // #3 向负载最小的节点发送「获取本消费组的协调器位置」的请求,
    27. // 请求中包含消费组的唯一ID名称,返回一个结果凭证,这是一个异步方法
    28. final RequestFuture<Void> future = lookupCoordinator();
    29. // #4 触发I/O操作,获取响应结果
    30. client.poll(future, timer);
    31. // #5 判断是否有结果。第一次肯定不会有结果,数据都还可能未发送,
    32. // 需要不断轮询「onsumerNetworkClient.poll」从底层Socket获取数据
    33. if (!future.isDone()) {
    34. // 成功找到组协调器
    35. break;
    36. }
    37. // #6 处理异常
    38. RuntimeException fatalException = null;
    39. if (future.failed()) {
    40. if (future.isRetriable()) {
    41. // #6-1 可能由于节点出问题了(如节点故障、选举等导致响应超时),那么尝试更新一下元数据看看是否能解决问题
    42. System.out.println("Coordinator discovery failed, refreshing metadata: " + future.exception());
    43. client.awaitMetadataUpdate(timer);
    44. } else {
    45. // #6-2 遇到不可重试致命异常,日志输出
    46. fatalException = future.exception();
    47. log.info("FindCoordinator request hit fatal exception", fatalException);
    48. }
    49. } else if (coordinator != null && client.isUnavailable(coordinator)) {
    50. // #7 之前所确定的节点不可用,无法发送请求。那就重新寻找新的组协调器
    51. markCoordinatorUnknown("coordinator unavailable");
    52. timer.sleep(rebalanceConfig.retryBackoffMs);
    53. }
    54. clearFindCoordinatorFuture();
    55. if (fatalException != null)
    56. throw fatalException;
    57. } while (coordinatorUnknown() && timer.notExpired()); // 直到找到或超时才退出
    58. return !coordinatorUnknown();
    59. }

    响应处理器源码如下:

    1. // org.apache.kafka.clients.consumer.internals.AbstractCoordinator.FindCoordinatorResponseHandler
    2. /**
    3. * 「FindCoordinator」响应器
    4. */
    5. private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
    6. @Override
    7. public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
    8. log.debug("Received FindCoordinator response {}", resp);
    9. FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
    10. Errors error = findCoordinatorResponse.error();
    11. if (error == Errors.NONE) {
    12. synchronized (AbstractCoordinator.this) {
    13. // use MAX_VALUE - node.id as the coordinator id to allow separate connections
    14. // for the coordinator in the underlying network client layer
    15. int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
    16. // 有了IP+端口号,可以向该节点发起连接
    17. AbstractCoordinator.this.coordinator = new Node(
    18. coordinatorConnectionId,
    19. findCoordinatorResponse.data().host(),
    20. findCoordinatorResponse.data().port());
    21. // 异步连接组协调器所在的节点
    22. client.tryConnect(coordinator);
    23. // 重置会话超时时间
    24. heartbeat.resetSessionTimeout();
    25. }
    26. future.complete(null);
    27. } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
    28. future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
    29. } else {
    30. log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
    31. future.raise(error);
    32. }
    33. }
    34. @Override
    35. public void onFailure(RuntimeException e, RequestFuture<Void> future) {
    36. log.debug("FindCoordinator request failed due to {}", e.toString());
    37. if (!(e instanceof RetriableException)) {
    38. // Remember the exception if fatal so we can ensure it gets thrown by the main thread
    39. fatalFindCoordinatorException = e;
    40. }
    41. super.onFailure(e, future);
    42. }
    43. }

    处理器处理响应十分简单,主要有两步:

  2. 将 IP + PORT 构成 Node 对象存入 AbstractCoordinator.coordinator 。

  3. 然后向该节点发起 TCP 连接,这个方法是异步的。后面我们每次发送请求之前也会判断该节点状态。

    阶段二:加入消费组

    在成功找到组协调器所在的节点位置后,并与之建立 Socket 连接。接下来,消费者主要做以下两件事情:

  4. 向组协调器发送 JoinGroup 请求,表示希望加入消费组,成员状态为 PREPARING_REBALANCE

  5. 消费者收到 JoinGroup 响应后,更改成员状态为 COMPLETING_REBALANCE,然后再向组协调器发送 SyncGroup 请求。SyncGroup 请求的响应包含该消费者的分区分配结果。

新消费者加入消费组示意图如下:
新消费者加入消费组示意图.png
这里提以下几点注意事项:

  1. 已存在消费组中的消费者是通过心跳响应来感知组协调器此刻正在发生分区重平衡。组协调器以错误码 Errors.REBALANCE_IN_PROGRESS 的形式返回给消费者。
  2. 组协调器通过 JoinGroup 请求收集所有消费者,它的实现比较复杂,会新开一个章节讲解。现在只需要知道它延迟一段时间(超时时间由 rebanlance.timeout.ms 决定,消费者超时时间由 max.poll.interval.ms 最大值决定)后再向已收集到的消费者返回 JoinGroup 响应。
  3. 组协调器会在消费组中选择一个消费者 Leader,选择策略非常简单:先到先得。Leader 和其它消费者收到的 JoinGroup 不一样,它会包含组内各个成员订阅主题的详情,并由它根据分配策略执行分区分配操作。
  4. 所有消费者收到 JoinGroup 请求后,还会再发送 SyncGroup 请求以获得分配结果。这对组协调器来说,这也是一个延迟操作:因为需要等待 Leader 返回才能响应。

下面带大家简单过一遍与分区分配过程相关的消费者端的核心源码。

PART1 心跳线程感知消费组正在发生分区重平衡

心跳线程除了周期性向组协调器发生心跳请求外,还承担其它重要的功能。其中之一就是感知消费组正在发生分区重平衡:

心跳线程收到响应,会判断 errorCode 的值,翻译后如果为 Errors.REBALANCE_IN_PROGRESS 错误,说明消费组正在发生分区重平衡,至于什么事件触发分区重平衡,前面已经介绍过,这里不在重复说明。

相关源码如下:

  1. // org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatResponseHandler#handle
  2. else if (error == Errors.REBALANCE_IN_PROGRESS) {
  3. // 协调器正在处于分区重平衡期间,暂停接收任何消费者的心跳、位移等请求。
  4. // 出现这个原因是因为其它消费加入或分区数量变化触发分区重平衡,因此,当前消费者需要重新发送「JoinGroup」请求重新申请加入消费组
  5. if (state == MemberState.STABLE) {
  6. // 如果消费者状态为「STABLE」,说明已经在消费组内,所以需要重新发送「JoinGroup」
  7. requestRejoin("group is already rebalancing");
  8. future.raise(error);
  9. } else {
  10. // 如果为其他状态,忽略即可
  11. log.debug("Ignoring heartbeat response with error {} during {} state", error, state);
  12. future.complete(null);
  13. }
  14. }

只有当消费者成员状态为 MemberState.STABLE 时,才需要重新发送 JoinGroup 请求。而其它状态是不需要做任何事情。

PART2 发送 JoinGroup 请求和处理响应

  1. // org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendJoinGroupRequest
  2. RequestFuture<ByteBuffer> sendJoinGroupRequest() {
  3. if (coordinatorUnknown())
  4. return RequestFuture.coordinatorNotAvailable();
  5. JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(new JoinGroupRequestData()
  6. // 消费组ID
  7. .setGroupId(rebalanceConfig.groupId)
  8. // 会话超时时间,默认值:30000
  9. .setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
  10. // 成员ID,首次为"",后面由协调器分配唯一ID。
  11. .setMemberId(this.generation.memberId)
  12. // 组静态ID,默认为null
  13. .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
  14. // 协议类型。默认值:consumer
  15. .setProtocolType(protocolType())
  16. // 消费者支持分区策略。默认策略:range
  17. .setProtocols(metadata())
  18. // 重平衡超时时间,默认值:300000
  19. .setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
  20. );
  21. // Note that we override the request timeout using the rebalance timeout since that is the
  22. // maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays.
  23. int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(), rebalanceConfig.rebalanceTimeoutMs + JOIN_GROUP_TIMEOUT_LAPSE);
  24. return client.send(coordinator, requestBuilder, joinGroupTimeoutMs).compose(new JoinGroupResponseHandler(generation));
  25. }

JoinGroup 响应处理是由 JoinGroupResponseHandler 处理器异步完成的,源码如下:

  1. // org.apache.kafka.clients.consumer.internals.AbstractCoordinator.JoinGroupResponseHandler#handle
  2. @Override
  3. public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
  4. Errors error = joinResponse.error();
  5. if (error == Errors.NONE) {
  6. if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
  7. log.error("JoinGroup failed: Inconsistent Protocol Type, received {} but expected {} ", joinResponse.data().protocolType(), protocolType());
  8. future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
  9. } else {
  10. // 获取锁
  11. synchronized (AbstractCoordinator.this) {
  12. if (state != MemberState.PREPARING_REBALANCE) {
  13. // 如果消费者在重新平衡完成之前被唤醒,我们可能已经离开了这个消费组。
  14. // 在这种情况下,我们不希望继续进行下面的「SyncGroup」步骤。
  15. future.raise(new UnjoinedGroupException());
  16. } else {
  17. // 状态正常,更新状态为COMPLETING_REBALANCE,随后会发送「SyncGroup」请求接收分区分配结果
  18. state = MemberState.COMPLETING_REBALANCE;
  19. // 当状态为 MemberState.COMPLETING_REBALANCE 时,就可以启动心跳线程了。
  20. // 因为最终会从这个状态转移到STABLE
  21. if (heartbeatThread != null)
  22. // 开启心跳线程
  23. heartbeatThread.enable();
  24. // 记录发生分区重平衡次数(generationId)和成员唯一ID
  25. AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(),joinResponse.data().memberId(), joinResponse.data().protocolName());
  26. // #2 Leader收到「JoinGroup」响应后,
  27. // 需要完成分区分配任务,然后把结果发送给协调器
  28. if (joinResponse.isLeader()) {
  29. onJoinLeader(joinResponse).chain(future);
  30. } else {
  31. // 其他消费者则只需要发送「SyncGroup」请求,等待协调器将结果返回
  32. onJoinFollower().chain(future);
  33. }
  34. }
  35. }
  36. }
  37. } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
  38. future.raise(error);
  39. } else if (error == Errors.UNKNOWN_MEMBER_ID) {
  40. log.info("JoinGroup failed: {} Need to re-join the group. Sent generation was {}", error.message(), sentGeneration);
  41. if (generationUnchanged())
  42. resetGenerationOnResponseError(ApiKeys.JOIN_GROUP, error);
  43. future.raise(error);
  44. } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
  45. markCoordinatorUnknown(error);
  46. log.info("JoinGroup failed: {} Marking coordinator unknown. Sent generation was {}", error.message(), sentGeneration);
  47. future.raise(error);
  48. } else if (error == Errors.FENCED_INSTANCE_ID) {
  49. log.error("JoinGroup failed: The group instance id {} has been fenced by another instance. Sent generation was {}", rebalanceConfig.groupInstanceId, sentGeneration);
  50. future.raise(error);
  51. } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
  52. || error == Errors.INVALID_SESSION_TIMEOUT
  53. || error == Errors.INVALID_GROUP_ID
  54. || error == Errors.GROUP_AUTHORIZATION_FAILED
  55. || error == Errors.GROUP_MAX_SIZE_REACHED) {
  56. log.error("JoinGroup failed due to fatal error: {}", error.message());
  57. if (error == Errors.GROUP_MAX_SIZE_REACHED) {
  58. future.raise(new GroupMaxSizeReachedException("Consumer group " + rebalanceConfig.groupId + "already has the configured maximum number of members."));
  59. } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
  60. future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
  61. } else {
  62. future.raise(error);
  63. }
  64. } else if (error == Errors.UNSUPPORTED_VERSION) {
  65. log.error("JoinGroup failed due to unsupported version error. Please unset field group.instance.id and retry to see if the problem resolves");
  66. future.raise(error);
  67. } else if (error == Errors.MEMBER_ID_REQUIRED) {
  68. log.debug("JoinGroup failed due to non-fatal error: {} Will set the member id as {} and then rejoin. Sent generation was {}", error, memberId, sentGeneration);
  69. synchronized (AbstractCoordinator.this) {
  70. AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
  71. }
  72. // #1
  73. requestRejoin("need to re-join with the given member-id");
  74. future.raise(error);
  75. } else if (error == Errors.REBALANCE_IN_PROGRESS) {
  76. log.info("JoinGroup failed due to non-fatal error: REBALANCE_IN_PROGRESS, which could indicate a replication timeout on the broker. Will retry.");
  77. future.raise(error);
  78. } else {
  79. // unexpected error, throw the exception
  80. log.error("JoinGroup failed due to unexpected error: {}", error.message());
  81. future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
  82. }
  83. }

这里值得提一下的是关于 MEMBER_ID_REQUIRED,新的消费者是没有 MEMBER_ID,在发送 JoinGroup 的参数中 member_id 默认值是 "",随后 JoinGroup 响应体中错误码为 Errors.MEMBER_ID_REQUIRED,且附带一个唯一的 MEMBER_ID,随后消费者使用这个 MEMBER_ID 再重新发送一个 JoinGroup 请求。
代码 #2 区分 Leader 和其他组员收到 JoinGroup 做不同的处理,Leader 会调用 AbstractCoordinator#performAssignment 按照配置的分配策略给组员分配分区,而其它组员仅需要发送 SyncGroup 请求即可。
看看 Leader 是如何处理 JoinGroup 响应的:

  1. // org.apache.kafka.clients.consumer.internals.AbstractCoordinator#onJoinLeader
  2. /**
  3. * Leader消费者收到「JoinGroup」响应后:
  4. * ① 调用 {@link #performAssignment(String, String, List)} 为组内的消费者分配分区。
  5. * ② 将结果放到 {@link SyncGroupRequest} 请求中发送给协调器
  6. *
  7. * @param joinResponse JoinGroup 响应
  8. * @return
  9. */
  10. private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
  11. try {
  12. // #1 按配置分区策略为所有组员分配分区,默认分区策略是:range
  13. Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
  14. joinResponse.data().members());
  15. List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
  16. for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
  17. groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()
  18. .setMemberId(assignment.getKey())
  19. .setAssignment(Utils.toArray(assignment.getValue()))
  20. );
  21. }
  22. // #2 准备「SyncGroup」请求
  23. SyncGroupRequest.Builder requestBuilder =
  24. new SyncGroupRequest.Builder(
  25. new SyncGroupRequestData()
  26. // 消费组组名
  27. .setGroupId(rebalanceConfig.groupId)
  28. // 成员唯一ID
  29. .setMemberId(generation.memberId)
  30. // 协议类型,默认值:consumer
  31. .setProtocolType(protocolType())
  32. // 分区策略,默认值:range
  33. .setProtocolName(generation.protocolName)
  34. .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
  35. // 简单理解为分区重平衡已发生次数
  36. .setGenerationId(generation.generationId)
  37. // 消费者分区分配结果
  38. .setAssignments(groupAssignmentList)
  39. );
  40. // #3 发送请求
  41. return sendSyncGroupRequest(requestBuilder);
  42. } catch (RuntimeException e) {
  43. return RequestFuture.failure(e);
  44. }
  45. }

SyncGroup 响应是由 SyncGroupResponseHandler 处理,源码的细节就不带大家看了。

https://www.cnblogs.com/huxi2b/p/6223228.html
问题:

  1. 已存在消费组的组员如何被通知有新的消费者加入分区?

已存在消费组中的消费者是通过心跳的方式感知到需要重新发送”加入组请求”。 协调器关于消费组的状态更改,因此,协调器无法正常响应消费者发来的心跳请求,协调器通过返回错误码的方式通知消费者:此刻你所处的消费组正在发生分区重平衡,你需要发送 JoinGroup 重新加入消费组。

  1. 消费者重平衡是否会触发元数据更新?
  2. 消费组重平衡流程是怎么样的?
  3. 心跳线程会被阻塞么?
  1. 消费者在发送 JoinGroup 请求之前做些清理工作:提交位移。
  2. 协调者通常选择第一个发送 JoinGroup 请求的消费者作为 Leader 消费者。后续将由它来完成分区分配任务。
  3. 消费者发送 JoinGroup 请求给协调器,协调器在收集完所有消费者和它们所订阅主题信息后再会返回响应给消费者,只有主消费者才会收到整个消费组的元数据信息。
  4. 消费者收到 JoinGroup 响应后,再发送 SyncGroup 请求给协调者,目的是获得分区分配结果。分区分配结果由 Leader 消费者完成分配,会在 SyncGroup 请求中放入分配结果。协调者只有收到 Leader 消费者结果才向其他消费者返回 JoinGroup 响应,这个响应就包含分配结果。

每次调用 KafkaConsumer#poll() 方法时,也同时会调用 ConsumerCoordinator#poll() 方法。理解 Kafka 源码,我们要通过非阻塞思维去理解,也可以通过事件驱动理解,每次循环,就会