常用命令

创建

  1. bin/kafka-topics.sh --bootstrap-server <broker_host:port> \
  2. --create --topic <my_topic_name> \
  3. --partitions 1 \
  4. --replication-factor 1
  • --create 表明要创建主题
  • --partitions 和 分别设置了主题的分区数
  • --replication factor 每个分区下的副本总数
  • 2.2 版本开始,社区推荐用 --bootstrap-server 参数替换 --zookeeper 参数

查看

  • 查询列表

    1. kafka-topics.sh --bootstrap-server <broker_host:port> --list
  • 查询指定 topic

    1. kafka-topics.sh --bootstrap-server <broker_host:port> \
    2. --describe --topic <topic_name>
    • describe命令不指定具体的主题名称,那么 Kafka 默认会返回所有 “可见” 主题的详细数据给你

修改

修改主题分区

  • 其实就是增加分区,目前 Kafka 不允许减少某个主题的分区数 ```shell kafka-topics.sh —bootstrap-server \ —alter —topic —partitions < 新分区数 >
  1. - 指定的分区数一定要比原有分区数大,否则 Kafka 会抛出 InvalidPartitionsException 异常
  2. <a name="EIBfH"></a>
  3. #### 常规参数设置
  4. - 常规参数,还是用 `zookeeper`
  5. ```shell
  6. kafka-configs.sh --zookeeper <zookeeper_host:port> \
  7. --entity-type topics --entity-name <topic_name> \
  8. --alter --add-config max.message.bytes=10485760

变更副本数

  • 使用自带的 kafka-reassign-partitions 脚本,帮助我们增加主题的副本数

修改主题限速

  • 主要是指设置 Leader 副本和 Follower 副本使用的带宽
  1. 设置 Broker 端参数 leader.replication.throttled.ratefollower.replication.throttled.rate

    • 在多个副本所在 broker 进行设置
      1. kafka-configs.sh --zookeeper zookeeper_host:port \
      2. --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' \
      3. --entity-type brokers --entity-name <broker.id>
  2. 设置要限制的副本

    1. 这里用通配符 * 限制所有副本
      1. kafka-configs.sh --zookeeper <zookeeper_host:port> \
      2. --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' \
      3. --entity-type topics --entity-name <broker.id>

主题分区迁移

  • 同样是使用 kafka-reassign-partitions 脚本,对主题各个分区的副本进行“手术”般的调整,比如把某些分区批量迁移到其他 Broker 上

删除

  • 删除操作是异步的,执行完这条命令不代表主题立即就被删除了。它仅仅是被标记成“已删除”状态而已 ```shell kafka-topics.sh —bootstrap-server \ —delete —topic
  1. ---
  2. <a name="2MueB"></a>
  3. ## __consumer_offset
  4. <a name="VkELX"></a>
  5. ### 查看消费者组提交的位移数据
  6. ```shell
  7. kafka-console-consumer.sh --bootstrap-server <kafka_host:port> \
  8. --topic __consumer_offsets \
  9. --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
  10. --from-beginning

直接读取该主题消息,查看消费者组的状态信息

  1. kafka-console-consumer.sh --bootstrap-server kafka_host:port \
  2. --topic __consumer_offsets \
  3. --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" \
  4. --from-beginning

错误处理

主题删除失败

  • 当运行完上面的删除命令后,很多人发现已删除主题的分区数据硬盘上没有清除
    • 实际上,造成主题删除失败的原因有很多,最常见的原因有两个:副本所在的 Broker 宕机了;待删除主题的部分分区依然在执行迁移过程。
  1. 手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode
  2. 手动删除该主题在磁盘上的分区目录
  3. 在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存
    1. 可能造成大面积的分区 Leader 重选举
    2. 事实上,仅仅执行前两步也是可以的

**

__consumer_offsets 占用太多的磁盘

  • jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态
    • 通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题
    • 只能重启相应的 Broker 了