主题管理

主题增删改查

kafka-topics脚本

  1. -- 创建主题
  2. bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
  3. -- 查看主题列表
  4. bin/kafka-topics.sh --bootstrap-server broker_host:port --list
  5. -- 查看单个主题详情(describe 命令不指定具体的主题名称,Kafka 默认会返回所有“可见”主题)
  6. bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>
  7. -- 修改主题分区(Kafka只允许增加, 减少会抛InvalidPartitionsException)
  8. bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>
  9. -- 删除主题(操作为异步)
  10. bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>

kafka-configs脚本

  1. -- 修改主题级别参数, 如下修改max.message.bytes
  2. bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760

内部主题管理

kafka-console-consumer脚本

  1. -- 查看消费者组提交的位移数据
  2. bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
  3. -- 读取主题消息,查看消费者组的状态信息
  4. bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning

常见错误

主题删除失败

最常见的原因有两个:副本所在的 Broker 宕机了;待删除主题的部分分区依然在执行迁移过程。
如果是因为前者,通常你重启对应的 Broker 之后,删除操作就能自动恢复;如果是因为后者,那就麻烦了,很可能两个操作会相互干扰。

不管什么原因,一旦你碰到主题无法删除的问题,可以采用这样的方法:
第 1 步,手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。
第 2 步,手动删除该主题在磁盘上的分区目录。
第 3 步,在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。
在执行最后一步时,你一定要谨慎,因为它可能造成大面积的分区 Leader 重选举。事实上,仅仅执行前两步也是可以的,只是 Controller 缓存中没有清空待删除主题罢了,也不影响使用。

__consumer_offsets 占用太多的磁盘

一旦你发现这个主题消耗了过多的磁盘空间,那么,你一定要显式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。倘若真是这个原因导致的,那我们就只能重启相应的 Broker 了。另外,请你注意保留出错日志,因为这通常都是 Bug 导致的,最好提交到社区看一下。

Kafka动态配置

动态Broker配置

自1.1.0版本中引入了动态Broker参数, 修改后无需重启Broker即可立即生效, 而Kafka安装目录的Config/server.properties文件中的配置则称为静态参数.

官网: https://kafka.apache.org/documentation/#brokerconfigs
Broker Configs表中增加了 Dynamic Update Mode 列。该列有 3 类值,分别是 read-only、per-broker 和 cluster-wide。

  • read-only。被标记为 read-only 的参数和原来的参数行为一样,只有重启 Broker,才能令修改生效。
  • per-broker。被标记为 per-broker 的参数属于动态参数,修改它之后,只会在对应的 Broker 上生效。
  • cluster-wide。被标记为 cluster-wide 的参数也属于动态参数,修改它之后,会在整个集群范围内生效,也就是说,对所有 Broker 都生效。你也可以为具体的 Broker 修改 cluster-wide 参数。

参数优先级: per-broker 参数 > cluster-wide 参数 > static 参数 > Kafka 默认值

常用场景

  • 动态调整 Broker 端各种线程池大小,实时应对突发流量。
  • 动态调整 Broker 端连接信息或安全配置信息。
  • 动态更新 SSL Keystore 有效期。
  • 动态调整 Broker 端 Compact 操作性能。
  • 实时变更 JMX 指标收集器 (JMX Metrics Reporter)。

内部保存机制

Kafka将动态Broker配置保存在Zookeeper的持久化znode节点中, zk重启也依然生效
image.png
如上图:

  • changes 是用来实时监测动态参数变更的,不会保存参数值;
  • topics 是用来保存 Kafka 主题级别参数的。
  • users 和 clients 则是用于动态调整客户端配额(Quota)的 znode 节点。所谓配额,是指 Kafka 运维人员限制连入集群的客户端的吞吐量或者是限定它们使用的 CPU 资源。
  • /config/brokers znode 才是真正保存动态 Broker 参数的地方。
    • < default >,保存的是前面说过的 cluster-wide 范围的动态参数
    • broker.id 为名,保存的是特定 Broker 的 per-broker 范围参数, 可能存在多个

示例:
image.png

使用示例

  1. -- cluster-wide演示
  2. -- 注意: 设置 cluster-wide 范围的动态参数,需要显式指定 entity-default
  3. -- 更改unclean.leader.election.enabletrue
  4. $ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --add-config unclean.leader.election.enable=true
  5. Completed updating default config for brokers in the cluster,
  6. -- 查看是否设置成功
  7. $ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --describe
  8. Default config for brokers in the cluster are:
  9. unclean.leader.election.enable=true sensitive=false synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true}
  10. # 删除cluster-wide范围参数
  11. $ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --delete-config unclean.leader.election.enable
  12. Completed updating default config for brokers in the cluster,
  1. -- per-broker演示
  2. -- 更改ID1brokerunclean.leader.election.enablefalse
  3. $ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false
  4. Completed updating config for broker: 1.
  5. -- 查看是否设置成功
  6. $ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --describe
  7. Configs for broker 1 are:
  8. unclean.leader.election.enable=false sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false, DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true, DEFAULT_CONFIG:unclean.leader.election.enable=false}
  9. # 删除per-broker范围参数
  10. $ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --delete-config unclean.leader.election.enable
  11. Completed updating config for broker: 1.

