重点脚本操作

生产消息

生产消息使用 kafka-console-producer 脚本即可,一个典型的命令如下所示:

  1. $ bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --request-required-acks -1 --producer-property compression.type=lz4

在这段命令中,我们指定生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法。这个脚本可以很方便地让我们使用控制台来向 Kafka 的指定主题发送消息。

消费消息

下面再来说说数据消费。如果要快速地消费主题中的数据来验证消息是否存在,运行 kafka-console-consumer 脚本应该算是最便捷的方法了。常用的命令用法如下:

  1. $ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property enable.auto.commit=false

注意,在这段命令中,我们指定了 group 信息。如果没有指定的话,每次运行 Console Consumer,它都会自动生成一个新的消费者组来消费。久而久之,你会发现你的集群中有大量的以 console-consumer 开头的消费者组。通常情况下,你最好还是加上 group。
另外,from-beginning 等同于将 Consumer 端参数 auto.offset.reset 设置成 earliest,表明我想从头开始消费主题。如果不指定的话,它会默认从最新位移读取消息。如果此时没有任何新消息,那么该命令的输出为空,你什么都看不到。
最后,我在命令中禁掉了自动提交位移。通常情况下,让 Console Consumer 提交位移是没有意义的,毕竟我们只是用它做一些简单的测试。

测试生产者性能

如果你想要对 Kafka 做一些简单的性能测试。那么不妨试试下面这一组工具。它们分别用于测试生产者和消费者的性能。
我们先说测试生产者的脚本:kafka-producer-perf-test。它的参数有不少,但典型的命令调用方式是这样的。

  1. $ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4
  2. 2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
  3. 4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
  4. 10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.

上述命令向指定主题发送了 1 千万条消息,每条消息大小是 1KB。该命令允许你在 producer-props 后面指定要设置的生产者参数,比如本例中的压缩算法、延时时间等。
该命令的输出值得好好说一下。它会打印出测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时。一般情况下,消息延时不是一个简单的数字,而是一组分布。或者说,我们应该关心延时的概率分布情况,仅仅知道一个平均值是没有意义的。这就是这里计算分位数的原因。通常我们关注到99th 分位就可以了。比如在上面的输出中,99th 值是 604ms,这表明测试生产者生产的消息中,有 99% 消息的延时都在 604ms 以内。你完全可以把这个数据当作这个生产者对外承诺的 SLA。

测试消费者性能

测试消费者也是类似的原理,只不过我们使用的是kafka-consumer-perf-test脚本,命令如下:

  1. $ bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic
  2. start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
  3. 2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012

虽然输出格式有所差别,但该脚本也会打印出消费者的吞吐量数据。比如本例中的 1723MB/s。有点令人遗憾的是,它没有计算不同分位数下的分布情况。因此,在实际使用过程中,这个脚本的使用率要比生产者性能测试脚本的使用率低。

查看主题消息总数

很多时候,我们都想查看某个主题当前的消息总数。令人惊讶的是,Kafka 自带的命令竟然没有提供这样的功能,我们只能“绕道”获取了。所谓的绕道,是指我们必须要调用一个未被记录在官网上的命令。命令如下:

  1. $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topic
  2. test-topic:0:0
  3. test-topic:1:0
  4. $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -1 --topic test-topic
  5. test-topic:0:5500000
  6. test-topic:1:5500000

我们要使用 Kafka 提供的工具类GetOffsetShell来计算给定主题特定分区当前的最早位移和最新位移,将两者的差值累加起来,就能得到该主题当前总的消息数。对于本例来说,test-topic 总的消息数为 5500000 + 5500000,等于 1100 万条。

查看消息文件数据

作为 Kafka 使用者,你是不是对 Kafka 底层文件里面保存的内容很感兴趣? 如果是的话,你可以使用 kafka-dump-log 脚本来查看具体的内容。

  1. $ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
  2. Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
  3. Starting offset: 0
  4. baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true
  5. baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true
  6. ......

如果只是指定 —files,那么该命令显示的是消息批次(RecordBatch)或消息集合(MessageSet)的元数据信息,比如创建时间、使用的压缩算法、CRC 校验值等。
如果我们想深入看一下每条具体的消息,那么就需要显式指定 —deep-iteration 参数,如下所示:

  1. $ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration
  2. Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
  3. Starting offset: 0
  4. baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true
  5. | offset: 0 CreateTime: 1561597044911 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  6. | offset: 1 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  7. | offset: 2 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  8. | offset: 3 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  9. | offset: 4 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  10. | offset: 5 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  11. | offset: 6 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  12. | offset: 7 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  13. | offset: 8 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  14. | offset: 9 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  15. | offset: 10 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  16. | offset: 11 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  17. | offset: 12 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  18. | offset: 13 CreateTime: 1561597044933 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  19. | offset: 14 CreateTime: 1561597044933 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
  20. baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true
  21. ......

在上面的输出中,以竖线开头的就是消息批次下的消息信息。如果你还想看消息里面的实际数据,那么还需要指定 —print-data-log 参数,如下所示:

  1. $ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration --print-data-log

查询消费者组位移

接下来,我们来看如何使用 kafka-consumer-groups 脚本查看消费者组位移。在上一讲讨论重设消费者组位移的时候,我们使用的也是这个命令。当时我们用的是 —reset-offsets 参数,今天我们使用的是 —describe 参数。假设我们要查询 Group ID 是 test-group 的消费者的位移,那么命令如图所示:
image.png
图中的 CURRENT-OFFSET 表示该消费者当前消费的最新位移,LOG-END-OFFSET 表示对应分区最新生产消息的位移,LAG 列是两者的差值。CONSUMER-ID 是 Kafka 消费者程序自动生成的一个 ID。截止到 2.2 版本,你都无法干预这个 ID 的生成过程。如果运行该命令时,这个消费者程序已经终止了,那么此列的值为空。