消费者组

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制

  • 一个Consumer Group可以由一个或多个Consumer Instance组成, 共享一个Group ID, 组内所有实例一起来消费订阅主题的所有分区, 每个分区只能由同一个消费者组内的一个消费者实例来消费
  • 若所有消费者同属一个组, 即实现了消费队列模型; 若分属不同的组, 则实现了发布订阅模型; kafka的consumer group避免了传统消息队列模型多个消费者之间的竞争, 也避免了传统发布订阅模型每个消费者订阅所有主题的繁重, 具备良好的可伸缩性
  • 设置组内consumer实例数=该组订阅主题的分区总数, 很好理解, 这样配置一个实例消费一个主题的一个分区, 假设设置8个实例, 而分区总数是6, 则有两个实例不会被分配任何分区

Standalone Consumer

  • 独立consumer, 运行机制与上者完全不同
  • 位移机制与Consumer Group相同, 也有Group Id

位移

  • 对于消费者组, 位移是一对KV的数据结构, java来表示即Map, key主要包含3部分内容, value为consumer消费该分区的最新位移
  • 老版本Consumer将位移保存在zk中, 好处是减少Broker端保存状态的开销, 使节点无状态便于扩缩容, 坏处是zk不适合频繁写
  • 新版本Consumer将位移保存在_consumer_offsets的内部位移主题中

位移主题

  • 消息格式有三种:
    • 位移消息, key为, value为消息位移以及时间戳, 其他元数据, 用户自定义的数据等
    • consumer group注册的消息
    • 删除group过期位移或删除group的消息, 称为tombstone(墓碑)消息, 也称为delete mark, 消息体为null

      一旦某个 Consumer Group 下的所有 Consumer 实例都停止了,而且它们的位移数据都已被删除时,Kafka 会向位移主题的对应分区写入 tombstone 消息,表明要彻底删除这个 Group 的信息。

  • 当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建50个分区, 3个副本的位移主题, 创建完成后在kafka的日志路径下可见__consumer_offsets-xxx目录

    1. broker参数:
    2. offsets.topic.num.partitions默认为50
    3. offsets.topic.replication.factor默认为3
  • Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner.

    Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起

补充: 很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。

image.png

之前遇到过的一个问题跟大家分享一下,原因描述不正确的地方还请大佬指正:
log cleaner线程挂掉还有可能导致消费端出现:Marking Coordinator Dead!

原因大概如下:
log cleaner线程挂掉之后会导致磁盘上位移主题的文件越来越多(当然,大部分是过期数据,只是依旧存在),broker内存中会维护offsetMap,从名字上看这个map就是维护消费进度的,而这个map和位移主题的文件有关联,文件越来越多会导致offsetMap越来越大,甚至导致offsetMap构建失败(为什么会失败没有搞明白),offsetMap构建失败之后broker不会承认自己是coordinator。
消费者组找coordinator的逻辑很简单:abs(consumer_groupName.hashCode) % __consumer_offset.partition.num对应的partition所在的broker就是这个group的coordinate,一旦这个broker的offsetMap构建失败,那么这个broker就不承认自己是这个group的coordinate,这个group的消费就无法继续进行,会出现Marking Coordinator Dead错误。
此时需要删除过期的位移主题的文件(根据文件名很容易确定哪个是最新的),重启broker。重启过程中需要关注log cleaner是否会再次挂掉。

PS:上述问题在broker重启和正常运行中都有可能遇到。

位移提交

  • Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)

个人理解

  1. 首先, consumer本地会记录消费的位移数据, 无论位移提交与否, 只要consumer不重启, 都会从本地记录的最新的位移开始poll
  2. 其次, 位移提交主要是为了consumer重启后要从最新的位移点开始消费, 避免从头消费所有消息, 因此才需要consumer自动(或手动)跟broker同步位移

自动提交

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("group.id", "test");
  4. props.put("enable.auto.commit", "true");
  5. props.put("auto.commit.interval.ms", "2000");
  6. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  7. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  8. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  9. consumer.subscribe(Arrays.asList("foo", "bar"));
  10. while (true) {
  11. ConsumerRecords<String, String> records = consumer.poll(100);
  12. for (ConsumerRecord<String, String> record : records)
  13. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  14. }
  • 相关配置有两个
    • enable.auto.commit=true
    • auto.commit.interval.ms默认值为5, 表示每5s提交一次, 这里的提交频率指的是最小提交间隔, 也就是说若两次poll之间的间隔是6s, 那实际到6s才会提交
  • 在每次调用poll方法之前会有自动提交逻辑, 当两次poll间隔>最小提交间隔, 才进行offset提交, 示例, 假设poll的频率是2s, 则前两次poll不会提交, 第三次poll即6s的时候才会提交
  • 可能出现重复消费

    在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