常用动态配置

  • log.retention.ms :日志留存时间
  • num.io.threads 和 num.network.threads: Broker线程池线程数量
  • SSL相关
    • ssl.keystore.type
    • ssl.keystore.location
    • ssl.keystore.password
    • ssl.key.password
  • num.replica.fetchers: follwer副本向Broker拉取消息的线程数, 增加以解决同步慢的问题

如何重设消费者组位移?

Kafka基于日志结构, 消费消息并不会将消息从磁盘删除, 因此可通过修改消费者位移, 实现重复消费历史数据的功能

重设位移策略

image.png

重设方式

消费者 API

  1. void seek(TopicPartition partition, long offset);
  2. void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
  3. void seekToBeginning(Collection<TopicPartition> partitions);
  4. void seekToEnd(Collection<TopicPartition> partitions);
  5. 补充:
  6. OffsetAndMetadata 类是一个封装了 Long 型的位移和自定义元数据的复合类,
  7. 只是一般情况下,自定义元数据为空,因此你基本上可以认为这个类表征的主要是消息的位移值

Earliest 策略的实现示例, 注意:

  1. 你要创建的消费者程序,要禁止自动提交位移。
  2. 组 ID 要设置成你要重设的消费者组的组 ID。
  3. 调用 seekToBeginning 方法时,需要一次性构造主题的所有分区对象。
  4. 最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0)), poll(0)是为了拿元数据 ```java

Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

String topic = “test”; // 要重设位移的Kafka主题 try (final KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties)) { consumer.subscribe(Collections.singleton(topic)); consumer.poll(0); consumer.seekToBeginning( consumer.partitionsFor(topic).stream().map(partitionInfo ->
new TopicPartition(topic, partitionInfo.partition())) .collect(Collectors.toList())); }

  1. Latest 策略示例
  2. ```java
  3. consumer.seekToEnd(
  4. consumer.partitionsFor(topic).stream().map(partitionInfo ->
  5. new TopicPartition(topic, partitionInfo.partition()))
  6. .collect(Collectors.toList()));

Current 策略示例,需借助 KafkaConsumer 的 committed 方法来获取当前提交的最新位移

  1. consumer.partitionsFor(topic).stream().map(info ->
  2. new TopicPartition(topic, info.partition()))
  3. .forEach(tp -> {
  4. long committedOffset = consumer.committed(tp).offset();
  5. consumer.seek(tp, committedOffset);
  6. });

Specified-Offset 策略示例

  1. long targetOffset = 1234L;
  2. for (PartitionInfo info : consumer.partitionsFor(topic)) {
  3. TopicPartition tp = new TopicPartition(topic, info.partition());
  4. consumer.seek(tp, targetOffset);
  5. }

Shift-By-N 策略

  1. for (PartitionInfo info : consumer.partitionsFor(topic)) {
  2. TopicPartition tp = new TopicPartition(topic, info.partition());
  3. // 假设向前跳123条消息
  4. long targetOffset = consumer.committed(tp).offset() + 123L;
  5. consumer.seek(tp, targetOffset);
  6. }

DateTime 策略示例

  1. -- 假设重设位移到 2019 6 20 日晚上 8
  2. long ts = LocalDateTime.of(
  3. 2019, 6, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
  4. Map<TopicPartition, Long> timeToSearch =
  5. consumer.partitionsFor(topic).stream().map(info ->
  6. new TopicPartition(topic, info.partition()))
  7. .collect(Collectors.toMap(Function.identity(), tp -> ts));
  8. for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
  9. consumer.offsetsForTimes(timeToSearch).entrySet()) {
  10. consumer.seek(entry.getKey(), entry.getValue().offset());
  11. }

Duration 策略

  1. -- 假设将位移调回 30 分钟前
  2. Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream()
  3. .map(info -> new TopicPartition(topic, info.partition()))
  4. .collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000 * 60));
  5. for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
  6. consumer.offsetsForTimes(timeToSearch).entrySet()) {
  7. consumer.seek(entry.getKey(), entry.getValue().offset());
  8. }

kafka-consumer-groups 脚本

