概述

Zookeeper提供了简单易用的API,我们利用这些API实现添加、删除、修改、查看ZooKeeper的节点,以及实现对这些节点的监听功能。

API介绍

建议大家养成查阅 官方文档 的习惯,因为官方文档是最权威的,而且英文也不难,基本上大家都能看得懂。

本文以v3.8.0版本的zookeeper演示,org.apache.zookeeper.Zookeeper是ZooKeeper客户端的主类,除非另有说明,该类的方法是线程安全的。

ZooKeeper构造方法

可以通过构造方法实例化ZooKeeper对象,同时会和到服务器建立连接,服务器就会为客户端分配一个会话ID。客户端将定期向服务器发送心跳,以保持会话的有效性。
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

  • connectString: Zookeeper服务器的地址,多个地址用逗号分隔
  • sessionTimeout:超时时间
  • watcher:设置默认监听器

    ZooKeeper常用方法

    ZooKeeper的API提供了同步和异步两种方式。同步方法会阻塞,直到服务器响应。异步方法只是对请求进行排队,以便立即发送和返回。它们接受一个回调对象,该对象将在请求成功执行时执行,或在出现错误时执行,并带有指示错误的适当返回码。
方法 描述
create(String path, byte[] data, List acl, CreateMode createMode) 同步方式创建节点
create(String path, byte[] data, List acl, CreateMode createMode) 异步方式创建节点
delete(String path, int version) 同步方式删除节点
delete(String path, int version, AsyncCallback.VoidCallback cb, Object ctx) 异步方式删除节点
exists(String path, boolean watch) 返回指定路径的节点状态信息,如果不存在返回null
getChildren(String path, boolean watch) 返回指定路径的所有子节点状态信息
getData(String path, boolean watch, Stat stat) 返回指定路径的节点数据和状态信息
setData(String path, byte[] data, int version) 给指定路径和版本的节点设置新值,如版本为-1,即给所有版本设置值

环境准备

  1. 引入对应版本的依赖,本文用3.8.0最新版本演示

    1. <!--zookeeper 依赖包-->
    2. <dependency>
    3. <groupId>org.apache.zookeeper</groupId>
    4. <artifactId>zookeeper</artifactId>
    5. <version>3.8.0</version>
    6. </dependency>
    7. <dependency>
    8. <groupId>org.projectlombok</groupId>
    9. <artifactId>lombok</artifactId>
    10. <version>1.18.24</version>
    11. <scope>compile</scope>
    12. </dependency>
    13. <!--junit测试依赖-->
    14. <dependency>
    15. <groupId>org.junit.jupiter</groupId>
    16. <artifactId>junit-jupiter-api</artifactId>
    17. <version>5.5.2</version>
    18. <scope>test</scope>
    19. </dependency>
    20. </dependencies>
  2. 我们采用junit的方式演示,junit的常用注解作用如下:

  • @BeforeClass – 表示在类中的任意public static void方法执行之前执行
  • @AfterClass – 表示在类中的任意public static void方法执行之后执行
  • @Before – 表示在任意使用@Test注解标注的public void方法执行之前执行
  • @After– 表示在任意使用@Test注解标注的public void方法执行之后执行
  • @Test – 使用该注解标注的public void方法会表示为一个测试方法

    测试案例

    创建会话和关闭会话

    可以通过Zookeeper类的构造函数创建会话,它有10个重载的构造方法。
    image.png
参数 说明
connectString 指定ZooKeeper服务器列表,有英文逗号分隔的host:port字符串组成,如”127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”。可以指定客户端连上connectString中服务器后的根目录,如 “127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a” ,对ZooKeeper的操作都会基于/app/a这个根目录,即创建路径为”/foo/bar”的节点,实际该节点的路径为”/app/a/foo/bar”
sessionTimeout 会话的超时时间,单位毫秒。在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。
watcher ZooKeeper允许客户端在构造方法中传入一个接口Watcher(org.apache.zookeeper.Watcher)的实现类对象来作为默认的Watch事件通知器。该参数也可以设置为null,表明不需要设置默认的Watch处理器。
  • 客户端和服务端建立会话是异步的。构造方法会在处理完客户端初始化工作后立即返回,在通常情况下,此时并没有真正建立好一个可用的会话,此时在会话的生命周期中处于“CONNECTING”的状态。当该会话真正创建完毕后,ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知后,才算真正建立了会话。
  • 实例化的ZooKeeper客户端对象将从connectString列举的服务器中随机选择一个服务器,并尝试连接到该服务器。如果建立连接失败,将尝试连接另一个服务器(顺序是不确定的,因为列举的服务器是随机洗牌的),直到建立连接。即客户端连接一个服务器失败,将继续尝试,直到会话显式关闭。

