主题管理
主题增删改查
kafka-topics脚本
-- 创建主题bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1-- 查看主题列表bin/kafka-topics.sh --bootstrap-server broker_host:port --list-- 查看单个主题详情(describe 命令不指定具体的主题名称,Kafka 默认会返回所有“可见”主题)bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>-- 修改主题分区(Kafka只允许增加, 减少会抛InvalidPartitionsException)bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>-- 删除主题(操作为异步)bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
kafka-configs脚本
-- 修改主题级别参数, 如下修改max.message.bytesbin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
内部主题管理
kafka-console-consumer脚本
-- 查看消费者组提交的位移数据bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning-- 读取主题消息,查看消费者组的状态信息bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
常见错误
主题删除失败
最常见的原因有两个:副本所在的 Broker 宕机了;待删除主题的部分分区依然在执行迁移过程。
如果是因为前者,通常你重启对应的 Broker 之后,删除操作就能自动恢复;如果是因为后者,那就麻烦了,很可能两个操作会相互干扰。
不管什么原因,一旦你碰到主题无法删除的问题,可以采用这样的方法:
第 1 步,手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。
第 2 步,手动删除该主题在磁盘上的分区目录。
第 3 步,在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。
在执行最后一步时,你一定要谨慎,因为它可能造成大面积的分区 Leader 重选举。事实上,仅仅执行前两步也是可以的,只是 Controller 缓存中没有清空待删除主题罢了,也不影响使用。
__consumer_offsets 占用太多的磁盘
一旦你发现这个主题消耗了过多的磁盘空间,那么,你一定要显式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。倘若真是这个原因导致的,那我们就只能重启相应的 Broker 了。另外,请你注意保留出错日志,因为这通常都是 Bug 导致的,最好提交到社区看一下。
Kafka动态配置
动态Broker配置
自1.1.0版本中引入了动态Broker参数, 修改后无需重启Broker即可立即生效, 而Kafka安装目录的Config/server.properties文件中的配置则称为静态参数.
官网: https://kafka.apache.org/documentation/#brokerconfigs
Broker Configs表中增加了 Dynamic Update Mode 列。该列有 3 类值,分别是 read-only、per-broker 和 cluster-wide。
- read-only。被标记为 read-only 的参数和原来的参数行为一样,只有重启 Broker,才能令修改生效。
 - per-broker。被标记为 per-broker 的参数属于动态参数,修改它之后,只会在对应的 Broker 上生效。
 - cluster-wide。被标记为 cluster-wide 的参数也属于动态参数,修改它之后,会在整个集群范围内生效,也就是说,对所有 Broker 都生效。你也可以为具体的 Broker 修改 cluster-wide 参数。
 
参数优先级: per-broker 参数 > cluster-wide 参数 > static 参数 > Kafka 默认值
常用场景
- 动态调整 Broker 端各种线程池大小,实时应对突发流量。
 - 动态调整 Broker 端连接信息或安全配置信息。
 - 动态更新 SSL Keystore 有效期。
 - 动态调整 Broker 端 Compact 操作性能。
 - 实时变更 JMX 指标收集器 (JMX Metrics Reporter)。
 
内部保存机制
Kafka将动态Broker配置保存在Zookeeper的持久化znode节点中, zk重启也依然生效
如上图:
- changes 是用来实时监测动态参数变更的,不会保存参数值;
 - topics 是用来保存 Kafka 主题级别参数的。
 - users 和 clients 则是用于动态调整客户端配额(Quota)的 znode 节点。所谓配额,是指 Kafka 运维人员限制连入集群的客户端的吞吐量或者是限定它们使用的 CPU 资源。
 - /config/brokers znode 才是真正保存动态 Broker 参数的地方。
- < default >,保存的是前面说过的 cluster-wide 范围的动态参数
 - broker.id 为名,保存的是特定 Broker 的 per-broker 范围参数, 可能存在多个
 
 