0.11版本引入, 0.11之前只能用API

  1. -- Earliest 策略直接指定 --to-earliest
  2. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest execute
  3. -- Latest 策略直接指定 --to-latest
  4. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
  5. -- Current 策略直接指定 --to-current
  6. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
  7. -- Specified-Offset 策略直接指定 --to-offset
  8. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
  9. -- Shift-By-N 策略直接指定 --shift-by N
  10. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute
  11. -- DateTime 策略直接指定 --to-datetime
  12. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
  13. -- 实现 Duration 策略,直接指定 --by-duration
  14. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute

常见工具脚本

2.2版本提供了30多个Shell脚本
(1):connect-standalone:支持kafka Connect组件支持单节点Standalone模式
(2):connect-distributed:支持多节点的Distributed模式。

(3):kafka-acls:用于设置Kafka权限,如设置哪些用户可以访问哪些主题之类的权限。
(4):kafka-broker-api-versions:主要目的是验证不同Kafka版本之间服务器和客户端的适配性。
(5):kafka-configs:用于配置管理

(6):kafka-console-consumer:
(7):kafka-console-producer:

(8):kafka-producer-perf-test和kafka-consumer-perf-test :用于生产者和消费者的性能测试

(9):kafka-consumer-groups:消费者位移时多有涉及
(10):kafka-delegation-tokens:管理Delegation Token的,基于Delegation Token的认证是一种轻量级的认证机制,补充了现有的SASL认证机制。

(11):kafka-delete-records:用于删除Kafka的分区消息。
(12):kafka-dump-log:能够查看kafka消息文件的内容,包括消息的各种元数据信息
(13):kafka-log-dirs:可以帮助查询各个Broker上的各个日志路径的磁盘占用情况
(14):kafka-mirror-maker:可以帮助实现kafka集群间消息同步
(15):kafka-preferred-replica-election:执行Preferred Leader选举。他可以为指定的主题执行“换Leader”的操作。
(16):kafka-reassign-partitions:用于执行分区副本迁移以及副本文件路径迁移
(17):kafka-topics:所有主题管理操作,都是有该脚本来实现。
(18):kafka-run-class:可以用这个脚本执行任何带main方法的Kafka类。

(19):kafka-server-start和kafka-server-stop:启动和停止Kafka Broker进程
(20):kafka-streams-application-reset:用来给kafka-Streams应用程序重试位移,以便重新消费数据。

(21):kafka-verifiabel-producer和kafka-verifiable-consumer是用来测试生产者和消费者功能的。

(22):zookeeper开头的脚本是用来管理和运维Zookeeper的。

重点脚本操作

生产/消费消息

  1. # 生产消息, 示例生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法
  2. $ bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --request-required-acks -1 --producer-property compression.type=lz4
  3. >
  4. # 消费消息
  5. 1. 最好指定group, 不然每次都自动生成console-cosumer开头的消费者
  6. 2. from-beginning表示从头开始消费主题, 不指定则默认从最新位移读取消息
  7. 3. 最好禁掉自动提交唯一, 因为测试情况下提交无意义
  8. $ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property enable.auto.commit=false

性能测试

  1. # 测试生产者性能, 示例命令向指定主题发送了 1 千万条消息,每条消息大小是 1KB
  2. # 命令结果会打印测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时, 一般关注99th分位的延时即可
  3. $ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4
  4. 2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
  5. 4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
  6. 10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.
  7. # 测试消费者性能
  8. $ bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic
  9. start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
  10. 2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012

查看消息

  1. # 查看主题消息数
  2. # 查分区当前最早位移与最新位移, 然后将两者差值累加, 得到该主题当前总的消息数
  3. $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topic
  4. test-topic:0:0
  5. test-topic:1:0
  6. $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -1 --topic test-topic
  7. test-topic:0:5500000
  8. test-topic:1:5500000
  9. # 查看消息文件数据
  10. $ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
  11. Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
  12. Starting offset: 0
  13. baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true
  14. baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true
  15. ......
  16. # 查看每条具体的消息, 需显示指定--deep-iteration参数
  17. $ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration
  18. Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
  19. Starting offset: 0
  20. baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true
  21. | offset: 0 CreateTime: 1561597044911 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  22. | offset: 1 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  23. | offset: 2 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  24. | offset: 3 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  25. | offset: 4 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  26. | offset: 5 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  27. | offset: 6 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  28. | offset: 7 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  29. | offset: 8 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  30. | offset: 9 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  31. | offset: 10 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  32. | offset: 11 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  33. | offset: 12 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  34. | offset: 13 CreateTime: 1561597044933 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  35. | offset: 14 CreateTime: 1561597044933 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  36. baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true
  37. ......
  38. # 若想查看消息里面实际数据, 还需指定--print-data-log 参数
  39. $ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration --print-data-log

查看消费者位移

  1. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --describe --group test-group

image.png