数据存储
TODO:待优化
事务日志:zoo.cfg 文件中的 datadir
快照日志:zoo.cfg 文件中的 datadir
运行时日志:bin/zookeeper.out
原生 API 操作 Zookeeper
建立连接
public static void main(String[] args) {
ZooKeeper zooKeeper = null;
try {
zooKeeper = new ZooKeeper("192.168.202.18:2181,192.168.202.49:2181,192.168.202.50:2181",
4000, null);
System.out.println(zooKeeper.getState());
Thread.sleep(1000);
System.out.println(zooKeeper.getState());
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
CONNECTING
CONNECTED
Zookeeper 客户端和服务端建立会话连接的时候会存在几种状态转变:
NOT-CONNECTED -> CONNECTING -> CONNECTED -> CLOSE
我们在睡眠一秒钟之后,连接状态会发生变化,进入 CONNECTED 状态,只有在进入该状态之后,才能进行各种 API 操作,如果在 CONNECTING 状态进行 API 操作会报错。
所以我们怎么保证当前建立的会话已经进入 CONNECTED 状态呢?
这里就需要引入 Watcher 机制,通过 Watcher 机制确保会话已经进入 CONNECTED 状态。
public static void main(String[] args) {
ZooKeeper zooKeeper = null;
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper("192.168.202.18:2181,192.168.202.49:2181,192.168.202.50:2181",
4000, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到服务端响应时间,连接成功
if (Event.KeeperState.SyncConnected == event.getState()) {
countDownLatch.countDown();
}
}
});
countDownLatch.await();
System.out.println(zooKeeper.getState());
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
数据增删改查操作
// 创建节点
zooKeeper.create("/zk-persis", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(1000);
// 获取节点
Stat stat = new Stat();
byte[] bytes = zooKeeper.getData("/zk-persis", false, stat);
System.out.println(new String(bytes));
// 更新节点
zooKeeper.setData("/zk-persis", "1".getBytes(), stat.getVersion());
// 获取节点
byte[] bytes1 = zooKeeper.getData("/zk-persis", false, stat);
System.out.println(new String(bytes1));
// 删除节点
zooKeeper.delete("/zk-persis", stat.getVersion());
Stat 是节点的信息,包含了如下数据内容:
事件机制
Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听
事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 zookeeper 实现分布式锁、集群管理等功能。
Watcher 特性:当数据发生变化的时候,Zookeeper 会产生一个 Watcher 事件,并且会发送到客户端,但是客户端只会收到一次通知,如果后续这个节点再次发生变化,那么之前设置 Watcher 的客户端不会再次收到消息。Watcher 是一次性的操作,可以通过循环监听去达到永久监听效果。
如何注册事件机制
通过这三个操作来绑定事件:getData、exists、getChildren
如何触发事件
凡是事务类型的操作,都会触发监听事件,比如 create、setData、delete。
zooKeeper.create("/zk-persis", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 注册事件机制
Stat stat = zooKeeper.exists("/zk-persis", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getType() + " -> " + event.getPath());
}
});
// 触发事件
stat = zooKeeper.setData("/zk-persis","1".getBytes(),stat.getVersion());
Thread.sleep(1000);
zooKeeper.delete("/zk-persis", stat.getVersion());
NodeDataChanged -> /zk-persis
没有打印删除事件,上面讲过 Watcher 事件的特性,Watcher 是一次性操作,所以如果要监听 delete 操作,需要修改成如下代码结构,以达到循环监听的目的。
// 注册事件机制
final ZooKeeper finalZookeeper = zooKeeper;
Stat stat = zooKeeper.exists("/zk-persis", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getType() + " -> " + event.getPath());
try {
finalZookeeper.exists("/zk-persis", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getType() + " -> " + event.getPath());
}
});
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 触发事件
stat = zooKeeper.setData("/zk-persis", "1".getBytes(), stat.getVersion());
Thread.sleep(1000);
zooKeeper.delete("/zk-persis", stat.getVersion());
NodeDataChanged -> /zk-persis
NodeDeleted -> /zk-persis
事件类型
- None:连接状态发生变化,触发 None 事件
- NodeCreated:创建节点,触发 NodeCreated 事件
- NodeDeleted:删除节点, 触发 NodeDeleted 事件
- NodeDataChanged:更新节点,触发 NodeDataChanged 事件
- NodeChildrenChange:子节点创建、删除,触发事件
操作和事件之间的关系
getData | exists | getChildren | |
---|---|---|---|
create(“/zk-persis”) | NodeCreated | NodeCreated | |
setData(“/zk-persis”) | NodeDataChanged | NodeDataChanged | |
delete(“/zk-persis”) | NodeDeleted | NodeDeleted | |
create(“/zk-persis/children1”) | NodeChildrenChange | ||
setData(“/zk-persis/children1”) | |||
delete(“/zk-persis/children1”) | NodeChildrenChange |
Curator API 操作 Zookeeper
使用 Curator 简化 Zookeeper 的客户端操作
数据增删改查操作
public static void main(String[] args) {
// 建立连接
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString("192.168.202.18:2181,192.168.202.49:2181,192.168.202.50:2181")
.sessionTimeoutMs(4000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("curator")
.build();
curatorFramework.start();
try {
// 创建节点
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/node/children1", "0".getBytes());
// 获取节点
Stat stat = new Stat();
byte[] bytes = curatorFramework.getData()
.storingStatIn(stat)
.forPath("/node/children1");
System.out.println(new String(bytes));
// 更新节点
stat = curatorFramework.setData()
.withVersion(stat.getVersion())
.forPath("/node/children1", "1".getBytes());
// 获取节点
byte[] bytes1 = curatorFramework.getData()
.storingStatIn(stat)
.forPath("/node/children1");
System.out.println(new String(bytes1));
// 删除节点
curatorFramework.delete()
.withVersion(stat.getVersion())
.forPath("/node/children1");
} catch (Exception e) {
e.printStackTrace();
}
curatorFramework.close();
}
事件机制
参考
Git 地址:https://gitee.com/yin_jw/demo/tree/master/zookeeper-demo