示例:
使用示例
-- cluster-wide演示-- 注意: 设置 cluster-wide 范围的动态参数,需要显式指定 entity-default-- 更改unclean.leader.election.enable为true$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --add-config unclean.leader.election.enable=trueCompleted updating default config for brokers in the cluster,-- 查看是否设置成功$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --describeDefault config for brokers in the cluster are:unclean.leader.election.enable=true sensitive=false synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true}# 删除cluster-wide范围参数$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --delete-config unclean.leader.election.enableCompleted updating default config for brokers in the cluster,
-- per-broker演示-- 更改ID为1的broker的unclean.leader.election.enable为false$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=falseCompleted updating config for broker: 1.-- 查看是否设置成功$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --describeConfigs for broker 1 are:unclean.leader.election.enable=false sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false, DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true, DEFAULT_CONFIG:unclean.leader.election.enable=false}# 删除per-broker范围参数$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --delete-config unclean.leader.election.enableCompleted updating config for broker: 1.
常用动态配置
- log.retention.ms :日志留存时间
 - num.io.threads 和 num.network.threads: Broker线程池线程数量
 - SSL相关
- ssl.keystore.type
 - ssl.keystore.location
 - ssl.keystore.password
 - ssl.key.password
 
 - num.replica.fetchers: follwer副本向Broker拉取消息的线程数, 增加以解决同步慢的问题
 
如何重设消费者组位移?
Kafka基于日志结构, 消费消息并不会将消息从磁盘删除, 因此可通过修改消费者位移, 实现重复消费历史数据的功能
重设位移策略

