JMX 监控指标
Kafka 自身提供的监控指标(包括 broker 和主题的指标,而集群层面的指标可以通过各个 broker 的指标值累加 来获得)都可以通过 JMX(Java Managent Extension,Java 管理扩展)来获取,在使用 JMX 之前需要确保 Kafka 开启了 JMX 的功能,该功能默认是关闭的。Kafka 在启动时需要通过配置 JMX_PORT 来设置 JMX 的端口号并以此来开启 JMX 的功能,示例如下:
JMX_PORT=9999 nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
有时候,在某些服务器中可能无法正确绑定 IP 地址,这时候我们就需要显示指定绑定的 host。通过在 kafka-run-class.sh 脚本中添加 -Djava.rmi.server.hostname=x.x.x.x 来显式绑定:
开启 JMX 功能后,我们就可以使用 Java 自带的工具 JConsole 来查看 JMX 指标了:
Kafka 完整监控指标含义:https://kafka.apache.org/documentation/#monitoring
消费进度监控
对于 Kafka 消费者来说,最重要的事情就是监控它们的消费进度了,或者说是监控它们消费的滞后程度。滞后程度是指消费者当前落后于生产者的程度,比如生产者向某主题成功生产了 100 万条消息,而你的消费者当前只消费了 80 万条消息,那么我们就说你的消费者滞后了 20 万条消息。
这个滞后程度有个专门的名称:消费者 Lag,对每一个分区而言,它的 Lag 等于 HW - ConsumerOffset 的值,其中 ConsumerOffset 表示当前的消费位移。通常 Lag 的单位是消息数,并且 Kafka 监控 Lag 的层级是在分区上的。如果要计算主题级别,需要手动汇总所有主题分区的 Lag 将它们累加起来,合并成最终的 Lag 值。
消费者 Lag 直接反映了一个消费者的运行情况。一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示该消费者能够及时地消费生产者生产出来的消息,滞后程度很小。反之,如果一个消费者 Lag 值很大,通常就表明它无法跟上生产者的速度,最终 Lag 会越来越大,从而拖慢下游消息的处理速度。
更可怕的是,由于消费者的速度无法匹及生产者的速度,极有可能导致它消费的数据已经不在操作系统的页缓存中了。这样消费者就不得不从磁盘上读取它们,这就进一步拉大了与生产者的差距,使得那些 Lag 原本就很大的消费者会越来越慢,Lag 也会越来越大。
因此我们在实际业务场景中必须要时刻关注消费者的消费进度。常用的监控方式有如下 3 种:
1. kafka-consumer-groups.sh
kafka-consumer-groups 脚本是 Kafka 为我们提供的最直接的监控消费者消费进度的工具。,且它也能够监控独立消费者的 Lag。独立消费者就是没有使用消费者组机制的消费者程序。和消费者组相同的是,它们也要配置 group.id,但和消费者组调用 subscribe() 不同的是,独立消费者调用 assign() 方法直接消费指定分区。下面我们讨论的所有内容都适用于独立消费者。
kafka-consumer-groups.sh 脚本的使用示例如下:
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>
Kafka 连接信息就是 <主机名:端口> 对,而 group 名称就是你的消费者程序中设置的 group.id 值。如果是独立消费者,group.id 可以随意指定,但要额外加上 —partition 参数,下图展示了脚本的具体用法:
kafka-consumer-groups 脚本的输出信息很丰富。首先,它会按照消费者组订阅主题的分区进行展示,每个分区一行数据。其次,它会汇报每个分区当前最新生产的消息的位移值(LOG-END-OFFSET)、该消费者组当前最新消费消息的位移值(CURRENT-OFFSET)、LAG 值(前两者的差值)、消费者实例 ID、消费者连接的 Broker 主机名以及消费者的 CLIENT-ID 信息。
2. Kafka Consumer API
很多时候,你可能对运行命令行工具查询 Lag 这种方式并不满意,而是希望用程序的方式自动化监控。幸运的是,社区的确为我们提供了这样的方法。这就是我们今天要讲的第二种方法。
简单来说,社区提供的 Java Consumer API 分别提供了查询当前分区最新消息位移和消费者组最新消费消息位移两组方法,我们使用它们就能计算出对应的 Lag。下面这段代码展示了如何利用 Consumer 端 API 监控给定消费者组的 Lag 值:
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
// 获取给定消费者组的最新消费消息的位移
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
try {
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
// 获取订阅分区的最新消息位移
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
// 执行相应的减法操作,获取Lag值并封装进一个Map对象
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
}
} catch (Exception e) {
......
} catch (TimeoutException e) {
throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
}
}
}
请注意,AdminClient.listConsumerGroupOffsets 方法只在 Kafka 2.0.0 及以上版本有。
3. Kafka JMX 监控指标
当前,Kafka 消费者提供了一个名为 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=”{client-id}” 的 JMX 指标,其中 records-lag-max 和 records-lead-min 分别表示此消费者在测试窗口时间内曾达到的最大 Lag 值和最小 Lead 值。
这里的 Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。很显然,Lag 和 Lead 是一体的两个方面:Lag 越大 Lead 就越小,因为 Lead 越小意味着 consumer 消费的消息越来越接近被删除的边缘。Kafka 默认会删除 1 周前的数据,此时分区第一条消息的 offset 会变大,倘若消费者程序足够慢,则 current-offset 的值增长的非常慢,慢到这个值快要接近分区第一条消息的 offset 值了,即它要消费的数据快被 Kafka 删除了,这时你必须立即处理,否则会出现消息被删除,导致消费者程序重新调整位移值的情形。
这可能产生两个后果:一个是消费者从头消费一遍数据,另一个是消费者从最新的消息位移处开始消费,之前没来得及消费的消息全部被跳过了,从而造成丢消息的假象。具体取决于 auto.offset.reset 的值,这两种情形都是不可忍受的,因此必须有一个 JMX 指标来清晰地表征这种情形,这就是引入 Lead 指标的原因。
Kafka 消费者还提供了额外的 JMX 指标来单独监控分区级别的 Lag 和 Lead 值。JMX 名称为:kafka.consumer:type=consumer-fetch-manager-metrics,partition=”{partition}”,topic=”{topic}”,client-id=”{client-id}”。分区级别的 JMX 指标中多了 records-lag-avg 和 records-lead-avg 两个属性,可以计算平均的 Lag 值和 Lead 值。在实际场景中,我们会更多地使用这两个 JMX 指标。
同步失效分区监控
处于同步失效或功能失效(比如处于非活跃状态)的副本统称为失效副本,而包含失效副本的分区也就称为同步失效分区。通常情况下,在一个运行状况良好的 Kafka 集群中,失效分区的个数应该为 0。Kafka 本身提供了一个相关的指标来表征失效分区的个数,即 UnderReplicatedPartitions,可通过 JMX 访问来获取其值:
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
取值范围是大于等于 0 的整数,如果获取的 UnderReplicatedPartitions 值大于 0,那么就需要对其进行告警并进一步诊断其背后的真正原因。注意:如果 Kafka 集群正在做分区重分配,这个值也会大于 0。如果集群中存在 broker 的 UnderReplicatedPartitions 频繁变动,或者处于一个稳定的大于 0 的值(这里特指没有 broker 下线的情况)时,一般暗示集群出现了性能问题。
集群层面的问题一般也就是两个方面:资源瓶颈和负载不均衡。资源瓶颈指的是 broker 在某硬件资源的使用上遇到了瓶颈,比如网络、CPU、I/O 等层面。就以 I/O 而论,Kafka 中的消息都是存盘的,生产者线程将消息写入 leader 副本的性能和 I/O 有直接的关联,follower 副本的同步线程及消费者的消费线程又要通过 I/O 从磁盘中拉取消息,如果 I/O 层面出现了瓶颈,那么势必影响全局的走向,与此同时消息的流入、流出又都需要和网络打交道。因此,建议硬件层面的指标可以关注 CPU 的使用率、网络流入/流出速度、磁盘的读/写速度、iowait、ioutil 等,也可以适当地关注下文件句柄数、Socket 句柄数及内存等方面。
常用监控指标说明
指标名称 | MBean 名称 |
---|---|
网络流入速率(bytesIn) | kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec |
网络流出速率(bytesOut) | kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec |
leader 副本总数 | kafka.server:type=ReplicaManager,name=LeaderCount |
分区的总数 | kafka.server:type=ReplicaManager,name=PartitionCount |
ISR 集合扩张速度 | kafka.server:type=ReplicaManager,name=IsrShrinksPerSec |
ISR 集合收缩速度 | kafka.server:type=ReplicaManager,name=IsrExpandsPerSec |
controller 存活数,应始终为 1 | kafka.controller:type=KafkaController,name=ActiveControllerCount |
broker I/O 工作处理线程空闲率 | kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent |
broker 网络处理线程空闲率 | kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent |