1. 集群管理
前台启动broker,Ctrl + C 关闭
bin/kafka-server-start.sh
后台启动broker
bin/kafka-server-start.sh -daemon
关闭broker
bin/kafka-server-stop.sh
2. Topic 管理
选项说明:
—replication-factor 3 定义副本数,副本数不能超过当前broker数,我这里有三个broker,所以最大可以填写3
—partitions 2 定义分区数,分区数无限制数量大小
—topic test 定义主题名称
创建topic
bin/kafka-topics.sh —create —zookeeper localhost:2181 —partitions 3 —replication-factor 3 —topic topicname
删除topic
bin/kafka-topics.sh —delete —zookeeper localhost:2181 —topic topicname
删除topic需要注意,我们要确保server.properties中的delete.topic.enable=true为true才可以,不然只是把这个topic标记为删除,但是却还是存在的,如果创建同名的topoic,就会提示该topic已存在。
查询topic列表
bin/kafka-topics.sh —zookeeper localhost:2181 —list
查询topic详情
bin/kafka-topics.sh —zookeeper localhost:2181 —describe —topic topicname
修改topic
bin/kafka-topics.sh —alter —zookeeper localhost:2181 —partitions 6 —topic topicname
新消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh —new-consumer —bootstrap-server localhost:9092 —list
新消费者列表查询(支持0.10版本+)
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —list
3. Producer 和 Consumer 生产订阅消息
生产者生生产消息
bin/kafka-console-producer.sh —broker-list master:9092 —topic topicname
—broker-list kafka集群 跟的是kafka集群
消费者消费消息
bin/kafka-console-consumer.sh —bootstrap-server master:9092 —from-beginning —topic test5
新版本都是使用—bootstrap-server 建议新版本 老版本使用的是—zookeeper都可以用 —from-beginnign 表示从topic的的头开始读取消息
注意:
在老版本中,消费者的offset会存储在zookeeper中,但是在新版本中已经修改为存储在kafka中,下图就是存储在kafka的offset的记录,默认50个分区。
生产者
bin/kafka-console-producer.sh —broker-list localhost:9092 —topic test
消费者
bin/kafka-console-consumer.sh —zookeeper localhost:2181 —topic test
新生产者(支持0.9版本+)
bin/kafka-console-producer.sh —broker-list localhost:9092 —topic test —producer.config config/producer.properties
新消费者(支持0.9版本+)
bin/kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test —new-consumer —from-beginning —consumer.config config/consumer.properties
高级点的用法
bin/kafka-simple-consumer-shell.sh —brist localhost:9092 —topic test —partition 0 —offset 1234 —max-messages 10
Consumer-Groups
查询消费者组
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —list
查询消费者组详情
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —describe —group groupname
重设消费者组位移
最早处
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group groupname —reset-offsets —all-topics —to-earliest —execute
最新处
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group groupname —reset-offsets —all-topics —to-latest —execute
某个位置
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group groupname —reset-offsets —all-topics —to-offset 2000 —execute
调整到某个时间之后得最早位移
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group groupname —reset-offsets —all-topics —to-datetime 2019-09-15T00:00:00.000
查看有那些消费者group
./kafka-consumer-groups.sh —bootstrap-server localhost:9092 —list
删除消费者组
bin/kafka-consumer-groups.sh —zookeeper localhost:2181 —delete —group groupname
平衡leader
bin/kafka-preferred-replica-election.sh —zookeeper zk_host:port/chroot
kafka自带压测命令
bin/kafka-producer-perf-test.sh —topic test —num-records 100 —record-size 1 —throughput 100 —producer-props bootstrap.servers=localhost:9092
分区扩容
bin/kafka-topics.sh —zookeeper localhost:2181 —alter —topic topic1 —partitions 2
脚本工具
producer脚本
bin/kafka-console-producer.sh —broker-list localhost:9092 —topic topicname
参数含义:
—compression-codec lz4
压缩类型
—request-required-acks all acks的值
—timeout 3000 linger.ms的值
—message-send-max-retries 10
retries的值
—max-partition-memory-bytes batch.size值
consumer脚本
bin/kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic topicname —from-beginning
指定groupid
bin/kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic topicname —from-beginning —consumer-property group.id=old-consumer-group
指定分区 bin/kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic topicname —from-beginning —partition 0
kafka-run-class脚本
kafka-run-class.sh kafka.tools.ConsoleConsumer
就是 kafka-console-consumer.sh kafka-run-class.sh kafka.tools.ConsoleProducer
就是 kafka-console-producer.sh
获取topic当前消息数
kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list localhost:9092 —topic topicname —time -1
—time -1表示最大位移
—time -2表示最早位移
查询_consumer_offsets
**_bin/kafka-simple-consumer-shell.sh —topic _consumer_offsets —partition 12 —broker-list localhost:9092 —formatter “kafka.coorfinator.GroupMetadataManager\$OffsetsMessageFormatter”