2.3 主题

2.3.1 管理

使用kafka-topics.sh脚本:

选项 说明
—config 为创建的或修改的主题指定配置信息。支持下述
配置条目:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.replicas
index.interval.bytes
leader.replication.throttled.replicas
max.message.bytes
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
—create 创建一个新主题
—delete 删除一个主题
—delete-config 删除现有主题的一个主题配置条目。这些条目就 是在—config中给出的配置条目。
—alter 更改主题的分区数量,副本分配和/或配置条目。
—describe 列出给定主题的细节。
—disable-rack-aware 禁用副本分配的机架感知。
—force 抑制控制台提示信息
—force 抑制控制台提示信息
—help 打印帮助信息
—if-exists 如果指定了该选项,则在修改或删除主题的时候,只有主题存在才可以执行。
—if-not-exists 在创建主题的时候,如果指定了该选项,则只有主题不存在的时候才可以执行命令。
—list 列出所有可用的主题。
—partitions 要创建或修改主题的分区数。
—replica-assignment
:broker_id_for_part1_replica2
,broker_id_for_part2_replica1
:broker_id_for_part2_replica2 , …>
当创建或修改主题的时候手动指定partition-to-broker的分配关系。
—replication-factor
要创建的主题分区副本数。1表示只有一个副本,
也就是Leader副本。
—topic 要创建、修改或描述的主题名称。除了创建,修改和描述在这里还可以使用正则表达式。
—topics-with-overrides if set when describing topics, only show topics that have overridden configs
—unavailable-partitions if set when describing topics, only show partitions whose leader is not available
—under-replicated-partitions if set when describing topics, only show under replicated partitions
—zookeeper 必需的参数:连接zookeeper的字符串,逗号分隔的多个host:port列表。多个URL可以故障转移。

主题中可以使用的参数定义:

属性 默认值 服务器默认属性 说明
cleanup.policy delete log.cleanup.policy 要么是”delete“要么是”
compact“; 这个字符串指明了针对旧日志部分的利用方式;默认方式(”delete”)将会丢弃旧的部分当他们的回收时间或者尺寸限制到达时。”compact“将会进行日志压缩.
compression.type none producer用于压缩数据的压缩类 型。默认是无压缩。正确的选项值 是none、gzip、snappy。压缩最 好用于批量处理,批量处理消息越 多,压缩性能越好。
delete.retention.ms 86400000
(24 hours)
log.cleaner.delete.retention.ms 对于压缩日志保留的最长时间,也 是客户端消费消息的最长时间,通 log.retention.minutes的区别在 于一个控制未压缩数据,一个控制 压缩后的数据。此项配置可以在 topic创建时的置顶参数覆盖
flush.ms None log.flush.interval.ms 此项配置用来置顶强制进行fsync 日志到磁盘的时间间隔;例如,如 果设置为1000,那么每1000ms就 需要进行一次fsync。一般不建议 使用这个选项
flush.messages None log.flush.interval.messages 此项配置指定时间间隔:强制进行 fsync日志。例如,如果这个选项 设置为1,那么每条消息之后都需 要进行fsync,如果设置为5,则每 5条消息就需要进行一次fsync。一 般来说,建议你不要设置这个值。 此参数的设置,需要在”数据可靠 性”与”性能”之间做必要的权衡.如 果此值过大,将会导致每 次”fsync”的时间较长(IO阻塞),如 果此值过小,将会导致”fsync”的次 数较多,这也意味着整体的client请 求有一定的延迟.物理server故障, 将会导致没有fsync的消息丢失.
index.interval.bytes 4096 log.index.interval.bytes 默认设置保证了我们每4096个字 节就对消息添加一个索引,更多的 索引使得阅读的消息更加靠近,但 是索引规模却会由此增大;一般不 需要改变这个选项
max.message.bytes 1000000 max.message.bytes kafka追加消息的最大尺寸。注意 如果你增大这个尺寸,你也必须增 大你consumer的fetch 尺寸,这 样consumer才能fetch到这些最 大尺寸的消息。
min.cleanable.dirty.ratio 0.5 min.cleanable.dirty.ratio 此项配置控制log压缩器试图进行 清除日志的频率。默认情况下,将 避免清除压缩率超过50%的日志。 这个比率避免了最大的空间浪费
min.insync.replicas 1 min.insync.replicas 当producer设置
request.required.acks为-1时,
min.insync.replicas指定replicas
的最小数目(必须确认每一个
repica的写数据都是成功的),如
果这个数目没有达到,producer
会产生异常。
retention.bytes None log.retention.bytes 如果使用“delete”的retention 策略,这项配置就是指在删除日志之前,日志所能达到的最大尺寸。默认情况下,没有尺寸限制而只有时间限制
retention.ms 7 days log.retention.minutes 如果使用“delete”的retention策 略,这项配置就是指删除日志前日 志保存的时间。
segment.bytes 1GB log.segment.bytes kafka中log日志是分成一块块存储 的,此配置是指log日志划分成块 的大小
segment.index.bytes 10MB log.index.size.max.bytes 此配置是有关offsets和文件位置 之间映射的索引文件的大小;一般 不需要修改这个配置
segment.jitter.ms 0 log.roll.jitter.{ms,hours} The maximum jitter to subtract from logRollTimeMillis.
segment.ms 7 days log.roll.hours 即使log的分块文件没有达到需要 删除、压缩的大小,一旦log 的时 间达到这个上限,就会强制新建一 个log分块文件
unclean.leader.election.enable true 指明了是否能够使不在ISR中 replicas设置用来作为leader

