管理

  1. ## 创建topic(4个分区,2个副本)
  2. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
  3. ## kafka版本 >= 2.2
  4. bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
  5. ## 分区扩容
  6. # kafka版本 < 2.2
  7. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
  8. # kafka版本 >= 2.2
  9. bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2
  10. ## 删除topic
  11. bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

查询

  1. ## 查询集群描述
  2. bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181
  3. ## topic列表查询
  4. bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
  5. ## topic列表查询(支持0.9版本+)
  6. bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  7. ## 新消费者列表查询(支持0.9版本+)
  8. bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
  9. ## 新消费者列表查询(支持0.10版本+)
  10. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  11. ## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
  12. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
  13. ## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
  14. bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
  15. ## 显示某个消费组的消费详情(0.10.1.0版本+)
  16. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

发送和消费

## 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
## 消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
## 新生产者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
## 新消费者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
## kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等)
bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName
## 高级点的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234  --max-messages 10

切换leader

# kafka版本 <= 2.4
> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
# kafka新版本
> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port

kafka自带压测命令

bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092

kafka持续发送消息

持续发送消息到指定的topic中,且每条发送的消息都会有响应信息:

kafka-verifiable-producer.sh --broker-list $(hostname -i):9092 --topic test --max-messages 100000