Kafka中Topic的操作

创建Topic(副本数<=节点数)

指定2个分区,2个副本,注意:副本数不能大于集群中Broker的数量

因为每个partition的副本必须保存在不同的broker,否则没有意义,如果partition的副本都保存在同一个broker,那么这个broker挂了,则partition数据依然会丢失

  1. #如果是是3个节点的kafka集群,副本数可以设置为2,最大可以设置为3
  2. bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --replication-factor 2 --topic hello
  3. #如果是单机kafka的话,这里的副本数就只能设置为1
  4. bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic hello

image.png

查看Topic列表

  1. bin/kafka-topics.sh --list --zookeeper localhost:2181

image.png

查看topic的详细信息

  1. bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello

image.png

第一个行显示指定topic所有partitions的一个总结
PartitionCount:表示这个Topic一共有多少个partition
ReplicationFactor:表示这个topic中partition的副本因子是几
Config:这个表示创建Topic时动态指定的配置信息,在这我们没有额外指定配置信息

下面每一行给出的是一个partition的信息,如果只有一个partition,则只显示一行。
Topic:显示当前的topic名称
Partition:显示当前topic的partition编号
Leader:Leader partition所在的节点编号,这个编号其实就是broker.id的值

来看这个图:
image.png

这个图里面的hello这个topic有两个partition,其中partition1的leader所在的节点是broker1,partition2的leader所在的节点是broker2 Replicas:当前partition所有副本所在的节点编号【包含Leader所在的节点】,如果设置多个副本的话,这里会显示多个,不管该节点是否是Leader以及是否存活。
Isr:当前partition处于同步状态的所有节点,这里显示的所有节点都是存活状态的,并且跟Leader同步的(包含Leader所在的节点) 所以说Replicas和Isr的区别就是:
如果某个partition的副本所在的节点宕机了,在Replicas中还是会显示那个节点,但是在Isr中就不会显示了,Isr中显示的都是处于正常状态的节点。

也可以通过Big Data ToolsKafkalytic插件查看topic信息
image.png

修改Topic

修改Topic的partition数量,只能增加

为什么partition只能增加? 因为数据是存储在partition中的,如果可以减少partition的话,那么partition中的数据就丢了

  1. bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 5 --topic hello

image.png

删除Topic

  1. bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic hello

删除操作是不可逆的,删除Topic会删除它里面的所有数据

注意:Kafka从1.0.0开始默认开启了删除操作,之前的版本只会把Topic标记为删除状态,需要设置delete.topic.enable为true才可以真正删除 如果不想开启删除功能,可以设置delete.topic.enable为false,这样删除topic的时候只会把它标记为删除状态,此时这个topic依然可以正常使用。
delete.topic.enable可以配置在server.properties文件中

Kafka中的生产者和消费者

kafka默认提供了基于控制台的生产者和消费者,方便测试使用
生产者:bin/kafka-console-producer.sh
消费者:bin/kafka-console-consumer.sh
先创建一个topic【5个分区,1个副本】:

  1. bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 1 --topic hello

创建生产者

  1. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello

broker-list:kafka的服务地址[多个用逗号隔开]
image.png

创建消费者

  1. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

bootstrap-server:kafka的服务地址
发现消费不到刚才生产的数据,为什么呢?
因为kafka的消费者默认是消费最新生产的数据,如果想消费之前生产的数据需要添加一个参数–from-beginning,表示从头消费的意思

  1. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning

image.png