2.3.1.1 创建主题

  1. kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x
  2. --partitions 1 --replication-factor 1
  3. kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_02
  4. --partitions 3 --replication-factor 1 --config max.message.bytes=1048576
  5. --config segment.bytes=10485760

2.3.1.2 查看主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --list
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_x
kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides -- describe

2.3.1.3 修改主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 
 --partitions 2 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01
 --config max.message.bytes=1048576
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test_01
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01
 --config segment.bytes=10485760 
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config
 max.message.bytes --topic topic_test_01

2.3.1.4 删除主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x

image.png
给主题添加删除的标记:
image.png
要过一段时间删除。

2.3.2 增加分区

通过命令行工具操作,主题的分区只能增加,不能减少。否则报错:

ERROR org.apache.kafka.common.errors.InvalidPartitionsException:
The number of partitions for a topic can only be increased.
Topic myTop1 currently has 2 partitions, 1 would not be an increase.

通过—alter修改主题的分区数,增加分区。

kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 --partitions 2

2.3.3 分区副本的分配-了解

副本分配的三个目标:

  1. 均衡地将副本分散于各个broker上
  2. 对于某个broker上分配的分区,它的其他副本在其他broker上
  3. 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。

在不考虑机架信息的情况下:

  1. 第一个副本分区通过轮询的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置

进行轮询。

  1. 其余副本通过增加偏移进行分配。

分配案例:
broker-0 broker-1 broker-2 broker-3 broker-4
p0 p1 p2 p3 p4 (1st replica)
p5 p6 p7 p8 p9 (1st replica)
p4 p0 p1 p2 p3 (2nd replica)
p8 p9 p5 p6 p7 (2nd replica)
p3 p4 p0 p1 p2 (3nd replica)
p7 p8 p9 p5 p6 (3nd replica)
image.png
考虑到机架信息,首先为每个机架创建一个broker列表。如:
三个机架(rack1,rack2,rack3),六个broker(0,1,2,3,4,5)
brokerID -> rack

  • 0 -> “rack1”, 1 -> “rack3”, 2 -> “rack3”, 3 -> “rack2”, 4 -> “rack2”, 5 -> “rack1”

rack1:0,5
rack2:3,4
rack3:1,2
这broker列表为rack1的0,rack2的3,rack3的1,rack1的5,rack2的4,rack3的2
即:0, 3, 1, 5, 4, 2
通过简单的轮询将分区分配给不同机架上的broker:
image.png
每个分区副本在分配的时候在上一个分区第一个副本开始分配的位置右移一位。

