• kafka命令大全 - OrcHome

    主题管理

    1. 创建Kafka主题

    Kafka提供了自带的Kafka-topic脚本用于帮助用户创建主题。

    bin/kafka-topic.sh —bootstarp-server broker_host:port —create –topic my_topic —partitions 1 —replication-factor 1

create 表明我们要创建主题,而partitions和replication factor分布设置了主题的分区数以及每个分区下的副本数。

2. 查询主题

查询所有主题的列表

/bin/kafka-topic.sh —bootstrap-server broker_host:port —list

查询单个主题的详细数据

/bin/kafka-topic.sh —bootstrap-server broker_host:port —describe —topic 不写topic就展示所有

  • topics-with-overrides
  • under-replicated-partitions:包含失效副本的分区
  • unavailable-partitions:没有leader副本的分区

  • 3. 修改主题

    A :修改分区:

    /bin/kafka-topic.sh —bootstrap-server broker_host : port —alter —topic —partitions <新分区数>

分区数一定要比原有分区数大。

B :修改主题级别参数:使用kafka-configs脚本修改对应的参数。

修改主题级别的max.message.bytes :

/bin/kafka-configs.sh —zookeeper zookeeper_host:port —entity-type topic —entity-name —alter —add-config max.message.bytes=10485760

这个命令里使用的 –zookeeper,也可以使用 —bootstrap-server,只是他是用来设置动态参数的。

C :变更副本数

使用kafka-reassign-partitions 脚本,增加副本数

增加分区

image.png
image.png

手动修改分区分配

image.png

D :修改主题限速

这是指设置Leader副本和follower 副本使用的带宽。有时候,需要让某个主题的副本在执行副本同步机制时,不要消耗过多的带宽。
要做到这个需要先设置leader.replication.throttled.rate和follower.replication.throttled.rate

bin/kafka-configs.sh —zookeeper zookeeper_host:port —alter —add-config ‘leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600’ —entity-type brokers —entity-name 0

E :主题分区迁移

同样是使用kafka-ressign-partitions脚本。

F :删除主题

/bin/kafka-topic.sh –bootstrap-server broker_host:port —delete —topic

删除主题的操作是异步的,执行完这条命令不代表主题立即就被删除了,它仅仅是被标记成“已删除”状态而已。Kafka会在后台默默地开启主题删除操作。

4. 常见主题错误处理

1:主题删除失败

造成主题删除失败的原因有很多,最常见的原因有两个:副本所在Broker宕机了;待删除主题的部分分区依然在执行迁移过程。
解决:
第一步:手动删除Zookeeper节点/admin/delete_topics 下待删除主题为名的znode。
第二步:手动删除该主题的磁盘上的分区目录。
第三步:在Zookeeper中执行rmr/controller,触发Controller重选举,刷新Controller缓存。
在执行最后一步时,要慎重,因为他可能造成大面积的分区Leader重选举。事实上,仅仅执行前两步也是可以的,只是Controller缓存中没有清空删除主题,不影响使用。

2:_consumer_offset占用太多的磁盘

如果发现这个主题占用了过多的磁盘空间,就要显示的使用jstack 命令查看kafka-log-cleaner-thread前缀线程状态。

动态参数

1.1版本支持动态参数

  • server.properties中的参数是静态参数

$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-default —alter —add-config unclean.leader.election.enable=true Completed updating default config for brokers in the cluster,

cluster-wide 范围的动态参数,需要显式指定 entity-default

删除 cluster-wide 范围参数

$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-default —alter —delete-config unclean.leader.election.enable Completed updating default config for brokers in the cluster,

删除 per-broker 范围参数

$ bin/kafka-configs.sh —bootstrap-server kafka-host:port —entity-type brokers —entity-name 1 —alter —delete-config unclean.leader.election.enable Completed updating config for broker: 1.