主题管理
主题增删改查
kafka-topics脚本
-- 创建主题
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
-- 查看主题列表
bin/kafka-topics.sh --bootstrap-server broker_host:port --list
-- 查看单个主题详情(describe 命令不指定具体的主题名称,Kafka 默认会返回所有“可见”主题)
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>
-- 修改主题分区(Kafka只允许增加, 减少会抛InvalidPartitionsException)
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>
-- 删除主题(操作为异步)
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
kafka-configs脚本
-- 修改主题级别参数, 如下修改max.message.bytes
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
内部主题管理
kafka-console-consumer脚本
-- 查看消费者组提交的位移数据
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
-- 读取主题消息,查看消费者组的状态信息
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
常见错误
主题删除失败
最常见的原因有两个:副本所在的 Broker 宕机了;待删除主题的部分分区依然在执行迁移过程。
如果是因为前者,通常你重启对应的 Broker 之后,删除操作就能自动恢复;如果是因为后者,那就麻烦了,很可能两个操作会相互干扰。
不管什么原因,一旦你碰到主题无法删除的问题,可以采用这样的方法:
第 1 步,手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。
第 2 步,手动删除该主题在磁盘上的分区目录。
第 3 步,在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。
在执行最后一步时,你一定要谨慎,因为它可能造成大面积的分区 Leader 重选举。事实上,仅仅执行前两步也是可以的,只是 Controller 缓存中没有清空待删除主题罢了,也不影响使用。
__consumer_offsets 占用太多的磁盘
一旦你发现这个主题消耗了过多的磁盘空间,那么,你一定要显式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。倘若真是这个原因导致的,那我们就只能重启相应的 Broker 了。另外,请你注意保留出错日志,因为这通常都是 Bug 导致的,最好提交到社区看一下。
Kafka动态配置
动态Broker配置
自1.1.0版本中引入了动态Broker参数, 修改后无需重启Broker即可立即生效, 而Kafka安装目录的Config/server.properties文件中的配置则称为静态参数.
官网: https://kafka.apache.org/documentation/#brokerconfigs
Broker Configs表中增加了 Dynamic Update Mode 列。该列有 3 类值,分别是 read-only、per-broker 和 cluster-wide。
- read-only。被标记为 read-only 的参数和原来的参数行为一样,只有重启 Broker,才能令修改生效。
- per-broker。被标记为 per-broker 的参数属于动态参数,修改它之后,只会在对应的 Broker 上生效。
- cluster-wide。被标记为 cluster-wide 的参数也属于动态参数,修改它之后,会在整个集群范围内生效,也就是说,对所有 Broker 都生效。你也可以为具体的 Broker 修改 cluster-wide 参数。
参数优先级: per-broker 参数 > cluster-wide 参数 > static 参数 > Kafka 默认值
常用场景
- 动态调整 Broker 端各种线程池大小,实时应对突发流量。
- 动态调整 Broker 端连接信息或安全配置信息。
- 动态更新 SSL Keystore 有效期。
- 动态调整 Broker 端 Compact 操作性能。
- 实时变更 JMX 指标收集器 (JMX Metrics Reporter)。
内部保存机制
Kafka将动态Broker配置保存在Zookeeper的持久化znode节点中, zk重启也依然生效
如上图:
- changes 是用来实时监测动态参数变更的,不会保存参数值;
- topics 是用来保存 Kafka 主题级别参数的。
- users 和 clients 则是用于动态调整客户端配额(Quota)的 znode 节点。所谓配额,是指 Kafka 运维人员限制连入集群的客户端的吞吐量或者是限定它们使用的 CPU 资源。
- /config/brokers znode 才是真正保存动态 Broker 参数的地方。
- < default >,保存的是前面说过的 cluster-wide 范围的动态参数
- broker.id 为名,保存的是特定 Broker 的 per-broker 范围参数, 可能存在多个
示例:
使用示例
-- cluster-wide演示
-- 注意: 设置 cluster-wide 范围的动态参数,需要显式指定 entity-default
-- 更改unclean.leader.election.enable为true
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --add-config unclean.leader.election.enable=true
Completed updating default config for brokers in the cluster,
-- 查看是否设置成功
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --describe
Default config for brokers in the cluster are:
unclean.leader.election.enable=true sensitive=false synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true}
# 删除cluster-wide范围参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --delete-config unclean.leader.election.enable
Completed updating default config for brokers in the cluster,
-- per-broker演示
-- 更改ID为1的broker的unclean.leader.election.enable为false
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false
Completed updating config for broker: 1.
-- 查看是否设置成功
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --describe
Configs for broker 1 are:
unclean.leader.election.enable=false sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false, DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true, DEFAULT_CONFIG:unclean.leader.election.enable=false}
# 删除per-broker范围参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --delete-config unclean.leader.election.enable
Completed updating config for broker: 1.
常用动态配置
- log.retention.ms :日志留存时间
- num.io.threads 和 num.network.threads: Broker线程池线程数量
- SSL相关
- ssl.keystore.type
- ssl.keystore.location
- ssl.keystore.password
- ssl.key.password
- num.replica.fetchers: follwer副本向Broker拉取消息的线程数, 增加以解决同步慢的问题
如何重设消费者组位移?
Kafka基于日志结构, 消费消息并不会将消息从磁盘删除, 因此可通过修改消费者位移, 实现重复消费历史数据的功能
重设位移策略
重设方式
消费者 API
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);
补充:
OffsetAndMetadata 类是一个封装了 Long 型的位移和自定义元数据的复合类,
只是一般情况下,自定义元数据为空,因此你基本上可以认为这个类表征的主要是消息的位移值
Earliest 策略的实现示例, 注意:
- 你要创建的消费者程序,要禁止自动提交位移。
- 组 ID 要设置成你要重设的消费者组的组 ID。
- 调用 seekToBeginning 方法时,需要一次性构造主题的所有分区对象。
- 最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0)), poll(0)是为了拿元数据 ```java
Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
String topic = “test”; // 要重设位移的Kafka主题
try (final KafkaConsumer
new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
}
Latest 策略示例
```java
consumer.seekToEnd(
consumer.partitionsFor(topic).stream().map(partitionInfo ->
new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
Current 策略示例,需借助 KafkaConsumer 的 committed 方法来获取当前提交的最新位移
consumer.partitionsFor(topic).stream().map(info ->
new TopicPartition(topic, info.partition()))
.forEach(tp -> {
long committedOffset = consumer.committed(tp).offset();
consumer.seek(tp, committedOffset);
});
Specified-Offset 策略示例
long targetOffset = 1234L;
for (PartitionInfo info : consumer.partitionsFor(topic)) {
TopicPartition tp = new TopicPartition(topic, info.partition());
consumer.seek(tp, targetOffset);
}
Shift-By-N 策略
for (PartitionInfo info : consumer.partitionsFor(topic)) {
TopicPartition tp = new TopicPartition(topic, info.partition());
// 假设向前跳123条消息
long targetOffset = consumer.committed(tp).offset() + 123L;
consumer.seek(tp, targetOffset);
}
DateTime 策略示例
-- 假设重设位移到 2019 年 6 月 20 日晚上 8 点
long ts = LocalDateTime.of(
2019, 6, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
Map<TopicPartition, Long> timeToSearch =
consumer.partitionsFor(topic).stream().map(info ->
new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -> ts));
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
Duration 策略
-- 假设将位移调回 30 分钟前
Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream()
.map(info -> new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000 * 60));
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
kafka-consumer-groups 脚本
0.11版本引入, 0.11之前只能用API
-- Earliest 策略直接指定 --to-earliest。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
-- Latest 策略直接指定 --to-latest。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
-- Current 策略直接指定 --to-current。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
-- Specified-Offset 策略直接指定 --to-offset。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
-- Shift-By-N 策略直接指定 --shift-by N。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute
-- DateTime 策略直接指定 --to-datetime。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
-- 实现 Duration 策略,直接指定 --by-duration。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute
常见工具脚本
2.2版本提供了30多个Shell脚本
(1):connect-standalone:支持kafka Connect组件支持单节点Standalone模式
(2):connect-distributed:支持多节点的Distributed模式。
(3):kafka-acls:用于设置Kafka权限,如设置哪些用户可以访问哪些主题之类的权限。
(4):kafka-broker-api-versions:主要目的是验证不同Kafka版本之间服务器和客户端的适配性。
(5):kafka-configs:用于配置管理
(6):kafka-console-consumer:
(7):kafka-console-producer:
(8):kafka-producer-perf-test和kafka-consumer-perf-test :用于生产者和消费者的性能测试
(9):kafka-consumer-groups:消费者位移时多有涉及
(10):kafka-delegation-tokens:管理Delegation Token的,基于Delegation Token的认证是一种轻量级的认证机制,补充了现有的SASL认证机制。
(11):kafka-delete-records:用于删除Kafka的分区消息。
(12):kafka-dump-log:能够查看kafka消息文件的内容,包括消息的各种元数据信息
(13):kafka-log-dirs:可以帮助查询各个Broker上的各个日志路径的磁盘占用情况
(14):kafka-mirror-maker:可以帮助实现kafka集群间消息同步
(15):kafka-preferred-replica-election:执行Preferred Leader选举。他可以为指定的主题执行“换Leader”的操作。
(16):kafka-reassign-partitions:用于执行分区副本迁移以及副本文件路径迁移
(17):kafka-topics:所有主题管理操作,都是有该脚本来实现。
(18):kafka-run-class:可以用这个脚本执行任何带main方法的Kafka类。
(19):kafka-server-start和kafka-server-stop:启动和停止Kafka Broker进程
(20):kafka-streams-application-reset:用来给kafka-Streams应用程序重试位移,以便重新消费数据。
(21):kafka-verifiabel-producer和kafka-verifiable-consumer是用来测试生产者和消费者功能的。
(22):zookeeper开头的脚本是用来管理和运维Zookeeper的。
重点脚本操作
生产/消费消息
# 生产消息, 示例生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法
$ bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --request-required-acks -1 --producer-property compression.type=lz4
>
# 消费消息
1. 最好指定group, 不然每次都自动生成console-cosumer开头的消费者
2. from-beginning表示从头开始消费主题, 不指定则默认从最新位移读取消息
3. 最好禁掉自动提交唯一, 因为测试情况下提交无意义
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property enable.auto.commit=false
性能测试
# 测试生产者性能, 示例命令向指定主题发送了 1 千万条消息,每条消息大小是 1KB
# 命令结果会打印测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时, 一般关注99th分位的延时即可
$ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4
2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.
# 测试消费者性能
$ bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012
查看消息
# 查看主题消息数
# 查分区当前最早位移与最新位移, 然后将两者差值累加, 得到该主题当前总的消息数
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topic
test-topic:0:0
test-topic:1:0
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -1 --topic test-topic
test-topic:0:5500000
test-topic:1:5500000
# 查看消息文件数据
$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true
baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true
......
# 查看每条具体的消息, 需显示指定--deep-iteration参数
$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration
Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true
| offset: 0 CreateTime: 1561597044911 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 1 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 2 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 3 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 4 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 5 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 6 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 7 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 8 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 9 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 10 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 11 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 12 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 13 CreateTime: 1561597044933 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 14 CreateTime: 1561597044933 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true
......
# 若想查看消息里面实际数据, 还需指定--print-data-log 参数
$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration --print-data-log
查看消费者位移
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --describe --group test-group