主题和分区是 Kafka 的两个核心概念,生产者和消费者的设计理念所针对的都是主题和分区层面的操作。主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。分区的划分不仅为 Kafka 提供了可伸缩性、水平扩展的功能,还通过多副本机制来为 Kafka 提供数据冗余以提高数据可靠性。

从 Kafka 的底层实现来说,主题和分区都是逻辑上的概念,分区可以有一至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等。

主题管理

主题的管理包括创建主题、查看主题信息、修改主题和删除主题等操作,可以通过 kafka-topic.sh 脚本来执行这些操作,此外还可以通过 KafkaAdminClient 的方式实现,下面一一进行介绍。

1. 创建主题

如果 broker 端配置参数 auto.create.topics.enable 设置为 true(默认就是 true),那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为 num.partitions(默认为 1)、副本因子为 default.replication.factor(默认为 1)的主题。此外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会按照该配置参数来创建一个相应的主题。一般不建议将这个参数设置为 true,因为这个参数会增加主题的管理与维护的难度,推荐使用 kafka-topics.sh 脚本来创建主题,如下图所示:
image.png

从 Kafka 2.2 版本开始,社区推荐用 —bootstrap-server 参数替换 —zookeeper 参数,并且显式地将后者标记为已过期。因此,如果使用的是 2.2 以上的版本,那么请用 —bootstrap-server 参数。

上图创建了一个分区数为 6、副本因子为 3 的主题,创建环境是一个包含三个 broker 节点的集群。因为创建主题时指定的副本数不能小于可用的 broker 数量,所以在该环境中副本数大于 3 时则会报错:
image.png
在执行完创建主题的脚本后,Kafka 会在 log.dir 或 log.dirs 参数所配置的目录下创建相应的主题分区,默认这个目录为 /tmp/kafka-logs/,我们来看下通过上面的操作创建的主题分区信息:
image.png
可以看到该 broker 节点创建了六个文件夹 demo-topic-0 到 demo-topic-5,分别对应 demo-topic 主题的六个分区。由于这里的三个 broker 节点都部署在一台机器上,且副本数为 3,所以该机器上一共有 18 个分区(6 3)的文件夹。其余两个目录如下所示:
image.pngimage.png
实际这类文件夹对应的不是分区,分区只是一个逻辑概念而没有物理上的存在。主题、分区、副本和 Log 的关系如下图所示,主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切地说是 Log 层面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的 broker 中,这样才能提供有效的数据冗余。
image.png
创建主题时对于主题名称的命名方式也很有讲究,首先是不能与已经存在的主题同名,如果创建了同名的主题则会报错。kafka-topic.sh 脚本中提供了一个 *if-not-exists
参数,如果在创建主题时带上了这个参数,那么在发生命名冲突时将不做任何处理,即不覆盖原有主题,也不报错。其次在创建主题时还会检测是否包含 . 或 字符,因为 Kafka 内部做埋点时会根据主题名称来命名 metrics 的名称,并且会将点号(.)改成下划线()的格式,如果允许则不同主题的 metrics 名称就可能相同了。

2. 查看主题

kafka-topics.sh 脚本有 5 种指令类型:create、list、describe、alter 和 delete。其中 list 和 describe 指令可 以用来方便地查看主题信息,其中 list 指令可以查看当前所有可用的主题:

  1. bin/kafka-topics.sh --bootstrap-server localhost:9092 -list

通过 describe 指令来查看主题分区副本的分配信息,如果不使用 topic 参数指定某个主题,则会展示所有主题的详细信息:
image.png
其中,Leader 表示分区的 leader 副本所对应的 brokerId,Isr 表示分区的 ISR 集合,Replicas 表示分区所有的副本分配情况,即 AR 集合,其中的数字都表示的是 brokerId。从图中可以看到分区是均匀分配的。

3. 修改主题

当一个主题被创建之后,依然允许我们对其做一定的修改,比如修改分区个数、修改配置等,这个修改功能就是由 kafka-topics.sh 脚本中的 —alter 指令提供的。下图展示了如何增加主题的分区数,还是以前面的 demo-topic 为例,之前分区数为 6,现在增加到 7:
image.png
目前 Kafka 只支持增加分区数而不支持减少分区数,因为实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理?如果随着分区一起消失则消息的可靠性得不到保障;如果需要保留则又需要考虑如何保留。直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于 Spark、Flink 这类需要消息时间戳的组件将会受到影响;如果分散插入到现有的分区中,那么在消息量很大时,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?与此同时,顺序性问题、事务性问题、以及分区和副本的状态机切换问题都是不得不面对的。反观这个功能的收益点却是很低,如果真的需要实现此类的功能,完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去即可。

4. 删除主题

如果确定不再使用一个主题,那么最好的方式是将其删除,这样可以释放一些资源,比如磁盘、文件句柄等。kafka-topics.sh 脚本中的 delete 指令就可以用来删除主题,如下图所示:
image.png
如果要删除的主题是 Kafka 的内部主题,那么删除时就会报错。截至 Kafka 2.0.0 版本,Kafka 的内部一共包含了两个内部主题,分别为 consumer_offsets 和 transaction_state。

如果要删除的是一个不存在的主题则会报错,这里可以通过 if-exists 参数来忽略异常。这里的删除操作本质上只是在 ZooKeeper 中的 /admin/delete_topics 路径下创建一个与待删除主题同名的节点,以此标记该主题为待删除的状态。真正删除主题的动作是由 Kafka 控制器负责完成的。

异常情况
如果你碰到主题无法删除的情况,可以采用这样的方法:

  • 手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode
  • 手动删除该主题在磁盘上的分区目录
  • 在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存

在执行最后一步时,你一定要谨慎,因为它可能造成大面积的分区 leader 副本的重选举。事实上,仅仅执行前两步也是可以的,只是 Controller 缓存中没有清空待删除主题罢了,也不影响使用。

配置管理

kafka-configs.sh 脚本是专门用来对配置进行操作的,这里的操作是指在运行状态下修改原有的配置,如此可以达到动态变更的目的。该脚本包含变更配置 alter 和查看配置 describe 这两种指令类型,此外,该脚本不仅支持操作主题相关的配置,还可以支持操作 broker、用户和客户端这三个类型的操作。

1. 配置查看

kafka-configs.sh 脚本使用 entity-type 参数来指定操作配置的类型,并使用 entity-name 参数来指定操作配置的名称,比如查看主题 demo-topic 的配置可以按如下方式执行:
image.png
entity-type 只可以配置四个值,具体信息如下:

  • topics:主题类型的配置,需指定主题的名称
  • brokers:broker 类型的配置,需指定 brokerId 的值
  • clients:客户端类型的配置,需指定 clientId 的值,即 KafkaProducer 或 KafkaConsumer 的 client.id 参数配置的值
  • users:用户类型的配置,需指定用户名

如果使用 kafka-configs.sh 脚本查看配置信息时没有指定 entity-name 参数的值,则会查看 entity-type 所对应的所有配置信息。

2. 配置变更

使用 alter 指令变更配置时,需要配合 add-configdelete-config 这两个参数一起使用。其中,前者用来实现配置的增、改,即覆盖原有的配置;后者用来实现配置的删,即删除被覆盖配置恢复默认值。
image.png
注意 sensitive=false 的字眼,它表明我们要调整的参数不是敏感数据。如果我们调整的是类似于密码这样的参数时,该字段就会为 true,表示这属于敏感数据。实际上,所有与主题相关的配置参数在 broker 层面都有对应参数,比如主题端参数 cleanup.policy 对应 broker 层面的 log.cleanup.policy。如果没有修改过主题的任何配置参数,那么就会使用 broker 端的对应参数作为其默认值。

当然 broker 中并非所有的参数都支持动态变更,它仅仅是把一部分参数变成了可动态调整。在 Kafka 1.1 版本之后针对 Broker Configs 表中增加了 Dynamic Update Mode 列。该列有 3 类值,具体含义如下:

  • read-only:和原来的参数行为一样,只有重启 Broker 才能令修改生效
  • per-broker:动态参数,修改它之后只会在对应的 Broker 上生效
  • cluster-wide:动态参数,修改它之后会在整个集群范围内生效。

