本篇文章主要描述ZooKeeper的Java系客户端,本篇主要分为两部分:
- ZooKeeper会话相关
- ZooKeeper节点相关
- ZooKeeper节点Watcher机制
ZooKeeper会话
创建
Zookeeper(String connectionString, int sessionTimeout, watcher watcher)
connectionString
-zookeeper
主机sessionTimeout
- 会话超时watcher
- 实现”监听器” 对象。zookeeper
集合通过监视器对象返回连接状态实践
try { // 因为Zookeeper的连接是异步的,一般加上计数器让主线程等待Zookeeper去连接 final CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zookeeper = new ZooKeeper(HOST + ":" + PORT, 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ System.out.println("连接成功"); countDownLatch.countDown(); } } }); countDownLatch.await(); System.out.println(zookeeper.getSessionId()); zookeeper.close(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }
ZooKeeper节点
新增节点
目前提供了两个接口:
// path,节点路径 data,节点数据 acl,节点权限 createMode,创建模式,持久|临时|有序
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
官方默认给了以下几种权限组合常量:
OPEN_ACL_UNSAFE
,world:anyone:rwcda
CREATOR_ALL_ACL
,auth::rwcda
;(这个权限一定要先addAuth
才能再授权,addAuth
的对象可以是任意一个)READ_ACL_UNSAFE
,world:anyone:r
节点创建方式一
// 直接能创建成功
String javaPathNode = zooKeeper.create("/javaApi", "Good Night".getBytes(),
ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.in.read();
zooKeeper.close();
节点创建方式二
这部分要求先addauth,才能再授权auth模式
// 登录验证(这个用户其实不存在,主要是为了满足下面得CREATOR_ALL_ACL)
zooKeeper.addAuthInfo("digest", "111111:111111".getBytes());
// CREATOR_ALL_ACL要求一定得先验证
zooKeeper.create("/javaApi2", "Good javaApi".getBytes(),
ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
System.in.read();
zooKeeper.close();
节点创建方式三
自定义权限创建:
List<ACL> acls = new ArrayList<>();
Id id = new Id("world", "anyone");
// 如果要同时给定多个权限,可以使用 | 进行 或 组合
acls.add(new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.WRITE, id));
zooKeeper.create("/javaApi3", "java3".getBytes(), acls, CreateMode.EPHEMERAL);
System.in.read();
zooKeeper.close();
节点创建方式四
针对IP模式进行权限创建:
List<ACL> acls = new ArrayList<>();
Id id = new Id("ip", "192.168.1.6");
acls.add(new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.WRITE, id));
zooKeeper.create("/javaApi4", "java4".getBytes(), acls, CreateMode.EPHEMERAL);
System.in.read();
zooKeeper.close();
节点创建方式五
针对auth模式下的自定义权限创建:
zooKeeper.addAuthInfo("digest", "1:1".getBytes());
List<ACL> acls = new ArrayList<>();
Id id = new Id("auth", ""); //不需要设定ID,因为AUTH模式下,setACL使用的是addauth的用户
acls.add(new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.WRITE | ZooDefs.Perms.CREATE, id));
zooKeeper.create("/javaApi5", "java5".getBytes(), acls, CreateMode.EPHEMERAL);
System.in.read();
zooKeeper.close();
节点创建方式六
针对digest模式下的自定义权限创建:
List<ACL> acls = new ArrayList<>();
// 账号:密码=2:2
Id id = new Id("digest", "2:eiegrg4ZTLQrNWEMges2RKxiJww=");
acls.add(new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.WRITE | ZooDefs.Perms.CREATE, id));
zooKeeper.create("/javaApi6", "java6".getBytes(), acls, CreateMode.EPHEMERAL);
System.in.read();
zooKeeper.close();
节点创建方式七
异步模式创建对象:
zooKeeper.create("/javaApi7", "javaApi7".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL,
CreateMode.EPHEMERAL, new AsyncCallback.Create2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
// 创建结果:创建正确是0;创建失败是其他的
System.out.println(rc);
// 节点路径
System.out.println(path);
// 我们传入的上下文
System.out.println(ctx);
// 正常创建的节点名称 = 节点路径
System.out.println(name);
// 这个节点的状态
System.out.println(stat);
}
}, "我是上下文");
更新节点
节点更新方式一
同步方式更新节点:
// 第一个参数是 path
// 第二个参数是 数据
// 第三个参数是 限制的版本
Stat stat = zooKeeper.setData("/javaApi7", "Hello".getBytes(), -1);
版本如果是 -1
表示不限制版本 进行更新
节点更新方式二
使用异步方式更新节点:
zooKeeper.setData("/javaApi7", "Hi".getBytes(), -1, new AsyncCallback.StatCallback(){
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
// 更新结果
System.out.println(rc);
// 更新节点路径
System.out.println(path);
// 传入的上下文
System.out.println(ctx);
// 节点状态
System.out.println(stat);
}
}, "我是传入的上下文");
删除节点
节点删除方式一
同步删除节点:
zooKeeper.delete("/javaApi7", -1);
节点删除方式二
异步删除节点:
zooKeeper.delete("/javaApi7", -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
// 删除结果
System.out.println(rc);
// 删除的节点路径
System.out.println(path);
// 外部传入的上下文
System.out.println(ctx);
}
}, "我是上下文");
查看节点
数据查看方式一
同步查看数据:
// 空的状态对象,在getData()方法里会把对应节点的状态设置给它
Stat stat = new Stat();
// 返回的结果是纯字节数据
// false表示不watch
byte[] data = zookeeper.getData("/javaApi7", false, stat);
// 打印该节点的版本
System.out.println(stat.getVersion());
// 打印该节点的czxid
System.out.println(stat.getCzxid());
数据查看方式二
异步查看数据:
zookeeper.getData("/javaApi7", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
// 该方法的执行结果,0为成功
System.out.println(rc);
// 该方法执行的节点对象
System.out.println(path);
// 外部传入的上下文数据
System.out.println(ctx);
// 查询出来的数据结果
System.out.println(new String(data));
// 该节点的状态
System.out.println(stat);
}
}, "我是要传入的上下文");
PS:这里暂时不讲 watch
功能,后面会有单独一大块分析 watch
功能
查看子节点
子节点查看方式一
同步查看子节点:
// 状态对象,传入后,getChildren会把/javaApi7的状态设置给stat
Stat stat = new Stat();
// false表示不启用watch功能
List<String> children = zookeeper.getChildren("/javaApi7", false, stat);
// 打印stat对象的值
System.out.println(stat.getVersion());
System.out.println(stat.getCzxid());
// 输出某个节点的子节点
for (String child : children) {
System.out.println(child);
}
子节点查看方式二
异步查看子节点:
zookeeper.getChildren("/javaApi7", false, new AsyncCallback.Children2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
// 该方法的执行结果,0为成功
System.out.println(rc);
// 该方法执行的节点对象
System.out.println(path);
// 外部传入的上下文数据
System.out.println(ctx);
// 输出该节点下的所有子节点
for (String child : children) {
System.out.println(child);
}
// 该节点的状态
System.out.println(stat);
}
}, "我是上下文");
判断节点是否存在
判断某个节点是否存在,因为使用方式差不多,这里就给出接口:
// arg1:该节点的路径;arg2:是否启用watch功能
public Stat exists(String path, boolean watch)
// arg1:该节点的路径;arg2:是否启用watch功能;arg3:回调;ctx:上下文
public void exists(final String path, boolean watch, StatCallback cb, Object ctx)
ZooKeeper监听机制Watcher
Wacther由三个部分共同配合完成:
- Zookeeper的客户端
- Zookeeper的服务端
- 客户端的ZKWatchManager对象
客户端首先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watcher管理器对象。当Zookeeper服务端监听到数据变化时,服务端会主动通知客户端,接着客户端的Watch管理器就会触发相关Watcher来回调相应处理逻辑,从而完成整体的数据 发布/订阅 流程。
点击查看【processon】
特性 | 说明 |
---|---|
一次性 | watcher 是一次性的,一旦被触发就会移除,再次使用时需要重新注册 |
客户端顺序回调 | watcher 回调是顺序串行执行的,只有回调后客户端才能看到最新的数据状态。一个watcher 回调逻辑不应该太多,以免影响别的watcher 执行 |
轻量级 | WatchEvent 是最小的通信单位,结构上只包含通知状态、事件类型和节点路径,并不会告诉数据节点变化前后的具体内容 |
时效性 | watcher 只有在当前session 彻底失效时才会无效,若在session 有效期内快速重连成功,则watcher 依然存在,仍可接收到通知; |
Q:为什么watcher不设计为返回数据?
A:因为如果有多个客户端watch了,那么服务端要给每个客户端发送数据(而且有的客户端并不需要知道变更的数据),所以为了减少服务器压力,就让客户端自己去获取数据
回调事件类型
Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。Watcher内部包含了两个枚举类,一个是KeeperState、另一个是EventType。
Watcher通知状态(KeeperState)
KeeperState是客户端与服务器端连接状态发生变化时的通知类型,其枚举属性有以下几种:
枚举属性 | 说明 |
---|---|
SyncConnected |
客户端与服务器正常连接时 |
Disconnected |
无法连接服务器的时候就会Disconnected |
Expired |
会话session 失效时 |
AuthFailed |
身份认证失败时 |
Closed |
客户端主动close了连接 |
Watcher事件类型(EventType)
EventType是ZNode发生变化时的对应的通知类型。这里有一个规律:
- EventType变化时,KeeperState永远是
SyncConnected
的状态(其他状态下,不可能接收到EventType变化的消息) - KeeperState变化时,EventType永远为
None
(即使是Disconnected变化到了SyncConnected,这个瞬时过程,接收到的EventType也永远是None) | 枚举属性 | 说明 | | —- | —- | |None
| 无 | |NodeCreated
|Watcher
监听的数据节点被创建时 | |NodeDeleted
|Watcher
监听的数据节点被删除时 | |NodeDataChanged
|Watcher
监听的数据节点内容发生更改时(无论数据是否真的变化) | |NodeChildrenChanged
|Watcher
监听的数据节点的子节点列表发生变更时 |
注意:客户端接收到的相关事件通知中只包含状态以及类型等信息,不包含节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需要调用get
等方法重新获取。
监听事件捕获
连接事件捕获
在创建ZooKeeper对象的时候,可以设置Watcher,用来捕获连接的监听事件:
new Zookeeper()
该构造方法能监听以下几种数据:
SyncConnected
,连接成功Disconnected
,断开连接,客户端连接不上服务端(连接上后,断开虚拟网卡,就会断开连接)Closed
,客户端主动关闭 与客户端的连接AuthFailed
,认证失败。当客户端addauth之后,访问一个需要认证的节点时,如果认证用户(注意授权对象得是用户才行)不对,就会报该异常。Expired
,会话超时,断开连接后,等待一段时间,然后网络连接可用了,但是超时了(连接上后,断开虚拟网卡,然后等待客户端连接超时,然后再启用虚拟网卡),服务端已经把该连接移除了,所以网络恢复了也没用。- 解决方案是,在捕获到了会话超时后,再重新创建一个
ZooKeeper()
对象
- 解决方案是,在捕获到了会话超时后,再重新创建一个
节点事件捕获
在zookeeper客户端中使用以下三种类型的方法来捕获一个节点的监听事件:
zk.exists()
zk.getData()
zk.getChildren()
这些方法分别能监听以下几种事件:
方法 | Created | children changed | data changed | deleted |
---|---|---|---|---|
zk.exists() | ✔ | ✔ | ✔ | |
zk.getData() | ✔ | ✔ | ||
zk.getChildren() | ✔ | ✔ |
zk.exists()
public class ZookeeperExistsWatcherApp {
ZooKeeper zooKeeper;
@Before
public void before() throws InterruptedException, IOException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper("192.168.136.128:2181", 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeCreated){
System.out.println("创建节点");
}else if(event.getType() == Event.EventType.NodeDeleted){
System.out.println("删除节点");
}else if(event.getType() == Event.EventType.NodeDataChanged){
System.out.println("更新节点");
}else if(event.getType() == Event.EventType.NodeChildrenChanged){
System.out.println("子节点创建");
}else if(event.getType() == Event.EventType.None){
System.out.println("EventType = None!检查连接状态");
}
if(event.getState() == Event.KeeperState.SyncConnected){
System.out.println("连接成功");
}else if(event.getState() == Event.KeeperState.Disconnected){
System.out.println("无法连接服务器");
}else if(event.getState() == Event.KeeperState.Closed){
System.out.println("关闭客户端");
}else if(event.getState() == Event.KeeperState.Expired){
System.out.println("会话超时");
}else if(event.getState() == Event.KeeperState.AuthFailed){
System.out.println("认证失败");
}
countDownLatch.countDown();
}
});
countDownLatch.await();
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}
@Test
public void testExistsMethod1() throws KeeperException, InterruptedException, IOException {
// 如果watch给的是true,默认使用new ZooKeeper() 中使用的Watcher来监听
// watch给的true,永久使用,不是一次性!
zooKeeper.exists("/testExistNode", true);
// 这里分别测试三种可以监听到的事件作为触发方法:
// 1. 在 /testExistNode 节点下创建子节点
// 2. 修改 /testExistNode 节点的数据
// 3. 删除 /testExistNode 节点
// 4. /testExistNode 的子节点数据发生改变
Thread.sleep(100000);
}
@Test
public void testExistsMethod2() throws KeeperException, InterruptedException, IOException {
zooKeeper.exists("/testExistNode", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("发生changed路径为:" + event.getPath());
System.out.println("事件类型是:" + event.getType().name());
System.out.println("连接状态类型是:" + event.getState().name());
System.out.println();
}
});
// 这里分别测试三种可以监听到的事件作为触发方法:
// 1. 在 /testExistNode 节点下创建子节点
// 2. 修改 /testExistNode 节点的数据
// 3. 删除 /testExistNode 节点
// 4. /testExistNode 的子节点数据发生改变
Thread.sleep(100000);
}
@Test
public void testExistMethod3() throws InterruptedException, KeeperException {
// 测试 Watch的 一次性功能
// 执行两次修改,会发现只会触发一次watch回调
zooKeeper.exists("/testExistNode", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("发生changed路径为:" + event.getPath());
System.out.println("事件类型是:" + event.getType().name());
System.out.println("连接状态类型是:" + event.getState().name());
System.out.println();
/* 使得Watcher能够反复使用
try {
zooKeeper.exists("/testExistNode", this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}*/
}
});
}
@Test
public void testExistMethod4() throws InterruptedException, KeeperException, IOException {
// 测试 一个节点可以有多个Watcher
zooKeeper.exists("/testExistNode", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("11111111111111");
System.out.println("发生changed路径为:" + event.getPath());
System.out.println("事件类型是:" + event.getType().name());
System.out.println("连接状态类型是:" + event.getState().name());
System.out.println();
}
});
zooKeeper.exists("/testExistNode", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("22222222222222");
System.out.println("发生changed路径为:" + event.getPath());
System.out.println("事件类型是:" + event.getType().name());
System.out.println("连接状态类型是:" + event.getState().name());
System.out.println();
}
});
System.in.read();
}
}
zk.getData()
大部分原理和 zk.exists()
一样,就是监听的事件种类相比少了一个:
@Test
public void testGetDataMethod1() throws KeeperException, InterruptedException, IOException {
// 如果watch给的是true,默认使用new ZooKeeper() 中使用的Watcher来监听
Stat stat = new Stat();
// 要预先创建/testGetNode节点
zooKeeper.getData("/testGetNode", true, stat);
// 这里分别测试两种可以监听到的事件作为触发方法:
// 1. 在 /testGetNode 节点下创建子节点 ×
// 2. 修改 /testGetNode 节点的数据 ✔
// 3. 删除 /testGetNode 节点 ✔
// 4. /testGetNode 的子节点数据发生改变 ×
System.in.read();
}
zk.getChildren()
使用方式和 zk.getExists()
一样,就是监听的事件种类发生了变化:
@Test
public void testGetDataMethod1() throws KeeperException, InterruptedException, IOException {
// 如果watch给的是true,默认使用new ZooKeeper() 中使用的Watcher来监听
Stat stat = new Stat();
// 要预先创建/testGetNode节点
zooKeeper.getData("/testGetChildrenNode", true, stat);
// 这里分别测试两种可以监听到的事件作为触发方法:
// 1. 在 /testGetChildrenNode 节点下创建子节点 ×
// 2. 修改 /testGetChildrenNode 节点的数据 ×
// 3. 删除 /testGetChildrenNode 节点 ✔
// 4. /testGetChildrenNode 的子节点数据发生改变 ✔
System.in.read();
}
注意,watch如果是自己给定回调函数,那么其只能使用一次;如果给的是true,用的连接Watch回调,那么就是永久的