六个broker,六个分区,正好最后一个分区的第一个副本分配的位置是该broker列表的最后一个。
如果有更多的分区需要分配,则算法开始对follower副本进行移位分配。
这主要是为了避免每次都得到相同的分配序列。
此时,如果有一个分区等待分配(分区6),这按照如下方式分配:
6 -> 0,4,2 (而不是像分区0那样重复0,3,1)
跟机架相关的副本分配中,永远在机架相关的broker列表中轮询地分配第一个副本。其余的副本,倾向于机架上没有副本的broker进行副本分配,除非每个机架有一个副本。然后其他的副本又通过轮询的方式分配给broker。
结果是,如果副本的个数大于等于机架数,保证每个机架最少有一个副本。否则每个机架最多保有一个副本。
如果副本的个数和机架的个数相同,并且每个机架包含相同个数的broker,可以保证副本在机架和broker之间均匀分布。
image.png
上图,tp_eagle_01主题的分区0分配信息:leader分区在broker1上,同步副本分区是1和2,也就是在broker1和broker2上的两个副本分区是同步副本分区,其中一个是leader分区。

2.3.4 必要参数配置

kafka-topics.sh —config xx=xx —config yy=yy
配置给主题的参数。

属性 默认值 服务器默认属性 说明
cleanup.policy delete log.cleanup.policy 要么是”delete“要么是”compact“; 这个字符串指明了针 对旧日志部分的利用方式;
默认方式(”delete”)将会丢弃旧的部分当他们的回收时间或者尺寸限制到达时。”compact“将会进行日志压缩
compression.type none producer用于压缩数据的压缩类 型。默认是无压缩。
正确的选项值是none、gzip、snappy、 lz4。
压缩最好用于批量处理,批量处理消息越多,压缩性能越好。
max.message.bytes 1000000 max.message.bytes kafka追加消息的最大字节数。注 意如果你增大这个字节数,也必须增大consumer的fetch字节数,这样consumer才能fetch到 这些最大字节数的消息。
min.cleanable.dirty.ratio 0.5 min.cleanable.dirty.ratio 此项配置控制log压缩器试图进行清除日志的频率。
默认情况下,将避免清除压缩率超过50%的日志。这个比率避免了最大的空间浪费
min.insync.replicas 1 min.insync.replicas 当producer设置 request.required.acks为-1时, min.insync.replicas指定replicas 的最小数目(必须确认每一个 repica,的写数据都是成功的),如果这个数目没有达到,producer会产生异常。
retention.bytes None log.retention.bytes 如果使用“delete”的retention 策略,这项配置就是指在删除日志之前,日志所能达到的最大尺寸。
默认情况下,没有尺寸限制而只有时间限制
retention.ms 7 days log.retention.minutes 如果使用“delete”的retention策略,这项配置就是指删除日志前日志保存的时间。
segment.bytes 1GB log.segment.bytes kafka中log日志是分成一块块存储的,此配置是指log日志划分成块的大小
segment.index.bytes 10MB log.index.size.max.bytes 此配置是有关offsets和文件位置之间映射的索引文件的大小;一般不需要修改这个配置
segment.jitter.ms 0 log.roll.jitter.{ms,hours} The maximum jitter to subtract from logRollTimeMillis.
segment.ms 7 days log.roll.hours 即使log的分块文件没有达到需要删除、压缩的大小,一旦log 的时间达到这个上限,就会强制新建一个log分块文件
unclean.leader.election.enable true 指明了是否能够使不在ISR中 replicas 设置用来作为leader

2.3.5 KafkaAdminClient应用

说明
除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采用Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,又多了一个AdminClient,在kafka-client包下,一个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient。
功能与原理介绍
Kafka官网:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects。
KafkaAdminClient包含了一下几种功能(以Kafka1.0.2版本为准):

  1. 创建主题:
    • createTopics(final Collection newTopics, final CreateTopicsOptions options)
  2. 删除主题:
    • deleteTopics(final Collection topicNames, DeleteTopicsOptions options)
  3. 列出所有主题:
    • listTopics(final ListTopicsOptions options)
  4. 查询主题:
    • describeTopics(final Collection topicNames, DescribeTopicsOptions options)
  5. 查询集群信息:
    • describeCluster(DescribeClusterOptions options)
  6. 查询配置信息:
    • describeConfigs(Collection configResources, final DescribeConfigsOptions options)
  7. 修改配置信息:
    • alterConfigs(Map configs, final AlterConfigsOptions options)
  8. 修改副本的日志目录:
    • alterReplicaLogDirs(Map replicaAssignment, final AlterReplicaLogDirsOptions options)
  9. 查询节点的日志目录信息:
    • describeLogDirs(Collection brokers, DescribeLogDirsOptions options)
  10. 查询副本的日志目录信息:
    • describeReplicaLogDirs(Collection replicas, DescribeReplicaLogDirsOptions options)
  11. 增加分区:
    • createPartitions(Map newPartitions, final CreatePartitionsOptions options)