如果要在集群层面设置全局值,即设置 cluster-wide 范围值。需要显式指定 entity-default。下面这条命令就是用来设置 broker 的全局配置的。
image.png

KafkaAdminClient

上面介绍的 Kafka 自带的各种命令行脚本虽然使用方便,却有一些弊端。因为命令行的脚本都只能运行在控制台上面。如果想要在应用程序、运维框架或是监控平台中集成它们会非常得困难。并且运行这些脚本需要使用到服务器端的代码,但实际上社区希望用户只使用 Kafka 客户端代码,这样所有运维操作都能纳入到统一的处理机制下,方便后面的功能演进。

因此,在 Kafka 0.11.0.0 版本之后,社区推出了 KafkaAdminClient 工具类,它不仅可以用来管理 broker、配置和 ACL(Access Control List),还可以用来管理主题。KafkaAdminClient 支持的常用操作如下所示:

  • 主题管理:包括主题的创建、删除和查询
  • 权限管理:包括具体权限的配置与删除
  • 配置参数管理:包括 Kafka broker、主题、客户端等各种资源的参数设置、详情查询
  • 副本日志管理:包括副本底层日志路径的变更和详情查询
  • 分区管理:即创建额外的主题分区
  • 消息删除:即删除指定位移之前的分区消息
  • 消费者组管理:包括消费者组的查询、位移查询和删除
  • Preferred 领导者选举:推选指定主题分区的 Preferred Broker 为领导者。

使用 KafkaAdminClient 创建一个主题

  1. public static void main(String[] args) {
  2. Properties properties = new Properties();
  3. properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  4. properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
  5. AdminClient adminClient = AdminClient.create(properties);
  6. NewTopic newTopic = new NewTopic("demo-topic", 3, (short) 1);
  7. CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
  8. try {
  9. result.all().get();
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. }
  13. adminClient.close();
  14. }

NewTopic 用来设定所要创建主题的具体信息,包含创建主题时需要的主题名称、分区数和副本因子等。该类中的成员变量具体如下所示:

  1. public class NewTopic {
  2. private final String name;
  3. private final int numPartitions;
  4. private final short replicationFactor;
  5. // 分配方案
  6. private final Map<Integer, List<Integer>> replicasAssignments;
  7. // 主题配置
  8. private Map<String, String> configs = null;
  9. }

目前,AdminClient 各个方法的返回类型都是名为 *Result 的对象。这类对象会将结果以 Java Future 的形式封装起来。如果要获取运行结果,需要调用相应的方法来获取对应的 Future 对象,然后再调用相应的 get 方法来取得执行结果。当然对于创建主题而言,一旦主题被成功创建,只要没有抛出异常任务就完成了,它返回的结果也不重要了。

使用 KafkaAdminClient 查看主题配置

public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
    AdminClient adminClient = AdminClient.create(properties);

    ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "demo-topic");
    DescribeConfigsResult result = adminClient.describeConfigs(Collections.singletonList(configResource));
    try {
        Config config = result.all().get().get(configResource);
        System.out.println(config);
    } catch (Exception e) {
        e.printStackTrace();
    }
    adminClient.close();
}

最终的输出结果不会只列出被覆盖的配置信息,而是会列出主题中所有的配置信息。

分区管理

1. 优先副本的选举

分区使用多副本机制来提升可靠性,但只有 leader 副本对外提供读写服务,而 follower 副本只负责在内部进行消息的同步。如果一个分区的 leader 副本不可用,那就意味着整个分区变得不可用,此时就需要 Kafka 从剩余的 follower 副本中挑选一个新的 leader 副本来继续对外提供服务。当原来的 leader 节点恢复后重新加入集群时,它只能成为一个新的 follower 节点而不再对外提供服务。

