启动方式

  1. 以守护进程方式启动kafka进程
  2. nohup /data/br/base/kafka/bin/kafka-server-start.sh /data/br/base/kafka/config/server.properties >/dev/null 2>&1 &

常用命令

创建Topic

kafka-topics.sh —zookeeper bonree01:2181 —topic mytopic —replication-factor 1 —partitions 1 —create

Topic列表

kafka-topics.sh —zookeeper bonree01:2181 —list
kafka-topics.sh —zookeeper localhost:2181 —describe

生产者测试

kafka-console-producer.sh —broker-listbonree01:2181 —topic mytopic

消费者测试

kafka-console-consumer.sh —zookeeper bonree01:2181 —topic mytopic —from-beginning

kafka-consumer-groups.sh —new-consumer —bootstrap-server 192.168.150.93:39092 —command-config /data/br/base/kafka/config/consumer.properties —group GROUP_SDK_BEHAVIOR —describe

动态调整etl topic分区

kafka-topics.sh —alter —topic SDK_ETL_TOPIC —zookeeper 127.0.0.1:32181 —partitions 6
image.png

查看kafka topic组

kafka-consumer-groups.sh —bootstrap-server 172.0.0.1:9092 —list

查看积压

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker —zookeeper 127.0.0.1:32181 —topic APM_AGENT_METRIC_TOPIC —group GROUP_APM_CONTROLLER

kafka-consumer-groups.sh —group GROUP_SDK_ETL_TOPIC —describe —new-consumer —bootstrap-server Dev-SDK6-03:9092

kafka性能压测

创建topic:

/data/br/base/kafka/bin/kafka-topics.sh —create —zookeeper localhost:2181 —replication-factor 2 —partitions 5 —topic test-partition-5

生产者性能压测:

kafka-producer-perf-test.sh —num-records 10000000 —record-size 500 —throughput 1000000 —producer-props bootstrap.servers=B-197:9092 batch.size=16384 compression.type=snappy linger.ms=5 —topic test-partition-5

消费者性能压测:

kafka-consumer-perf-test.sh —messages 10000000 —threads 3 —zookeeper localhost:2181 —num-fetch-threads 3 —topic test-partition-20