代码:

  1. private static final String ZK_ADDR = "10.100.1.14:2181";
  2. private static final Integer ZK_SESSION_TIMEOUT = 30000;
  3. private ZooKeeper zooKeeper = null;
  4. @Before
  5. public void init() throws IOException, InterruptedException {
  6. log.info("********************** start zk ..................");
  7. CountDownLatch countDownLatch = new CountDownLatch(1);
  8. zooKeeper = new ZooKeeper(ZK_ADDR, ZK_SESSION_TIMEOUT, event -> {
  9. log.info("触发了事件:[{}]", event);
  10. countDownLatch.countDown();
  11. });
  12. countDownLatch.await();
  13. }
  14. @After
  15. public void close() throws InterruptedException {
  16. zooKeeper.close();
  17. log.info("************************ close zk ..................");
  18. }
  • init方法和close方法是用来创建和关闭zk会话,加了@Before@After注解,它会在每个测试用例前后执行。
  • 由于客户端和服务端建立会话是异步的,因此做一个阻塞操作,防止还没开启就执行后面的操作,在真正打开了客户端之后,发送一个消息,并解掉阻塞。

    创建节点

    创建节点有同步和异步两种方式。
    create( final String path, byte[] data, List<ACL> acl, CreateMode createMode)
    说明:

  • 该方法是一个同步创建节点的方法

参数说明:

参数 说明
path znode路径。例如,/path, /app/node
data 存储到znode路径的数据,byte数组,最大1M
acl 要创建的节点的访问控制列表。zookeeper API提供了一个静态接口 ZooDefs.Ids 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回打开znode的acl列表。
- ZooDefs.Ids.OPEN_ACL_UNSAFE:表示开放权限,所有用户拥有所有权限
- ZooDefs.Ids.CREATOR_ALL_ACL:表示使用 auth 权限模式,并且对于满足条件的用户开放所有权限
- ZooDefs.Ids.READ_ACL_UNSAFE:表示对于所有用户,只开放Read权限
- ZooDefs.Ids.ANYONE_ID_UNSAFE:是一个常用的Id对象,表示所有用户
- ZooDefs.Ids.AUTH_IDS:是一个Auth模式的Id对象。
- 我们也可以自己定义权限模式

createMode 节点的类型,是一个枚举。
- PERSISTENT:持久节点(也有叫永久节点的),不会随着会话的结束而自动删除。
- PERSISTENT_SEQUENTIAL:带单调递增序号的持久节点,不会随着会话的结束而自动删除。
- EPHEMERAL:临时节点,会随着会话的结束而自动删除。
- EPHEMERAL_SEQUENTIAL:带单调递增序号的临时节点,会随着会话的结束而自动删除。
- CONTAINER:容器节点,用于Leader、Lock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在将来某个时候删除的候选节点。
- PERSISTENT_WITH_TTL:带TTL(time-to-live,存活时间)的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。
- PERSISTENT_SEQUENTIAL_WITH_TTL:带TTL(time-to-live,存活时间)和单调递增序号的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。

create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback callBack,Object ctx)
说明:

  • 这是一个异步创建节点的方法

参数说明:
其他参数和上面同步创建节点一样.

参数 说明
callBack 异步回调接口
ctx 传递上下文参数

代码:

  1. @Test
  2. public void testCreate() throws InterruptedException, KeeperException {
  3. // 创建一个持久节点,对所有用户开放
  4. zooKeeper.create("/node1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  5. // 创建一个临时的有序节点,权限模式为对指定ip开放
  6. Id ip = new Id("ip", "10.100.1.100");
  7. zooKeeper.create("/user", "u00001".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip)), CreateMode.EPHEMERAL_SEQUENTIAL);
  8. }
  9. // 异步创建节点
  10. @Test
  11. public void testCreateAsync() throws InterruptedException {
  12. zooKeeper.create("/path2", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new AsyncCallback.StringCallback(){
  13. @Override
  14. public void processResult(int rc, String path, Object ctx, String name) {
  15. log.info("rc: [{}]", rc); // 0代表成功了
  16. log.info(path); // 传进来的,添加的节点
  17. log.info(name); // 真正查到的节点的名字
  18. log.info(ctx.toString()); // 上下文参数,ctx传进来的东西
  19. log.info("create node success!");
  20. }
  21. }, "ctx" );
  22. Thread.sleep(1000);
  23. }

