数据查看

sh /home/kafka/software/kafka/bin/kafka-dump-log.sh —files /data15/kafka/log/ty_httpflow-40/00000000000018099743.log|tail

查看消费情况

./kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group alarm_sensor_webshell —describe

查看消费者

./kafka-consumer-groups.sh —bootstrap-server localhost:9092 —list

查看topic

./kafka-topics.sh —zookeeper 127.0.0.1:2181 —list

查看topic内容

/kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic topicName —from-beginning

查询集群描述bin/kafka-topics.sh —describe —zookeeper 127.0.0.1:2181## 查询集群描述(新)bin/kafka-topics.sh —bootstrap-server localhost:9092 —topic foo —describe

offset操作

$ 最大offset
kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list localhost:9092 —topic test_topic —time -1

$ 最小offset
kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list localhost:9092 —topic test_topic —time -2

$ offset
kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list localhost:9092 —topic test_topic

topic操作

$ 列出当前kafka所有的topic
kafka-topics.sh —zookeeper localhost:2181 —list

$ 创建topic
kafka-topics.sh —create —zookeeper localhost:2181 —topic test_topic —replication-factor 1 —partitions 1

kafka-topics.sh —create —zookeeper localhost:2181 —topic test_topic —replication-factor 3 —partitions 10 —config cleanup.policy=compact

kafka-topics.sh —create —zookeeper localhost:2181 —topic test_topic —partitions 1 —replication-factor 1 —config max.message.bytes=64000 —config flush.messages=1

$ 查看某topic具体情况
kafka-topics.sh —zookeeper localhost:2181 —describe —topic test_topic

查看topic最大偏移量
./kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list localhost:9092 —topic file_sip —time -1

$ 修改topic
kafka-topics.sh —zookeeper localhost:2181 —alter —partitions 3 —config cleanup.policy=compact —topic test_topic

$ 修改topic方法2
kafka-configs.sh —alter —zookeeper localhost:2181 —entity-name test_topic —entity-type topics —add-config cleanup.policy=compact

kafka-configs.sh —alter —zookeeper localhost:2181 —entity-name test_topic —entity-type topics —delete-config cleanup.policy

消费group操作

$ 查看某消费组
kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test_group —describe

$ 列出当前所有的消费组
kafka-consumer-groups.sh —bootstrap-server localhost:9092 —list

消费操作

$从最后消费
kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test_topic —group test_group

$从头开始消费
kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test_topic —group test_group —from-beginning

$ 消费数据(最多消费多少条就自动退出消费)
kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test_topic —group test_group —max-messages 1

$ 消费数据同时把key打印
kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test_topic —group test_group —property print.key=true

生产操作
$ 生产数据
kafka-console-producer.sh —broker-list localhost:9092 —topic test_topic

$ 生产数据(写入带有key的message)
kafka-console-producer.sh —broker-list localhost:9092 —topic test_topic —property “parse.key=true” —property “key.separator=:”

kafka重置偏移量
更新到当前group最初的offset位置
—to-current和—to-latest与—to-offset和—to-earliest 相同
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test-group —reset-offsets —all-topics —to-earliest —execute

bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test-group —reset-offsets —all-topics —to-latest —execute
复制
更新到指定的offset位置
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test-group —reset-offsets —all-topics —to-offset 500000 —execute
复制
更新到当前offset位置(解决offset的异常)
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test-group —reset-offsets —all-topics —to-current —execute
复制
offset位置按设置的值进行位移
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test-group —reset-offsets —all-topics —shift-by -100000 —execute
复制
offset设置到指定时刻开始
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test-group —reset

参考:
https://blog.csdn.net/weixin_42854904/article/details/118424361