本篇文章主要描述ZooKeeper的Java系客户端,本篇主要分为两部分:

  1. ZooKeeper会话相关
  2. ZooKeeper节点相关
  3. ZooKeeper节点Watcher机制

    ZooKeeper会话

    创建

    1. Zookeeper(String connectionString, int sessionTimeout, watcher watcher)
  • connectionString - zookeeper主机
  • sessionTimeout- 会话超时
  • watcher - 实现”监听器” 对象。zookeeper集合通过监视器对象返回连接状态

    实践

          try {
              // 因为Zookeeper的连接是异步的,一般加上计数器让主线程等待Zookeeper去连接
              final CountDownLatch countDownLatch = new CountDownLatch(1);
              ZooKeeper zookeeper = new ZooKeeper(HOST + ":" + PORT, 5000, new Watcher() {
                  @Override
                  public void process(WatchedEvent watchedEvent) {
                      if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
                          System.out.println("连接成功");
                          countDownLatch.countDown();
                      }
                  }
              });
    
              countDownLatch.await();
              System.out.println(zookeeper.getSessionId());
              zookeeper.close();
          } catch (IOException e) {
              e.printStackTrace();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
    

ZooKeeper节点

新增节点

目前提供了两个接口:

// path,节点路径    data,节点数据    acl,节点权限     createMode,创建模式,持久|临时|有序
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)

官方默认给了以下几种权限组合常量:

  • OPEN_ACL_UNSAFEworld:anyone:rwcda
  • CREATOR_ALL_ACLauth::rwcda ;(这个权限一定要先 addAuth 才能再授权, addAuth 的对象可以是任意一个)
  • READ_ACL_UNSAFEworld:anyone:r

节点创建方式一

// 直接能创建成功
String javaPathNode = zooKeeper.create("/javaApi", "Good Night".getBytes(), 
                                       ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.in.read();
zooKeeper.close();

节点创建方式二

这部分要求先addauth,才能再授权auth模式

// 登录验证(这个用户其实不存在,主要是为了满足下面得CREATOR_ALL_ACL)
zooKeeper.addAuthInfo("digest", "111111:111111".getBytes());
// CREATOR_ALL_ACL要求一定得先验证
zooKeeper.create("/javaApi2", "Good javaApi".getBytes(), 
                 ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
System.in.read();
zooKeeper.close();

节点创建方式三

自定义权限创建:

List<ACL> acls = new ArrayList<>();
Id id = new Id("world", "anyone");
// 如果要同时给定多个权限,可以使用 | 进行 或 组合
acls.add(new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.WRITE, id));
zooKeeper.create("/javaApi3", "java3".getBytes(), acls, CreateMode.EPHEMERAL);
System.in.read();
zooKeeper.close();

节点创建方式四

针对IP模式进行权限创建:

List<ACL> acls = new ArrayList<>();
Id id = new Id("ip", "192.168.1.6");
acls.add(new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.WRITE, id));
zooKeeper.create("/javaApi4", "java4".getBytes(), acls, CreateMode.EPHEMERAL);
System.in.read();
zooKeeper.close();

节点创建方式五

针对auth模式下的自定义权限创建:

zooKeeper.addAuthInfo("digest", "1:1".getBytes());
List<ACL> acls = new ArrayList<>();
Id id = new Id("auth", "");  //不需要设定ID,因为AUTH模式下,setACL使用的是addauth的用户
acls.add(new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.WRITE | ZooDefs.Perms.CREATE, id));
zooKeeper.create("/javaApi5", "java5".getBytes(), acls, CreateMode.EPHEMERAL);
System.in.read();
zooKeeper.close();

节点创建方式六

针对digest模式下的自定义权限创建:

List<ACL> acls = new ArrayList<>();
// 账号:密码=2:2
Id id = new Id("digest", "2:eiegrg4ZTLQrNWEMges2RKxiJww=");
acls.add(new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.WRITE | ZooDefs.Perms.CREATE, id));
zooKeeper.create("/javaApi6", "java6".getBytes(), acls, CreateMode.EPHEMERAL);
System.in.read();
zooKeeper.close();

节点创建方式七

异步模式创建对象:

zooKeeper.create("/javaApi7", "javaApi7".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, 
                 CreateMode.EPHEMERAL, new AsyncCallback.Create2Callback() {
    @Override
    public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
        // 创建结果:创建正确是0;创建失败是其他的
        System.out.println(rc);
        // 节点路径
        System.out.println(path);
        // 我们传入的上下文
        System.out.println(ctx);
        // 正常创建的节点名称 = 节点路径
        System.out.println(name);
        // 这个节点的状态
        System.out.println(stat);
    }
}, "我是上下文");