手动提交

  • 分同步和异步
    • 同步: commitSync(), 优点是有自动重试机制 ```java

while (true) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 try { consumer.commitSync(); } catch (CommitFailedException e) { handle(e); // 处理提交失败异常 } }

  1. - 异步: commitAsync()
  2. ```java
  3. while (true) {
  4. ConsumerRecords<String, String> records =
  5. consumer.poll(Duration.ofSeconds(1));
  6. process(records); // 处理消息
  7. consumer.commitAsync((offsets, exception) -> {
  8. if (exception != null)
  9. handle(exception);
  10. });
  11. }
  • 最佳实践

    1. try {
    2. while(true) {
    3. ConsumerRecords<String, String> records =
    4. consumer.poll(Duration.ofSeconds(1));
    5. process(records); // 处理消息
    6. commitAysnc(); // 使用异步提交规避阻塞
    7. }
    8. } catch(Exception e) {
    9. handle(e); // 处理异常
    10. } finally {
    11. try {
    12. consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
    13. } finally {
    14. consumer.close();
    15. }
    16. }
  • 自定义每100条消息提交一次示例, 避免大位移的提交, 减少错误恢复的时间 ```java

private Map offsets = new HashMap<>(); int count = 0; …… while (true) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord record: records) { process(record); // 处理消息 offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1); if(count % 100 == 0) consumer.commitAsync(offsets, null); // 回调处理逻辑是null count++; } }

  1. <a name="B1w2K"></a>
  2. #### CommitFailedException
  3. 消费者位移提交异常, **原因为消费者组已开启Rebalance, 并将需要提交位移的分区分配给了另一个消费者实例. 出现这种情况的原因为, 消费者实例连续两次调用poll方法的时间间隔超过期望值max.poll.interval.ms.**
  4. 从源代码方面来说,CommitFailedException 异常通常发生在手动提交位移时,即用户显式调用 KafkaConsumer.commitSync() 方法时.<br />有以下两种使用场景会造成该异常,
  5. 1. 消息处理总时间超过期望值max.poll.interval.ms, 解决方案:
  6. 1. 缩短单条消息处理时间
  7. 1. 加大max.poll.interval.ms, 默认为5min
  8. 1. 减少下游系统一次性消费的总消息数, max.poll.records, 默认为500
  9. 1. 下游使用多线程加速消费
  10. > 示例: 假设消费端需将数据写入mongodb, 如果访问db的平均延时不超过2s
  11. > - 按max.poll.records=500, 总消费时长约1000s, 可以增大max.poll.interval.ms
  12. > - 或者缩小max.poll.records=150, 则总消费时间约150*2=300s=5min
  13. 2. 如果你的应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常,因为 Kafka 无法识别这个具有相同 group.id 的消费者实例,于是就向它返回一个错误,表明它不是消费者组内合法的成员。
  14. <br />
  15. <a name="Ve84d"></a>
  16. ## 重平衡
  17. <a name="UXZlq"></a>
  18. ### Rebalance概念
  19. - Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区
  20. - 在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配
  21. <a name="ICtim"></a>
  22. ### Coordinator协调者
  23. - 协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等
  24. > 具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
  25. - 所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件
  26. - 目前,Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。
  27. - 1. 确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
  28. - 2. 找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。
  29. > 知晓计算规则, 有助于定位消费者组对应的协调者对应的Broker, 方便排查日志, 例如, 某个消费者组的id为"test-group", abs(627841412 % 50) = 12, 因此对应的broker为位移主题的12分区
  30. <a name="DtgxI"></a>
  31. ### 何时进行 Rebalance ?
  32. 1. 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
  33. 1. 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile("t.*c")) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  34. 1. 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
  35. <a name="u3b0S"></a>
  36. ### Rebalance的问题
  37. - Rebalance 过程对 Consumer Group 消费过程有极大的影响, 在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成
  38. - 目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动
  39. - Rebalance非常慢
  40. <a name="XZ7K1"></a>
  41. ### 如何避免Rebalance?
  42. 从Rebalance的时机入手, 后两者是运维操作, 我们能做的就是避免组成员数发生变更
  43. - consumer会定时想coordinator发送心跳, 若一段时间未发送, 则会被认定死亡, 被提出, 发生rebalance, 因此更改相关设置如下
  44. ```java
  45. 设置 session.timeout.ms = 6s。
  46. 设置 heartbeat.interval.ms = 2s。
  47. 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,
  48. 即 session.timeout.ms >= 3 * heartbeat.interval.ms。
  • consumer两次poll消息时间间隔最大为默认5min, 若超时则会被踢出, 导致rebalance, 因此可以适当调大该值, 给消费程序预留更多时间

    1. max.poll.interval.ms, 默认5min
  • 若还发生rebalance, 建议排查是否由于GC 设置不合理导致程序频发 Full GC 而引发的非预期 Rebalance

