创建topic
kafka-topics.sh
--create
--partitions 1 #分区数量
--replication-factor 2 #副本数量
--topic test
--zookeeper companynode02:2181,companynode03:2181,companynode04:2181
查询所有topic
kafka-topics.sh --list --zookeeper companynode02:2181,companynode03:2181,companynode04:2181
查看topic描述信息
kafka-topics.sh --describe --topic test --zookeeper companynode02:2181,companynode03:2181,companynode04:2181
删除kafka的topic
kafka-topics.sh --delete --topic test --zookeeper companynode02:2181,companynode03:2181,companynode04:2181
step1
把正在运行的 produce和consume停止,设置auto.create.topics.enable = false(可选)。server.properties设置delete.topic.enable=true,如果delete.topic.enable没有设置为true那么删除topic的时候只是被标记为删除而不会真正的删除。
step2
运行删除命令:
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topicName
step3
删除kafka存储目录server.properties文件的log.dir配置,默认为/data/kafka-logs中相关topic的数据目录
注意:如果kafka有多个broker,且每个broker配置了多个数据盘,例如/data/kafka-logs,/data1/kafka-logs,且topic有多个分区partition和多个副本replication,则需要把所有的broker的所有数据盘进行扫描,删除该topic的所有数据
经过以上基本就能删除topic以及topic的数据了,如果还无法正常删除topic,那么请继续
step4
找一台部署了zk的服务器,登录zookeeper:
./bin/zkCli.sh -server localhost:2181
登陆到zk shell之后,找到topic所在的目录:
ls /brokers/topics
找到要删除的topic,然后执行:
rmr /brokers/topics/topicName
如果topic是被标记为marked for deletion状态,则是因为你没有把delete.topic.enable没有设置为true,执行:
ls /admin/delete_topic
找到要删除的topic之后执行:
rmr /admin/delete_topic/topicName
备注
网络上很多其它文章还说明,需要删除topic在zk上面的消费节点记录、配置节点记录,比如:
rmr /consumers/【consumer-group】
rmr /config/topics/【topic name】
其实正常情况是不需要进行这两个操作的,如果需要,那都是由于操作不当导致的。比如step1停止生产和消费程序没有做或者没有正确配置。也就是说,正常情况下严格按照step1 – step4的步骤,是一定能够正常删除topic的。
到这里你的topic已经被删除的干干净净了,如果还不行评论区告诉我,看看还有什么是我没踩过的坑
模拟生产者写入数据到topic中
kafka-console-producer.sh --broker-list companynode02:9092,companynode03:9092,companynode04:9092 --topic test
模拟消费者拉取topic中的数据
kafka-console-consumer.sh --zookeeper companynode02:2181,companynode03:2181,companynode04:2181 --topic test --from-beginning
或者
kafka-console-consumer.sh --bootstrap-server companynode02:9092,companynode03:9092,companynode04:9092 --topic test --from-beginning
查看某一消费组下的offset信息
查看消费者组列表
kafka-consumer-groups.sh --new-consumer --bootstrap-server companynode01:9092,companynode02:9092,companynode03:9092 --list
查看某一消费者组的描述信息
kafka-consumer-groups.sh --bootstrap-server companynode01:9092,companynode02:9092,companynode03:9092 --describe --group crawlgroup
偏移量管理
1、查看哪些 group id 正在进行消费
kafka-consumer-groups.sh --bootstrap-server localhost:9092 -- list
2、查看指定group.id的消费者消费情况
kafka-consumer-groups.sh --bootstrap-server localhost:9092 -- describe --group test001
3、将偏移量设置为最早的
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group test001 --to-earliest--topic tp_demo_ 02
4、将偏移量设置为最新的
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group test001 --to-latest--topic tp_demo_ 02
5、分别将指定主题的指定分区的偏移量向前移动10个消息
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group test001 --topic tp_demo_ 02 --shift-by -10