其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。

用到的参数:

属性 说明 重要性
bootstrap.servers 向Kafka集群建立初始连接用到的host/port列表。
客户端会使用这里列出的所有服务器进行集群其他服务器的发现,而不管是否指定了哪个服务器用作引导。
这个列表仅影响用来发现集群所有服务器的初始主机。
字符串形式:host1:port1,host2:port2,…
由于这组服务器仅用于建立初始链接,然后发现集群中的所有服务器,因此没有必要将集群中的所有地址写在这里。
一般最好两台,以防其中一台宕掉。
high
client.id 生产者发送请求的时候传递给broker的id字符串。
用于在broker的请求日志中追踪什么应用发送了什么消息。
一般该id是跟业务有关的字符串。
medium
connections.max.idle.ms 当连接空闲时间达到这个值,就关闭连接。
long型数据,默认:300000
medium
receive.buffer.bytes TCP接收缓存(SO_RCVBUF),如果设置为-1,则使用操作系统默认的值。
int类型值,默认65536,可选值:[-1,…]
medium
request.timeout.ms 客户端等待服务端响应的最大时间。如果该时间超时,则客户端要么重新发起请求,要么如果重试耗尽,请求失败。
int类型值,默认:120000
medium
security.protocol 跟broker通信的协议:PLAINTEXT, SSL,SASL_PLAINTEXT, SASL_SSL.
string类型值,默认:PLAINTEXT
medium
send.buffer.bytes 用于TCP发送数据时使用的缓冲大小(SO_SNDBUF),-1表示使用OS默认的缓冲区大小。
int类型值,默认值:131072
medium
reconnect.backoff.max.ms 对于每个连续的连接失败,每台主机的退避将成倍增加,直至达到此最大值。在计算退避增量之后,添加20%的随机抖动以避免连接风暴。
long型值,默认1000,可选值:[0,…]
low
reconnect.backoff.ms 重新连接主机的等待时间。避免了重连的密集循环。该等待时间应用于该客户端到broker的所有连接。
long型值,默认:50
low
retries The maximum number of times to retry a call before failing it.重试的次数,达到此值,失败。
int类型值,默认5。
low
retry.backoff.ms 在发生失败的时候如果需要重试,则该配置表示客户端等待多长时间再发起重试。
该时间的存在避免了密集循环。
long型值,默认值:100。
low

主要操作步骤:
客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
客户端发送请求至Kafka Broker。
Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。

客户端接收相应的回执并进行解析处理。