在创建主题时,该主题的分区及副本会尽可能均匀地分布到集群的各个 broker 节点上,对应的 leader 副本的分配也比较均匀。针对同一个分区而言,同一个 broker 节点中不可能出现它的多个副本,即 Kafka 集群的一个 broker 中最多只能有它的一个副本,我们可以将 leader 副本所在的 broker 节点叫做分区的 leader 节点,而 follower 副本所在的 broker 节点叫做分区的 follower 节点。

1)初始状态:
image.png
2)broker 0 宕机后的状态:
image.png
3)broker 0 重新上线后的状态:
image.png
可以看到原本分区 2 的 leader 节点为 0,现在变成了 1,如此一来原本均衡的负载变成了失衡。

1.1 什么是优先副本

为了能够有效地治理负载失衡的情况,Kafka 引入了 优先副本(preferred replica)的概念。所谓的优先副本是指在 AR 集合列表中的第一个副本,比如上图中分区 0 的 AR 集合列表(Replicas)为 [2,0,1],那么分区 0 的优先副本即为 1。

理想情况下,优先副本就是该分区的 leader 副本,Kafka 要确保所有主题的优先副本在 Kafka 集群中均匀分布,这样就保证了所有分区的 leader 均衡分布。如果 leader 分布过于集中就会造成集群负载不均衡。而优先副本的选举是指通过一定方式促使优先副本选举为 leader 副本,以此来促进集群的负载均衡。需要注意的是,分区平衡并不意味着 Kafka 集群的负载均衡,因为还要考虑集群中的分区分配是否均衡,更进一步,还要考虑每个分区的 leader 副本的负载也是各不相同的。

1.2 分区自动平衡

在 Kafka 中提供了分区自动平衡的功能,对应的 broker 端参数是 auto.leader.rebalance.enable,默认为 true,即默认情况下是开启分区自动平衡的。如果开启分区自动平衡的功能,则 Kafka 的控制器会启动一个定时任务,这个定时任务会轮询所有的 broker 节点,计算每个 broker 节点的分区不平衡率(broker 中的不平衡率 = 非优先副本的 leader 个数 / 分区总数)是否超过 leader.imbalance.per.broker.percentage 参数值,默认为 10%,若超过则自动执行优先副本的选举动作以求分区平衡。任务执行周期由 leader.imbalance.check.interval.seconds 参数控制,默认值为 5 分钟。

对应到上面那个例子,在 broker 0 重新上线后,broker 1 的不平衡率为非优先副本的 leader 个数(1 个,即分区 2)/ 分区总数(2 个,即分区 1 和分区 2)等于 50 % 大于了 10% 的默认值,因此会执行分区自动平衡的操作,等过一段时间后,我们再来查看,经过分区自动平衡后的状态如下图所示:
image.png
不过在生产环境中不建议将 auto.leader.rebalance.enable 设置为默认的 true,因为这可能引起负面的性能问题,也有可能引起客户端一定时间的阻塞。因为执行的时间无法自主掌控,如果在关键时期执行关键任务的关卡上执行优先副本的自动选举操作,势必会有业务阻塞、频繁超时之类的风险。

1.3 分区手动平衡

在 Kafka 中提供了 kafka-perferred-replica-election.sh 脚本可以对分区 leader 副本进行重新平衡。优先副本的选举过程是一个安全的过程,Kafka 客户端可以自动感知分区 leader 副本的变更。注意:在 Kafka 的新版本中已经废弃了这个脚本,建议用 kafka-leader-election.sh 来代替。
image.png
这里的分区手动平衡只是手动触发优先副本的选举,并不能将分区的 leader 副本分配在指定的某个 broker 节点上,它会自动选择优先副本做为重平衡后的 broker 节点。

2. 分区重分配

当集群节点宕机下线时,位于这个节点上的分区副本都已经处于功能失效的状态,但 Kafka 并不会将这些失效的分区副本自动地迁移到集群中剩余可用的 broker 节点上,如果放任不管,则不仅会影响整个集群的均衡负载,还会影响整体服务的可用性和可靠性。

