快速分类

  • connect-standalone 和 connect-distributed
    • 这两个脚本是 Kafka Connect 组件的启动脚本
  • kafka-acls 脚本
    • 用于设置 Kafka 权限的,比如设置哪些用户可以访问 Kafka 的哪些主题之类的权限
  • kafka-broker-api-versions 脚本
    • 这个脚本的主要目的是验证不同 Kafka 版本之间服务器和客户端的适配性
    • 在 0.10.2.0 之前,Kafka 是单向兼容的,即高版本的 Broker 能够处理低版本 Client 发送的请求,反过来则不行。
    • 自 0.10.2.0 版本开始,Kafka 正式支持双向兼容,也就是说,低版本的 Broker 也能处理高版本 Client 的请求了
  • kafka-configs 脚本
    • 参数配置和动态 Broker 参数


  • kafka-console-consumer 和 kafka-console-producer


  • kafka-producer-perf-test 和 kafka-consumer-perf-test
    • 它们分别是生产者和消费者的性能测试工具,非常实用


  • kafka-consumer-groups 命令
    • 重设消费者组位移时


  • kafka-delegation-tokens
    • 它是管理 Delegation Token 的
    • 基于 Delegation Token 的认证是一种轻量级的认证机制,补充了现有的 SASL 认证机制


  • kafka-delete-records 脚本
    • 用于删除 Kafka 的分区消息
    • 鉴于 Kafka 本身有自己的自动消息删除策略


  • kafka-dump-log 脚本可谓是非常实用的脚本。它能查看 Kafka 消息文件的内容,包括消息的各种元数据信息,甚至是消息体本身

  • kafka-log-dirs 脚本是比较新的脚本

    • 可以帮助查询各个 Broker 上的各个日志路径的磁盘占用情况
  • kafka-mirror-maker

    • 实现 Kafka 集群间的消息同步的
  • kafka-preferred-replica-election

    • 执行 Preferred Leader 选举的
    • 它可以为指定的主题执行“换 Leader”的操作
  • kafka-reassign-partitions

    • 用于执行分区副本迁移以及副本文件路径迁移
  • kafka-topics

    • 所有的主题管理操作
  • kafka-run-class

    • 执行任何带 main 方法的 Kafka 类
    • 早期使用
    • 在实际工作中,你几乎遇不上要直接使用这个脚本的场景了
  • kafka-server-start 和 kafka-server-stop 脚本

    • 用于启动和停止 Kafka Broker 进程的
  • kafka-streams-application-reset

    • 用来给 Kafka Streams 应用程序重设位移,以便重新消费数据
  • kafka-verifiable-producer 和 kafka-verifiable-consumer

    • 用来测试生产者和消费者功能的。 Console Producer 和 Console Consumer 完全可以替代它们
  • zookeeper 开头的脚本是用来管理和运维 ZooKeeper 的


重点操作

生产消息

  • kafka-console-producer

    1. $ bin/kafka-console-producer.sh --broker-list kafka-host:port \
    2. --topic test-topic \
    3. --request-required-acks -1 \
    4. --producer-property compression.type=lz4
    • 指定生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法
    • 这个脚本可以很方便地让我们使用控制台来向 Kafka 的指定主题发送消息

消费消息

  • kafka-console-consumer 脚本

    1. $ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port \
    2. --topic test-topic \
    3. --group test-group \
    4. --from-beginning \
    5. --consumer-property enable.auto.commit=false
    • 注意,在这段命令中,我们指定了 group 信息
      • 如果没有指定的话,每次运行 Console Consumer,它都会自动生成一个新的消费者组来消费
        • 会导致集群中有大量的以 console-consumer 开头的消费者组
        • 通常情况下,最好还是加上 group
    • from-beginning 等同于将 Consumer 端参数 auto.offset.reset 设置成 earliest
      • 如果不指定的话,它会默认从最新位移读取消息
    • 命令中禁掉了自动提交位移

测试生产者性能

  • kafka-producer-perf-test ``` $ bin/kafka-producer-perf-test.sh —topic test-topic —num-records 10000000 \ —throughput -1 —record-size 1024 \ —producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 \ compression.type=lz4

2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency. 4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency. 10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.

  1. - 上述命令向指定主题发送了 1 千万条消息,每条消息大小是 1KB
  2. - 该命令允许在 `producer-props` 后面指定要设置的生产者参数,比如本例中的压缩算法、延时时间等
  3. - 该命令的输出值
  4. - 会打印出测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时
  5. - 一般情况下,消息延时不是一个简单的数字,而是一组概率分布
  6. - 通常我们关注到**99th 分位**就可以了。比如在上面的输出中,99th 值是 604ms,这表明测试生产者生产的消息中,有 99% 消息的延时都在 604ms 以内
  7. - 你完全可以把这个数据当作这个生产者对外承诺的 SLA
  8. <a name="3pV4l"></a>
  9. ### 测试消费者性能
  10. - **kafka-consumer-perf-test**

$ bin/kafka-consumer-perf-test.sh —broker-list kafka-host:port \ —messages 10000000 \ —topic test-topic

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec 2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012

  1. - 会打印出消费者的吞吐量数据。比如本例中的 1723MB/s
  2. - 有点令人遗憾的是,它没有计算不同分位数下的分布情况。因此,在实际使用过程中,这个脚本的使用率要比生产者性能测试脚本的使用率低
  3. <a name="jLTvT"></a>
  4. ### 查看主题消息总数
  5. - Kafka 自带的命令没有提供这样的功能
  6. - Kafka 提供的工具类 **GetOffsetShell **来计算给定主题特定分区当前的最早位移和最新位移,将两者的差值累加起来,就能得到该主题当前总的消息数

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list kafka-host:port \ —time -2 —topic test-topic

test-topic:0:0 test-topic:1:0 $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list kafka-host:port \ —time -1 —topic test-topic

test-topic:0:5500000 test-topic:1:5500000

  1. - 对于本例来说,test-topic 总的消息数为 5500000 + 5500000,等于 1100 万条
  2. <a name="UwKF6"></a>
  3. ### 查看消息文件数据
  4. - **kafka-dump-log**

$ bin/kafka-dump-log.sh —files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log \ —deep-iteration \ —print-data-log

  1. - 如果只是指定 --files,那么该命令显示的是消息批次(RecordBatch)或消息集合(MessageSet)的元数据信息,比如创建时间、使用的压缩算法、CRC 校验值等。
  2. - 显式指定** --deep-iteration 参数**,查看一下每条具体的消息
  3. - 显式指定 **--print-data-log 参数,**看消息里面的实际数据
  4. <a name="C3m5E"></a>
  5. ### 查询消费者组位移
  6. - **kafka-consumer-groups**
  7. ```shell
  8. kafka-consumer-groups.sh --bootstrap-server localhost:9892 \
  9. --describe \
  10. --group test-group
  • client id 主要用于区分JMX指标和日志中的不同consumer
  • consumer id 指的是member id,目前是Kafka自动生成的,对用户意义不大
  • group id是表示consumer group组id的,非常重要,必须要指定。