创建成功的日志
image.png

查看节点

  1. // 同步方式查看节点数据,使用自定义的监听器
  2. byte[] getData(final String path, Watcher watcher, Stat stat)
  3. // 同步方式查看节点数据,使用连接时的监听器
  4. byte[] getData(String path, boolean watch, Stat stat)
  5. // 异步方式查看节点,使用自定义的监听器
  6. void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
  7. // 异步方式查看节点,使用注册的连接器
  8. void getData(String path, boolean watch, DataCallback cb, Object ctx)

参数说明:

参数 说明
path znode路径
watcher 注册一个监听器
watch 是否使用连接对象中注册的监视器
stat 返回znode的元数据
callBack 异步回调接口
ctx 传递上下文参数

代码

  1. @Test
  2. public void testGet() throws InterruptedException, KeeperException {
  3. Stat stat = new Stat();
  4. byte[] data = zooKeeper.getData("/node1", false, stat);
  5. log.info("获取到的数据是:" + new String(data));
  6. log.info("当前节点的版本:" + stat.getVersion());
  7. }
  8. @Test
  9. public void testGetAsync() throws InterruptedException, KeeperException {
  10. zooKeeper.getData("/node1", null, new AsyncCallback.DataCallback() {
  11. @Override
  12. public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat) {
  13. log.info("rc: " + rc);
  14. log.info(path);
  15. log.info(new String(bytes));
  16. log.info("version: " + stat.getVersion());
  17. }
  18. }, null);
  19. Thread.sleep(1000);
  20. }

更新节点

  1. // 同步方式更新节点
  2. Stat setData(final String path, byte[] data, int version)
  3. // 异步方式更新节点
  4. void setData(final String path, byte[] data, int version, StatCallback cb, Object ctx)

参数说明:

参数 说明
path znode路径
data 更新的数据
version znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。
callBack 异步回调接口
ctx 传递上下文参数

代码:

  1. @Test
  2. public void testSetData() throws InterruptedException, KeeperException {
  3. Stat stat = zooKeeper.setData("/node1", "alvin".getBytes(), -1); // 返回状态信息
  4. log.info(stat.toString()); // 将状态信息打印
  5. log.info("当前版本号" + stat.getVersion());
  6. log.info("节点创建时间" + stat.getCtime());
  7. log.info("节点修改时间" + stat.getMtime());
  8. }
  9. @Test
  10. public void testSetDataAsync() throws InterruptedException, KeeperException {
  11. zooKeeper.setData("/node1", "alvin2".getBytes(), 1, new AsyncCallback.StatCallback() {
  12. @Override
  13. public void processResult(int rc, String path, Object ctx, Stat stat) {
  14. log.info("rc" + rc); // 0 代表修改成功
  15. log.info(path); // 输入的节点路径
  16. log.info("version " + stat.getVersion()); // 当前版本
  17. }
  18. }, null); // 返回状态信息
  19. Thread.sleep(1000);
  20. }

删除节点

  1. // 同步方式删除节点
  2. void delete(final String path, int version)
  3. // 异步方式删除节点
  4. void delete(final String path, int version, VoidCallback cb, Object ctx)

参数说明:

参数 说明
path znode路径
version znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。
callBack 异步回调接口
ctx 传递上下文参数

代码:

  1. @Test
  2. public void testDelete() throws InterruptedException, KeeperException {
  3. zooKeeper.delete("/node1", -1); // 如果节点不存在,会删除失败
  4. zooKeeper.delete("/node5/child1", -1); // 如果节点不存在,会删除失败
  5. }
  6. @Test
  7. public void testDeleteAsync() {
  8. zooKeeper.delete("/node1", -1, new AsyncCallback.VoidCallback() {
  9. @Override
  10. public void processResult(int rc, String path, Object ctx) {
  11. log.info("rc:" + rc);
  12. log.info(path);
  13. }
  14. }, "ctx");
  15. }

查看子节点

  1. //同步方式查看子节点,传入监听器
  2. List<String> getChildren(final String path, Watcher watcher)
  3. //同步方式查看子节点,是否使用默认的监听器
  4. List<String> getChildren(String path, boolean watch)
  5. //异步方式查看子节点,传入监听器
  6. void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
  7. //异步方式查看子节点,是否使用默认的监听器
  8. void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)

代码:

  1. private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException {
  2. zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  3. }
  4. @Test
  5. public void testGetChild() throws InterruptedException, KeeperException {
  6. syncCreateNode("/a", "hello");
  7. syncCreateNode("/a/b", "hello");
  8. syncCreateNode("/a/c", "hello");
  9. List<String> children = zooKeeper.getChildren("/a", false);
  10. log.info("********* children: [{}]", children);
  11. }

