(一)对Kafka Broker操作

1.启动Kafka


$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

  1. [root@zjj101 kafka_2.11-2.1.0]# $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  2. [root@zjj101 kafka_2.11-2.1.0]# jps
  3. 25888 Kafka
  4. 24014 QuorumPeerMain
  5. 26078 Jps
  6. [root@zjj101 kafka_2.11-2.1.0]#


(二)主题操作

kafka-topics.sh 主题相关的操作

1.创建主题

主题的信息属于Kafka的元数据,是存在Zookeeper里面的brokers/topics里面,元数据是以永久节点的方式来保存的.

方式1

kafka-topics.sh —zookeeper zjj101:2181 —create —topic hello —partitions 2 —replication-factor 2

说明
-create —topic 固定的,
—partitions 指定分区有几个,必填
—replication-factor 指定副本数量,必填, 每个broker最多存储一份副本,所以在创建主题时,副本数不能超过当前可用的broker的数量!

这种方式 kafka集群根据负载均衡的策略,自动将分区分配到对应的broker实例中.

方式2


明确告诉kafka,创建多少个分区,以及每个分区的副本到底选择哪个broker

kafka-topics.sh —zookeeper zjj101:2181 —create —topic hello2 —replica-assignment 101:102,102:103

多个分区逗号间隔.


在Zookeeper查看创建的主题

zookeeper的hello这个永久节点显示:

{“version”:1,”partitions”:{“1”:[102,103],”0”:[101,102]}}


这个意思是 一号区有两个副本,分别在102和103副本. 零号区也有两个副本,分别在101和102上面


2.查询所有的主题


$KAFKA_HOME/bin/kafka-topics.sh —zookeeper zjj101:2181 —list



3.查看某个主题详细信息


kafka-topics.sh —zookeeper zjj101:2181 —describe —topic hello

  1. [root@zjj101 kafka-logs]# kafka-topics.sh --zookeeper zjj101:2181 --describe --topic hello
  2. Topic:hello PartitionCount:2 ReplicationFactor:2 Configs:
  3. Topic: hello Partition: 0 Leader: 101 Replicas: 101,102 Isr: 101,102
  4. Topic: hello Partition: 1 Leader: 102 Replicas: 102,103 Isr: 102,103


Topic: hello Partition: 0 Leader: 101 Replicas: 101,102 Isr: 101,102

意思是0号副本的 Leader是101分区 副本是101和102



4.修改主题


只能修改分区数(只允许增加)和副本的放置策略

bin/kafka-topics.sh —zookeeper zjj101:2181 —alter —topic hello2 —partitions 3


5.删除主题


这个删除只是给Zookeeper里面的元数据删除,但是实际数据并没有删除,它是标记删除,会给实际数据改个名字,然后会有个删除线程定时扫描删除,默认是一天删除.

$KAFKA_HOME/bin/kafka-topics.sh —zookeeper zjj101:2181 —delete —topic test01

(三)生产者消费者

1.启动生产者


kafka提供了用于测试的producer,位置在kafka_2.11-0.11.0.2/bin目录下面,有个kafka-console-producer.sh

命令:

$KAFKA_HOME/bin/kafka-console-producer.sh —broker-list zjj101:9092 —topic hello


2.启动消费者

kafka_2.11-0.11.0.2/bin目录下面,有个kafka-console-consumer.sh 将生产者的数据打印在控制台上面

命令 :

$KAFKA_HOME/bin/kafka-console-consumer.sh —bootstrap-server zjj101:9092 —topic hello


输入完了之后就卡着了.. 等待生产者生产数据.

https://blog.csdn.net/qq_41489540/article/details/112771361