转载地址

https://www.cnblogs.com/hyunbar/p/13064254.html

zookeeper存储了:
brokerId的相关信息.
消费者相关信息
分区信息
副本信息

唯独没有存储生产者信息.

zookeeper 是应用广泛的分布式服务协调组件,它对于大数据领域的其他组件,如HFDS、Yarn、Hbase、Kafka等,都扮演着基础角色

在kafka.utils.ZKUtils对象的开头,预先定义了很多ZK路径,如:

  1. object ZkUtils extends scala.AnyRef {
  2. val AdminPath : java.lang.String = { /* compiled code */ }
  3. val BrokersPath : java.lang.String = { /* compiled code */ }
  4. val ClusterPath : java.lang.String = { /* compiled code */ }
  5. val ConfigPath : java.lang.String = { /* compiled code */ }
  6. val ControllerPath : java.lang.String = { /* compiled code */ }
  7. val ControllerEpochPath : java.lang.String = { /* compiled code */ }
  8. val IsrChangeNotificationPath : java.lang.String = { /* compiled code */ }
  9. val LogDirEventNotificationPath : java.lang.String = { /* compiled code */ }
  10. val KafkaAclPath : java.lang.String = { /* compiled code */ }
  11. val KafkaAclChangesPath : java.lang.String = { /* compiled code */ }
  12. val ConsumersPath : java.lang.String = { /* compiled code */ }
  13. val ClusterIdPath : scala.Predef.String = { /* compiled code */ }
  14. val BrokerIdsPath : scala.Predef.String = { /* compiled code */ }
  15. val BrokerTopicsPath : scala.Predef.String = { /* compiled code */ }
  16. val ReassignPartitionsPath : scala.Predef.String = { /* compiled code */ }
  17. val DeleteTopicsPath : scala.Predef.String = { /* compiled code */ }
  18. val PreferredReplicaLeaderElectionPath : scala.Predef.String = { /* compiled code */ }
  19. val BrokerSequenceIdPath : scala.Predef.String = { /* compiled code */ }
  20. val ConfigChangesPath : scala.Predef.String = { /* compiled code */ }
  21. val ConfigUsersPath : scala.Predef.String = { /* compiled code */ }
  22. val ConfigBrokersPath : scala.Predef.String = { /* compiled code */ }
  23. val ProducerIdBlockPath : java.lang.String = { /* compiled code */ }
  24. val SecureZkRootPaths : scala.collection.Seq[java.lang.String] = { /* compiled code */ }
  25. val SensitiveZkRootPaths : scala.collection.Seq[scala.Predef.String] = { /* compiled code */ }
  26. ........
  27. }

可以通过ZK命令行或可视化工具来观察这些路径下面的存储情况

1、Broker注册信息

路径为/brokers/ids/[broker_id],其中存储的数据示例如下。

  1. {
  2. "listener_security_protocol_map": {
  3. "PLAINTEXT": "PLAINTEXT"
  4. },
  5. "endpoints": ["PLAINTEXT://hadoop100:9092"],
  6. "jmx_port": 9393,
  7. "host": "hadoop100",
  8. "timestamp": "1554349917296",
  9. "port": 9092,
  10. "version": 4
  11. }
  • jmx_port:JMX端口号。
  • host:所在主机名或IP地址。
  • timestamp:启动时的时间戳。
  • port:开放的TCP端口号。
  • version:版本号。以下所有version值均是代表版本号,不再赘述。

当Kafka集群中有节点上下线时,这个路径下的数据就会更新。

2、Topic信息

路径为/brokers/topics/[topic_name], 其中存储的数据示例如下。

  1. {
  2. "version": 1,
  3. "partitions": {
  4. "1": [106],
  5. "0": [105],
  6. "2": [107]
  7. }
  8. }
  • partitions:topic中各个partition的ID,以及对应的ISR中各个broker的ID的列表

当有topic被创建或删除,以及partition发生变更时,

通过对topic以及节点变更注册监听,就能实现producer的负载均衡

在/admin/delete_topics下还保存有已经标记为删除的topic名称(只有名称,没有其他数据)

/config/topics/[topic_name]下保存有各个topic的自定义配置

partition状态信息路径/brokers/topics/[topic_name]/partitions/[partition_id]/state,其中存储的数据如下:

  1. {
  2. "controller_epoch": 17,
  3. "leader": 105,
  4. "version": 1,
  5. "leader_epoch": 2,
  6. "isr": [105]
  7. }
  • controller_epoch:controller的纪元(代数),即集群重新选举controller的次数
  • leader:当前partition的leader的broker ID
  • leader_epoch:partition leader的纪元(代数),即当前partition重新选举leader的次数
  • isr:该partition对应的ISR中各个broker ID的列表

3、Controller注册信息

当前controller信息的路径就是/controller,其中存储的数据示例如下。

  1. {
  2. "version": 1,
  3. "brokerid": 104,
  4. "timestamp": "1554349916898"
  5. }
  • brokerid:现在集群中Controller的节点ID
  • timestamp:最近一次Controller变化的时间戳

如果Controller信息节点被删除,就会触发集群重新选举Controller。zk对选主操作有天然的支持

在在/controller_epoch路径下还保存有controller的纪元值,与partition状态信息中的值相同。没重选举一次,该值就会加1

4、consumer订阅信息

consumer本身的信息路径为/consumers/[group_id]/ids/[consumer_id],其中存储的数据示例如下。

  1. {
  2. "version": 1,
  3. "subscription": {
  4. "bl_mall_orders": 1
  5. },
  6. "pattern": "white_list",
  7. "timestamp": "1558617131642"
  8. }
  • subscription:订阅topic名称,及该topic对应消息流个数的映射
  • parttern:订阅方式,可取值静态(static)、白名单(white_list)、黑名单(black_list)
  • timestamp:consumer创建时的时间戳

通过zk维护的consumer及consumer group信息,可以实现消费者负载均衡

/consumers/[group_id]/offsets/[topic_name]/[partition_id]下存储有consumer group对应各个topic及paritition的消费偏移量

/consumers/[group_id]/owners/[topic_name]/[partition_id]下存储有consumer group对应各个topic及partition的消费者线程。

5、最优replica选举信息

当由于节点宕机等原因使得partition leader变得不再均匀分布时,可以使用kafka提供的kafka-preferred-replica-election工具重新将partition创建时的最优replica(前提是在ISR内)选举为leader

也可以开启leader自动平衡的功能(auto.leader.rebalance.enable

当正在选举最优replica时,zk中会创建/admin/preferred_replica_election节点,其中存储着需要调整最优replica的partition信息,示例数据如下。

  1. {
  2. "version": 1,
  3. "partitions": [
  4. {
  5. "topic": "bl_mall_orders",
  6. "partition": 1
  7. },
  8. {
  9. "topic": "bl_mall_products",
  10. "partition": 0
  11. }
  12. ]
  13. }

6、paritition重分配信息

与上面的kafka-preferred-replica-election工具类似,Kafka还提供了kafka-reassign-partitions工具,但它的功能更为强大。

它重新分配partition的所有leader和follower的位置,甚至更改replica数量。

当集群扩容或follower分布也不均匀时,就可以利用它。

该工具会生成JSON格式的重分配计划,并存入zk中/admin/reassign_partitions节点,示例数据如下。

  1. {
  2. "version": 1,
  3. "partitions": [
  4. {
  5. "topic": "bl_mall_wish",
  6. "partition": 1,
  7. "replicas": [0, 1, 3]
  8. }
  9. ]
  10. }

7、ISR变更通知信息

各个paritition的ISR集合并不是一成不变的。

当ISR发生变化(如replica超时),Controller会将发生变得哪个partition存入/isr_change_notification/[isr_change_x]