结果:
image.png

检查节点是否存在

  1. // 同步方式检查节点是否存在, 传入监听器
  2. Stat exists(final String path, Watcher watcher)
  3. // 同步方式检查节点是否存在, 是否用默认监听器
  4. Stat exists(String path, boolean watch)
  5. // 异步方式检查节点是否存在, 传入监听器
  6. void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)
  7. // 异步方式检查节点是否存在, 是否用默认监听器
  8. void exists(String path, boolean watch, StatCallback cb, Object ctx)

代码:

  1. @Test
  2. public void testExist() throws InterruptedException, KeeperException {
  3. syncCreateNode("/alvin", "hello");
  4. Stat stat = zooKeeper.exists("/alvin", false);
  5. log.info("stat: [{}]", stat);
  6. log.info("delete node /alvin ......");
  7. zooKeeper.delete("/alvin", -1);
  8. stat = zooKeeper.exists("/alvin", false);
  9. log.info("stat: [{}]", stat);
  10. }

监听器代码验证

getData、exist是、getChildren三个方法都可以监听对应节点变化。
ZooKeeper Java API - 图4

验证watch的一次性

  1. 创建监听执行

    1. private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException {
    2. zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    3. }
    4. @Test
    5. public void testGetWatch() throws InterruptedException, KeeperException {
    6. // 创建临时节点
    7. syncCreateNode("/watch", "aaa");
    8. byte[] data = zooKeeper.getData("/watch", true, new Stat());
    9. log.info("getData: [{}]", new String(data));
    10. Thread.sleep(100000L);
    11. }
  2. 多次执行更新节点的操作

    1. // 修改数据
    2. @Test
    3. public void testUpdateData() throws InterruptedException, KeeperException {
    4. // 创建临时节点
    5. zooKeeper.setData("/watch", "bbb".getBytes(), -1);
    6. Thread.sleep(10000L);
    7. }
  3. 查看结果, 日志只打印了一次

image.png

通过自定义监听器多次监听

  1. 通过exists创建自定义监听

    1. @Test
    2. public void testExists() throws KeeperException, InterruptedException {
    3. // 创建临时节点
    4. syncCreateNode("/watch", "aaa");
    5. // 重复使用,用完再注册一个新的
    6. Stat stat = zooKeeper.exists("/watch", new Watcher() {
    7. @Override
    8. public void process(WatchedEvent watchedEvent) {
    9. switch (watchedEvent.getType()) {
    10. case NodeCreated:
    11. log.info("{}节点创建了", watchedEvent.getPath());
    12. break;
    13. case NodeDataChanged:
    14. log.info("{}节点数据被修改了", watchedEvent.getPath());
    15. break;
    16. case NodeDeleted:
    17. log.info("{}节点被删除了", watchedEvent.getPath());
    18. break;
    19. }
    20. try {
    21. // 重复监听的关键
    22. zooKeeper.exists("/watch", this);
    23. } catch (KeeperException | InterruptedException e) {
    24. e.printStackTrace();
    25. }
    26. }
    27. });
    28. if (stat != null) {
    29. log.info("version: " + stat.getVersion());
    30. }
    31. Thread.sleep(100000);
    32. }
  2. 多次执行更新节点、删除节点、创建节点的操作

  3. 查看结果,多次响应监听

image.png

通过addWatcher方法实现多次监听

  1. 通过addWatch添加监听器,addWatch方法支持重复监听

    1. @Test
    2. public void testAddWatch() throws InterruptedException, KeeperException {
    3. // 创建临时节点
    4. syncCreateNode("/watch", "aaa");
    5. zooKeeper.addWatch("/watch", new Watcher() {
    6. @Override
    7. public void process(WatchedEvent event) {
    8. switch (event.getType()) {
    9. case NodeCreated:
    10. log.info("{}节点创建了", event.getPath());
    11. break;
    12. case NodeDataChanged:
    13. log.info("{}节点数据被修改了", event.getPath());
    14. break;
    15. case NodeDeleted:
    16. log.info("{}节点被删除了", event.getPath());
    17. break;
    18. }
    19. }
    20. }, AddWatchMode.PERSISTENT);
    21. Thread.sleep(100000);
    22. }
  2. 多次执行更新节点、删除节点、创建节点的操作

  3. 查看结果

ZooKeeper Java API - 图7
说明: