Topic

查看topic列表

  1. kafka-topics.sh --zookeeper ServerIP:2181 --list

查看topic 详情

  1. kafka-topics.sh --zookeeper ServerIP:2181 --describe --topic TopicName

创建topic

  1. kafka-topics.sh --zookeeper ServerIP:2181 --create --topic robin --partitions 1 --replication-factor 1
  2. kafka-topics.sh --zookeeper ServerIP:2181/kafka --create --topic robin --partitions 1 --replication-factor 1

在配置文件server.properties中可以指定kafka的根目录, 比如zookeeper.connect=172.16.8.74:2181/kafka 而不是直接使用/,这样多套kafka也可共享同一个zookeeper集群,如果指定了kafka在zookeeper上的目录,则参数也要指定,否则会报错

修改topic分区数

  1. kafka-topics.sh --zookeeper ServerIP:2181/kafka -alter --partitions Num --topic TopicName

删除topic

  1. kafka-topics.sh --zookeeper ServerIP:2181 --topic TopicName --delete
  2. kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper ServerIP:2181 --topic TopicName

Consumer

查看消费者组列表

  1. kafka-consumer-groups.sh --bootstrap-server ServerIP:9092 --list

查看指定消费者组详情

  1. kafka-consumer-groups.sh --bootstrap-server ServerIP:9092 --group=ConsumerGroupName --describe

消费者组重置offset

  1. kafka-consumer-groups.sh --bootstrap-server ServerIP:9092 --group ConsumerGroupName --reset-offsets --all-topics --to-earliest --execute

Topic作用域

  1. --all-topics
  2. --topic t1 --topic t2
  3. --topic t1:0,1 指定的topic分区

重置策略

  1. --to-earliest
  2. --to-latest
  3. --to-current
  4. --to-offset <offset> 指定位移处
  5. --shift-by N 调整到当前位移 + N处,N可以为负数,表示向前移动
  6. --to-datetime 调整到大于给定时间的最早位移,格式为yyyy-MM-ddTHH:mm:ss.xxx
  7. --by-duration 调整到距离当前时间指定间隔的位移处,格式为PnDTnHnMnS
  8. --from-file CSV文件中读取调整策略

执行方案

  1. --execute
  2. --export 把位移调整方案按照CSV格式打印

TIPS:

  1. consumer group状态必须是inactive的,即不能是处于正在工作中的状态
  2. 不加执行方案,默认是只做打印操作

Producter

控制台生产消息

  1. kafka-console-producer.sh --broker-list ServerIP:9092 --topic TopicName

Config

动态修改topic保存时间为5天

  1. kafka-configs.sh --zookeeper zkip:2181/kafka --entity-type topics --entity-name topicname --alter --add-config retention.ms=432000000

https://yq.aliyun.com/articles/637579