多线程消费方案

使用多个消费线程执行消息处理逻辑
image.png

  1. private final KafkaConsumer<String, String> consumer;
  2. private ExecutorService executors;
  3. ...
  4. private int workerNum = ...;
  5. executors = new ThreadPoolExecutor(
  6. workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
  7. new ArrayBlockingQueue<>(1000),
  8. new ThreadPoolExecutor.CallerRunsPolicy());
  9. ...
  10. while (true) {
  11. ConsumerRecords<String, String> records =
  12. consumer.poll(Duration.ofSeconds(1));
  13. for (final ConsumerRecord record : records) {
  14. executors.submit(new Worker(record));
  15. }
  16. }
  17. ..

目前作者没有很好的范例, 下面这个是作者的另一篇文章, 依然有消息丢失的问题
https://www.cnblogs.com/huxi2b/p/7089854.html

消费者组消费进度监控

  • Consumer Lag表征消费者落后于生产者的滞后程度

    假设生产者写了10w消息, 消费者消费了8w, 那么Lag为2w

  • Kafka监控Lag的层级是在分区上的, 计算主题级别需要手动汇总所有的分区

  • 如果一个消费者Lag很大, 通常表明它无法跟上生产者的速度, 极有可能导致它消费的数据不在操作系统的页缓存中, 需要从磁盘上读, 进一步拉大与生产者的差距, 最终导致lag越来越大

监控方式:

  1. 使用 Kafka 自带的命令行工具 bin/kafka-consumer-groups.sh(bat)。

该命令行也能用于监控独立消费者

  1. $ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>

image.png
当运行 kafka-consumer-groups 脚本时没有启动消费者实例时, 会出现下图这种情况, 但lag值依然是有效的
image.png

  1. Kafka Java Consumer API监控

此段代码适用于2.0.0以上版本, 简言之, 分区最新消费位移-消费者组最新消费位移=Lag

  1. public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
  2. Properties props = new Properties();
  3. props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  4. try (AdminClient client = AdminClient.create(props)) {
  5. ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
  6. try {
  7. Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
  8. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
  9. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
  10. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  11. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  12. try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
  13. Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
  14. return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
  15. entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
  16. }
  17. } catch (InterruptedException e) {
  18. Thread.currentThread().interrupt();
  19. // 处理中断异常
  20. // ...
  21. return Collections.emptyMap();
  22. } catch (ExecutionException e) {
  23. // 处理ExecutionException
  24. // ...
  25. return Collections.emptyMap();
  26. } catch (TimeoutException e) {
  27. throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
  28. }
  29. }
  30. }
  1. Kafka JMX 监控指标
    1. kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”
  • records-lag-max, 此消费者在测试窗口时间内曾经达到的最大的 Lag 值
  • records-lead-min, 此消费者在测试窗口时间内曾经达到的最小的 Lead 值
    • Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值
    • Lag 越大的话,Lead 就越小,反之也是同理
    • 当Lag值越来越大时, 说明消费跟不上生产
    • 当消费停滞时, 由于Kafka默认留存消息1周, 超过1周的数据会被逐渐删除, 因此Lead值会越来越小, 当Lead值为0时, 消费者程序需要重新调整位移, 要么从头消费, 要么从最新的位移处开始消费(丢失中间未消费的消息)

JConsole 工具监控 JMX 指标示例

需要JConsole连接consumer的JMX端口

image.png

  1. Kafka 消费者还在分区级别提供了额外的 JMX 指标,用于单独监控分区级别的 Lag Lead 值。JMX 名称为:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”

image.png

在公司的kafka消费者监控上,经常可以看到lag 为一个负数,比如-3,-109等,想咨询一下,为什么会出现负数呢? 在CMAK这样解释过这个问题,producer的offset是通过JMX轮询获取的,consumer的offset是从topic中直接读取的,很明显,轮询的话,因为存在时间差,所以获取的offset就可能比实际的小,计算之后,就变成负数了 KM 上的瞬时lag <0 是正常的。尤其是消费得特别快的时候