启动kafka
/opt/kafka_2.13-2.8.0/bin/kafka-server-start.sh /opt/kafka_2.13-2.8.0/config/server.properties
#后台运行
/opt/kafka_2.13-2.8.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.13-2.8.0/config/server.properties
创建topic
./kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic userlog
选项说明:
—topic 定义 topic 名
—replication-factor 定义副本数
—partitions 定义分区数
查看topic详情
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic userlog
查看所有topic列表
./kafka-topics.sh --zookeeper localhost:2181 --list
控制台向topic生产数据
./kafka-console-producer.sh --broker-list localhost:9092 --topic userlog
控制台消费topic的数据
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic userlog --from-beginning
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic userlog --from-beginning
查看topic某分区偏移量最大(小)值
./kafka-run-class.sh kafka.tools.GetOffsetShell --topic userlog --time -1 --broker-list localhost:9092 --partitions 0
注: time为-1时表示最大值,time为-2时表示最小值
增加topic分区数
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic userlog --partitions 10
删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除
./kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic userlog
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic tests
需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。
step1:
如果需要被删除topic 此时正在被程序 produce和consume,则这些生产和消费程序需要停止。
因为如果有程序正在生产或者消费该topic,则该topic的offset信息一致会在broker更新。调用kafka delete命令则无法删除该topic。
同时,需要设置 auto.create.topics.enable = false,默认设置为true。如果设置为true,则produce或者fetch 不存在的topic也会自动创建这个topic。这样会给删除topic带来很多意向不到的问题。
所以,这一步很重要,必须设置auto.create.topics.enable = false,并认真把生产和消费程序彻底全部停止。
step2:
server.properties 设置 delete.topic.enable=true
如果没有设置 delete.topic.enable=true,则调用kafka 的delete命令无法真正将topic删除,而是显示(marked for deletion)
step3:
调用命令删除topic:
./bin/kafka-topics —delete —zookeeper 【zookeeper server:port】 —topic 【topic name】
step4:
删除kafka存储目录(server.properties文件log.dirs配置,默认为”/data/kafka-logs”)相关topic的数据目录。
注意:如果kafka 有多个 broker,且每个broker 配置了多个数据盘(比如 /data/kafka-logs,/data1/kafka-logs …),且topic也有多个分区和replica,则需要对所有broker的所有数据盘进行扫描,删除该topic的所有分区数据。
一般而言,经过上面4步就可以正常删除掉topic和topic的数据。但是,如果经过上面四步,还是无法正常删除topic,则需要对kafka在zookeeer的存储信息进行删除。具体操作如下:
(注意:以下步骤里面,kafka在zk里面的节点信息是采用默认值,如果你的系统修改过kafka在zk里面的节点信息,则需要根据系统的实际情况找到准确位置进行操作)
step5:
找一台部署了zk的服务器,使用命令:
bin/zkCli.sh -server 【zookeeper server:port】
登录到zk shell,然后找到topic所在的目录:ls /brokers/topics,找到要删除的topic,然后执行命令:
rmr /brokers/topics/【topic name】
即可,此时topic被彻底删除。
如果topic 是被标记为 marked for deletion,则通过命令 ls /admin/delete_topics,找到要删除的topic,然后执行命令:
rmr /admin/delete_topics/【topic name】
备注:
网络上很多其它文章还说明,需要删除topic在zk上面的消费节点记录、配置节点记录,比如:
rmr /consumers/【consumer-group】
rmr /config/topics/【topic name】
其实正常情况是不需要进行这两个操作的,如果需要,那都是由于操作不当导致的。比如step1停止生产和消费程序没有做,step2没有正确配置。也就是说,正常情况下严格按照step1 — step5 的步骤,是一定能够正常删除topic的。
step6:
完成之后,调用命令:
./bin/kafka-topics.sh —list —zookeeper 【zookeeper server:port】
查看现在kafka的topic信息。正常情况下删除的topic就不会再显示。
但是,如果还能够查询到删除的topic,则重启zk和kafka即可。
查看topic消费进度
kafka优化