数据查看
sh /home/kafka/software/kafka/bin/kafka-dump-log.sh —files /data15/kafka/log/ty_httpflow-40/00000000000018099743.log|tail
查看消费情况
./kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group alarm_sensor_webshell —describe
查看消费者
./kafka-consumer-groups.sh —bootstrap-server localhost:9092 —list
查看topic
./kafka-topics.sh —zookeeper 127.0.0.1:2181 —list
查看topic内容
/kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic topicName —from-beginning
查询集群描述bin/kafka-topics.sh —describe —zookeeper 127.0.0.1:2181## 查询集群描述(新)bin/kafka-topics.sh —bootstrap-server localhost:9092 —topic foo —describe
offset操作
$ 最大offset
kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list localhost:9092 —topic test_topic —time -1
$ 最小offset
kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list localhost:9092 —topic test_topic —time -2
$ offset
kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list localhost:9092 —topic test_topic
topic操作
$ 列出当前kafka所有的topic
kafka-topics.sh —zookeeper localhost:2181 —list
$ 创建topic
kafka-topics.sh —create —zookeeper localhost:2181 —topic test_topic —replication-factor 1 —partitions 1
kafka-topics.sh —create —zookeeper localhost:2181 —topic test_topic —replication-factor 3 —partitions 10 —config cleanup.policy=compact
kafka-topics.sh —create —zookeeper localhost:2181 —topic test_topic —partitions 1 —replication-factor 1 —config max.message.bytes=64000 —config flush.messages=1
$ 查看某topic具体情况
kafka-topics.sh —zookeeper localhost:2181 —describe —topic test_topic
查看topic最大偏移量
./kafka-run-class.sh kafka.tools.GetOffsetShell —broker-list localhost:9092 —topic file_sip —time -1
$ 修改topic
kafka-topics.sh —zookeeper localhost:2181 —alter —partitions 3 —config cleanup.policy=compact —topic test_topic
$ 修改topic方法2
kafka-configs.sh —alter —zookeeper localhost:2181 —entity-name test_topic —entity-type topics —add-config cleanup.policy=compact
kafka-configs.sh —alter —zookeeper localhost:2181 —entity-name test_topic —entity-type topics —delete-config cleanup.policy
消费group操作
$ 查看某消费组
kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test_group —describe
$ 列出当前所有的消费组
kafka-consumer-groups.sh —bootstrap-server localhost:9092 —list
消费操作
$从最后消费
kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test_topic —group test_group
$从头开始消费
kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test_topic —group test_group —from-beginning
$ 消费数据(最多消费多少条就自动退出消费)
kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test_topic —group test_group —max-messages 1
$ 消费数据同时把key打印
kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test_topic —group test_group —property print.key=true
生产操作
$ 生产数据
kafka-console-producer.sh —broker-list localhost:9092 —topic test_topic
$ 生产数据(写入带有key的message)
kafka-console-producer.sh —broker-list localhost:9092 —topic test_topic —property “parse.key=true” —property “key.separator=:”
kafka重置偏移量
更新到当前group最初的offset位置
—to-current和—to-latest与—to-offset和—to-earliest 相同
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test-group —reset-offsets —all-topics —to-earliest —execute
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test-group —reset-offsets —all-topics —to-latest —execute
复制
更新到指定的offset位置
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test-group —reset-offsets —all-topics —to-offset 500000 —execute
复制
更新到当前offset位置(解决offset的异常)
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test-group —reset-offsets —all-topics —to-current —execute
复制
offset位置按设置的值进行位移
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test-group —reset-offsets —all-topics —shift-by -100000 —execute
复制
offset设置到指定时刻开始
bin/kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group test-group —reset
参考:
https://blog.csdn.net/weixin_42854904/article/details/118424361