重设方式
消费者 API
void seek(TopicPartition partition, long offset);void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);void seekToBeginning(Collection<TopicPartition> partitions);void seekToEnd(Collection<TopicPartition> partitions);补充:OffsetAndMetadata 类是一个封装了 Long 型的位移和自定义元数据的复合类,只是一般情况下,自定义元数据为空,因此你基本上可以认为这个类表征的主要是消息的位移值
Earliest 策略的实现示例, 注意:
- 你要创建的消费者程序,要禁止自动提交位移。
 - 组 ID 要设置成你要重设的消费者组的组 ID。
 - 调用 seekToBeginning 方法时,需要一次性构造主题的所有分区对象。
 - 最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0)), poll(0)是为了拿元数据 ```java
 
Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
String topic = “test”;  // 要重设位移的Kafka主题 
try (final KafkaConsumer
  new TopicPartition(topic, partitionInfo.partition()))
  .collect(Collectors.toList()));
} 
Latest 策略示例```javaconsumer.seekToEnd(consumer.partitionsFor(topic).stream().map(partitionInfo ->new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));
Current 策略示例,需借助 KafkaConsumer 的 committed 方法来获取当前提交的最新位移
consumer.partitionsFor(topic).stream().map(info ->new TopicPartition(topic, info.partition())).forEach(tp -> {long committedOffset = consumer.committed(tp).offset();consumer.seek(tp, committedOffset);});
Specified-Offset 策略示例
long targetOffset = 1234L;for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp = new TopicPartition(topic, info.partition());consumer.seek(tp, targetOffset);}
Shift-By-N 策略
for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp = new TopicPartition(topic, info.partition());// 假设向前跳123条消息long targetOffset = consumer.committed(tp).offset() + 123L;consumer.seek(tp, targetOffset);}
DateTime 策略示例
-- 假设重设位移到 2019 年 6 月 20 日晚上 8 点long ts = LocalDateTime.of(2019, 6, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();Map<TopicPartition, Long> timeToSearch =consumer.partitionsFor(topic).stream().map(info ->new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp -> ts));for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset());}
Duration 策略
-- 假设将位移调回 30 分钟前Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000 * 60));for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset());}
kafka-consumer-groups 脚本
0.11版本引入, 0.11之前只能用API
-- Earliest 策略直接指定 --to-earliest。bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute-- Latest 策略直接指定 --to-latest。bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute-- Current 策略直接指定 --to-current。bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute-- Specified-Offset 策略直接指定 --to-offset。bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute-- Shift-By-N 策略直接指定 --shift-by N。bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute-- DateTime 策略直接指定 --to-datetime。bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute-- 实现 Duration 策略,直接指定 --by-duration。bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute
常见工具脚本
2.2版本提供了30多个Shell脚本
(1):connect-standalone:支持kafka Connect组件支持单节点Standalone模式
(2):connect-distributed:支持多节点的Distributed模式。
(3):kafka-acls:用于设置Kafka权限,如设置哪些用户可以访问哪些主题之类的权限。
(4):kafka-broker-api-versions:主要目的是验证不同Kafka版本之间服务器和客户端的适配性。
(5):kafka-configs:用于配置管理
(6):kafka-console-consumer:
(7):kafka-console-producer:
(8):kafka-producer-perf-test和kafka-consumer-perf-test :用于生产者和消费者的性能测试
(9):kafka-consumer-groups:消费者位移时多有涉及
(10):kafka-delegation-tokens:管理Delegation Token的,基于Delegation Token的认证是一种轻量级的认证机制,补充了现有的SASL认证机制。
(11):kafka-delete-records:用于删除Kafka的分区消息。
(12):kafka-dump-log:能够查看kafka消息文件的内容,包括消息的各种元数据信息
(13):kafka-log-dirs:可以帮助查询各个Broker上的各个日志路径的磁盘占用情况
(14):kafka-mirror-maker:可以帮助实现kafka集群间消息同步
(15):kafka-preferred-replica-election:执行Preferred Leader选举。他可以为指定的主题执行“换Leader”的操作。
(16):kafka-reassign-partitions:用于执行分区副本迁移以及副本文件路径迁移
(17):kafka-topics:所有主题管理操作,都是有该脚本来实现。
(18):kafka-run-class:可以用这个脚本执行任何带main方法的Kafka类。
(19):kafka-server-start和kafka-server-stop:启动和停止Kafka Broker进程
(20):kafka-streams-application-reset:用来给kafka-Streams应用程序重试位移,以便重新消费数据。
(21):kafka-verifiabel-producer和kafka-verifiable-consumer是用来测试生产者和消费者功能的。
(22):zookeeper开头的脚本是用来管理和运维Zookeeper的。
重点脚本操作
生产/消费消息
# 生产消息, 示例生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法$ bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --request-required-acks -1 --producer-property compression.type=lz4># 消费消息1. 最好指定group, 不然每次都自动生成console-cosumer开头的消费者2. from-beginning表示从头开始消费主题, 不指定则默认从最新位移读取消息3. 最好禁掉自动提交唯一, 因为测试情况下提交无意义$ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property enable.auto.commit=false
性能测试
# 测试生产者性能, 示例命令向指定主题发送了 1 千万条消息,每条消息大小是 1KB# 命令结果会打印测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时, 一般关注99th分位的延时即可$ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz42175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.# 测试消费者性能$ bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topicstart.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012
查看消息
# 查看主题消息数# 查分区当前最早位移与最新位移, 然后将两者差值累加, 得到该主题当前总的消息数$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topictest-topic:0:0test-topic:1:0$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -1 --topic test-topictest-topic:0:5500000test-topic:1:5500000# 查看消息文件数据$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.logDumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.logStarting offset: 0baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: truebaseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true......# 查看每条具体的消息, 需显示指定--deep-iteration参数$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iterationDumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.logStarting offset: 0baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true| offset: 0 CreateTime: 1561597044911 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 1 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 2 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 3 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 4 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 5 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 6 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 7 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 8 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 9 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 10 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 11 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 12 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 13 CreateTime: 1561597044933 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []| offset: 14 CreateTime: 1561597044933 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true......# 若想查看消息里面实际数据, 还需指定--print-data-log 参数$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration --print-data-log
查看消费者位移
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --describe --group test-group