更新节点

节点更新方式一

同步方式更新节点:

// 第一个参数是 path
// 第二个参数是 数据
// 第三个参数是 限制的版本
Stat stat = zooKeeper.setData("/javaApi7", "Hello".getBytes(), -1);

版本如果是 -1 表示不限制版本 进行更新

节点更新方式二

使用异步方式更新节点:

zooKeeper.setData("/javaApi7", "Hi".getBytes(), -1, new AsyncCallback.StatCallback(){
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        // 更新结果
        System.out.println(rc);
        // 更新节点路径
        System.out.println(path);
        // 传入的上下文
        System.out.println(ctx);
        // 节点状态
        System.out.println(stat);
    }
}, "我是传入的上下文");

删除节点

节点删除方式一

同步删除节点:

zooKeeper.delete("/javaApi7", -1);

节点删除方式二

异步删除节点:

zooKeeper.delete("/javaApi7", -1, new AsyncCallback.VoidCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx) {
                // 删除结果
                System.out.println(rc);
                // 删除的节点路径
                System.out.println(path);
                // 外部传入的上下文
                System.out.println(ctx);
            }
        }, "我是上下文");

查看节点

数据查看方式一

同步查看数据:

// 空的状态对象,在getData()方法里会把对应节点的状态设置给它
Stat stat = new Stat();
// 返回的结果是纯字节数据
// false表示不watch
byte[] data = zookeeper.getData("/javaApi7", false, stat);
// 打印该节点的版本
System.out.println(stat.getVersion());
// 打印该节点的czxid
System.out.println(stat.getCzxid());

数据查看方式二

异步查看数据:

zookeeper.getData("/javaApi7", false, new AsyncCallback.DataCallback() {
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        // 该方法的执行结果,0为成功
        System.out.println(rc);
        // 该方法执行的节点对象
        System.out.println(path);
        // 外部传入的上下文数据
        System.out.println(ctx);
        // 查询出来的数据结果
        System.out.println(new String(data));
        // 该节点的状态
        System.out.println(stat);
    }
}, "我是要传入的上下文");

PS:这里暂时不讲 watch 功能,后面会有单独一大块分析 watch 功能

查看子节点

子节点查看方式一

同步查看子节点:

// 状态对象,传入后,getChildren会把/javaApi7的状态设置给stat
Stat stat = new Stat();
// false表示不启用watch功能
List<String> children = zookeeper.getChildren("/javaApi7", false, stat);
// 打印stat对象的值
System.out.println(stat.getVersion());
System.out.println(stat.getCzxid());
// 输出某个节点的子节点
for (String child : children) {
    System.out.println(child);
}

子节点查看方式二

异步查看子节点:

zookeeper.getChildren("/javaApi7", false, new AsyncCallback.Children2Callback() {
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        // 该方法的执行结果,0为成功
        System.out.println(rc);
        // 该方法执行的节点对象
        System.out.println(path);
        // 外部传入的上下文数据
        System.out.println(ctx);
        // 输出该节点下的所有子节点
        for (String child : children) {
            System.out.println(child);
        }
        // 该节点的状态
        System.out.println(stat);
    }
}, "我是上下文");

判断节点是否存在

判断某个节点是否存在,因为使用方式差不多,这里就给出接口:

// arg1:该节点的路径;arg2:是否启用watch功能
public Stat exists(String path, boolean watch)
// arg1:该节点的路径;arg2:是否启用watch功能;arg3:回调;ctx:上下文
public void exists(final String path, boolean watch, StatCallback cb, Object ctx)

ZooKeeper监听机制Watcher

Wacther由三个部分共同配合完成:

  • Zookeeper的客户端
  • Zookeeper的服务端
  • 客户端的ZKWatchManager对象

客户端首先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watcher管理器对象。当Zookeeper服务端监听到数据变化时,服务端会主动通知客户端,接着客户端的Watch管理器就会触发相关Watcher来回调相应处理逻辑,从而完成整体的数据 发布/订阅 流程。
点击查看【processon】

特性 说明
一次性 watcher一次性的,一旦被触发就会移除,再次使用时需要重新注册
客户端顺序回调 watcher回调是顺序串行执行的,只有回调后客户端才能看到最新的数据状态。一个watcher回调逻辑不应该太多,以免影响别的watcher执行
轻量级 WatchEvent是最小的通信单位,结构上只包含通知状态、事件类型和节点路径,并不会告诉数据节点变化前后的具体内容
时效性 watcher只有在当前session彻底失效时才会无效,若在session有效期内快速重连成功,则watcher依然存在,仍可接收到通知;

