快速分类
- connect-standalone 和 connect-distributed
- 这两个脚本是 Kafka Connect 组件的启动脚本
- kafka-acls 脚本
- 用于设置 Kafka 权限的,比如设置哪些用户可以访问 Kafka 的哪些主题之类的权限
- kafka-broker-api-versions 脚本
- 这个脚本的主要目的是验证不同 Kafka 版本之间服务器和客户端的适配性
- 在 0.10.2.0 之前,Kafka 是单向兼容的,即高版本的 Broker 能够处理低版本 Client 发送的请求,反过来则不行。
- 自 0.10.2.0 版本开始,Kafka 正式支持双向兼容,也就是说,低版本的 Broker 也能处理高版本 Client 的请求了
- kafka-configs 脚本
- 参数配置和动态 Broker 参数
- kafka-console-consumer 和 kafka-console-producer
- kafka-producer-perf-test 和 kafka-consumer-perf-test
- 它们分别是生产者和消费者的性能测试工具,非常实用
- kafka-consumer-groups 命令
- 重设消费者组位移时
- kafka-delegation-tokens
- 它是管理 Delegation Token 的
- 基于 Delegation Token 的认证是一种轻量级的认证机制,补充了现有的 SASL 认证机制
- kafka-delete-records 脚本
- 用于删除 Kafka 的分区消息
- 鉴于 Kafka 本身有自己的自动消息删除策略
kafka-dump-log 脚本可谓是非常实用的脚本。它能查看 Kafka 消息文件的内容,包括消息的各种元数据信息,甚至是消息体本身
kafka-log-dirs 脚本是比较新的脚本
- 可以帮助查询各个 Broker 上的各个日志路径的磁盘占用情况
kafka-mirror-maker
- 实现 Kafka 集群间的消息同步的
kafka-preferred-replica-election
- 执行 Preferred Leader 选举的
- 它可以为指定的主题执行“换 Leader”的操作
kafka-reassign-partitions
- 用于执行分区副本迁移以及副本文件路径迁移
kafka-topics
- 所有的主题管理操作
kafka-run-class
- 执行任何带 main 方法的 Kafka 类
- 早期使用
- 在实际工作中,你几乎遇不上要直接使用这个脚本的场景了
kafka-server-start 和 kafka-server-stop 脚本
- 用于启动和停止 Kafka Broker 进程的
kafka-streams-application-reset
- 用来给 Kafka Streams 应用程序重设位移,以便重新消费数据
kafka-verifiable-producer 和 kafka-verifiable-consumer
- 用来测试生产者和消费者功能的。 Console Producer 和 Console Consumer 完全可以替代它们
zookeeper 开头的脚本是用来管理和运维 ZooKeeper 的
重点操作
生产消息
kafka-console-producer
$ bin/kafka-console-producer.sh --broker-list kafka-host:port \
--topic test-topic \
--request-required-acks -1 \
--producer-property compression.type=lz4
- 指定生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法
- 这个脚本可以很方便地让我们使用控制台来向 Kafka 的指定主题发送消息
消费消息
kafka-console-consumer 脚本
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port \
--topic test-topic \
--group test-group \
--from-beginning \
--consumer-property enable.auto.commit=false
- 注意,在这段命令中,我们指定了 group 信息
- 如果没有指定的话,每次运行 Console Consumer,它都会自动生成一个新的消费者组来消费
- 会导致集群中有大量的以 console-consumer 开头的消费者组
- 通常情况下,最好还是加上 group
- 如果没有指定的话,每次运行 Console Consumer,它都会自动生成一个新的消费者组来消费
- from-beginning 等同于将 Consumer 端参数 auto.offset.reset 设置成 earliest
- 如果不指定的话,它会默认从最新位移读取消息
- 命令中禁掉了自动提交位移
测试生产者性能
- kafka-producer-perf-test ``` $ bin/kafka-producer-perf-test.sh —topic test-topic —num-records 10000000 \ —throughput -1 —record-size 1024 \ —producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 \ compression.type=lz4
2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency. 4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency. 10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.
- 上述命令向指定主题发送了 1 千万条消息,每条消息大小是 1KB
- 该命令允许在 `producer-props` 后面指定要设置的生产者参数,比如本例中的压缩算法、延时时间等
- 该命令的输出值
- 会打印出测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时
- 一般情况下,消息延时不是一个简单的数字,而是一组概率分布
- 通常我们关注到**99th 分位**就可以了。比如在上面的输出中,99th 值是 604ms,这表明测试生产者生产的消息中,有 99% 消息的延时都在 604ms 以内
- 你完全可以把这个数据当作这个生产者对外承诺的 SLA
<a name="3pV4l"></a>
### 测试消费者性能
- **kafka-consumer-perf-test**
$ bin/kafka-consumer-perf-test.sh —broker-list kafka-host:port \ —messages 10000000 \ —topic test-topic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec 2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012
- 会打印出消费者的吞吐量数据。比如本例中的 1723MB/s
- 有点令人遗憾的是,它没有计算不同分位数下的分布情况。因此,在实际使用过程中,这个脚本的使用率要比生产者性能测试脚本的使用率低
<a name="jLTvT"></a>
### 查看主题消息总数
- Kafka 自带的命令没有提供这样的功能
- 用 Kafka 提供的工具类 **GetOffsetShell **来计算给定主题特定分区当前的最早位移和最新位移,将两者的差值累加起来,就能得到该主题当前总的消息数
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list kafka-host:port \ —time -2 —topic test-topic
test-topic:0:0 test-topic:1:0 $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list kafka-host:port \ —time -1 —topic test-topic
test-topic:0:5500000 test-topic:1:5500000
- 对于本例来说,test-topic 总的消息数为 5500000 + 5500000,等于 1100 万条
<a name="UwKF6"></a>
### 查看消息文件数据
- **kafka-dump-log**
$ bin/kafka-dump-log.sh —files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log \ —deep-iteration \ —print-data-log
- 如果只是指定 --files,那么该命令显示的是消息批次(RecordBatch)或消息集合(MessageSet)的元数据信息,比如创建时间、使用的压缩算法、CRC 校验值等。
- 显式指定** --deep-iteration 参数**,查看一下每条具体的消息
- 显式指定 **--print-data-log 参数,**看消息里面的实际数据
<a name="C3m5E"></a>
### 查询消费者组位移
- **kafka-consumer-groups**
```shell
kafka-consumer-groups.sh --bootstrap-server localhost:9892 \
--describe \
--group test-group
- client id 主要用于区分JMX指标和日志中的不同consumer
- consumer id 指的是member id,目前是Kafka自动生成的,对用户意义不大
- group id是表示consumer group组id的,非常重要,必须要指定。