image.png

1. topic相关

1.1 查看kafka集群的所有topic
  1. ./bin/kafka-topics.sh --list --zookeeper localhost:2181

1.2 创建名称为topic_test的topic,partitions为1个,副本为1个
  1. ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_test

1.3 修改topic_test的partition为10(只能改大不能改小)
  1. ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 10

1.4 删除名称为topic_test的topic(只删除zookeeper内的元数据,消息文件须手动删除)
  1. ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic_test

1.5 查看topic为topic_test的详细信息
  1. ./bin/kafka-topics.sh -zookeeper localhost:2181 -describe -topic topic_test

2. consumer相关

2.1 查看所有consumer的group
  1. ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

2.2 查看group为group_test消费的机器以及partition
  1. ./bin/kafka-consumer-groups.sh --describe --group group_test --bootstrap-server localhost:9092

2.3 查询__consumer_offsets topic所有内容

需要注意的是,__consumer_offset是内部使用的topic,外部应用程序是没法读取的,需要在config/consumer.properties关闭exclude.internal.topics这个参数。 exclude.internal.topics=false

  1. # 0.11.0.0之前版本
  2. ./bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties
  3. # 0.11.0.0之后版本(含)
  4. ./bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties

__consumer_offsets topic的每一日志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

2.4 计算指定consumer group在consumer_offsets topic中分区信息,Kafka会使用下面公式计算该group位移保存在consumer_offsets的哪个分区上:
  1. Math.abs(groupID.hashCode()) % numPartitions

默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。

2.5 查询__consumer_offsets topic所有某个partition的所有内容
  1. # 0.11.0.0之前版本
  2. bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --partition 20 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties
  3. # 0.11.0.0之后版本(含)
  4. ./bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --partition 20 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties

3. 控制台相关

3.1 控制台向topic_test发送数据
  1. ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_test

3.2 控制台接收topic_test数据
  1. ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test

3.3 控制台接收topic_test最开始的数据

  1. ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test --from-beginning

4. 服务相关

4.1 启动zookeeper服务
  1. ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties

4.2 启动kafka服务
  1. ./bin/kafka-server-start.sh -daemon ./config/server.properties

4.3 查看服务启动
  1. ps -ef | grep kafka

5. 遇到的问题

5.1 kafka生产消息在broker上分配不均匀

consumer_offser大量提交写入导致broker负载不均匀,问题参考
[Kafka 如何读取offset topic内容 (
consumer_offsets)](https://www.cnblogs.com/huxi2b/p/6061110.html)


参考

https://kafka.apache.org
__consumer_offser大量提交写入导致broker负载不均匀
Kafka 如何读取offset topic内容 (__consumer_offsets)