Q:为什么watcher不设计为返回数据?
A:因为如果有多个客户端watch了,那么服务端要给每个客户端发送数据(而且有的客户端并不需要知道变更的数据),所以为了减少服务器压力,就让客户端自己去获取数据

回调事件类型

Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。Watcher内部包含了两个枚举类,一个是KeeperState、另一个是EventType。

Watcher通知状态(KeeperState)

KeeperState是客户端与服务器端连接状态发生变化时的通知类型,其枚举属性有以下几种:

枚举属性 说明
SyncConnected 客户端与服务器正常连接时
Disconnected 无法连接服务器的时候就会Disconnected
Expired 会话session失效时
AuthFailed 身份认证失败时
Closed 客户端主动close了连接

Watcher事件类型(EventType)

EventType是ZNode发生变化时的对应的通知类型。这里有一个规律:

  1. EventType变化时,KeeperState永远是 SyncConnected 的状态(其他状态下,不可能接收到EventType变化的消息)
  2. KeeperState变化时,EventType永远为 None (即使是Disconnected变化到了SyncConnected,这个瞬时过程,接收到的EventType也永远是None) | 枚举属性 | 说明 | | —- | —- | | None | 无 | | NodeCreated | Watcher监听的数据节点被创建时 | | NodeDeleted | Watcher监听的数据节点被删除时 | | NodeDataChanged | Watcher监听的数据节点内容发生更改时(无论数据是否真的变化) | | NodeChildrenChanged | Watcher监听的数据节点的子节点列表发生变更时 |

注意:客户端接收到的相关事件通知中只包含状态以及类型等信息,不包含节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需要调用get等方法重新获取。

监听事件捕获

连接事件捕获

在创建ZooKeeper对象的时候,可以设置Watcher,用来捕获连接的监听事件:

  • new Zookeeper()

该构造方法能监听以下几种数据:

  • SyncConnected ,连接成功
  • Disconnected ,断开连接,客户端连接不上服务端(连接上后,断开虚拟网卡,就会断开连接)
  • Closed ,客户端主动关闭 与客户端的连接
  • AuthFailed ,认证失败。当客户端addauth之后,访问一个需要认证的节点时,如果认证用户(注意授权对象得是用户才行)不对,就会报该异常。
  • Expired ,会话超时,断开连接后,等待一段时间,然后网络连接可用了,但是超时了(连接上后,断开虚拟网卡,然后等待客户端连接超时,然后再启用虚拟网卡),服务端已经把该连接移除了,所以网络恢复了也没用。
    • 解决方案是,在捕获到了会话超时后,再重新创建一个 ZooKeeper() 对象

点击查看【processon】

节点事件捕获

在zookeeper客户端中使用以下三种类型的方法来捕获一个节点的监听事件:

  1. zk.exists()
  2. zk.getData()
  3. zk.getChildren()

这些方法分别能监听以下几种事件:

方法 Created children changed data changed deleted
zk.exists()
zk.getData()
zk.getChildren()

zk.exists()