当要对集群中的一个节点进行有计划的下线操作时,为了保证分区及副本的合理分配,我们希望通过某种方式能够将该节点上的分区副本迁移到其他的可用节点上。同理,当集群中新增 broker 节点时,只有新创建的主题分区才有可能被分配到这个节点上,而之前的主题分区并不会自动分配到新加入的节点中,因为在它们被创建时还没有这个新节点,这样新节点的负载和原先节点的负载之间严重不均衡。

为了让分区副本再次进行合理的分配,Kafka 提供了 kafka-reassign-partitions.sh 脚本来执行分区重分配的工作,它可以在集群扩容、broker 节点失效的场景下对分区进行迁移。使用过程分为三个步骤:首先需要创建一个包含主题清单的 JSON 文件,其次根据主题清单和 broker 节点清单生成一份重分配方案,最后根据这份方案执行具体的重分配动作。

下图展示了一个包含三个 broker 节点的集群中分区的初始分配情况:
image.png
1)创建主题 JSON 文件
如果我们想下线 broker 1 节点,在此之前,我们要做的就是将其上的分区副本迁移出去。使用 kafka-reassign-partitions.sh 脚本的第一步就是要创建一个 JSON 文件,假设文件名为 reassign.json,文件内容为要进行分区重分配的主题清单,示例如下:

{
  "topics": [
    {
      "topic": "reassign-topic-demo"
    }
  ],
  "version": 1
}

2)生成重分配候选方案
image.png

  • generate 是 kafka-reassign-partitions.sh 脚本中指令类型的参数,用来生成一个重分配的候选方案。
  • topic-to-move-json 用来指定分区重分配对应的主题清单文件的路径。
  • broker-list 用来指定所要分配的 broker 节点列表

上图中打印出了两个 JSON 格式的内容,第一个 JSON 内容为当前的分区副本的分配情况,在执行重分配的时候最好将这个内容保存起来,以备后续的回滚操作。第二个 JSON 内容为重分配的候选方案,注意这里只是生成一份可执行的方案,并没有真正执行重分配的动作。

3)执行重分配
将第二个 JSON 内容保存在一个 JSON 文件中,假设文件名为 project.json,然后执行具体的重分配动作。
image.png
此时,我们再次查看主题 reassign-topic-demo 的具体信息,可以看到主题中的所有分区副本都只在 0 和 2 的 broker 节点上分布了。
image.png

3. 修改副本因子

创建主题之后我们不仅可以修改分区的个数,同样可以修改副本因子。修改副本因子的功能也是通过重分配所使用的 kafka-reassignpartitions.sh 脚本实现的。

我们仔细观察一下上面示例中用到的 project.json 文件:

{
  "version": 1,
  "partitions": [
    {
      "topic": "reassign-topic-demo",
      "partition": 0,
      "replicas": [
        2,
        0
      ],
      "log_dirs": [
        "any",
        "any"
      ]
    },
    {
      "topic": "reassign-topic-demo",
      "partition": 2,
      "replicas": [
        2,
        0
      ],
      "log_dirs": [
        "any",
        "any"
      ]
    },
    {
      "topic": "reassign-topic-demo",
      "partition": 1,
      "replicas": [
        0,
        2
      ],
      "log_dirs": [
        "any",
        "any"
      ]
    }
  ]
}

可以观察到 JSON 内容里的 replicas 都是 2 个副本,我们可以自行添加一个副本,比如对于分区 0 而言可以改成下面的内容:

{
  "topic": "reassign-topic-demo",
  "partition": 0,
  "replicas": [
    2,
    1,
    0
  ],
  "log_dirs": [
    "any",
    "any",
    "any"
  ]
}

我们可以将其他分区的 replicas 内容也改成 [0,1,2],这样每个分区的副本因子就都从 2 增加到了 3。注意增加副本因子时也要在 log_dirs 中添加一个 any,这个 log_dirs 代表 Kafka 中的日志目录,对应 broker 端的 log.dirlog.dirs 参数的配置值,如果不需要关注此方面的细节,那么可以简单设为 any。