• Consumer Lag: 滞后程度
  • Kafka 监控 Lag 的层级是在分区上的。
    • 如果要计算主题级别的,你需要手动汇总所有主题分区的 Lag,将它们累加起来,合并成最终的 Lag 值

危害

  • 消费者的速度无法匹及生产者的速度,极有可能导致它消费的数据已经不在操作系统的页缓存中了,那么这些数据就会失去享有 Zero Copy 技术的资格
    • 这样的话,消费者就不得不从磁盘上读取它们,这就进一步拉大了与生产者的差距,进而出现马太效应

方式

使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本

  • 独立消费者也可以使用
    • 独立消费者就是没有使用消费者组机制的消费者程序
    • 也要配置 group.id 参数值
    • 但和消费者组调用 KafkaConsumer.subscribe() 不同的是,独立消费者调用KafkaConsumer.assign() 方法直接消费指定分区。

$ bin/kafka-consumer-groups.sh —bootstrap-server --describe --group

  • 独立消费者的group.id 可以随意指定,不过要额外加上 --partition 参数

使用 Kafka Java Consumer API 编程

  • 只适用于 Kafka 2.0.0 及以上的版本
    1. public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
    2. Properties props = new Properties();
    3. props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    4. try (AdminClient client = AdminClient.create(props)) {
    5. // <1> 调用 AdminClient.listConsumerGroupOffsets 方法
    6. // 获取给定消费者组的最新消费消息的位移
    7. ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
    8. try {
    9. Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
    10. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
    11. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
    12. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    13. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    14. try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    15. //<2> 处则是获取订阅分区的最新消息位移
    16. Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
    17. return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
    18. // <3>执行相应的减法操作,获取 Lag 值并封装进一个 Map 对象
    19. entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
    20. }
    21. } catch (InterruptedException e) {
    22. Thread.currentThread().interrupt();
    23. // 处理中断异常
    24. // ...
    25. return Collections.emptyMap();
    26. } catch (ExecutionException e) {
    27. // 处理 ExecutionException
    28. // ...
    29. return Collections.emptyMap();
    30. } catch (TimeoutException e) {
    31. throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
    32. }
    33. }
    34. }

使用 Kafka 自带的 JMX 监控指标 (推荐)

  • Kafka 消费者提供了一个名为 kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"的 JMX 指标,里面有很多属性
    • records-lag-max 此消费者在测试窗口时间内曾经达到的最大的 Lag 值
    • records-lead-min,此消费者在测试窗口时间内曾经达到的最小的 Lead 值
  • 分区级别提供了额外的 JMX 指标,用于单独监控分区级别的 Lag 和 Lead 值。JMX 名称为:kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"

**

Lead

  • Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值
    • Lag 越大的话,Lead 就越小(消费得少嘛)
  • Lead 越来越小,甚至是快接近于 0 了,这可能预示着消费者端要丢消息了
    • 由于 Kafka 的消息是有留存时间设置的,默认是 1 周
    • 如果消息被删除了,导致消费者程序重新调整位移值的情形,产生两个后果
      • 一个是消费者从头消费一遍数据
      • 另一个是消费者从最新的消息位移处开始消费,之前没来得及消费的消息全部被跳过了,从而造成丢消息的假象
      • 取决于**auto.offset.reset**的值