简介

客户端的 AdminClient是Kafka运维的工具类

AdminClient 提供的功能:

  • 主题管理:包括主题的创建、删除和查询。
  • 权限管理:包括具体权限的配置与删除。
  • 配置参数管理:包括 Kafka 各种资源的参数设置、详情查询。所谓的 Kafka 资源,主要有 Broker、主题、用户、Client-id 等。
  • 副本日志管理:包括副本底层日志路径的变更和详情查询。
  • 分区管理:即创建额外的主题分区。
  • 消息删除:即删除指定位移之前的分区消息。
  • Delegation Token 管理:包括 Delegation Token 的创建、更新、过期和详情查询。
  • 消费者组管理:包括消费者组的查询、位移查询和删除。
  • Preferred 领导者选举:推选指定主题分区的 Preferred Broker 为领导者。

设计

image.png

使用

AdminClient的完整类路径是 org.apache.kafka.clients.admin.AdminClient,而不是 kafka.admin.AdminClient。后者就是我们刚才说的服务器端的 AdminClient,它已经不被推荐使用了。

引入依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.3.0</version>
  5. </dependency>

构建和销毁 AdminClient 实例

  1. Properties props = new Properties();
  2. props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:port");
  3. props.put("request.timeout.ms", 600000);
  4. try (AdminClient client = AdminClient.create(props)) {
  5. // 执行你要做的操作……
  6. }

常用操作

创建主题

  1. String newTopicName = "test-topic";
  2. try (AdminClient client = AdminClient.create(props)) {
  3. NewTopic newTopic = new NewTopic(newTopicName, 10, (short) 3);
  4. CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));
  5. result.all().get(10, TimeUnit.SECONDS);
  6. }

查询消费者位移

  1. String groupID = "test-group";
  2. try (AdminClient client = AdminClient.create(props)) {
  3. ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
  4. Map<TopicPartition, OffsetAndMetadata> offsets =
  5. result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
  6. System.out.println(offsets);
  7. }

获取Broker磁盘占用

  1. try (AdminClient client = AdminClient.create(props)) {
  2. DescribeLogDirsResult ret = client.describeLogDirs(Collections.singletonList(targetBrokerId)); // 指定Broker id
  3. long size = 0L;
  4. for (Map<String, DescribeLogDirsResponse.LogDirInfo> logDirInfoMap : ret.all().get().values()) {
  5. size += logDirInfoMap.values().stream().map(logDirInfo -> logDirInfo.replicaInfos).flatMap(
  6. topicPartitionReplicaInfoMap ->
  7. topicPartitionReplicaInfoMap.values().stream().map(replicaInfo -> replicaInfo.size))
  8. .mapToLong(Long::longValue).sum();
  9. }
  10. System.out.println(size);
  11. }

补充: Kafka可视化管理工具