创建topic

  1. kafka-topics.sh
  2. --create
  3. --partitions 1 #分区数量
  4. --replication-factor 2 #副本数量
  5. --topic test
  6. --zookeeper companynode02:2181,companynode03:2181,companynode04:2181

查询所有topic

  1. kafka-topics.sh --list --zookeeper companynode02:2181,companynode03:2181,companynode04:2181

查看topic描述信息

  1. kafka-topics.sh --describe --topic test --zookeeper companynode02:2181,companynode03:2181,companynode04:2181

删除kafka的topic

  1. kafka-topics.sh --delete --topic test --zookeeper companynode02:2181,companynode03:2181,companynode04:2181

step1

把正在运行的 produce和consume停止,设置auto.create.topics.enable = false(可选)。server.properties设置delete.topic.enable=true,如果delete.topic.enable没有设置为true那么删除topic的时候只是被标记为删除而不会真正的删除。

step2

运行删除命令:

  1. ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topicName

step3

删除kafka存储目录server.properties文件的log.dir配置,默认为/data/kafka-logs相关topic的数据目录
注意:如果kafka有多个broker,且每个broker配置了多个数据盘,例如/data/kafka-logs,/data1/kafka-logs,且topic有多个分区partition和多个副本replication,则需要把所有的broker的所有数据盘进行扫描,删除该topic的所有数据
经过以上基本就能删除topic以及topic的数据了,如果还无法正常删除topic,那么请继续

step4

找一台部署了zk的服务器,登录zookeeper:

  1. ./bin/zkCli.sh -server localhost:2181

登陆到zk shell之后,找到topic所在的目录:

  1. ls /brokers/topics

找到要删除的topic,然后执行:

  1. rmr /brokers/topics/topicName

如果topic是被标记为marked for deletion状态,则是因为你没有把delete.topic.enable没有设置为true,执行:

  1. ls /admin/delete_topic

找到要删除的topic之后执行:

  1. rmr /admin/delete_topic/topicName

备注
网络上很多其它文章还说明,需要删除topic在zk上面的消费节点记录、配置节点记录,比如:

  1. rmr /consumers/【consumer-group
  2. rmr /config/topics/【topic name

其实正常情况是不需要进行这两个操作的,如果需要,那都是由于操作不当导致的。比如step1停止生产和消费程序没有做或者没有正确配置。也就是说,正常情况下严格按照step1 – step4的步骤,是一定能够正常删除topic的。
到这里你的topic已经被删除的干干净净了,如果还不行评论区告诉我,看看还有什么是我没踩过的坑

模拟生产者写入数据到topic中

  1. kafka-console-producer.sh --broker-list companynode02:9092,companynode03:9092,companynode04:9092 --topic test

模拟消费者拉取topic中的数据

  1. kafka-console-consumer.sh --zookeeper companynode02:2181,companynode03:2181,companynode04:2181 --topic test --from-beginning

或者

  1. kafka-console-consumer.sh --bootstrap-server companynode02:9092,companynode03:9092,companynode04:9092 --topic test --from-beginning

查看某一消费组下的offset信息

查看消费者组列表

  1. kafka-consumer-groups.sh --new-consumer --bootstrap-server companynode01:9092,companynode02:9092,companynode03:9092 --list

查看某一消费者组的描述信息

  1. kafka-consumer-groups.sh --bootstrap-server companynode01:9092,companynode02:9092,companynode03:9092 --describe --group crawlgroup

偏移量管理

1、查看哪些 group id 正在进行消费

  1. kafka-consumer-groups.sh --bootstrap-server localhost:9092 -- list

2、查看指定group.id的消费者消费情况

  1. kafka-consumer-groups.sh --bootstrap-server localhost:9092 -- describe --group test001

3、将偏移量设置为最早的

  1. kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group test001 --to-earliest--topic tp_demo_ 02

4、将偏移量设置为最新的

  1. kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group test001 --to-latest--topic tp_demo_ 02

5、分别将指定主题的指定分区的偏移量向前移动10个消息

  1. kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group test001 --topic tp_demo_ 02 --shift-by -10

kafka消费者Java客户端

https://www.orchome.com/451