Zookeeper 基础

1. Zookeeper 概念

Zookeeper 是一个分布式协调服务,可用于服务发现,分布式锁,分布式领导选举,配置管理等。Zookeeper 提供了一个类似于 Linux 文件系统的树形结构(可认为是轻量级的内存文件系统,但只适合存少量信息,完全不适合存储大量文件或者大文件),同时提供了对于每个节点的监控与通知机制。

2. Zookeeper 角色

Zookeeper 集群是一个基于主从复制的高可用集群,每个服务器承担如下三种角色中的一种

2.1. Leader

  1. 一个 Zookeeper 集群同一时间只会有一个实际工作的 Leader,它会发起并维护与各 Follwer 及 Observer 间的心跳。
  2. 所有的写操作必须要通过 Leader 完成再由 Leader 将写操作广播给其它服务器。只要有超过半数节点(不包括 observeer 节点)写入成功,该写请求就会被提交(类 2PC 协议)。

    2.2. Follower

  3. 一个 Zookeeper 集群可能同时存在多个 Follower,它会响应 Leader 的心跳

  4. Follower 可直接处理并返回客户端的读请求,同时会将写请求转发给 Leader 处理,并且负责在 Leader 处理写请求时对请求进行投票。

    2.3. Observer

    角色与 Follower 类似,但是无投票权。Zookeeper 需保证高可用和强一致性,为了支持更多的客户端,需要增加更多 Server;Server 增多,投票阶段延迟增大,影响性能;引入 Observer,Observer 不参与投票; Observers 接受客户端的连接,并将写请求转发给 leader 节点; 加入更多 Observer 节点,提高伸缩性,同时不影响吞吐率。

    3. Zookeeper 工作原理(原子广播)

  5. Zookeeper 的核心是原子广播,这个机制保证了各个 server 之间的同步。实现这个机制的协议叫做 Zab 协议。Zab 协议有两种模式,它们分别是恢复模式和广播模式。

  6. 当服务启动或者在领导者崩溃后,Zab 就进入了恢复模式,当领导者被选举出来,且大多数 server 的完成了和 leader 的状态同步以后,恢复模式就结束了。
  7. 状态同步保证了 leader 和 server 具有相同的系统状态
  8. 一旦 leader 已经和多数的 follower 进行了状态同步后,他就可以开始广播消息了,即进入广播状态。这时候当一个 server 加入 zookeeper 服务中,它会在恢复模式下启动,发现 leader,并和 leader 进行状态同步。待到同步结束,它也参与消息广播。Zookeeper 服务一直维持在 Broadcast 状态,直到 leader 崩溃了或者 leader 失去了大部分的 followers 支持。
  9. 广播模式需要保证 proposal 被按顺序处理,因此 zk 采用了递增的事务 id 号(zxid)来保证。所有的提议(proposal)都在被提出的时候加上了 zxid。
  10. 实现中 zxid 是一个 64 为的数字,它高 32 位是 epoch 用来标识 leader 关系是否改变,每次一个 leader 被选出来,它都会有一个新的 epoch。低 32 位是个递增计数。
  11. 当 leader 崩溃或者 leader 失去大多数的 follower,这时候 zk 进入恢复模式,恢复模式需要重新选举出一个新的 leader,让所有的 server 都恢复到一个正确的状态。

    3.1. ZAB 协议(了解)

    3.1.1. 事务编号 Zxid (事务请求 计数器 + epoch)

    在 ZAB ( ZooKeeper Atomic Broadcast , ZooKeeper 原子消息广播协议) 协议的事务编号 Zxid 设计中,Zxid 是一个 64 位的数字,其中低 32 位是一个简单的单调递增的计数器,针对客户端每一个事务请求,计数器加 1;而高 32 位则代表 Leader 周期 epoch 的编号,每个当选产生一个新的 Leader 服务器,就会从这个 Leader 服务器上取出其本地日志中最大事务的 ZXID,并从中读取 epoch 值,然后加 1,以此作为新的 epoch,并将低 32 位从 0 开始计数。
    Zxid(Transaction id)类似于 RDBMS 中的事务 ID,用于标识一次更新操作的 Proposal(提议)ID。为了保证顺序性,该 zkid 必须单调递增。

    3.1.2. epoch

    epoch:可以理解为当前集群所处的年代或者周期,每个 leader 就像皇帝,都有自己的年号,所以每次改朝换代,leader 变更之后,都会在前一个年代的基础上加 1。这样就算旧的 leader 崩溃恢复之后,也没有人听他的了,因为 follower 只听从当前年代的 leader 的命令

    3.1.3. Zab 协议有两种模式 - 恢复模式(选主)、广播模式(同步)

    Zab协议有两种模式,它们分别是恢复模式(选主)和广播模式(同步)。当服务启动或者在领导者崩溃后,Zab 就进入了恢复模式,当领导者被选举出来,且大多数 Server 完成了和 leader 的状态同步以后,恢复模式就结束了。状态同步保证了 leader 和 Server 具有相同的系统状态。
    ZAB 提交事务并不像 2PC 一样需要全部 follower 都 ACK,只需要得到超过半数的节点的 ACK 就可以了

    3.1.4. ZAB 协议 4 阶段

    3.1.4.1. Leader election (选举阶段 - 选出准 Leader)

    Leader election(选举阶段):节点在一开始都处于选举阶段,只要有一个节点得到超半数节点的票数,它就可以当选准 leader。只有到达 广播阶段(broadcast) 准 leader 才会成为真正的 leader。这一阶段的目的是就是为了选出一个准 leader,然后进入下一个阶段。

    3.1.4.2. Discovery (发现阶段 - 接受提议、生成 epoch 、接受 epoch)

    Discovery(发现阶段):在这个阶段,followers 跟准 leader 进行通信,同步 followers 最近接收的事务提议。这个一阶段的主要目的是发现当前大多数节点接收的最新提议,并且准 leader 生成新的 epoch,让 followers 接受,更新它们的 accepted Epoch
    一个 follower 只会连接一个 leader,如果有一个节点 f 认为另一个 follower p 是 leader,f 在尝试连接 p 时会被拒绝,f 被拒绝之后,就会进入重新选举阶段。

    3.1.4.3. Synchronization (同步阶段 - 同步 follower 副本)

    Synchronization(同步阶段):同步阶段主要是利用 leader 前一阶段获得的最新提议历史,同步集群中所有的副本。只有当 大多数节点都同步完成,准 leader 才会成为真正的 leader。follower 只会接收 zxid 比自己的 lastZxid 大的提议。

    3.1.4.4. Broadcast (广播阶段 -leader 消息广播)

    Broadcast(广播阶段):到了这个阶段,Zookeeper 集群才能正式对外提供事务服务,并且 leader 可以进行消息广播。同时如果有新的节点加入,还需要对新节点进行同步。

    3.1.5. ZAB 协议 JAVA 实现( FLE-发现阶段和同步合并为 Recovery Phase(恢复阶段))

    协议的 Java 版本实现跟上面的定义有些不同,选举阶段使用的是 Fast Leader Election(FLE),它包含了 选举的发现职责。因为 FLE 会选举拥有最新提议历史的节点作为 leader,这样就省去了发现最新提议的步骤。实际的实现将 发现阶段 和 同步合并为 Recovery Phase(恢复阶段)。所以,ZAB 的实现只有三个阶段:Fast Leader Election;Recovery Phase;Broadcast Phase。

    3.2. 投票机制(了解)

    每个 sever 首先给自己投票,然后用自己的选票和其他 sever 选票对比,权重大的胜出,使用权重较大的更新自身选票箱。具体选举过程如下:

  12. 每个 Server 启动以后都询问其它的 Server 它要投票给谁。对于其他 server 的询问,server 每次根据自己的状态都回复自己推荐的 leader 的 id 和上一次处理事务的 zxid(系统启动时每个 server 都会推荐自己)

  13. 收到所有 Server 回复以后,就计算出 zxid 最大的哪个 Server,并将这个 Server 相关信息设置成下一次要投票的 Server。
  14. 计算这过程中获得票数最多的的 sever 为获胜者,如果获胜者的票数超过半数,则改 server 被选为 leader。否则,继续这个过程,直到 leader 被选举出来
  15. leader 就会开始等待 server 连接
  16. Follower 连接 leader,将最大的 zxid 发送给 leader
  17. Leader 根据 follower 的 zxid 确定同步点,至此选举阶段完成。
  18. 选举阶段完成 Leader 同步后通知 follower 已经成为 uptodate 状态
  19. Follower 收到 uptodate 消息后,又可以重新接受 client 的请求进行服务了

举例:目前有 5 台服务器,每台服务器均没有数据,它们的编号分别是 1,2,3,4,5,按编号依次启动,它们的选择举过程如下:

  1. 服务器 1 启动,给自己投票,然后发投票信息,由于其它机器还没有启动所以它收不到反馈信息,服务器 1 的状态一直属于 Looking。
  2. 服务器 2 启动,给自己投票,同时与之前启动的服务器 1 交换结果,由于服务器 2 的编号大所以服务器 2 胜出,但此时投票数没有大于半数,所以两个服务器的状态依然是 LOOKING。
  3. 服务器 3 启动,给自己投票,同时与之前启动的服务器 1,2 交换信息,由于服务器 3 的编号最大所以服务器 3 胜出,此时投票数正好大于半数,所以服务器 3 成为领导者,服务器 1,2 成为小弟。
  4. 服务器 4 启动,给自己投票,同时与之前启动的服务器 1,2,3 交换信息,尽管服务器 4 的编号大,但之前服务器 3 已经胜出,所以服务器 4 只能成为小弟。
  5. 服务器 5 启动,后面的逻辑同服务器 4 成为小弟。

4. Znode 有四种形式的目录节点

  1. PERSISTENT:持久的节点。
  2. EPHEMERAL:暂时的节点。(与Session有关,如果Session销毁,则节点数据销毁)
  3. PERSISTENT_SEQUENTIAL:持久化顺序编号目录节点。
  4. EPHEMERAL_SEQUENTIAL:暂时化顺序编号目录节点。

    5. Zookeeper 安装

    5.1. 安装包方式

    5.1.1. 安装

  • zookeeper底层依赖于jdk,zookeeper用户登录后,根目录下先进行jdk的安装,jdk使用jdk-8u131-linux-x64.tar.gz版本,上传并解压jdk ```

    解压jdk

    tar -xzvf jdk-8u131-linux-x64.tar.gz
  1. - 配置jdk环境变量

vim打开 .bash_profile文件

vi .bash_profile

文件中加入如下内容

JAVA_HOME=/home/zookeeper/jdk1.8.0_131 export JAVA_HOME

PATH=$JAVA_HOME/bin:$PATH export PATH

使环境变量生效

. .bash_profile

检测jdk安装

java -version

  1. - zookeeper使用zookeeper-x.x.x.tar.gz,上传并解压

tar -xzvf zookeeper-3.4.10.tar.gz

  1. - zookeeper准备配置文件

进入conf目录

cd /home/zookeeper/zookeeper-x.x.x/conf

复制配置文件

cp zoo_sample.cfg zoo.cfg

zookeeper根目录下新建data目录

mkdir data

vi 修改配置文件中的dataDir

此路径用于存储zookeeper中数据的内存快照、及事物日志文件

dataDir=/home/zookeeper/zookeeper-x.x.x/data

  1. ### 5.2. docker 方式(待整理)
  2. 整理中...
  3. ### 5.3. Zookeeper 配置
  4. 修改 conf 目录的 zoo.cfg 文件,可以设置 zk 相关配置。如下是本人配置参考:

The number of milliseconds of each tick

tickTime=2000

The number of ticks that the initial

synchronization phase can take

initLimit=10

The number of ticks that can pass between

sending a request and getting an acknowledgement

syncLimit=5

the directory where the snapshot is stored.

do not use /tmp for storage, /tmp here is just

example sakes.

dataDir=E:/deployment-environment/apache-zookeeper-3.6.3-bin/data

the port at which the clients will connect

clientPort=2181

the maximum number of client connections.

increase this if you need to handle more clients

maxClientCnxns=60

#

Be sure to read the maintenance section of the

administrator guide before turning on autopurge.

#

http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance

#

The number of snapshots to retain in dataDir

autopurge.snapRetainCount=3

Purge task interval in hours

Set to “0” to disable auto purge feature

autopurge.purgeInterval=1

Zookeeper AdminServer port

admin.serverPort=9999

Metrics Providers

#

https://prometheus.io Metrics Exporter

metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider

metricsProvider.httpPort=7000

metricsProvider.exportJvmInfo=true

日志文件

dataLogDir=E:/deployment-environment/apache-zookeeper-3.6.3-bin/log

  1. > 注:Zookeeper3.5的新特性:会启动 Zookeeper AdminServer,默认使用 8080 端口。可以通过配置文件的 `admin.serverPort=8888` 修改 AdminServer 的端口
  2. ## 6. Zookeeper 服务端常用命令
  3. 修改了相应的配置之后,可以直接通过 zkServer.sh 这个脚本进行服务的相关操作

进入zookeeper的bin目录

cd /home/zookeeper/zookeeper-3.x.x/bin

启动 ZK 服务

sh bin/zkServer.sh start

查看 ZK 服务状态

sh bin/zkServer.sh status

停止 ZK 服务

sh bin/zkServer.sh stop

重启 ZK 服务

sh bin/zkServer.sh restart

  1. ## 7. Zookeeper 常用操作命令
  2. ### 7.1. 新增节点
  3. 命令格式:

其中-s 为有序节点,-e 临时节点

create [-s] [-e] path data

  1. - 创建持久化节点并写入数据。**不带任何参数,默认是新增持久节点**

create /hadoop “123456”

  1. - 创建持久化有序节点,此时创建的节点名为`指定节点名 + 自增序号`

[zk: localhost:2181(CONNECTED) 2] create -s /a “aaa” Created /a0000000000 [zk: localhost:2181(CONNECTED) 3] create -s /b “bbb” Created /b0000000001 [zk: localhost:2181(CONNECTED) 4] create -s /c “ccc” Created /c0000000002

  1. - 创建临时节点,临时节点会在会话过期后被删除

[zk: localhost:2181(CONNECTED) 5] create -e /tmp “tmp” Created /tmp

  1. - 创建临时有序节点,临时节点会在会话过期后被删除

[zk: localhost:2181(CONNECTED) 6] create -s -e /aa ‘aaa’ Created /aa0000000004 [zk: localhost:2181(CONNECTED) 7] create -s -e /bb ‘bbb’ Created /bb0000000005 [zk: localhost:2181(CONNECTED) 8] create -s -e /cc ‘ccc’ Created /cc0000000006

  1. > 注意:临时节点不能创建子节点

[zk: localhost:2181(CONNECTED) 19] create /test/test1 Ephemerals cannot have children: /test/test1

  1. ### 7.2. 更新节点
  2. 更新节点的命令是`set`,语法如下:

set [-s] [-v version] path data

  1. 可以直接进行修改

[zk: localhost:2181(CONNECTED) 3] set /hadoop “345” cZxid = 0x4 ctime = Thu Dec 12 14:55:53 CST 2019 mZxid = 0x5 mtime = Thu Dec 12 15:01:59 CST 2019 pZxid = 0x4 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 3 numChildren = 0

  1. 也可以基于版本号进行更改,此时类似于乐观锁机制,当传入的数据版本号 (dataVersion) 和当前节点的数据版本号不符合时,zookeeper 会拒绝本次修改:

[zk: localhost:2181(CONNECTED) 12] set -v 1 /hadoop “asss” version No is not valid : /hadoop

  1. ### 7.3. 删除节点
  2. 删除节点的语法:

删除单个节点,该节点下不能有子节点,否则拒绝删除

delete [-v version] path

级联删除,包含删除该节点下所有子节点

deleteall path [-b batch size]

  1. 和更新节点数据一样,也可以传入版本号,当传入的数据版本号 (dataVersion) 和当前节点的数据版本号不符合时,zookeeper 不会执行删除操作。

[zk: localhost:2181(CONNECTED) 21] delete -v 1 /hadoop version No is not valid : /hadoop

  1. 如果某个节点下有子节点,使用`delete`命令无法删除该节点,需要使用`deleteall`命令

[zk: localhost:2181(CONNECTED) 30] delete /moon Node not empty: /moon [zk: localhost:2181(CONNECTED) 31] deleteall /moon

  1. ### 7.4. 查看节点
  2. 查看节点语法:

get [-s] [-w] path

  1. - 参数 `-s` 列举出节点详情
  2. - 参数 `-w` 添加一个 watch(监视器)
  3. 查询某个节点详细信息

[zk: localhost:2181(CONNECTED) 4] get -s /hadoop 345 cZxid = 0x6a ctime = Thu Jun 17 22:37:55 CST 2021 mZxid = 0x6c mtime = Thu Jun 17 22:38:12 CST 2021 pZxid = 0x6a cversion = 0 dataVersion = 2 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 3 numChildren = 0

  1. 节点各个属性如下表。其中一个重要的概念是 Zxid(ZooKeeper Transaction Id),ZooKeeper 节点的每一次更改都具有唯一的 Zxid,如果 Zxid1 小于 Zxid2,则 Zxid1 的更改发生在 Zxid2 更改之前。
  2. |
  3. 状态属性
  4. | 说明
  5. |
  6. | --- | --- |
  7. |
  8. cZxid
  9. | 数据节点创建时的事务 ID
  10. |
  11. |
  12. ctime
  13. | 数据节点创建时的时间
  14. |
  15. |
  16. **mZxid**
  17. | 数据节点最后一次更新时的事务 ID
  18. |
  19. |
  20. **mtime**
  21. | 数据节点最后一次更新时的时间
  22. |
  23. |
  24. pZxid
  25. | 数据节点的子节点最后一次被修改时的事务 ID
  26. |
  27. |
  28. cversion
  29. | 子节点的更改次数
  30. |
  31. |
  32. **dataVersion**
  33. | 节点数据的更改次数
  34. |
  35. |
  36. aclVersion
  37. | 节点的 ACL 的更改次数
  38. |
  39. |
  40. ephemeralOwner
  41. | 如果节点是临时节点,则表示创建该节点的会话的 SessionID;如果节点是持久节点,则该属性值为 0
  42. |
  43. |
  44. dataLength
  45. | 数据内容的长度
  46. |
  47. |
  48. numChildren
  49. | 数据节点当前的子节点个数
  50. |
  51. > 注:重点关注红色加粗的属性值
  52. ### 7.5. 查看节点状态
  53. 查看节点状态语法:

stat [-w] path

  1. - 参数 `-w` 添加一个 watch(监视器)
  2. 注:`stat` 命令查看节点状态,它的返回值和 `get` 命令类似,但不会返回节点数据

[zk: localhost:2181(CONNECTED) 5] stat /hadoop cZxid = 0x6a ctime = Thu Jun 17 22:37:55 CST 2021 mZxid = 0x6c mtime = Thu Jun 17 22:38:12 CST 2021 pZxid = 0x6a cversion = 0 dataVersion = 2 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 3 numChildren = 0

  1. ### 7.6. 查看节点列表
  2. 查看节点列表语法:

ls [-s] [-w] [-R] path

  1. - 参数 `-s` 列举出节点详情
  2. - 参数 `-w` 添加一个 watch(监视器)
  3. - 参数 `-R` 列举出节点的级联节点(需要注意:参数必须大写)

[zk: localhost:2181(CONNECTED) 2] ls / [dubbo, hadoop, moon0000000004, zero0000000005, zookeeper]

  1. ### 7.7. 监听器
  2. 注册的监听器能够在节点内容发生改变的时候,向客户端发出通知。**需要注意的是 zookeeper 的触发器是一次性的 (One-time trigger),即触发一次后就会立即失效**。<br />可以注册监听器的操作分别有:查询节点(`get`)、查询节点状态(`stat`)、查询节点列表(`ls`
  3. #### 7.7.1. 查看节点时注册监听器
  4. 使用`get -w path`命令注册的监听器能够在节点内容发生改变的时候,会向客户端发出一次通知

[zk: localhost:2181(CONNECTED) 4] get -w /hadoop [zk: localhost:2181(CONNECTED) 5] set /hadoop 888 WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/hadoop

  1. #### 7.7.2. 查看节点状态时注册监听器
  2. 使用`stat -w path`命令注册的监听器能够在节点状态发生改变的时候,会向客户端发出一次通知

[zk: localhost:2181(CONNECTED) 6] stat -w /hadoop [zk: localhost:2181(CONNECTED) 7] set /hadoop 2020 WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/hadoop

  1. #### 7.7.3. 查看节点列表时注册监听器
  2. 使用`ls -w path`命令注册的监听器能够**监听该节点下所有子节点的增加和删除等操作**,会向客户端发出一次通知

[zk: localhost:2181(CONNECTED) 11] ls -R /hadoop /hadoop /hadoop/moon /hadoop/moon/zero [zk: localhost:2181(CONNECTED) 12] ls -w /hadoop [moon] [zk: localhost:2181(CONNECTED) 13] deleteall /hadoop/moon WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/hadoop

  1. ## 8. Zookeeper的ACL权限控制
  2. ### 8.1. ACL 概述
  3. zookeeper 类似文件系统,client 可以创建节点、更新节点、删除节点。ZookeeperACLaccess control list 访问控制列表)就是实现对节点的权限的控制<br />ACL 权限控制的基础语法:

scheme:id:permission

  1. 主要涵盖3个维度:
  2. - 权限模式(`scheme`):授权的策略
  3. - 授权对象(`id`):授权的对象
  4. - 权限(`permission`):授予的权限
  5. 其特性:
  6. - zooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限
  7. - 每个znode支持设置多种权限控制方案和多个权限
  8. - 子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点
  9. 示例:

将节点权限设置为IP:192.168.60.130的客户端可以对节点进行增、删、改、查、管理权限

setAcl /test2 ip:192.168.60.130:crwda

  1. ### 8.2. scheme 权限模式
  2. 定义采用何种方式授权
  3. |
  4. 方案值
  5. | 描述
  6. |
  7. | --- | --- |
  8. |
  9. world
  10. | 只有一个用户:anyone,代表登录zokeeper所有人(默认)
  11. |
  12. |
  13. ip
  14. | 对客户端使用IP地址认证
  15. |
  16. |
  17. auth
  18. | 使用已添加认证的用户认证
  19. |
  20. |
  21. digest
  22. | 使用“用户名:密码”方式认证
  23. |
  24. ### 8.3. id 授权的对象
  25. 授权对象ID是指,权限赋予的实体,即给谁授予权限。例如:IP 地址或用户。
  26. ### 8.4. permission 权限
  27. permission 用于指定授予什么类型的权限<br />createdeletereadwriteradmin也就是 增、删、改、查、管理权限,这5种权限简写为`cdrwa`。**注意:这5种权限中,delete是指对子节点的删除权限,其它4种权限指对自身节点的操作权限**
  28. |
  29. 权限
  30. | ACL简写
  31. | 描述
  32. |
  33. | --- | --- | --- |
  34. |
  35. create
  36. | c
  37. | 可以创建子节点
  38. |
  39. |
  40. delete
  41. | d
  42. | 可以删除子节点(仅下一级节点)
  43. |
  44. |
  45. read
  46. | r
  47. | 可以读取节点数据及显示子节点列表
  48. |
  49. |
  50. write
  51. | w
  52. | 可以设置节点数据
  53. |
  54. |
  55. admin
  56. | a
  57. | 可以设置节点访问控制列表权限
  58. |
  59. ### 8.5. ACL 授权命令
  60. |
  61. 命令
  62. | 描述
  63. |
  64. | --- | --- |
  65. |
  66. getAcl
  67. | 读取ACL权限
  68. |
  69. |
  70. setAcl
  71. | 设置ACL权限
  72. |
  73. |
  74. addauth
  75. | 添加认证用户
  76. |
  77. ### 8.6. ACL 权限控制示例
  78. #### 8.6.1. world 授权模式
  79. 命令

setAcl world:anyone:

  1. 示例

[zk: localhost:2181(CONNECTED) 1] create /node1 “node1” Created /node1 [zk: localhost:2181(CONNECTED) 2] getAcl /node1 ‘world,’anyone # world方式对所有用户进行授权 : cdrwa # 增、删、改、查、管理 [zk: localhost:2181(CONNECTED) 3] setAcl /node1 world:anyone:cdrwa cZxid = 0x2 ctime = Fri Dec 13 22:25:24 CST 2021 mZxid = 0x2 mtime = Fri Dec 13 22:25:24 CST 2021 pZxid = 0x2 cversion = 0 dataVersion = 0 aclVersion = 1 ephemeralOwner = 0x0 dataLength = 5 numChildren = 0

  1. #### 8.6.2. IP授权模式
  2. 命令

setAcl ip::

  1. 示例

[zk: localhost:2181(CONNECTED) 18] create /node2 “node2” Created /node2

[zk: localhost:2181(CONNECTED) 23] setAcl /node2 ip:192.168.60.129:cdrwa cZxid = 0xe ctime = Fri Dec 13 22:30:29 CST 2021 mZxid = 0x10 mtime = Fri Dec 13 22:33:36 CST 2021 pZxid = 0xe cversion = 0 dataVersion = 2 aclVersion = 1 ephemeralOwner = 0x0 dataLength = 20 numChildren = 0

[zk: localhost:2181(CONNECTED) 25] getAcl /node2 ‘ip,’192.168.60.129 : cdrwa

使用IP非 192.168.60.129 的机器

[zk: localhost:2181(CONNECTED) 0] get /node2 Authentication is not valid : /node2 # 提示没有权限

  1. > 注意:远程登录zookeeper命令是:`./zkCli.sh -server ip`
  2. #### 8.6.3. Auth授权模式
  3. 命令

addauth digest : # 添加认证用户 setAcl auth::

  1. 示例

zk: localhost:2181(CONNECTED) 2] create /node3 “node3” Created /node3

添加认证用户

[zk: localhost:2181(CONNECTED) 4] addauth digest MooN:123456 [zk: localhost:2181(CONNECTED) 1] setAcl /node3 auth:MooN:cdrwa cZxid = 0x15 ctime = Fri Dec 13 22:41:04 CST 2021 mZxid = 0x15 mtime = Fri Dec 13 22:41:04 CST 2021 pZxid = 0x15 cversion = 0 dataVersion = 0 aclVersion = 1 ephemeralOwner = 0x0 dataLength = 5 numChildren = 0

[zk: localhost:2181(CONNECTED) 0] getAcl /node3 ‘digest,’MooN:673OfZhUE8JEFMcu0l64qI8e5ek= : cdrwa

添加认证用户后可以访问

[zk: localhost:2181(CONNECTED) 3] get /node3 node3 cZxid = 0x15 ctime = Fri Dec 13 22:41:04 CST 2021 mZxid = 0x15 mtime = Fri Dec 13 22:41:04 CST 2021 pZxid = 0x15 cversion = 0 dataVersion = 0 aclVersion = 1 ephemeralOwner = 0x0 dataLength = 5 numChildren = 0

  1. #### 8.6.4. Digest 授权模式
  2. 命令

setAcl digest:::

  1. 这里密码是经过SHA1BASE64处理的密文,在SHELL中可以通过以下命令计算:

echo -n : | openssl dgst -binary -sha1 | openssl base64

  1. 示例

[zk: localhost:2181(CONNECTED) 4] create /node4 “node4” Created /node4

使用是上面算好的密文密码添加权限:

[zk: localhost:2181(CONNECTED) 5] setAcl /node4 digest:MooN:qlzQzCLKhBROghkooLvb+Mlwv4A=:cdrwa cZxid = 0x1c ctime = Fri Dec 13 22:52:21 CST 2021 mZxid = 0x1c mtime = Fri Dec 13 22:52:21 CST 2021 pZxid = 0x1c cversion = 0 dataVersion = 0 aclVersion = 1 ephemeralOwner = 0x0 dataLength = 5 numChildren = 0

[zk: localhost:2181(CONNECTED) 6] getAcl /node4 ‘digest,’MooN:qlzQzCLKhBROghkooLvb+Mlwv4A= : cdrwa

[zk: localhost:2181(CONNECTED) 3] get /node4 Authentication is not valid : /node4 # 没有权限 [zk: localhost:2181(CONNECTED) 4] addauth digest MooN:123456 # 添加认证用户 [zk: localhost:2181(CONNECTED) 5] get /node4 1 # 成功读取数据 cZxid = 0x1c ctime = Fri Dec 13 22:52:21 CST 2019 mZxid = 0x1c mtime = Fri Dec 13 22:52:21 CST 2019 pZxid = 0x1c cversion = 0 dataVersion = 0 aclVersion = 1 ephemeralOwner = 0x0 dataLength = 5 numChildren = 0

  1. #### 8.6.5. 多种模式授权
  2. 同一个节点可以同时使用多种模式授权

[zk: localhost:2181(CONNECTED) 0] create /node5 “node5” Created /node5 [zk: localhost:2181(CONNECTED) 1] addauth digest MooN:123456 # 添加认证用户 [zk: localhost:2181(CONNECTED) 2] setAcl /node5 ip:192.168.60.129:cdra,auth:MooN:cdrwa,digest:MooN:qlzQzCLKhBROghkooLvb+Mlwv4A=:cdrwa

  1. ### 8.7. ACL 超级管理员
  2. zookeeper的权限管理模式有一种叫做super,该模式提供一个超管可以方便的访问任何权限的节点。通过以下步骤
  3. 1. 假设这个超管是:super:admin,需要先为超管生成密码的密文

echo -n super:admin | openssl dgst -binary -sha1 | openssl base64

  1. 2. 打开zookeeper目录下的`/bin/zkServer.sh`服务器脚本文件,找到如下一行:

nohup $JAVA “-Dzookeeper.log.dir=${ZOO_LOG_DIR}” “-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}”

  1. 该脚本中启动zookeeper的命令,默认只有以上两个配置项,需要加一个超管的配置项

“-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs=”

  1. 3. 修改以后这条完整命令变成

nohup $JAVA “-Dzookeeper.log.dir=${ZOO_LOG_DIR}” “-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}” “-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs=”\ -cp “$CLASSPATH” $JVMFLAGS $ZOOMAIN “$ZOOCFG” > “$_ZOO_DAEMON_OUT” 2>&1 < /dev/null &

  1. 4. 启动zookeeper服务,输入如下命令添加认证用户权限

addauth digest super:admin

  1. # Zookeeper 客户端编程
  2. 以下章节只展示小部分示例代码,更多API使用示例详见:[https://github.com/MooNkirA/dubbo-note/tree/master/zookeeper-sample](https://github.com/MooNkirA/dubbo-note/tree/master/zookeeper-sample)
  3. ## 1. Java客户端 Zookeeper 的使用(官方)
  4. znodezooKeeper集合的核心组件,zookeeper API提供了一小组方法使用zookeeper集合来操纵znode的所有细节。<br />客户端应该遵循以步骤,与zookeeper服务器进行清晰和干净的交互。
  5. - 连接到zookeeper服务器。zookeeper服务器为客户端分配会话ID
  6. - 定期向服务器发送心跳。否则,zookeeper服务器将过期会话ID,客户端需要重新连接
  7. - 只要会话ID处于活动状态,就可以获取/设置znode
  8. - 所有任务完成后,断开与zookeeper服务器的连接。如果客户端长时间不活动,则zookeeper服务器将自动断开客户端
  9. ### 1.1. maven依赖
org.apache.zookeeper zookeeper 3.6.3
  1. ### 1.2. 连接 Zookeeper 服务端
  2. `ZooKeeper`类构造函数实现连接到zookeeper服务端

ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)

  1. - `connectionString`zookeeper主机
  2. - `sessionTimeout`:会话超时(以毫秒为单位)
  3. - `watcher`:实现“监视器”对象。zookeeper集合通过监视器对象返回连接状态。
  4. 示例:

public static final String CONNECTION_STR = “127.0.0.1:2181”; @Test public void testZooKeeperConnection() { try { // 创建计数器对象 CountDownLatch countDownLatch = new CountDownLatch(1); / 创建 ZooKeeper 实例即可连接到zookeeper服务端。建立连接本身是一个异步过程 / ZooKeeper zooKeeper = new ZooKeeper(CONNECTION_STR, 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(“当前状态:” + event.getState()); // 判断当前状态是否连接 if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println(“连接创建成功!”); // 连接成功后,释放所有等待的线程 countDownLatch.countDown(); } } });

  1. // 主线程阻塞等待连接对象的创建成功
  2. countDownLatch.await();
  3. // 会话编号
  4. System.out.println("会话编号: " + zooKeeper.getSessionId());
  5. // 关闭连接
  6. zooKeeper.close();
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. }

}

  1. ### 1.3. 新增节点
  2. 使用`ZooKeeper`对象的`create`方法新增节点

// 同步方式 public String create(final String path, byte data[], List acl, CreateMode createMode) // 异步方式 public void create(final String path, byte data[], List acl, CreateMode createMode, StringCallback cb, Object ctx)

  1. - `path`znode路径。例如,/node1 /node1/node11
  2. - `data[]`:要存储在指定znode路径中的数据
  3. - `acl`:要创建的节点的访问控制列表。zookeeper API提供了一个静态接口 `ZooDefs.Ids` 来获取一些基本的acl列表。例如,`ZooDefs.Ids.OPEN_ACL_UNSAFE`返回打开znodeacl列表。
  4. - `createMode`:节点的类型,这是一个**枚举**。
  5. - `cb`:异步回调接口
  6. - `ctx`:传递上下文参数
  7. 示例:

@Test public void testZooKeeperCreate() { try { // 创建计数器对象 CountDownLatch countDownLatch = new CountDownLatch(1); / 创建 ZooKeeper 实例即可连接到zookeeper服务端。建立连接本身是一个异步过程 / ZooKeeper zooKeeper = new ZooKeeper(CONNECTION_STR, 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(“当前状态:” + event.getState()); // 判断当前状态是否连接 if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println(“连接创建成功!”); // 连接成功后,释放所有等待的线程 countDownLatch.countDown(); } } });

  1. // 主线程阻塞等待连接对象的创建成功
  2. countDownLatch.await();
  3. /*
  4. * 创建节点
  5. * String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
  6. * path: 节点的路径
  7. * data[]: 节点的数据
  8. * acl: 权限列表。 示例取值:world:anyone:cdrwa
  9. * createMode: 节点类型。 示例取值:持久化节点
  10. */
  11. zooKeeper.create("/create", "MooN".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  12. // 关闭连接
  13. zooKeeper.close();
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. }

}

  1. ### 1.4. 更新节点
  2. 使用`ZooKeeper`对象的`setData`方法更新修改节点

// 同步方式 public Stat setData(final String path, byte data[], int version) // 异步方式 public void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)

  1. - `path`znode路径。
  2. - `data[]`:要存储在指定znode路径中的数据
  3. - `version`znode的当前版本。每当数据更改时,ZooKeeper会更新znode的版本号。
  4. - `cb`:异步回调接口
  5. - `ctx`:传递上下文参数
  6. 示例:

@Test public void testZooKeeperUpdate() { try { // 创建计数器对象 CountDownLatch countDownLatch = new CountDownLatch(1); / 创建 ZooKeeper 实例即可连接到zookeeper服务端。建立连接本身是一个异步过程 / ZooKeeper zooKeeper = new ZooKeeper(CONNECTION_STR, 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(“当前状态:” + event.getState()); // 判断当前状态是否连接 if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println(“连接创建成功!”); // 连接成功后,释放所有等待的线程 countDownLatch.countDown(); } } });

  1. // 主线程阻塞等待连接对象的创建成功
  2. countDownLatch.await();
  3. /*
  4. * 同步方式,更新节点
  5. * Stat setData(final String path, byte data[], int version)
  6. * 参数 path: 节点的路径
  7. * 参数 data[]: 节点修改的数据
  8. * 参数 version: 版本号 -1代表版本号不作为修改条件
  9. * 如果设置版本号不正确,会报错 “KeeperErrorCode = BadVersion for xxx”
  10. */
  11. Stat stat = zooKeeper.setData("/set/node1", "node13".getBytes(), 2);
  12. // 节点的版本号
  13. System.out.println(stat.getVersion());
  14. // 节点的创建时间
  15. System.out.println(stat.getCtime());
  16. // 关闭连接
  17. zooKeeper.close();
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }

}

  1. ### 1.5. 删除节点
  2. 使用`ZooKeeper`对象的`delete`方法删除节点

// 同步方式 public void delete(final String path, int version) // 异步方式 public void delete(final String path, int version, VoidCallback cb, Object ctx)

  1. - `path`znode路径
  2. - `version`znode的当前版本号。`-1`代表删除节点时不考虑版本信息
  3. - `cb`:异步回调接口
  4. - `ctx`:传递上下文参数
  5. 示例:

@Test public void testZooKeeperDelete() { try { // 创建计数器对象 CountDownLatch countDownLatch = new CountDownLatch(1); / 创建 ZooKeeper 实例即可连接到zookeeper服务端。建立连接本身是一个异步过程 / ZooKeeper zooKeeper = new ZooKeeper(CONNECTION_STR, 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(“当前状态:” + event.getState()); // 判断当前状态是否连接 if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println(“连接创建成功!”); // 连接成功后,释放所有等待的线程 countDownLatch.countDown(); } } });

  1. // 主线程阻塞等待连接对象的创建成功
  2. countDownLatch.await();
  3. /*
  4. * 同步方式,更新节点
  5. * Stat setData(final String path, byte data[], int version)
  6. * 参数 path: 节点的路径
  7. * 参数 data[]: 节点修改的数据
  8. * 参数 version: 版本号 -1代表版本号不作为修改条件
  9. * 如果设置版本号不正确,会报错 “KeeperErrorCode = BadVersion for xxx”
  10. */
  11. Stat stat = zooKeeper.setData("/set/node1", "node13".getBytes(), 2);
  12. // 节点的版本号
  13. System.out.println(stat.getVersion());
  14. // 节点的创建时间
  15. System.out.println(stat.getCtime());
  16. // 关闭连接
  17. zooKeeper.close();
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }

}

  1. ### 1.6. 查看节点
  2. 使用`ZooKeeper`对象的`getData`方法查询节点信息

// 同步方式 public byte[] getData(String path, boolean watch, Stat stat) // 异步方式 public void getData(String path, boolean watch, DataCallback cb, Object ctx)

// 同步方式(可指定监听器) public byte[] getData(final String path, Watcher watcher, Stat stat) // 异步方式(可指定监听器) public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)

  1. - `path`znode路径
  2. - `watch`:是否使用连接对象中注册的监视器
  3. - `stat`:返回znode的元数据
  4. - `cb`:异步回调接口
  5. - `ctx`:传递上下文参数
  6. - `watcher`:监听器实现
  7. 示例:

@Test public void testZooKeeperGetData() { try { // 创建计数器对象 CountDownLatch countDownLatch = new CountDownLatch(1); / 创建 ZooKeeper 实例即可连接到zookeeper服务端。建立连接本身是一个异步过程 / ZooKeeper zooKeeper = new ZooKeeper(CONNECTION_STR, 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(“当前状态:” + event.getState()); // 判断当前状态是否连接 if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println(“连接创建成功!”); // 连接成功后,释放所有等待的线程 countDownLatch.countDown(); } } });

  1. // 主线程阻塞等待连接对象的创建成功
  2. countDownLatch.await();
  3. /*
  4. * 同步方式,查看节点
  5. * byte[] getData(String path, boolean watch, Stat stat)
  6. * 参数 path: 节点的路径
  7. * 参数 watch: 是否使用连接对象中注册的监视器
  8. * 参数 stat: 读取节点属性的对象
  9. */
  10. Stat stat = new Stat();
  11. byte[] bys = zooKeeper.getData("/getData/node1", false, stat);
  12. // 打印数据
  13. System.out.println(new String(bys));
  14. // 版本信息
  15. System.out.println(stat.getVersion());
  16. // 关闭连接
  17. zooKeeper.close();
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }

}

  1. ### 1.7. 查看子节点
  2. 使用`ZooKeeper`对象的`getChildren`方法查询子节点信息

// 同步方式 public List getChildren(String path, boolean watch, Stat stat) // 异步方式 public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)

// 同步方式(可指定监听器) public List getChildren(final String path, Watcher watcher, Stat stat) // 异步方式(可指定监听器) public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)

  1. - `path`znode路径
  2. - `watch`:是否使用连接对象中注册的监视器
  3. - `stat`:返回znode的元数据
  4. - `cb`:异步回调接口
  5. - `ctx`:传递上下文参数
  6. - `watcher`:监听器实现
  7. 示例:

@Test public void testZooKeeperGetChildren() { try { // 创建计数器对象 CountDownLatch countDownLatch = new CountDownLatch(1); / 创建 ZooKeeper 实例即可连接到zookeeper服务端。建立连接本身是一个异步过程 / ZooKeeper zooKeeper = new ZooKeeper(CONNECTION_STR, 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(“当前状态:” + event.getState()); // 判断当前状态是否连接 if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println(“连接创建成功!”); // 连接成功后,释放所有等待的线程 countDownLatch.countDown(); } } });

  1. // 主线程阻塞等待连接对象的创建成功
  2. countDownLatch.await();
  3. /*
  4. * 同步方式,查看子节点
  5. * List<String> getChildren(String path, boolean watch)
  6. * 参数 path: 节点的路径
  7. * 参数 watch: 是否使用连接对象中注册的监视器
  8. */
  9. List<String> list = zooKeeper.getChildren("/getData", false);
  10. // 打印数据
  11. for (String str : list) {
  12. System.out.println(str);
  13. }
  14. // 关闭连接
  15. zooKeeper.close();
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. }

}

  1. ### 1.8. 检查节点是否存在
  2. 使用`ZooKeeper`对象的`exists`方法检查节点是否存在

// 同步方式 public Stat exists(String path, boolean watch) // 异步方式 public void exists(String path, boolean watch, StatCallback cb, Object ctx)

  1. - `path`znode路径
  2. - `watch`:是否使用连接对象中注册的监视器
  3. - `cb`:异步回调接口
  4. - `ctx`:传递上下文参数
  5. 示例:

@Test public void testZooKeeperExists() { try { // 创建计数器对象 CountDownLatch countDownLatch = new CountDownLatch(1); / 创建 ZooKeeper 实例即可连接到zookeeper服务端。建立连接本身是一个异步过程 / ZooKeeper zooKeeper = new ZooKeeper(CONNECTION_STR, 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(“当前状态:” + event.getState()); // 判断当前状态是否连接 if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println(“连接创建成功!”); // 连接成功后,释放所有等待的线程 countDownLatch.countDown(); } } });

  1. // 主线程阻塞等待连接对象的创建成功
  2. countDownLatch.await();
  3. /*
  4. * 同步方式,检查节点
  5. * Stat exists(String path, boolean watch)
  6. * 参数 path: 节点的路径
  7. * 参数 watch: 是否使用连接对象中注册的监视器
  8. */
  9. Stat stat = zooKeeper.exists("/exists1", false);
  10. // stat 为null,代表不存在
  11. System.out.println(stat.getVersion());
  12. // 关闭连接
  13. zooKeeper.close();
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. }

}

  1. ## 2. Java客户端 Zkclient 的使用
  2. ### 2.1. maven依赖
com.101tec zkclient 0.11
  1. ### 2.2. 初始化ZKClient对象
  2. 建立与zookeeper服务端链接,是同步方法。需要注意的是,原生的zookeeper客户端连接初始化时是一个异步操作(`Zookeeper zk = new Zookeeper()`)。<br />`ZkClient`提供了7中创建会话的方法:

public ZkClient(String serverstring)

public ZkClient(String zkServers, int connectionTimeout)

public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout)

public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer)

public ZkClient(final String zkServers, final int sessionTimeout, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout)

public ZkClient(IZkConnection connection)

public ZkClient(IZkConnection connection, int connectionTimeout)

public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer)

public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout)

  1. 示例:

@Test public void testZkclientConnection() throws Exception { // 创建 ZkClient 实例即可连接到zookeeper服务端。 // 需要注意,与原生ZooKeeper创建异步的过程不一样,ZkClient建立连接是同步的 ZkClient zkClient = new ZkClient(CONNECTION_STR, 5000);

  1. // 创建文件输出流
  2. OutputStream ops = new FileOutputStream("E:\\00-Downloads\\zkFolders.txt");
  3. // 读取zookeeper服务端的文件夹
  4. zkClient.showFolders(ops);
  5. // 关闭连接
  6. zkClient.close();

}

  1. > 值得注意的量,与原生`ZooKeeper`创建连接的异步过程不一样,`ZkClient`建立连接是同步的
  2. ### 2.3. 新增节点
  3. ZkClient提供了15个创建节点的方法,以下只列出几个常用的方法:

public void createPersistent(String path)

public void createPersistent(String path, boolean createParents)

public void createPersistent(String path, boolean createParents, List acl)

public void createPersistent(String path, Object data)

public void createPersistent(String path, Object data, List acl)

public String createPersistentSequential(String path, Object data)

public String createPersistentSequential(String path, Object data, List acl)

public void createEphemeral(final String path)

public void createEphemeral(final String path, final List acl)

public String create(final String path, Object data, final CreateMode mode)

public String create(final String path, Object data, final List acl, final CreateMode mode)

public void createEphemeral(final String path, final Object data)

public void createEphemeral(final String path, final Object data, final List acl)

public String createEphemeralSequential(final String path, final Object data)

public String createEphemeralSequential(final String path, final Object data, final List acl)

  1. 示例:

@Test public void testZkclientCreate() throws Exception { // 创建 ZkClient 实例连接到zookeeper服务端 ZkClient zkClient = new ZkClient(CONNECTION_STR, 5000); // 创建持久节点 zkClient.createPersistent(“/zkclient”, “zkclient”); // 创建临时节点 zkClient.createEphemeral(“/zkclientTemp”, “zkclientTemp”); // 手动指定权限与类型 zkClient.create(“/zkclient/create”, “MooN”, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 关闭连接 zkClient.close(); }

  1. ### 2.4. 更新节点
  2. 更新操作可以通过以下接口来实现:

public void writeData(String path, Object object)

public void writeData(final String path, Object datat, final int expectedVersion)

public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion)

  1. 示例:

@Test public void testZkclientWriteData() throws Exception { // 创建 ZkClient 实例连接到zookeeper服务端 ZkClient zkClient = new ZkClient(CONNECTION_STR, 5000); /*

  1. * void writeData(final String path, Object datat, final int expectedVersion)
  2. * 参数 path:节点的路径
  3. * 参数 datat:节点修改的数据
  4. * 参数 expectedVersion:版本号 -1代表版本号不作为修改条件
  5. * 如果修改的版本号与当前路径版本不一致,会报错:“KeeperErrorCode = BadVersion
  6. */
  7. zkClient.writeData("/set/node2", "MooN...", -1);
  8. // 关闭连接
  9. zkClient.close();

}

  1. ### 2.5. 删除节点
  2. 删除节点提供了以下方法:

public boolean delete(final String path)

public boolean delete(final String path, final int version)

public boolean deleteRecursive(String path)

  1. 示例:

@Test public void testZkclientDelete() throws Exception { // 创建 ZkClient 实例连接到zookeeper服务端 ZkClient zkClient = new ZkClient(CONNECTION_STR, 5000); /*

  1. * boolean delete(final String path, final int version)
  2. * 参数 path:节点的路径
  3. * 参数 version:版本号 -1代表版本号不作为修改条件
  4. * 如果删除的版本号与当前路径版本不一致,会报错:“KeeperErrorCode = BadVersion
  5. */
  6. zkClient.delete("/delete/node2", -1);
  7. // 关闭连接
  8. zkClient.close();

}

  1. ### 2.6. 查看节点
  2. 获取节点内容有以下接口方法:

public T readData(String path)

public T readData(String path, boolean returnNullIfPathNotExists)

public T readData(String path, Stat stat)

  1. 通过方法返回参数的定义,就可以得知,返回的结果(节点的内容)已经被反序列化成对象了。<br />对本接口实现监听的接口为`IZkDataListener`,分别提供了处理数据变化和删除操作的监听:

public void handleDataChange(String dataPath, Object data) throws Exception;

public void handleDataDeleted(String dataPath) throws Exception;

  1. 示例:

@Test public void testZkclientReadData() throws Exception { // 创建 ZkClient 实例连接到zookeeper服务端,另外 ZkSerializer 用于定义节点存储的数据序列化 ZkClient zkClient = new ZkClient(CONNECTION_STR, 500000, 5000, new ZkSerializer() { @Override public byte[] serialize(Object data) throws ZkMarshallingError { return new byte[0]; }

  1. @Override
  2. public Object deserialize(byte[] bytes) throws ZkMarshallingError {
  3. return new String(bytes);
  4. }
  5. });
  6. /*
  7. * <T extends Object> T readData(String path)
  8. * 参数 path:节点的路径
  9. * 这里默认会调用另一个重载的方法,returnNullIfPathNotExists 参数值为 false
  10. * 即如果节点不存在,则会直接抛出异常
  11. */
  12. Object data = zkClient.readData("/getData/node1");
  13. System.out.println(data.toString());
  14. // 关闭连接
  15. zkClient.close();

}

  1. > 注意读取节点数据的方法,如果创建zkclient连接时没有传入`ZkSerializer`接口的实现,定义节点存储数据的序列化,在读取数据时会抛出“java.io.StreamCorruptedException: invalid stream header”的异常
  2. ### 2.7. 查看节点列表

public List getChildren(String path)

  1. 此接口返回子节点的相对路径列表。比如节点路径为/test/a1和/test/a2,那么当path为/test时,返回的结果为[a1,a2]。<br />其中在原始API中,对节点注册Watcher,当节点被删除或其下面的子节点新增或删除时,会通知客户端。在ZkClient中,通过Listener监听来实现,后续会将到具体的使用方法。<br />可以注册的Listener,接口`IZkChildListener`下面的方法来实现:

public void handleChildChange(String parentPath, List currentChilds)

  1. 示例:

@Test public void testZkclientConnection() throws Exception { // 创建 ZkClient 实例连接到zookeeper服务端 ZkClient zkClient = new ZkClient(CONNECTION_STR, 5000); /*

  1. * List<String> getChildren(String path)
  2. * 参数 path:节点的路径
  3. */
  4. List<String> children = zkClient.getChildren("/getData/node4");
  5. for (String child : children) {
  6. System.out.println(child);
  7. }
  8. // 关闭连接
  9. zkClient.close();

}

  1. ### 2.8. 监测节点是否存在
  2. API比较简单,调用以下方法即可:

public boolean exists(final String path)

protected boolean exists(final String path, final boolean watch)

  1. 示例:

@Test public void testZkclientExists() throws Exception { // 创建 ZkClient 实例连接到zookeeper服务端 ZkClient zkClient = new ZkClient(CONNECTION_STR, 5000); /*

  1. * boolean exists(final String path)
  2. * 参数 path:节点的路径
  3. */
  4. System.out.println(zkClient.exists("/notExists"));
  5. System.out.println(zkClient.exists("/exists1"));
  6. // 关闭连接
  7. zkClient.close();

}

  1. ## 3. Java客户端 Curator 的使用
  2. curatorNetflix公司开源的一个zookeeper客户端,后捐献给apachecurator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。<br />原生zookeeperAPI的不足:
  3. - 连接对象异步创建,需要开发人员自行编码等待
  4. - 连接没有自动重连超时机制
  5. - watcher一次注册生效一次
  6. - 不支持递归创建树形节点
  7. curator特点:
  8. - 解决session会话超时重连
  9. - watcher反复注册
  10. - 简化开发api
  11. - 遵循Fluent风格的API
  12. - 提供了分布式锁服务、共享计数器、缓存机制等机制
  13. ### 3.1. maven依赖
org.apache.curator curator-framework 2.12.0 org.apache.curator curator-recipes 2.12.0 org.apache.curator curator-test 2.12.0
  1. ### 3.2. 连接 Zookeeper 服务端
  2. 使用`CuratorFrameworkFactory`工厂的建造者方式创建zookeeper服务端连接<br />示例:

@Test public void testCuratorConnection() { // 创建连接对象,可使用建造者链接编程方式 // 与原生ZooKeeper连接对象异步创建不一样,该连接对象的创建是同步 CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(CONNECTION_STR) // 服务端IP地址与端口号 .sessionTimeoutMs(5000) // 会话超时时间 // 设置重连机制。 // .retryPolicy(new RetryOneTime(3000)) // session重连策略:3秒后重连一次,只重连1次 // .retryPolicy(new RetryNTimes(3, 3000)) // session重连策略:每3秒重连一次,重连3次 // .retryPolicy(new RetryUntilElapsed(10000, 3000)) // session重连策略:每3秒重连一次,总等待时间超过10秒后停止重连 .retryPolicy(new ExponentialBackoffRetry(1000, 3)) // session重连策略:baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))) .namespace(“MooN”) // 命名空间 .build(); // 构建连接对象 // 打开连接 client.start(); System.out.println(client.getState()); // 客户端状态 System.out.println(client.isStarted()); // 客户端是否连接 // 关闭连接 client.close(); }

  1. ### 3.3. 新增节点
  2. 示例:

public class CuratorCreateDemo {

  1. private CuratorFramework client;
  2. @Before
  3. public void initConnection() throws Exception {
  4. ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
  5. client = CuratorFrameworkFactory.builder()
  6. .connectString(CONNECTION_STR) // 服务端IP地址与端口号
  7. .sessionTimeoutMs(5000) // 会话超时时间
  8. .retryPolicy(retryPolicy) // 设置重连机制
  9. .namespace("MooN") // 命名空间
  10. .build(); // 构建连接对象
  11. client.start();
  12. }
  13. @After
  14. public void closeConnection() throws Exception {
  15. client.close();
  16. }
  17. /* 新增节点 */
  18. @Test
  19. public void create1() throws Exception {
  20. String result = client.create()
  21. // 节点的类型
  22. .withMode(CreateMode.PERSISTENT)
  23. // 节点的权限列表 world:anyone:cdrwa
  24. .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
  25. /*
  26. * String forPath(final String givenPath, byte[] data)
  27. * 指定新增节点,具体实例是 CreateBuilderImpl
  28. * 参数 path:节点的路径
  29. * 参数 data:节点的数据
  30. */
  31. .forPath("/node1", "MooN".getBytes());
  32. System.out.println("result is " + result);
  33. }
  34. /* 新增节点,设置自定义权限 */
  35. @Test
  36. public void create2() throws Exception {
  37. // 权限列表
  38. List<ACL> list = new ArrayList<ACL>();
  39. // 授权模式和授权对象
  40. Id id = new Id("ip", "127.0.0.1");
  41. list.add(new ACL(ZooDefs.Perms.ALL, id));
  42. String result = client.create()
  43. .withMode(CreateMode.PERSISTENT)
  44. // 节点的自定义权限列表
  45. .withACL(list)
  46. .forPath("/node2", "Zero".getBytes());
  47. System.out.println("result is " + result);
  48. }
  49. /* 递归创建节点树 */
  50. @Test
  51. public void create3() throws Exception {
  52. String result = client.create()
  53. // 递归节点的创建,如果父节点不存在,将自动创建
  54. .creatingParentsIfNeeded()
  55. .withMode(CreateMode.PERSISTENT)
  56. .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
  57. .forPath("/node4/node41/node412/node413", "kira".getBytes());
  58. System.out.println("result is " + result);
  59. }
  60. /* 异步方式创建节点 */
  61. @Test
  62. public void create4() throws Exception {
  63. String result = client.create()
  64. .creatingParentsIfNeeded()
  65. .withMode(CreateMode.PERSISTENT)
  66. .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
  67. // 异步回调接口
  68. .inBackground(new BackgroundCallback() {
  69. public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
  70. // 节点的路径
  71. System.out.println(curatorEvent.getPath());
  72. // 事件的类型
  73. System.out.println(curatorEvent.getType());
  74. }
  75. })
  76. .forPath("/node5", "haha".getBytes());
  77. Thread.sleep(5000);
  78. System.out.println("result is " + result);
  79. }

}

  1. ### 3.4. 更新节点
  2. 示例:

public class CuratorSetDataDemo {

  1. private CuratorFramework client;
  2. @Before
  3. public void initConnection() throws Exception {
  4. ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
  5. client = CuratorFrameworkFactory.builder()
  6. .connectString(CONNECTION_STR) // 服务端IP地址与端口号
  7. .sessionTimeoutMs(5000) // 会话超时时间
  8. .retryPolicy(retryPolicy) // 设置重连机制
  9. .namespace("MooN") // 命名空间
  10. .build(); // 构建连接对象
  11. client.start();
  12. }
  13. @After
  14. public void closeConnection() throws Exception {
  15. client.close();
  16. }
  17. /* 更新节点 */
  18. @Test
  19. public void setData1() throws Exception {
  20. Stat stat = client.setData()
  21. /*
  22. * Stat forPath(String path, byte[] data)
  23. * 指定更新的节点,具体实现是 SetDataBuilderImpl
  24. * 参数 path:节点的路径
  25. * 参数 data:节点的数据
  26. */
  27. .forPath("/node1", "abc".getBytes());
  28. System.out.println("节点数据的更改次数: " + stat.getVersion());
  29. System.out.println("数据节点最后一次更新时的事务 ID : " + stat.getMzxid());
  30. }
  31. /* 指定版本号更新节点 */
  32. @Test
  33. public void setData2() throws Exception {
  34. Stat stat = client.setData()
  35. // 指定版本号,如果版本与修改的节点版本号不一致,会报错“KeeperErrorCode = BadVersion”
  36. // -1代表版本号不作为修改条件
  37. .withVersion(-1)
  38. .forPath("/node1", "L&N".getBytes());
  39. System.out.println("节点数据的更改次数: " + stat.getVersion());
  40. System.out.println("数据节点最后一次更新时的事务 ID : " + stat.getMzxid());
  41. }
  42. /* 异步方式修改节点数据 */
  43. @Test
  44. public void setData3() throws Exception {
  45. client.setData()
  46. .withVersion(-1)
  47. // 异步回调接口
  48. .inBackground(new BackgroundCallback() {
  49. public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
  50. // 节点的路径
  51. System.out.println(curatorEvent.getPath());
  52. // 事件的类型
  53. System.out.println(curatorEvent.getType());
  54. }
  55. })
  56. .forPath("/node1", "L?".getBytes());
  57. Thread.sleep(5000);
  58. System.out.println("更新节点结束");
  59. }

}

  1. ### 3.5. 删除节点
  2. 示例:

public class CuratorDeleteDemo {

  1. private CuratorFramework client;
  2. @Before
  3. public void initConnection() throws Exception {
  4. ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
  5. client = CuratorFrameworkFactory.builder()
  6. .connectString(CONNECTION_STR) // 服务端IP地址与端口号
  7. .sessionTimeoutMs(5000) // 会话超时时间
  8. .retryPolicy(retryPolicy) // 设置重连机制
  9. .namespace("MooN") // 命名空间
  10. .build(); // 构建连接对象
  11. client.start();
  12. }
  13. @After
  14. public void closeConnection() throws Exception {
  15. client.close();
  16. }
  17. /* 删除节点 */
  18. @Test
  19. public void delete1() throws Exception {
  20. client.delete()
  21. /*
  22. * Void forPath(String path)
  23. * 指定删除节点,具体实现是 DeleteBuilderImpl
  24. * 参数 path:节点的路径
  25. */
  26. .forPath("/node1");
  27. System.out.println("删除操作结束");
  28. }
  29. /* 指定版本号删除节点 */
  30. @Test
  31. public void delete2() throws Exception {
  32. client.delete()
  33. // 指定版本号,如果版本与修改的节点版本号不一致,会报错“KeeperErrorCode = BadVersion”
  34. // -1代表版本号不作为修改条件
  35. .withVersion(-1)
  36. .forPath("/node2");
  37. System.out.println("删除操作结束");
  38. }
  39. /* 删除节点,包含其子节点 */
  40. @Test
  41. public void delete3() throws Exception {
  42. client.delete()
  43. // 设置删除其子节点
  44. .deletingChildrenIfNeeded()
  45. .withVersion(-1)
  46. .forPath("/node3");
  47. System.out.println("删除操作结束");
  48. }
  49. /* 异步方式删除节点 */
  50. @Test
  51. public void delete4() throws Exception {
  52. client.delete()
  53. .deletingChildrenIfNeeded()
  54. .withVersion(-1)
  55. // 异步回调接口
  56. .inBackground(new BackgroundCallback() {
  57. public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
  58. // 节点的路径
  59. System.out.println(curatorEvent.getPath());
  60. // 事件的类型
  61. System.out.println(curatorEvent.getType());
  62. }
  63. })
  64. .forPath("/node4");
  65. Thread.sleep(5000);
  66. System.out.println("删除操作结束");
  67. }

}

  1. ### 3.6. 查看节点
  2. 示例:

public class CuratorGetDataDemo {

  1. private CuratorFramework client;
  2. @Before
  3. public void initConnection() throws Exception {
  4. ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
  5. client = CuratorFrameworkFactory.builder()
  6. .connectString(CONNECTION_STR) // 服务端IP地址与端口号
  7. .sessionTimeoutMs(5000) // 会话超时时间
  8. .retryPolicy(retryPolicy) // 设置重连机制
  9. .namespace("MooN") // 命名空间
  10. .build(); // 构建连接对象
  11. client.start();
  12. }
  13. @After
  14. public void closeConnection() throws Exception {
  15. client.close();
  16. }
  17. /* 读取节点数据 */
  18. @Test
  19. public void getData1() throws Exception {
  20. byte[] bys = client.getData()
  21. /*
  22. * byte[] forPath(String path)
  23. * 查看指定的节点,具体实现是 GetDataBuilderImpl
  24. * 参数 path:节点的路径
  25. */
  26. .forPath("/node1");
  27. System.out.println(new String(bys));
  28. }
  29. /* 读取数据时,读取节点的属性 */
  30. @Test
  31. public void getData2() throws Exception {
  32. // 创建节点属性对象
  33. Stat stat = new Stat();
  34. byte[] bys = client.getData()
  35. // 读取属性
  36. .storingStatIn(stat)
  37. .forPath("/node1");
  38. System.out.println("节点的数据" + new String(bys));
  39. System.out.println("节点数据的更改次数: " + stat.getVersion());
  40. System.out.println("数据节点最后一次更新时的事务 ID : " + stat.getMzxid());
  41. }
  42. /* 异步方式读取节点的数据 */
  43. @Test
  44. public void getData3() throws Exception {
  45. client.getData()
  46. // 异步回调接口
  47. .inBackground(new BackgroundCallback() {
  48. public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
  49. // 节点的路径
  50. System.out.println(curatorEvent.getPath());
  51. // 事件的类型
  52. System.out.println(curatorEvent.getType());
  53. // 节点的数据
  54. System.out.println(new String(curatorEvent.getData()));
  55. }
  56. })
  57. .forPath("/node1");
  58. Thread.sleep(5000);
  59. System.out.println("读取节点的数据结束");
  60. }

}

  1. ### 3.7. 查看子节点
  2. 示例:

public class CuratorGetChildrenDemo {

  1. private CuratorFramework client;
  2. @Before
  3. public void initConnection() throws Exception {
  4. ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
  5. client = CuratorFrameworkFactory.builder()
  6. .connectString(CONNECTION_STR) // 服务端IP地址与端口号
  7. .sessionTimeoutMs(5000) // 会话超时时间
  8. .retryPolicy(retryPolicy) // 设置重连机制
  9. .namespace("MooN") // 命名空间
  10. .build(); // 构建连接对象
  11. client.start();
  12. }
  13. @After
  14. public void closeConnection() throws Exception {
  15. client.close();
  16. }
  17. /* 读取子节点数据 */
  18. @Test
  19. public void getChildren1() throws Exception {
  20. List<String> list = client.getChildren()
  21. /*
  22. * List<String> forPath(String path)
  23. * 查看指定节点的子节点列表,具体实现是 GetChildrenBuilderImpl
  24. * 参数 path:节点的路径
  25. * 参数 data:节点的数据
  26. */
  27. .forPath("/node5");
  28. for (String str : list) {
  29. System.out.println(str);
  30. }
  31. }
  32. /* 异步方式读取子节点数据 */
  33. @Test
  34. public void getChildren2() throws Exception {
  35. client.getChildren()
  36. // 异步回调接口
  37. .inBackground(new BackgroundCallback() {
  38. public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
  39. // 节点路径
  40. System.out.println(curatorEvent.getPath());
  41. // 事件类型
  42. System.out.println(curatorEvent.getType());
  43. // 读取子节点数据
  44. List<String> list = curatorEvent.getChildren();
  45. for (String str : list) {
  46. System.out.println(str);
  47. }
  48. }
  49. })
  50. .forPath("/node5");
  51. Thread.sleep(5000);
  52. System.out.println("异步读取子节点列表结束");
  53. }

}

  1. ### 3.8. 检查节点是否存在
  2. 示例:

public class CuratorCheckExistsDemo {

  1. private CuratorFramework client;
  2. @Before
  3. public void initConnection() throws Exception {
  4. ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
  5. client = CuratorFrameworkFactory.builder()
  6. .connectString(CONNECTION_STR) // 服务端IP地址与端口号
  7. .sessionTimeoutMs(5000) // 会话超时时间
  8. .retryPolicy(retryPolicy) // 设置重连机制
  9. .namespace("MooN") // 命名空间
  10. .build(); // 构建连接对象
  11. client.start();
  12. }
  13. @After
  14. public void closeConnection() throws Exception {
  15. client.close();
  16. }
  17. /* 判断节点是否存在 */
  18. @Test
  19. public void checkExists1() throws Exception {
  20. Stat stat = client.checkExists()
  21. /*
  22. * Stat forPath(String path)
  23. * 指定更新的节点,具体实现是 ExistsBuilderImpl
  24. * 参数 path:节点的路径
  25. */
  26. .forPath("/node211");
  27. if (stat == null) {
  28. System.out.println("节点不存在");
  29. } else {
  30. System.out.println("节点数据的更改次数: " + stat.getVersion());
  31. System.out.println("数据节点最后一次更新时的事务 ID : " + stat.getMzxid());
  32. }
  33. }
  34. /* 异步方式判断节点是否存在 */
  35. @Test
  36. public void checkExists2() throws Exception {
  37. client.checkExists()
  38. // 异步回调接口
  39. .inBackground(new BackgroundCallback() {
  40. public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
  41. // 节点路径
  42. System.out.println(curatorEvent.getPath());
  43. // 事件类型
  44. System.out.println(curatorEvent.getType());
  45. // 结果码resultCode,存在返回值为0,不存在返回值为-101
  46. System.out.println("resultCode: " + curatorEvent.getResultCode());
  47. // 获取节点的属性
  48. Stat stat = curatorEvent.getStat();
  49. if (stat == null) {
  50. System.out.println("节点不存在");
  51. } else {
  52. System.out.println("节点数据的更改次数: " + stat.getVersion());
  53. System.out.println("数据节点最后一次更新时的事务 ID : " + stat.getMzxid());
  54. }
  55. }
  56. })
  57. .forPath("/node211");
  58. Thread.sleep(5000);
  59. System.out.println("异步方式判断节点结束");
  60. }

}

  1. ### 3.9. Watcher API
  2. Curator 客户端提供了两种 `Watcher`(Cache) 来监听结点的变化
  3. - `NodeCache`:只是监听某一个特定的节点,监听节点的新增和修改
  4. - `PathChildrenCache`:监控一个ZNode的子节点。当一个子节点增加、更新、删除时,`PathCache`会改变它的状态,会包含最新的子节点,子节点的数据和状态
  5. 值得注意,原生`ZooKeeper`客户端只能注册一次,数据变更后就不能再次监听。而`Curator``Watcher`的注册(反复注册)后可以一直监听节点的数据变更<br />示例:

public class CuratorWatcherDemo {

  1. private CuratorFramework client;
  2. @Before
  3. public void initConnection() throws Exception {
  4. ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
  5. client = CuratorFrameworkFactory.builder()
  6. .connectString(CONNECTION_STR) // 服务端IP地址与端口号
  7. .sessionTimeoutMs(5000) // 会话超时时间
  8. .retryPolicy(retryPolicy) // 设置重连机制
  9. .namespace("MooN") // 命名空间
  10. .build(); // 构建连接对象
  11. client.start();
  12. }
  13. @After
  14. public void closeConnection() throws Exception {
  15. client.close();
  16. }
  17. @Test
  18. public void testNodeCache() throws Exception {
  19. /*
  20. * 创建NodeCache对象,监视某个节点的数据变化,构造函数如下:
  21. * NodeCache(CuratorFramework client, String path)
  22. * 参数 client: 连接对象
  23. * 参数 path: 连接对象
  24. * 需要注意的是:如果CuratorFramework设置了namespace,则监听的节点是"namespace+path"
  25. */
  26. final NodeCache nodeCache = new NodeCache(client, "/node1");
  27. // 启动监视器对象
  28. nodeCache.start();
  29. // 注册监听器
  30. nodeCache.getListenable()
  31. .addListener(new NodeCacheListener() {
  32. // 节点每次变化时都回调此方法
  33. public void nodeChanged() throws Exception {
  34. // 从 NodeCache 对象可获取到节点修改后信息
  35. System.out.println(nodeCache.getCurrentData().getPath());
  36. System.out.println(new String(nodeCache.getCurrentData().getData()));
  37. }
  38. });
  39. // 对被监听的节点进行多次修改
  40. client.setData()
  41. .forPath("/node1", "我改一下中文看看".getBytes(StandardCharsets.UTF_8));
  42. client.setData()
  43. .forPath("/node1", "我改第二次".getBytes(StandardCharsets.UTF_8));
  44. Thread.sleep(10000); // 休眠主线程,等待监视器的结果输出
  45. System.out.println("示例结束");
  46. // 关闭监视器对象
  47. nodeCache.close();
  48. }
  49. @Test
  50. public void testPathChildrenCache() throws Exception {
  51. /*
  52. * 创建 PathChildrenCache 对象,监视某个节点的所有子节点的数据变化,构造函数如下:
  53. * PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
  54. * 参数 client: 连接对象
  55. * 参数 path: 监视的节点路径
  56. * 参数 cacheData: 事件中是否可以获取节点的数据
  57. */
  58. PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node5", true);
  59. // 启动监听
  60. pathChildrenCache.start();
  61. // 注册监听器
  62. pathChildrenCache.getListenable()
  63. .addListener(new PathChildrenCacheListener() {
  64. // 当某个子节点数据变化时,回调此方法(注意是所有子节点都会回调)
  65. public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
  66. // 节点的事件类型
  67. System.out.println(pathChildrenCacheEvent.getType());
  68. // 节点的路径
  69. System.out.println(pathChildrenCacheEvent.getData().getPath());
  70. // 节点的数据
  71. System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
  72. }
  73. });
  74. // 对被监听的节点进行多次修改
  75. client.setData()
  76. .forPath("/node5/node51", "MooNkirA".getBytes());
  77. Thread.sleep(10000); // 休眠主线程,等待监视器的结果输出
  78. System.out.println("示例结束");
  79. // 关闭监听
  80. pathChildrenCache.close();
  81. }

}

  1. ### 3.10. 事务
  2. 示例:

public class CuratorTransactionDemo {

  1. private CuratorFramework client;
  2. @Before
  3. public void initConnection() throws Exception {
  4. ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
  5. client = CuratorFrameworkFactory.builder()
  6. .connectString(CONNECTION_STR) // 服务端IP地址与端口号
  7. .sessionTimeoutMs(5000) // 会话超时时间
  8. .retryPolicy(retryPolicy) // 设置重连机制
  9. .namespace("MooN") // 命名空间
  10. .build(); // 构建连接对象
  11. client.start();
  12. }
  13. @After
  14. public void closeConnection() throws Exception {
  15. client.close();
  16. }
  17. /* Curator 控制事务,示例会创建两个节点,并在第二个节点中故意制造语法错误,观察是否支持事务 */
  18. @Test
  19. public void testTransaction() throws Exception {
  20. // 通过 inTransaction() 方法创建 CuratorTransaction 事务操作对象,开启事务
  21. client.inTransaction()
  22. .create()
  23. .forPath("/node6", "MooN".getBytes())
  24. .and()
  25. .create()
  26. .forPath("node7", "Zero".getBytes())
  27. .and()
  28. // 提交事务
  29. .commit();
  30. }

}

  1. ### 3.11. 分布式锁(待整理)
  2. Curator 提供了分布式锁服务的抽象封装与实现
  3. - `InterProcessMutex`:分布式可重入排它锁
  4. - `InterProcessReadWriteLock`:分布式读写锁
  5. 示例:
  1. ## 4. Zookeeper 事件监听机制(Watcher)
  2. ### 4.1. watcher概念
  3. zookeeper提供了数据的发布/订阅功能,多个订阅者可同时监听某一特定主题对象,当该主题对象的自身状态发生变化时(例如节点内容改变、节点下的子节点列表改变等),会实时、主动通知所有订阅者<br />zookeeper采用了Watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在Watcher注册后轮询阻塞,从而减轻了客户端压力。<br />watcher机制实际上与观察者模式类似,也可看作是一种观察者模式在分布式场景下的实现方式。
  4. ### 4.2. watcher架构
  5. Watcher实现由三个部分组成:
  6. - Zookeeper服务端
  7. - Zookeeper客户端
  8. - 客户端的ZKWatchManager对象
  9. 客户端首先将`Watcher`注册到服务端,同时将`Watcher`对象保存到客户端的Watch管理器中。当ZooKeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端,接着客户端的Watch管理器会触发相关`Watcher`来回调相应处理逻辑,从而完成整体的数据发布/订阅流程。<br />![](https://gitee.com/moonzero/images/raw/master/code-note/20210622185757976_8975.png)
  10. ### 4.3. watcher特性
  11. |
  12. 特性
  13. | 说明
  14. |
  15. | --- | --- |
  16. |
  17. 一次性
  18. | watcher是一次性的,一旦被触发就会移除,再次使用时需要重新注册
  19. |
  20. |
  21. 客户端顺序回调
  22. | watcher回调是顺序串行化执行的,只有回调后客户端才能看到最新的数据状态。一个watcher回调逻辑不应该太多,以免影响别的watcher执行
  23. |
  24. |
  25. 轻量级
  26. | WatchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点路径,并不会告诉数据节点变化前后的具体内容
  27. |
  28. |
  29. 时效性
  30. | watcher只有在当前session彻底失效时才会无效,若在session有效期内快速重连成功,则watcher依然存在,仍可接收到通知
  31. |
  32. ### 4.4. Watcher接口设计
  33. `Watcher`是一个接口,任何实现了`Watcher`接口的类就是一个新的`Watcher``Watcher`内部包含了两个枚举类:`KeeperState``EventType`<br />![](https://gitee.com/moonzero/images/raw/master/code-note/20210622190050999_27895.png)
  34. - `Watcher`通知状态(`KeeperState`)
  35. `KeeperState`是客户端与服务端连接状态发生变化时对应的通知类型。路径为`org.apache.zookeeper.Watcher.Event.KeeperState`,是一个枚举类,其枚举属性如下:
  36. |
  37. 枚举属性
  38. | 说明
  39. |
  40. | --- | --- |
  41. |
  42. SyncConnected
  43. | 客户端与服务器正常连接时
  44. |
  45. |
  46. Disconnected
  47. | 客户端与服务器断开连接时
  48. |
  49. |
  50. Expired
  51. | 会话session失效时
  52. |
  53. |
  54. AuthFailed
  55. | 身份认证失败时
  56. |
  57. - `Watcher`事件类型(`EventType`)
  58. `EventType`是数据节点(znode)发生变化时对应的通知类型。其路径为`org.apache.zookeeper.Watcher.Event.EventType`,是一个枚举类,`EventType`变化时`KeeperState`永远处于`SyncConnected`通知状态下;当`KeeperState`发生变化时,`EventType`永远为`None`。<br />枚举属性如下:
  59. |
  60. 枚举属性
  61. | 说明
  62. |
  63. | --- | --- |
  64. |
  65. None
  66. |
  67. |
  68. |
  69. NodeCreated
  70. | Watcher监听的数据节点被创建时
  71. |
  72. |
  73. NodeDeleted
  74. | Watcher监听的数据节点被删除时
  75. |
  76. |
  77. NodeDataChanged
  78. | Watcher监听的数据节点内容发生变更时(无论内容数据是否变化)
  79. |
  80. |
  81. NodeChildrenChanged
  82. | Watcher监听的数据节点的子节点列表发生变更时
  83. |
  84. 注:客户端接收到的相关事件通知中只包含状态及类型等信息,不包括节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需调用get等方法重新获取
  85. ### 4.5. 捕获相应的事件
  86. |
  87. 注册方式
  88. | Created
  89. | ChildrenChanged
  90. | Changed
  91. | Deleted
  92. |
  93. | --- | --- | --- | --- | --- |
  94. |
  95. `zk.exists(“/node-x”,watcher)`
  96. | 可监控
  97. |
  98. | 可监控
  99. | 可监控
  100. |
  101. |
  102. `zk.getData(“/node-x”,watcher)`
  103. |
  104. |
  105. | 可监控
  106. | 可监控
  107. |
  108. |
  109. `zk.getChildren(“/node-x”,watcher)`
  110. |
  111. | 可监控
  112. |
  113. | 可监控
  114. |
  115. ### 4.6. 注册Watcher使用示例
  116. #### 4.6.1. 客服端与服务器的连接状态监听
  117. KeeperState 通知状态:
  118. - `SyncConnected`:客户端与服务器正常连接时
  119. - `Disconnected`:客户端与服务器断开连接时
  120. - `Expired`:会话session失效时
  121. - `AuthFailed`:身份认证失败时<br />
  122. <br />
  123. 事件类型为:None
  124. 示例:

public class ZKConnectionWatcher implements Watcher {

  1. // 计数器对象
  2. private final CountDownLatch countDownLatch = new CountDownLatch(1);
  3. // 连接对象
  4. private ZooKeeper zooKeeper;
  5. @Override
  6. public void process(WatchedEvent event) {
  7. try {
  8. // 事件类型
  9. if (event.getType() == Event.EventType.None) {
  10. if (event.getState() == Event.KeeperState.SyncConnected) {
  11. System.out.println("连接创建成功!");
  12. countDownLatch.countDown();
  13. } else if (event.getState() == Event.KeeperState.Disconnected) {
  14. System.out.println("断开连接!");
  15. } else if (event.getState() == Event.KeeperState.Expired) {
  16. System.out.println("会话超时!");
  17. zooKeeper = new ZooKeeper(CONNECTION_STR, 5000, new ZKConnectionWatcher());
  18. } else if (event.getState() == Event.KeeperState.AuthFailed) {
  19. System.out.println("认证失败!");
  20. }
  21. }
  22. } catch (Exception ex) {
  23. ex.printStackTrace();
  24. }
  25. }
  26. @Test
  27. public void testWatcher() {
  28. try {
  29. zooKeeper = new ZooKeeper(CONNECTION_STR, 5000, new ZKConnectionWatcher());
  30. // 阻塞线程等待连接的创建
  31. countDownLatch.await();
  32. // 会话id
  33. System.out.println(zooKeeper.getSessionId());
  34. // 添加授权用户
  35. // zooKeeper.addAuthInfo("digest1", "MooN:123456".getBytes());
  36. byte[] bs = zooKeeper.getData("/MooN/node1", false, null);
  37. System.out.println(new String(bs));
  38. Thread.sleep(50000);
  39. zooKeeper.close();
  40. System.out.println("结束");
  41. } catch (Exception ex) {
  42. ex.printStackTrace();
  43. }
  44. }

}

  1. #### 4.6.2. 检查节点是否存在
  2. 使用`ZooKeeper`对象的`exists`方法可以指定监听器`Watcher`。可以监听以下状态
  3. - NodeCreated:节点创建
  4. - NodeDeleted:节点删除
  5. - NodeDataChanged:节点内容发生变化

// 使用连接对象的监视器 Stat exists(String path, boolean watch) // 自定义监视器 Stat exists(final String path, Watcher watcher)

  1. - `path`znode路径。
  2. - `watch`:是否使用连接对象中注册的监视器
  3. - `watcher`:监视器对象
  4. 示例:

public class ZKExistsWatcher {

  1. private ZooKeeper zooKeeper = null;
  2. @Before
  3. public void before() throws IOException, InterruptedException {
  4. CountDownLatch countDownLatch = new CountDownLatch(1);
  5. // 连接zookeeper客户端
  6. zooKeeper = new ZooKeeper(CONNECTION_STR, 6000, new Watcher() {
  7. @Override
  8. public void process(WatchedEvent event) {
  9. System.out.println("连接对象的参数!");
  10. // 连接成功
  11. if (event.getState() == Event.KeeperState.SyncConnected) {
  12. countDownLatch.countDown();
  13. }
  14. System.out.println("path=" + event.getPath());
  15. System.out.println("eventType=" + event.getType());
  16. }
  17. });
  18. countDownLatch.await();
  19. }
  20. @After
  21. public void after() throws InterruptedException {
  22. zooKeeper.close();
  23. }
  24. @Test
  25. public void watcherExists1() throws KeeperException, InterruptedException {
  26. /*
  27. * 使用连接对象的监视器
  28. * Stat exists(String path, boolean watch)
  29. * 参数 path:节点的路径
  30. * 参数 watch:使用连接对象中的watcher
  31. */
  32. zooKeeper.exists("/watcher1", true);
  33. Thread.sleep(50000);
  34. System.out.println("结束");
  35. }
  36. @Test
  37. public void watcherExists2() throws KeeperException, InterruptedException {
  38. /*
  39. * 使用自定义监视器
  40. * Stat exists(final String path, Watcher watcher)
  41. * 参数 path:节点的路径
  42. * 参数 watcher:自定义watcher对象
  43. */
  44. zooKeeper.exists("/watcher1", new Watcher() {
  45. @Override
  46. public void process(WatchedEvent event) {
  47. System.out.println("自定义watcher");
  48. System.out.println("path=" + event.getPath());
  49. System.out.println("eventType=" + event.getType());
  50. }
  51. });
  52. Thread.sleep(50000);
  53. System.out.println("结束");
  54. }
  55. @Test
  56. public void watcherExists3() throws KeeperException, InterruptedException {
  57. // watcher一次性
  58. Watcher watcher = new Watcher() {
  59. @Override
  60. public void process(WatchedEvent event) {
  61. try {
  62. System.out.println("自定义watcher");
  63. System.out.println("path=" + event.getPath());
  64. System.out.println("eventType=" + event.getType());
  65. zooKeeper.exists("/watcher1", this);
  66. } catch (Exception ex) {
  67. ex.printStackTrace();
  68. }
  69. }
  70. };
  71. zooKeeper.exists("/watcher1", watcher);
  72. Thread.sleep(80000);
  73. System.out.println("结束");
  74. }
  75. @Test
  76. public void watcherExists4() throws KeeperException, InterruptedException {
  77. // 注册多个监听器对象
  78. zooKeeper.exists("/watcher1", new Watcher() {
  79. @Override
  80. public void process(WatchedEvent event) {
  81. System.out.println("1");
  82. System.out.println("path=" + event.getPath());
  83. System.out.println("eventType=" + event.getType());
  84. }
  85. });
  86. zooKeeper.exists("/watcher1", new Watcher() {
  87. @Override
  88. public void process(WatchedEvent event) {
  89. System.out.println("2");
  90. System.out.println("path=" + event.getPath());
  91. System.out.println("eventType=" + event.getType());
  92. }
  93. });
  94. Thread.sleep(80000);
  95. System.out.println("结束");
  96. }

}

  1. #### 4.6.3. 查看节点
  2. 使用`ZooKeeper`对象的`getData`方法可以指定监听器`Watcher`。可以监听以下状态
  3. - NodeDeleted:节点删除
  4. - NodeDataChanged:节点内容发生变化

// 使用连接对象的监视器 byte[] getData(String path, boolean watch, Stat stat) // 自定义监视器 byte[] getData(final String path, Watcher watcher, Stat stat)

  1. - `path`znode路径。
  2. - `watch`:是否使用连接对象中注册的监视器
  3. - `watcher`:监视器对象
  4. - `Stat`:返回znode的元数据
  5. 示例:

public class ZKGetDataWatcher {

  1. private ZooKeeper zooKeeper = null;
  2. @Before
  3. public void before() throws IOException, InterruptedException {
  4. CountDownLatch countDownLatch = new CountDownLatch(1);
  5. // 连接zookeeper客户端
  6. zooKeeper = new ZooKeeper(CONNECTION_STR, 6000, new Watcher() {
  7. @Override
  8. public void process(WatchedEvent event) {
  9. System.out.println("连接对象的参数!");
  10. // 连接成功
  11. if (event.getState() == Event.KeeperState.SyncConnected) {
  12. countDownLatch.countDown();
  13. }
  14. System.out.println("path=" + event.getPath());
  15. System.out.println("eventType=" + event.getType());
  16. }
  17. });
  18. countDownLatch.await();
  19. }
  20. @After
  21. public void after() throws InterruptedException {
  22. zooKeeper.close();
  23. }
  24. @Test
  25. public void watcherGetData1() throws KeeperException, InterruptedException {
  26. /*
  27. * 使用连接对象的监视器
  28. * byte[] getData(String path, boolean watch, Stat stat)
  29. * 参数 path:节点的路径
  30. * 参数 watch:使用连接对象中的watcher
  31. */
  32. zooKeeper.getData("/watcher2", true, null);
  33. Thread.sleep(50000);
  34. System.out.println("结束");
  35. }
  36. @Test
  37. public void watcherGetData2() throws KeeperException, InterruptedException {
  38. /*
  39. * 自定义监视器
  40. * byte[] getData(final String path, Watcher watcher, Stat stat)
  41. * 参数 path:节点的路径
  42. * 参数 watcher:自定义watcher对象
  43. */
  44. zooKeeper.getData("/watcher2", new Watcher() {
  45. @Override
  46. public void process(WatchedEvent event) {
  47. System.out.println("自定义watcher");
  48. System.out.println("path=" + event.getPath());
  49. System.out.println("eventType=" + event.getType());
  50. }
  51. }, null);
  52. Thread.sleep(50000);
  53. System.out.println("结束");
  54. }
  55. @Test
  56. public void watcherGetData3() throws KeeperException, InterruptedException {
  57. // 一次性
  58. Watcher watcher = new Watcher() {
  59. @Override
  60. public void process(WatchedEvent event) {
  61. try {
  62. System.out.println("自定义watcher");
  63. System.out.println("path=" + event.getPath());
  64. System.out.println("eventType=" + event.getType());
  65. if (event.getType() == Event.EventType.NodeDataChanged) {
  66. zooKeeper.getData("/watcher2", this, null);
  67. }
  68. } catch (Exception ex) {
  69. ex.printStackTrace();
  70. }
  71. }
  72. };
  73. zooKeeper.getData("/watcher2", watcher, null);
  74. Thread.sleep(50000);
  75. System.out.println("结束");
  76. }
  77. @Test
  78. public void watcherGetData4() throws KeeperException, InterruptedException {
  79. // 注册多个监听器对象
  80. zooKeeper.getData("/watcher2", new Watcher() {
  81. @Override
  82. public void process(WatchedEvent event) {
  83. try {
  84. System.out.println("1");
  85. System.out.println("path=" + event.getPath());
  86. System.out.println("eventType=" + event.getType());
  87. if (event.getType() == Event.EventType.NodeDataChanged) {
  88. zooKeeper.getData("/watcher2", this, null);
  89. }
  90. } catch (Exception ex) {
  91. ex.printStackTrace();
  92. }
  93. }
  94. }, null);
  95. zooKeeper.getData("/watcher2", new Watcher() {
  96. @Override
  97. public void process(WatchedEvent event) {
  98. try {
  99. System.out.println("2");
  100. System.out.println("path=" + event.getPath());
  101. System.out.println("eventType=" + event.getType());
  102. if (event.getType() == Event.EventType.NodeDataChanged) {
  103. zooKeeper.getData("/watcher2", this, null);
  104. }
  105. } catch (Exception ex) {
  106. ex.printStackTrace();
  107. }
  108. }
  109. }, null);
  110. Thread.sleep(50000);
  111. System.out.println("结束");
  112. }

}

  1. #### 4.6.4. 查看子节点
  2. 使用`ZooKeeper`对象的`getChildren`方法可以指定监听器`Watcher`。可以监听以下状态
  3. - NodeDeleted:节点删除
  4. - NodeChildrenChanged:子节点发生变化

// 使用连接对象的监视器 List getChildren(String path, boolean watch) // 自定义监视器 List getChildren(final String path, Watcher watcher)

  1. - `path`znode路径。
  2. - `watch`:是否使用连接对象中注册的监视器
  3. - `watcher`:监视器对象
  4. - `Stat`:返回znode的元数据
  5. 示例:

public class ZKGetChildWatcher {

  1. private ZooKeeper zooKeeper = null;
  2. @Before
  3. public void before() throws IOException, InterruptedException {
  4. CountDownLatch connectedSemaphore = new CountDownLatch(1);
  5. // 连接zookeeper客户端
  6. zooKeeper = new ZooKeeper(CONNECTION_STR, 6000, new Watcher() {
  7. @Override
  8. public void process(WatchedEvent event) {
  9. System.out.println("连接对象的参数!");
  10. // 连接成功
  11. if (event.getState() == Event.KeeperState.SyncConnected) {
  12. connectedSemaphore.countDown();
  13. }
  14. System.out.println("path=" + event.getPath());
  15. System.out.println("eventType=" + event.getType());
  16. }
  17. });
  18. connectedSemaphore.await();
  19. }
  20. @After
  21. public void after() throws InterruptedException {
  22. zooKeeper.close();
  23. }
  24. @Test
  25. public void watcherGetChild1() throws KeeperException, InterruptedException {
  26. /*
  27. * 使用连接对象的监视器
  28. * List<String> getChildren(String path, boolean watch)
  29. * 参数 path:节点的路径
  30. * 参数 watch:使用连接对象中的watcher
  31. */
  32. zooKeeper.getChildren("/watcher3", true);
  33. Thread.sleep(50000);
  34. System.out.println("结束");
  35. }
  36. @Test
  37. public void watcherGetChild2() throws KeeperException, InterruptedException {
  38. /*
  39. * 自定义监视器
  40. * List<String> getChildren(final String path, Watcher watcher)
  41. * 参数 path:节点的路径
  42. * 参数 watcher:自定义watcher对象
  43. */
  44. zooKeeper.getChildren("/watcher3", new Watcher() {
  45. @Override
  46. public void process(WatchedEvent event) {
  47. System.out.println("自定义watcher");
  48. System.out.println("path=" + event.getPath());
  49. System.out.println("eventType=" + event.getType());
  50. }
  51. });
  52. Thread.sleep(50000);
  53. System.out.println("结束");
  54. }
  55. @Test
  56. public void watcherGetChild3() throws KeeperException, InterruptedException {
  57. // 一次性
  58. Watcher watcher = new Watcher() {
  59. @Override
  60. public void process(WatchedEvent event) {
  61. try {
  62. System.out.println("自定义watcher");
  63. System.out.println("path=" + event.getPath());
  64. System.out.println("eventType=" + event.getType());
  65. if (event.getType() == Event.EventType.NodeChildrenChanged) {
  66. zooKeeper.getChildren("/watcher3", this);
  67. }
  68. } catch (Exception ex) {
  69. ex.printStackTrace();
  70. }
  71. }
  72. };
  73. zooKeeper.getChildren("/watcher3", watcher);
  74. Thread.sleep(50000);
  75. System.out.println("结束");
  76. }
  77. @Test
  78. public void watcherGetChild4() throws KeeperException, InterruptedException {
  79. // 多个监视器对象
  80. zooKeeper.getChildren("/watcher3", new Watcher() {
  81. @Override
  82. public void process(WatchedEvent event) {
  83. try {
  84. System.out.println("1");
  85. System.out.println("path=" + event.getPath());
  86. System.out.println("eventType=" + event.getType());
  87. if (event.getType() == Event.EventType.NodeChildrenChanged) {
  88. zooKeeper.getChildren("/watcher3", this);
  89. }
  90. } catch (Exception ex) {
  91. ex.printStackTrace();
  92. }
  93. }
  94. });
  95. zooKeeper.getChildren("/watcher3", new Watcher() {
  96. @Override
  97. public void process(WatchedEvent event) {
  98. try {
  99. System.out.println("2");
  100. System.out.println("path=" + event.getPath());
  101. System.out.println("eventType=" + event.getType());
  102. if (event.getType() == Event.EventType.NodeChildrenChanged) {
  103. zooKeeper.getChildren("/watcher3", this);
  104. }
  105. } catch (Exception ex) {
  106. ex.printStackTrace();
  107. }
  108. }
  109. });
  110. Thread.sleep(50000);
  111. System.out.println("结束");
  112. }

}

```