和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和响应类的两个父类。
综上,如果要自定义实现一个功能,只需要三个步骤:

  1. 自定义XXXOptions;
  2. 自定义XXXResult返回值;
  3. 自定义Call,然后挑选合适的XXXRequest和XXXResponse来实现Call类中的3个抽象方法。 ```java package com.lagou.kafka.demo;

import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.junit.After; import org.junit.Before; import org.junit.Test;

import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Consumer;

public class MyAdminClient { private KafkaAdminClient client;

@Before
public void before() {
    Map<String, Object> conf = new HashMap<>();
    conf.put("bootstrap.servers", "node1:9092");
    conf.put("client.id", "adminclient-1");
    client = (KafkaAdminClient) KafkaAdminClient.create(conf);
}

@After
public void after() {
    client.close();
}

@Test
public void testListTopics1() throws ExecutionException, InterruptedException {
    ListTopicsResult listTopicsResult = client.listTopics();
    // KafkaFuture<Collection<TopicListing>> listings = listTopicsResult.listings();
    // Collection<TopicListing> topicListings = listings.get();
    //
    // topicListings.forEach(new Consumer<TopicListing>() {
    // @Override
    // public void accept(TopicListing topicListing) {
    // boolean internal = topicListing.isInternal();
    // String name = topicListing.name();
    // String s = topicListing.toString();
    // System.out.println(s + "\t" + name + "\t" + internal);
    // }
    // });
    // KafkaFuture<Set<String>> names = listTopicsResult.names();
    // Set<String> strings = names.get();
    //
    // strings.forEach(name -> {
    // System.out.println(name);
    // });
    // KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings();
    // Map<String, TopicListing> stringTopicListingMap = mapKafkaFuture.get();
    //
    // stringTopicListingMap.forEach((k, v) -> {
    // System.out.println(k + "\t" + v);
    // });
    ListTopicsOptions options = new ListTopicsOptions();
    options.listInternal(false);
    options.timeoutMs(500);
    ListTopicsResult listTopicsResult1 = client.listTopics(options);
    Map<String, TopicListing> stringTopicListingMap = listTopicsResult1.namesToListings().get();
    stringTopicListingMap.forEach((k, v) -> {
        System.out.println(k + "\t" + v);
    });
    // 关闭管理客户端 client.close();


}

@Test
public void testCreateTopic() throws ExecutionException, InterruptedException {
    Map<String, String> configs = new HashMap<>();
    configs.put("max.message.bytes", "1048576");
    configs.put("segment.bytes", "1048576000");
    NewTopic newTopic = new NewTopic("adm_tp_01", 2, (short) 1);
    newTopic.configs(configs);
    CreateTopicsResult topics = client.createTopics(Collections.singleton(newTopic));
    KafkaFuture<Void> all = topics.all();
    Void aVoid = all.get();
    System.out.println(aVoid);
}

@Test
public void testDeleteTopic() throws ExecutionException, InterruptedException {
    DeleteTopicsOptions options = new DeleteTopicsOptions();
    options.timeoutMs(500);
    DeleteTopicsResult deleteResult = client.deleteTopics(Collections.singleton("adm_tp_01"), options);
    deleteResult.all().get();
}

@Test
public void testAlterTopic() throws ExecutionException, InterruptedException {
    NewPartitions newPartitions = NewPartitions.increaseTo(5);
    Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
    newPartitionsMap.put("adm_tp_01", newPartitions);
    CreatePartitionsOptions option = new CreatePartitionsOptions();
    // Set to true if the request should be validated without creating new partitions.
    // 如果只是验证,而不创建分区,则设置为true
    // option.validateOnly(true);
    CreatePartitionsResult partitionsResult = client.createPartitions(newPartitionsMap, option);
    Void aVoid = partitionsResult.all().get();
}

@Test
public void testDescribeTopics() throws ExecutionException, InterruptedException {
    DescribeTopicsOptions options = new DescribeTopicsOptions();
    options.timeoutMs(3000);
    DescribeTopicsResult topicsResult = client.describeTopics(Collections.singleton("adm_tp_01"), options);
    Map<String, TopicDescription> stringTopicDescriptionMap = topicsResult.all().get();
    stringTopicDescriptionMap.forEach((k, v) -> {
        System.out.println(k + "\t" + v);
        System.out.println("=======================================");
        System.out.println(k);
        boolean internal = v.isInternal();
        String name = v.name();
        List<TopicPartitionInfo> partitions = v.partitions();
        String partitionStr = Arrays.toString(partitions.toArray());
        System.out.println("内部的?" + internal);
        System.out.println("topic name = " + name);
        System.out.println("分区:" + partitionStr);
        partitions.forEach(partition -> {
            System.out.println(partition);
        });
    });
}

@Test
public void testDescribeCluster() throws ExecutionException, InterruptedException {
    DescribeClusterResult describeClusterResult = client.describeCluster();
    KafkaFuture<String> stringKafkaFuture = describeClusterResult.clusterId();
    String s = stringKafkaFuture.get();
    System.out.println("cluster name = " + s);
    KafkaFuture<Node> controller = describeClusterResult.controller();
    Node node = controller.get();
    System.out.println("集群控制器:" + node);
    Collection<Node> nodes = describeClusterResult.nodes().get();
    nodes.forEach(node1 -> {
        System.out.println(node1);
    });
}

@Test
public void testDescribeConfigs() throws ExecutionException, InterruptedException, TimeoutException {
    ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
    DescribeConfigsResult describeConfigsResult = client.describeConfigs(Collections.singleton(configResource));
    Map<ConfigResource, Config> configMap = describeConfigsResult.all().get(15, TimeUnit.SECONDS);
    configMap.forEach(new BiConsumer<ConfigResource, Config>() {
        @Override
        public void accept(ConfigResource configResource, Config config) {
            ConfigResource.Type type = configResource.type();
            String name = configResource.name();
            System.out.println("资源名称:" + name);
            Collection<ConfigEntry> entries = config.entries();
            entries.forEach(new Consumer<ConfigEntry>() {
                @Override
                public void accept(ConfigEntry configEntry) {
                    boolean aDefault = configEntry.isDefault();
                    boolean readOnly = configEntry.isReadOnly();
                    boolean sensitive = configEntry.isSensitive();
                    String name1 = configEntry.name();
                    String value = configEntry.value();
                    System.out.println("是否默认:" + aDefault + "\t是否 只读?" + readOnly + "\t是否敏感?" + sensitive + "\t" + name1 + " --> " + value);
                }
            });
            ConfigEntry retries = config.get("retries");
            if (retries != null) {
                System.out.println(retries.name() + " -->" + retries.value());
            } else {
                System.out.println("没有这个属性");
            }
        }
    });
}

@Test
public void testAlterConfig() throws ExecutionException, InterruptedException {
    // 这里设置后,原来资源中不冲突的属性也会丢失,直接按照这里的配置设置
    Map<ConfigResource, Config> configMap = new HashMap<>();
    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "adm_tp_01");
    Config config = new Config(Collections.singleton(new ConfigEntry("segment.bytes", "1048576000")));
    configMap.put(resource, config);
    AlterConfigsResult alterConfigsResult = client.alterConfigs(configMap);
    Void aVoid = alterConfigsResult.all().get();
}

@Test
public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
    DescribeLogDirsOptions option = new DescribeLogDirsOptions();
    option.timeoutMs(1000);
    DescribeLogDirsResult describeLogDirsResult = client.describeLogDirs(Collections.singleton(0), option);
    Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> integerMapMap = describeLogDirsResult.all().get();
    integerMapMap.forEach(new BiConsumer<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>() {
        @Override
        public void accept(Integer integer, Map<String, DescribeLogDirsResponse.LogDirInfo> stringLogDirInfoMap) {
            System.out.println("broker.id = " + integer);
            stringLogDirInfoMap.forEach(new BiConsumer<String, DescribeLogDirsResponse.LogDirInfo>() {
                @Override
                public void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo) {
                    System.out.println("log.dirs:" + s);
                    // 查看该broker上的主题/分区/偏移量等信息 
                    // logDirInfo.replicaInfos.forEach(new BiConsumer<TopicPartition, DescribeLogDirsResponse.ReplicaInfo>() { 
                    // @Override 
                    // public void accept(TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) { 
                    // int partition = topicPartition.partition(); 
                    // String topic = topicPartition.topic(); 
                    // boolean isFuture = replicaInfo.isFuture; 
                    // long offsetLag = replicaInfo.offsetLag; 
                    // long size = replicaInfo.size; 
                    // System.out.println("partition:" + partition + "\ttopic:" + topic 
                    // + "\tisFuture:" + isFuture 
                    // + "\toffsetLag:" + offsetLag 
                    // + "\tsize:" + size); 
                    // } 
                    // });
                }
            });
        }
    });
}

}


<a name="9lXTm"></a>
### 2.3.6 偏移量管理
Kafka 1.0.2,__consumer_offsets主题中保存各个消费组的偏移量。<br />早期由zookeeper管理消费组的偏移量。<br />**查询方法:**<br />通过原生 kafka 提供的工具脚本进行查询。<br />工具脚本的位置与名称为 bin/kafka-consumer-groups.sh<br />首先运行脚本,查看帮助:

| **参数 ** | **说明** |
| --- | --- |
| **--all-topics** | 将所有关联到指定消费组的主题都划归到 reset-offsets 操作范围。 |
| **--bootstrap-server**<br /> <String: server to connect to> | **必须**:(基于消费组的新的消费者): 要连接的服务器地址。 |
| **--by-duration** <String: duration> | 距离当前时间戳的一个时间段。格式:'PnDTnHnMnS' |
| **--command-config** <br /><String:command config property file> | 指定配置文件,该文件内容传递给Admin Client和消费者。 |
| **--delete** | 传值消费组名称,删除整个消费组与所有主题的各个分区偏移量和所有者关系。<br />如: --group g1 --group g2 。<br />传值消费组名称和单个主题,仅删除该消费组到指定主题的分区偏移量和所属关系。<br />如: --group g1 --group g2 --topic t1 。<br />传值一个主题名称,仅删除指定主题与所有消费组分区偏移量以及所属关系。<br />如: --topic t1<br />注意:消费组的删除仅对基于ZK保存偏移量的消费组有效,并且要小心使用,仅删除不活跃的消费组。 |
| **--describe ** | 描述给定消费组的偏移量差距(有多少消息还没有消费) |
| **--execute ** | 执行操作。支持的操作: reset-offsets 。  |
| **--export** | 导出操作的结果到CSV文件。支持的操作: reset-offsets 。  |
| **--from-file**<br /> <String: path to CSV file> | 重置偏移量到CSV文件中定义的值。 |
| **--group** <br /><String: consumer group> | 目标消费组。 |
| **--list**  | 列出所有消费组。 |
| **--new-consumer** | 使用新的消费者实现。这是默认值。随后的发行版中会删除这一操作。 |
| **--reset-offsets** | 重置消费组的偏移量。当前一次操作只支持一个消费组,并且该消费组应该是不活跃的。<br />有三个操作选项<br />1. (默认)plan:要重置哪个偏移量。<br />2. execute:执行 reset-offsets 操作。<br />3. process:配合 --export 将操作结果导出到CSV格式。<br />可以使用如下选项:<br />--to-datetime <br />--by-period <br />--to-earliest <br />--to-latest <br />--shift-by <br />--from-file <br />--to-current 。<br />必须选择一个选项使用。<br />要定义操作的范围,使用:<br />--all-topics <br />--topic 。<br />必须选择一个,除非使用 --from-file 选项。 |
| **--shift-by**<br /> <Long: number-of-offsets> | 重置偏移量n个。n可以是正值,也可以是负值。 |
| **--timeout** <br /><Long: timeout (ms)> | 对某些操作设置超时时间。<br />如:对于描述指定消费组信息,指定毫秒值的最大等待时间,以获取正常数据(如刚创建的消费组,或者消费组做了一些更改操作)。<br />默认时间: 5000 。 |
| **--to-current ** | 重置到当前的偏移量。 |
| **--to-datetime**<br /><String:datetime> | 重置偏移量到指定的时间戳。格式:'YYYY-MM<br />DDTHH:mm:SS.sss |
| **--to-earliest** | 重置为最早的偏移量 |
| **--to-latest ** | 重置为最新的偏移量 |
| **--to-offset** <br /><Long:offset> | 重置到指定的偏移量。 |
| **--topic **<String: topic> | 指定哪个主题的消费组需要删除,或者指定哪个主题的消费组需要包含到 reset-offsets 操作中。对于 reset-offsets 操作,还可以指定分区: topic1:0,1,2 。其中0,1,2表示要包含到操作中的分区号。重置偏移量的操作支持多个主题一起操作。 |
| **--zookeeper** <String: urls>  | **必须**,它的值,你懂的。 --zookeeper node1:2181/myKafka 。 |

这里我们先编写一个生产者,消费者的例子:<br />我们先启动消费者,再启动生产者, 再通过 bin/kafka-consumer-groups.sh 进行消费偏移量查询,

由于kafka 消费者记录group的消费偏移量有两种方式 : <br />1)kafka 自维护 (新)<br />2)zookpeer 维护 (旧) ,已经逐渐被废弃<br />所以 ,脚本只查看由broker维护的,由zookeeper维护的可以将** --bootstrap-server **换成 **-- zookeeper **即可。<br /> <br />1**. ****查看有那些**** group ID ****正在进行消费:**
```java
[root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
Note: This will not show information about old Zookeeper-based consumers. 

group

image.png

注意:

  1. 这里面是没有指定** topic,查看的是所有topic消费者的 group.id **的列表。
  2. 注意: 重名的** group.id **只会显示一次

2.**查看指定group.id 的消费者消费情况**

[root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe
 --group group

image.png