- kafka命令大全 - OrcHome
主题管理
1. 创建Kafka主题
Kafka提供了自带的Kafka-topic脚本用于帮助用户创建主题。bin/kafka-topic.sh —bootstarp-server broker_host:port —create –topic my_topic —partitions 1 —replication-factor 1
create 表明我们要创建主题,而partitions和replication factor分布设置了主题的分区数以及每个分区下的副本数。
2. 查询主题
查询所有主题的列表:
/bin/kafka-topic.sh —bootstrap-server broker_host:port —list
查询单个主题的详细数据:
/bin/kafka-topic.sh —bootstrap-server broker_host:port —describe —topic
不写topic就展示所有
- topics-with-overrides
- under-replicated-partitions:包含失效副本的分区
- unavailable-partitions:没有leader副本的分区
3. 修改主题
A :修改分区:
/bin/kafka-topic.sh —bootstrap-server broker_host : port —alter —topic
—partitions <新分区数>
B :修改主题级别参数:使用kafka-configs脚本修改对应的参数。
修改主题级别的max.message.bytes :
/bin/kafka-configs.sh —zookeeper zookeeper_host:port —entity-type topic —entity-name
—alter —add-config max.message.bytes=10485760
这个命令里使用的 –zookeeper,也可以使用 —bootstrap-server,只是他是用来设置动态参数的。
C :变更副本数
使用kafka-reassign-partitions 脚本,增加副本数
增加分区
手动修改分区分配
D :修改主题限速
这是指设置Leader副本和follower 副本使用的带宽。有时候,需要让某个主题的副本在执行副本同步机制时,不要消耗过多的带宽。
要做到这个需要先设置leader.replication.throttled.rate和follower.replication.throttled.rate
bin/kafka-configs.sh —zookeeper zookeeper_host:port —alter —add-config ‘leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600’ —entity-type brokers —entity-name 0
E :主题分区迁移
同样是使用kafka-ressign-partitions脚本。
F :删除主题
/bin/kafka-topic.sh –bootstrap-server broker_host:port —delete —topic
删除主题的操作是异步的,执行完这条命令不代表主题立即就被删除了,它仅仅是被标记成“已删除”状态而已。Kafka会在后台默默地开启主题删除操作。
4. 常见主题错误处理
1:主题删除失败
造成主题删除失败的原因有很多,最常见的原因有两个:副本所在Broker宕机了;待删除主题的部分分区依然在执行迁移过程。
解决:
第一步:手动删除Zookeeper节点/admin/delete_topics 下待删除主题为名的znode。
第二步:手动删除该主题的磁盘上的分区目录。
第三步:在Zookeeper中执行rmr/controller,触发Controller重选举,刷新Controller缓存。
在执行最后一步时,要慎重,因为他可能造成大面积的分区Leader重选举。事实上,仅仅执行前两步也是可以的,只是Controller缓存中没有清空删除主题,不影响使用。
2:_consumer_offset占用太多的磁盘
如果发现这个主题占用了过多的磁盘空间,就要显示的使用jstack 命令查看kafka-log-cleaner-thread前缀线程状态。
动态参数
1.1版本支持动态参数
- server.properties中的参数是静态参数
$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-default —alter —add-config unclean.leader.election.enable=true Completed updating default config for brokers in the cluster,
cluster-wide 范围的动态参数,需要显式指定 entity-default
删除 cluster-wide 范围参数
$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-default —alter —delete-config unclean.leader.election.enable Completed updating default config for brokers in the cluster,
删除 per-broker 范围参数
$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-name 1 —alter —delete-config unclean.leader.election.enable Completed updating config for broker: 1.