Kafka中Topic的操作
创建Topic(副本数<=节点数)
指定2个分区,2个副本,注意:副本数不能大于集群中Broker的数量
因为每个partition的副本必须保存在不同的broker,否则没有意义,如果partition的副本都保存在同一个broker,那么这个broker挂了,则partition数据依然会丢失
#如果是是3个节点的kafka集群,副本数可以设置为2,最大可以设置为3bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --replication-factor 2 --topic hello#如果是单机kafka的话,这里的副本数就只能设置为1bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic hello
查看Topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看topic的详细信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello

第一个行显示指定topic所有partitions的一个总结
PartitionCount:表示这个Topic一共有多少个partition
ReplicationFactor:表示这个topic中partition的副本因子是几
Config:这个表示创建Topic时动态指定的配置信息,在这我们没有额外指定配置信息下面每一行给出的是一个partition的信息,如果只有一个partition,则只显示一行。
Topic:显示当前的topic名称
Partition:显示当前topic的partition编号
Leader:Leader partition所在的节点编号,这个编号其实就是broker.id的值
来看这个图:
这个图里面的hello这个topic有两个partition,其中partition1的leader所在的节点是broker1,partition2的leader所在的节点是broker2 Replicas:当前partition所有副本所在的节点编号【包含Leader所在的节点】,如果设置多个副本的话,这里会显示多个,不管该节点是否是Leader以及是否存活。
Isr:当前partition处于同步状态的所有节点,这里显示的所有节点都是存活状态的,并且跟Leader同步的(包含Leader所在的节点) 所以说Replicas和Isr的区别就是:
如果某个partition的副本所在的节点宕机了,在Replicas中还是会显示那个节点,但是在Isr中就不会显示了,Isr中显示的都是处于正常状态的节点。
也可以通过Big Data Tools或Kafkalytic插件查看topic信息
修改Topic
修改Topic的partition数量,只能增加
为什么partition只能增加? 因为数据是存储在partition中的,如果可以减少partition的话,那么partition中的数据就丢了
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 5 --topic hello
删除Topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic hello
删除操作是不可逆的,删除Topic会删除它里面的所有数据
注意:Kafka从1.0.0开始默认开启了删除操作,之前的版本只会把Topic标记为删除状态,需要设置delete.topic.enable为true才可以真正删除 如果不想开启删除功能,可以设置delete.topic.enable为false,这样删除topic的时候只会把它标记为删除状态,此时这个topic依然可以正常使用。
delete.topic.enable可以配置在server.properties文件中
Kafka中的生产者和消费者
kafka默认提供了基于控制台的生产者和消费者,方便测试使用
生产者:bin/kafka-console-producer.sh
消费者:bin/kafka-console-consumer.sh
先创建一个topic【5个分区,1个副本】:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 1 --topic hello
创建生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello
broker-list:kafka的服务地址[多个用逗号隔开]
创建消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello
bootstrap-server:kafka的服务地址
发现消费不到刚才生产的数据,为什么呢?
因为kafka的消费者默认是消费最新生产的数据,如果想消费之前生产的数据需要添加一个参数–from-beginning,表示从头消费的意思
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning

