数据存储
TODO:待优化
事务日志:zoo.cfg 文件中的 datadir
快照日志:zoo.cfg 文件中的 datadir
运行时日志:bin/zookeeper.out
原生 API 操作 Zookeeper
建立连接
public static void main(String[] args) {ZooKeeper zooKeeper = null;try {zooKeeper = new ZooKeeper("192.168.202.18:2181,192.168.202.49:2181,192.168.202.50:2181",4000, null);System.out.println(zooKeeper.getState());Thread.sleep(1000);System.out.println(zooKeeper.getState());} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} finally {if (zooKeeper != null) {try {zooKeeper.close();} catch (InterruptedException e) {e.printStackTrace();}}}}
CONNECTINGCONNECTED
Zookeeper 客户端和服务端建立会话连接的时候会存在几种状态转变:
NOT-CONNECTED -> CONNECTING -> CONNECTED -> CLOSE
我们在睡眠一秒钟之后,连接状态会发生变化,进入 CONNECTED 状态,只有在进入该状态之后,才能进行各种 API 操作,如果在 CONNECTING 状态进行 API 操作会报错。
所以我们怎么保证当前建立的会话已经进入 CONNECTED 状态呢?
这里就需要引入 Watcher 机制,通过 Watcher 机制确保会话已经进入 CONNECTED 状态。
public static void main(String[] args) {ZooKeeper zooKeeper = null;try {CountDownLatch countDownLatch = new CountDownLatch(1);zooKeeper = new ZooKeeper("192.168.202.18:2181,192.168.202.49:2181,192.168.202.50:2181",4000, new Watcher() {@Overridepublic void process(WatchedEvent event) {// 收到服务端响应时间,连接成功if (Event.KeeperState.SyncConnected == event.getState()) {countDownLatch.countDown();}}});countDownLatch.await();System.out.println(zooKeeper.getState());} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} finally {if (zooKeeper != null) {try {zooKeeper.close();} catch (InterruptedException e) {e.printStackTrace();}}}}
数据增删改查操作
// 创建节点zooKeeper.create("/zk-persis", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);Thread.sleep(1000);// 获取节点Stat stat = new Stat();byte[] bytes = zooKeeper.getData("/zk-persis", false, stat);System.out.println(new String(bytes));// 更新节点zooKeeper.setData("/zk-persis", "1".getBytes(), stat.getVersion());// 获取节点byte[] bytes1 = zooKeeper.getData("/zk-persis", false, stat);System.out.println(new String(bytes1));// 删除节点zooKeeper.delete("/zk-persis", stat.getVersion());
Stat 是节点的信息,包含了如下数据内容:

事件机制
Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听
事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 zookeeper 实现分布式锁、集群管理等功能。
Watcher 特性:当数据发生变化的时候,Zookeeper 会产生一个 Watcher 事件,并且会发送到客户端,但是客户端只会收到一次通知,如果后续这个节点再次发生变化,那么之前设置 Watcher 的客户端不会再次收到消息。Watcher 是一次性的操作,可以通过循环监听去达到永久监听效果。
如何注册事件机制
通过这三个操作来绑定事件:getData、exists、getChildren
如何触发事件
凡是事务类型的操作,都会触发监听事件,比如 create、setData、delete。
zooKeeper.create("/zk-persis", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// 注册事件机制Stat stat = zooKeeper.exists("/zk-persis", new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println(event.getType() + " -> " + event.getPath());}});// 触发事件stat = zooKeeper.setData("/zk-persis","1".getBytes(),stat.getVersion());Thread.sleep(1000);zooKeeper.delete("/zk-persis", stat.getVersion());
NodeDataChanged -> /zk-persis
没有打印删除事件,上面讲过 Watcher 事件的特性,Watcher 是一次性操作,所以如果要监听 delete 操作,需要修改成如下代码结构,以达到循环监听的目的。
// 注册事件机制final ZooKeeper finalZookeeper = zooKeeper;Stat stat = zooKeeper.exists("/zk-persis", new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println(event.getType() + " -> " + event.getPath());try {finalZookeeper.exists("/zk-persis", new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println(event.getType() + " -> " + event.getPath());}});} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}});// 触发事件stat = zooKeeper.setData("/zk-persis", "1".getBytes(), stat.getVersion());Thread.sleep(1000);zooKeeper.delete("/zk-persis", stat.getVersion());
NodeDataChanged -> /zk-persisNodeDeleted -> /zk-persis
事件类型
- None:连接状态发生变化,触发 None 事件
 - NodeCreated:创建节点,触发 NodeCreated 事件
 - NodeDeleted:删除节点, 触发 NodeDeleted 事件
 - NodeDataChanged:更新节点,触发 NodeDataChanged 事件
 - NodeChildrenChange:子节点创建、删除,触发事件
 
操作和事件之间的关系
| getData | exists | getChildren | |
|---|---|---|---|
| create(“/zk-persis”) | NodeCreated | NodeCreated | |
| setData(“/zk-persis”) | NodeDataChanged | NodeDataChanged | |
| delete(“/zk-persis”) | NodeDeleted | NodeDeleted | |
| create(“/zk-persis/children1”) | NodeChildrenChange | ||
| setData(“/zk-persis/children1”) | |||
| delete(“/zk-persis/children1”) | NodeChildrenChange | 
Curator API 操作 Zookeeper
使用 Curator 简化 Zookeeper 的客户端操作
数据增删改查操作
public static void main(String[] args) {// 建立连接CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString("192.168.202.18:2181,192.168.202.49:2181,192.168.202.50:2181").sessionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).namespace("curator").build();curatorFramework.start();try {// 创建节点curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/node/children1", "0".getBytes());// 获取节点Stat stat = new Stat();byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/node/children1");System.out.println(new String(bytes));// 更新节点stat = curatorFramework.setData().withVersion(stat.getVersion()).forPath("/node/children1", "1".getBytes());// 获取节点byte[] bytes1 = curatorFramework.getData().storingStatIn(stat).forPath("/node/children1");System.out.println(new String(bytes1));// 删除节点curatorFramework.delete().withVersion(stat.getVersion()).forPath("/node/children1");} catch (Exception e) {e.printStackTrace();}curatorFramework.close();}
事件机制
参考
Git 地址:https://gitee.com/yin_jw/demo/tree/master/zookeeper-demo