public class ZookeeperExistsWatcherApp {
    ZooKeeper zooKeeper;
    @Before
    public void before() throws InterruptedException, IOException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        zooKeeper = new ZooKeeper("192.168.136.128:2181", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if(event.getType() == Event.EventType.NodeCreated){
                    System.out.println("创建节点");
                }else if(event.getType() == Event.EventType.NodeDeleted){
                    System.out.println("删除节点");
                }else if(event.getType() == Event.EventType.NodeDataChanged){
                    System.out.println("更新节点");
                }else if(event.getType() == Event.EventType.NodeChildrenChanged){
                    System.out.println("子节点创建");
                }else if(event.getType() == Event.EventType.None){
                    System.out.println("EventType = None!检查连接状态");
                }
                if(event.getState() == Event.KeeperState.SyncConnected){
                    System.out.println("连接成功");
                }else if(event.getState() == Event.KeeperState.Disconnected){
                    System.out.println("无法连接服务器");
                }else if(event.getState() == Event.KeeperState.Closed){
                    System.out.println("关闭客户端");
                }else if(event.getState() == Event.KeeperState.Expired){
                    System.out.println("会话超时");
                }else if(event.getState() == Event.KeeperState.AuthFailed){
                    System.out.println("认证失败");
                }
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
    }
    @After
    public void after() throws InterruptedException {
        zooKeeper.close();
    }
    @Test
    public void testExistsMethod1() throws KeeperException, InterruptedException, IOException {
        // 如果watch给的是true,默认使用new ZooKeeper() 中使用的Watcher来监听
        // watch给的true,永久使用,不是一次性!
        zooKeeper.exists("/testExistNode", true);
        // 这里分别测试三种可以监听到的事件作为触发方法:
        // 1. 在 /testExistNode 节点下创建子节点
        // 2. 修改 /testExistNode 节点的数据
        // 3. 删除 /testExistNode 节点
        // 4. /testExistNode 的子节点数据发生改变
        Thread.sleep(100000);
    }
    @Test
    public void testExistsMethod2() throws KeeperException, InterruptedException, IOException {
        zooKeeper.exists("/testExistNode", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("发生changed路径为:" + event.getPath());
                System.out.println("事件类型是:" + event.getType().name());
                System.out.println("连接状态类型是:" + event.getState().name());
                System.out.println();
            }
        });
        // 这里分别测试三种可以监听到的事件作为触发方法:
        // 1. 在 /testExistNode 节点下创建子节点
        // 2. 修改 /testExistNode 节点的数据
        // 3. 删除 /testExistNode 节点
        // 4. /testExistNode 的子节点数据发生改变
        Thread.sleep(100000);
    }
    @Test
    public void testExistMethod3() throws InterruptedException, KeeperException {
        // 测试 Watch的 一次性功能
        // 执行两次修改,会发现只会触发一次watch回调
        zooKeeper.exists("/testExistNode", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("发生changed路径为:" + event.getPath());
                System.out.println("事件类型是:" + event.getType().name());
                System.out.println("连接状态类型是:" + event.getState().name());
                System.out.println();
/*              使得Watcher能够反复使用
                try {
                    zooKeeper.exists("/testExistNode", this);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }*/ 
            }
        });
    }
    @Test
    public void testExistMethod4() throws InterruptedException, KeeperException, IOException {
        // 测试 一个节点可以有多个Watcher
        zooKeeper.exists("/testExistNode", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("11111111111111");
                System.out.println("发生changed路径为:" + event.getPath());
                System.out.println("事件类型是:" + event.getType().name());
                System.out.println("连接状态类型是:" + event.getState().name());
                System.out.println();
            }
        });
        zooKeeper.exists("/testExistNode", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("22222222222222");
                System.out.println("发生changed路径为:" + event.getPath());
                System.out.println("事件类型是:" + event.getType().name());
                System.out.println("连接状态类型是:" + event.getState().name());
                System.out.println();
            }
        });
        System.in.read();
    }
}

zk.getData()

大部分原理和 zk.exists() 一样,就是监听的事件种类相比少了一个:

@Test
public void testGetDataMethod1() throws KeeperException, InterruptedException, IOException {
    // 如果watch给的是true,默认使用new ZooKeeper() 中使用的Watcher来监听
    Stat stat = new Stat();
    // 要预先创建/testGetNode节点
    zooKeeper.getData("/testGetNode", true, stat);
    // 这里分别测试两种可以监听到的事件作为触发方法:
    // 1. 在 /testGetNode 节点下创建子节点           ×
    // 2. 修改 /testGetNode 节点的数据               ✔
    // 3. 删除 /testGetNode 节点                     ✔
    // 4. /testGetNode 的子节点数据发生改变          ×
    System.in.read();
}

zk.getChildren()

使用方式和 zk.getExists() 一样,就是监听的事件种类发生了变化:

@Test
public void testGetDataMethod1() throws KeeperException, InterruptedException, IOException {
    // 如果watch给的是true,默认使用new ZooKeeper() 中使用的Watcher来监听
    Stat stat = new Stat();
    // 要预先创建/testGetNode节点
    zooKeeper.getData("/testGetChildrenNode", true, stat);
    // 这里分别测试两种可以监听到的事件作为触发方法:
    // 1. 在 /testGetChildrenNode 节点下创建子节点             ×
    // 2. 修改 /testGetChildrenNode 节点的数据                 ×
    // 3. 删除 /testGetChildrenNode 节点                     ✔
    // 4. /testGetChildrenNode 的子节点数据发生改变             ✔
    System.in.read();
}

注意,watch如果是自己给定回调函数,那么其只能使用一次;如果给的是true,用的连接Watch回调,那么就是永久的