概述
Zookeeper提供了简单易用的API,我们利用这些API实现添加、删除、修改、查看ZooKeeper的节点,以及实现对这些节点的监听功能。
API介绍
建议大家养成查阅 官方文档 的习惯,因为官方文档是最权威的,而且英文也不难,基本上大家都能看得懂。
本文以v3.8.0版本的zookeeper演示,org.apache.zookeeper.Zookeeper是ZooKeeper客户端的主类,除非另有说明,该类的方法是线程安全的。
ZooKeeper构造方法
可以通过构造方法实例化ZooKeeper对象,同时会和到服务器建立连接,服务器就会为客户端分配一个会话ID。客户端将定期向服务器发送心跳,以保持会话的有效性。
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
- connectString: Zookeeper服务器的地址,多个地址用逗号分隔
- sessionTimeout:超时时间
- watcher:设置默认监听器
ZooKeeper常用方法
ZooKeeper的API提供了同步和异步两种方式。同步方法会阻塞,直到服务器响应。异步方法只是对请求进行排队,以便立即发送和返回。它们接受一个回调对象,该对象将在请求成功执行时执行,或在出现错误时执行,并带有指示错误的适当返回码。
| 方法 | 描述 |
|---|---|
| create(String path, byte[] data, List acl, CreateMode createMode) | 同步方式创建节点 |
| create(String path, byte[] data, List acl, CreateMode createMode) | 异步方式创建节点 |
| delete(String path, int version) | 同步方式删除节点 |
| delete(String path, int version, AsyncCallback.VoidCallback cb, Object ctx) | 异步方式删除节点 |
| exists(String path, boolean watch) | 返回指定路径的节点状态信息,如果不存在返回null |
| getChildren(String path, boolean watch) | 返回指定路径的所有子节点状态信息 |
| getData(String path, boolean watch, Stat stat) | 返回指定路径的节点数据和状态信息 |
| setData(String path, byte[] data, int version) | 给指定路径和版本的节点设置新值,如版本为-1,即给所有版本设置值 |
环境准备
引入对应版本的依赖,本文用3.8.0最新版本演示
<!--zookeeper 依赖包--><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version><scope>compile</scope></dependency><!--junit测试依赖--><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter-api</artifactId><version>5.5.2</version><scope>test</scope></dependency></dependencies>
我们采用junit的方式演示,junit的常用注解作用如下:
@BeforeClass– 表示在类中的任意public static void方法执行之前执行@AfterClass– 表示在类中的任意public static void方法执行之后执行@Before– 表示在任意使用@Test注解标注的public void方法执行之前执行@After– 表示在任意使用@Test注解标注的public void方法执行之后执行@Test– 使用该注解标注的public void方法会表示为一个测试方法测试案例
创建会话和关闭会话
可以通过Zookeeper类的构造函数创建会话,它有10个重载的构造方法。
| 参数 | 说明 |
|---|---|
| connectString | 指定ZooKeeper服务器列表,有英文逗号分隔的host:port字符串组成,如”127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”。可以指定客户端连上connectString中服务器后的根目录,如 “127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a” ,对ZooKeeper的操作都会基于/app/a这个根目录,即创建路径为”/foo/bar”的节点,实际该节点的路径为”/app/a/foo/bar” |
| sessionTimeout | 会话的超时时间,单位毫秒。在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。 |
| watcher | ZooKeeper允许客户端在构造方法中传入一个接口Watcher(org.apache.zookeeper.Watcher)的实现类对象来作为默认的Watch事件通知器。该参数也可以设置为null,表明不需要设置默认的Watch处理器。 |
- 客户端和服务端建立会话是异步的。构造方法会在处理完客户端初始化工作后立即返回,在通常情况下,此时并没有真正建立好一个可用的会话,此时在会话的生命周期中处于“CONNECTING”的状态。当该会话真正创建完毕后,ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知后,才算真正建立了会话。
- 实例化的ZooKeeper客户端对象将从connectString列举的服务器中随机选择一个服务器,并尝试连接到该服务器。如果建立连接失败,将尝试连接另一个服务器(顺序是不确定的,因为列举的服务器是随机洗牌的),直到建立连接。即客户端连接一个服务器失败,将继续尝试,直到会话显式关闭。
代码:
private static final String ZK_ADDR = "10.100.1.14:2181";private static final Integer ZK_SESSION_TIMEOUT = 30000;private ZooKeeper zooKeeper = null;@Beforepublic void init() throws IOException, InterruptedException {log.info("********************** start zk ..................");CountDownLatch countDownLatch = new CountDownLatch(1);zooKeeper = new ZooKeeper(ZK_ADDR, ZK_SESSION_TIMEOUT, event -> {log.info("触发了事件:[{}]", event);countDownLatch.countDown();});countDownLatch.await();}@Afterpublic void close() throws InterruptedException {zooKeeper.close();log.info("************************ close zk ..................");}
- init方法和close方法是用来创建和关闭zk会话,加了
@Before和@After注解,它会在每个测试用例前后执行。 由于客户端和服务端建立会话是异步的,因此做一个阻塞操作,防止还没开启就执行后面的操作,在真正打开了客户端之后,发送一个消息,并解掉阻塞。
创建节点
创建节点有同步和异步两种方式。
create( final String path, byte[] data, List<ACL> acl, CreateMode createMode)
说明:该方法是一个同步创建节点的方法
参数说明:
| 参数 | 说明 |
|---|---|
| path | znode路径。例如,/path, /app/node |
| data | 存储到znode路径的数据,byte数组,最大1M |
| acl | 要创建的节点的访问控制列表。zookeeper API提供了一个静态接口 ZooDefs.Ids 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回打开znode的acl列表。 - ZooDefs.Ids.OPEN_ACL_UNSAFE:表示开放权限,所有用户拥有所有权限 - ZooDefs.Ids.CREATOR_ALL_ACL:表示使用 auth 权限模式,并且对于满足条件的用户开放所有权限 - ZooDefs.Ids.READ_ACL_UNSAFE:表示对于所有用户,只开放Read权限 - ZooDefs.Ids.ANYONE_ID_UNSAFE:是一个常用的Id对象,表示所有用户 - ZooDefs.Ids.AUTH_IDS:是一个Auth模式的Id对象。 - 我们也可以自己定义权限模式 |
| createMode | 节点的类型,是一个枚举。 - PERSISTENT:持久节点(也有叫永久节点的),不会随着会话的结束而自动删除。 - PERSISTENT_SEQUENTIAL:带单调递增序号的持久节点,不会随着会话的结束而自动删除。 - EPHEMERAL:临时节点,会随着会话的结束而自动删除。 - EPHEMERAL_SEQUENTIAL:带单调递增序号的临时节点,会随着会话的结束而自动删除。 - CONTAINER:容器节点,用于Leader、Lock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在将来某个时候删除的候选节点。 - PERSISTENT_WITH_TTL:带TTL(time-to-live,存活时间)的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。 - PERSISTENT_SEQUENTIAL_WITH_TTL:带TTL(time-to-live,存活时间)和单调递增序号的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。 |
create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback callBack,Object ctx)
说明:
- 这是一个异步创建节点的方法
参数说明:
其他参数和上面同步创建节点一样.
| 参数 | 说明 |
|---|---|
| callBack | 异步回调接口 |
| ctx | 传递上下文参数 |
代码:
@Testpublic void testCreate() throws InterruptedException, KeeperException {// 创建一个持久节点,对所有用户开放zooKeeper.create("/node1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// 创建一个临时的有序节点,权限模式为对指定ip开放Id ip = new Id("ip", "10.100.1.100");zooKeeper.create("/user", "u00001".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip)), CreateMode.EPHEMERAL_SEQUENTIAL);}// 异步创建节点@Testpublic void testCreateAsync() throws InterruptedException {zooKeeper.create("/path2", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new AsyncCallback.StringCallback(){@Overridepublic void processResult(int rc, String path, Object ctx, String name) {log.info("rc: [{}]", rc); // 0代表成功了log.info(path); // 传进来的,添加的节点log.info(name); // 真正查到的节点的名字log.info(ctx.toString()); // 上下文参数,ctx传进来的东西log.info("create node success!");}}, "ctx" );Thread.sleep(1000);}
查看节点
// 同步方式查看节点数据,使用自定义的监听器byte[] getData(final String path, Watcher watcher, Stat stat)// 同步方式查看节点数据,使用连接时的监听器byte[] getData(String path, boolean watch, Stat stat)// 异步方式查看节点,使用自定义的监听器void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)// 异步方式查看节点,使用注册的连接器void getData(String path, boolean watch, DataCallback cb, Object ctx)
参数说明:
| 参数 | 说明 |
|---|---|
| path | znode路径 |
| watcher | 注册一个监听器 |
| watch | 是否使用连接对象中注册的监视器 |
| stat | 返回znode的元数据 |
| callBack | 异步回调接口 |
| ctx | 传递上下文参数 |
代码
@Testpublic void testGet() throws InterruptedException, KeeperException {Stat stat = new Stat();byte[] data = zooKeeper.getData("/node1", false, stat);log.info("获取到的数据是:" + new String(data));log.info("当前节点的版本:" + stat.getVersion());}@Testpublic void testGetAsync() throws InterruptedException, KeeperException {zooKeeper.getData("/node1", null, new AsyncCallback.DataCallback() {@Overridepublic void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat) {log.info("rc: " + rc);log.info(path);log.info(new String(bytes));log.info("version: " + stat.getVersion());}}, null);Thread.sleep(1000);}
更新节点
// 同步方式更新节点Stat setData(final String path, byte[] data, int version)// 异步方式更新节点void setData(final String path, byte[] data, int version, StatCallback cb, Object ctx)
参数说明:
| 参数 | 说明 |
|---|---|
| path | znode路径 |
| data | 更新的数据 |
| version | znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。 |
| callBack | 异步回调接口 |
| ctx | 传递上下文参数 |
代码:
@Testpublic void testSetData() throws InterruptedException, KeeperException {Stat stat = zooKeeper.setData("/node1", "alvin".getBytes(), -1); // 返回状态信息log.info(stat.toString()); // 将状态信息打印log.info("当前版本号" + stat.getVersion());log.info("节点创建时间" + stat.getCtime());log.info("节点修改时间" + stat.getMtime());}@Testpublic void testSetDataAsync() throws InterruptedException, KeeperException {zooKeeper.setData("/node1", "alvin2".getBytes(), 1, new AsyncCallback.StatCallback() {@Overridepublic void processResult(int rc, String path, Object ctx, Stat stat) {log.info("rc" + rc); // 0 代表修改成功log.info(path); // 输入的节点路径log.info("version " + stat.getVersion()); // 当前版本}}, null); // 返回状态信息Thread.sleep(1000);}
删除节点
// 同步方式删除节点void delete(final String path, int version)// 异步方式删除节点void delete(final String path, int version, VoidCallback cb, Object ctx)
参数说明:
| 参数 | 说明 |
|---|---|
| path | znode路径 |
| version | znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。 |
| callBack | 异步回调接口 |
| ctx | 传递上下文参数 |
代码:
@Testpublic void testDelete() throws InterruptedException, KeeperException {zooKeeper.delete("/node1", -1); // 如果节点不存在,会删除失败zooKeeper.delete("/node5/child1", -1); // 如果节点不存在,会删除失败}@Testpublic void testDeleteAsync() {zooKeeper.delete("/node1", -1, new AsyncCallback.VoidCallback() {@Overridepublic void processResult(int rc, String path, Object ctx) {log.info("rc:" + rc);log.info(path);}}, "ctx");}
查看子节点
//同步方式查看子节点,传入监听器List<String> getChildren(final String path, Watcher watcher)//同步方式查看子节点,是否使用默认的监听器List<String> getChildren(String path, boolean watch)//异步方式查看子节点,传入监听器void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)//异步方式查看子节点,是否使用默认的监听器void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
代码:
private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException {zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}@Testpublic void testGetChild() throws InterruptedException, KeeperException {syncCreateNode("/a", "hello");syncCreateNode("/a/b", "hello");syncCreateNode("/a/c", "hello");List<String> children = zooKeeper.getChildren("/a", false);log.info("********* children: [{}]", children);}
检查节点是否存在
// 同步方式检查节点是否存在, 传入监听器Stat exists(final String path, Watcher watcher)// 同步方式检查节点是否存在, 是否用默认监听器Stat exists(String path, boolean watch)// 异步方式检查节点是否存在, 传入监听器void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)// 异步方式检查节点是否存在, 是否用默认监听器void exists(String path, boolean watch, StatCallback cb, Object ctx)
代码:
@Testpublic void testExist() throws InterruptedException, KeeperException {syncCreateNode("/alvin", "hello");Stat stat = zooKeeper.exists("/alvin", false);log.info("stat: [{}]", stat);log.info("delete node /alvin ......");zooKeeper.delete("/alvin", -1);stat = zooKeeper.exists("/alvin", false);log.info("stat: [{}]", stat);}
监听器代码验证
getData、exist是、getChildren三个方法都可以监听对应节点变化。
验证watch的一次性
创建监听执行
private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException {zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);}@Testpublic void testGetWatch() throws InterruptedException, KeeperException {// 创建临时节点syncCreateNode("/watch", "aaa");byte[] data = zooKeeper.getData("/watch", true, new Stat());log.info("getData: [{}]", new String(data));Thread.sleep(100000L);}
多次执行更新节点的操作
// 修改数据@Testpublic void testUpdateData() throws InterruptedException, KeeperException {// 创建临时节点zooKeeper.setData("/watch", "bbb".getBytes(), -1);Thread.sleep(10000L);}
查看结果, 日志只打印了一次
通过自定义监听器多次监听
通过exists创建自定义监听
@Testpublic void testExists() throws KeeperException, InterruptedException {// 创建临时节点syncCreateNode("/watch", "aaa");// 重复使用,用完再注册一个新的Stat stat = zooKeeper.exists("/watch", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {switch (watchedEvent.getType()) {case NodeCreated:log.info("{}节点创建了", watchedEvent.getPath());break;case NodeDataChanged:log.info("{}节点数据被修改了", watchedEvent.getPath());break;case NodeDeleted:log.info("{}节点被删除了", watchedEvent.getPath());break;}try {// 重复监听的关键zooKeeper.exists("/watch", this);} catch (KeeperException | InterruptedException e) {e.printStackTrace();}}});if (stat != null) {log.info("version: " + stat.getVersion());}Thread.sleep(100000);}
多次执行更新节点、删除节点、创建节点的操作
- 查看结果,多次响应监听
通过addWatcher方法实现多次监听
通过addWatch添加监听器,addWatch方法支持重复监听
@Testpublic void testAddWatch() throws InterruptedException, KeeperException {// 创建临时节点syncCreateNode("/watch", "aaa");zooKeeper.addWatch("/watch", new Watcher() {@Overridepublic void process(WatchedEvent event) {switch (event.getType()) {case NodeCreated:log.info("{}节点创建了", event.getPath());break;case NodeDataChanged:log.info("{}节点数据被修改了", event.getPath());break;case NodeDeleted:log.info("{}节点被删除了", event.getPath());break;}}}, AddWatchMode.PERSISTENT);Thread.sleep(100000);}
多次执行更新节点、删除节点、创建节点的操作
- 查看结果

说明:
- zk api提供了addWatch、printwatches、removewatches方法,分别用来添加监听,答应监听器和移除监听器列表。
- addWatch的参数中可以传入监听的两种模式,PERSISTENT和PERSISTENT_RECURSIVE,PERSISTENT模式只监听指定的节点事件,而PERSISTENT_RECURSIVE模式会监听指定节点与它所有子节点的事件。
参考
https://zookeeper.apache.org/doc/r3.8.0/apidocs/zookeeper-server/index.html
https://juejin.cn/post/6844903601517625357#heading-6
https://juejin.cn/post/6844904029546348551#heading-1
https://juejin.cn/post/6964703960250712101#heading-13
