数据存储

TODO:待优化

事务日志:zoo.cfg 文件中的 datadir
快照日志:zoo.cfg 文件中的 datadir
运行时日志:bin/zookeeper.out

原生 API 操作 Zookeeper

建立连接

  1. public static void main(String[] args) {
  2. ZooKeeper zooKeeper = null;
  3. try {
  4. zooKeeper = new ZooKeeper("192.168.202.18:2181,192.168.202.49:2181,192.168.202.50:2181",
  5. 4000, null);
  6. System.out.println(zooKeeper.getState());
  7. Thread.sleep(1000);
  8. System.out.println(zooKeeper.getState());
  9. } catch (IOException e) {
  10. e.printStackTrace();
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. } finally {
  14. if (zooKeeper != null) {
  15. try {
  16. zooKeeper.close();
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  22. }
  1. CONNECTING
  2. CONNECTED

Zookeeper 客户端和服务端建立会话连接的时候会存在几种状态转变:

NOT-CONNECTED -> CONNECTING -> CONNECTED -> CLOSE

我们在睡眠一秒钟之后,连接状态会发生变化,进入 CONNECTED 状态,只有在进入该状态之后,才能进行各种 API 操作,如果在 CONNECTING 状态进行 API 操作会报错。

所以我们怎么保证当前建立的会话已经进入 CONNECTED 状态呢?

这里就需要引入 Watcher 机制,通过 Watcher 机制确保会话已经进入 CONNECTED 状态。

  1. public static void main(String[] args) {
  2. ZooKeeper zooKeeper = null;
  3. try {
  4. CountDownLatch countDownLatch = new CountDownLatch(1);
  5. zooKeeper = new ZooKeeper("192.168.202.18:2181,192.168.202.49:2181,192.168.202.50:2181",
  6. 4000, new Watcher() {
  7. @Override
  8. public void process(WatchedEvent event) {
  9. // 收到服务端响应时间,连接成功
  10. if (Event.KeeperState.SyncConnected == event.getState()) {
  11. countDownLatch.countDown();
  12. }
  13. }
  14. });
  15. countDownLatch.await();
  16. System.out.println(zooKeeper.getState());
  17. } catch (IOException e) {
  18. e.printStackTrace();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. } finally {
  22. if (zooKeeper != null) {
  23. try {
  24. zooKeeper.close();
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }
  30. }

数据增删改查操作

  1. // 创建节点
  2. zooKeeper.create("/zk-persis", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  3. Thread.sleep(1000);
  4. // 获取节点
  5. Stat stat = new Stat();
  6. byte[] bytes = zooKeeper.getData("/zk-persis", false, stat);
  7. System.out.println(new String(bytes));
  8. // 更新节点
  9. zooKeeper.setData("/zk-persis", "1".getBytes(), stat.getVersion());
  10. // 获取节点
  11. byte[] bytes1 = zooKeeper.getData("/zk-persis", false, stat);
  12. System.out.println(new String(bytes1));
  13. // 删除节点
  14. zooKeeper.delete("/zk-persis", stat.getVersion());

Stat 是节点的信息,包含了如下数据内容:

Zookeeper 实践指南 - 图1

事件机制

Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听
事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 zookeeper 实现分布式锁、集群管理等功能。

Watcher 特性:当数据发生变化的时候,Zookeeper 会产生一个 Watcher 事件,并且会发送到客户端,但是客户端只会收到一次通知,如果后续这个节点再次发生变化,那么之前设置 Watcher 的客户端不会再次收到消息。Watcher 是一次性的操作,可以通过循环监听去达到永久监听效果。

如何注册事件机制

通过这三个操作来绑定事件:getData、exists、getChildren

如何触发事件

凡是事务类型的操作,都会触发监听事件,比如 create、setData、delete。

  1. zooKeeper.create("/zk-persis", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  2. // 注册事件机制
  3. Stat stat = zooKeeper.exists("/zk-persis", new Watcher() {
  4. @Override
  5. public void process(WatchedEvent event) {
  6. System.out.println(event.getType() + " -> " + event.getPath());
  7. }
  8. });
  9. // 触发事件
  10. stat = zooKeeper.setData("/zk-persis","1".getBytes(),stat.getVersion());
  11. Thread.sleep(1000);
  12. zooKeeper.delete("/zk-persis", stat.getVersion());
  1. NodeDataChanged -> /zk-persis

没有打印删除事件,上面讲过 Watcher 事件的特性,Watcher 是一次性操作,所以如果要监听 delete 操作,需要修改成如下代码结构,以达到循环监听的目的。

  1. // 注册事件机制
  2. final ZooKeeper finalZookeeper = zooKeeper;
  3. Stat stat = zooKeeper.exists("/zk-persis", new Watcher() {
  4. @Override
  5. public void process(WatchedEvent event) {
  6. System.out.println(event.getType() + " -> " + event.getPath());
  7. try {
  8. finalZookeeper.exists("/zk-persis", new Watcher() {
  9. @Override
  10. public void process(WatchedEvent event) {
  11. System.out.println(event.getType() + " -> " + event.getPath());
  12. }
  13. });
  14. } catch (KeeperException e) {
  15. e.printStackTrace();
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. });
  21. // 触发事件
  22. stat = zooKeeper.setData("/zk-persis", "1".getBytes(), stat.getVersion());
  23. Thread.sleep(1000);
  24. zooKeeper.delete("/zk-persis", stat.getVersion());
  1. NodeDataChanged -> /zk-persis
  2. NodeDeleted -> /zk-persis

事件类型

  • None:连接状态发生变化,触发 None 事件
  • NodeCreated:创建节点,触发 NodeCreated 事件
  • NodeDeleted:删除节点, 触发 NodeDeleted 事件
  • NodeDataChanged:更新节点,触发 NodeDataChanged 事件
  • NodeChildrenChange:子节点创建、删除,触发事件

操作和事件之间的关系

getData exists getChildren
create(“/zk-persis”) NodeCreated NodeCreated
setData(“/zk-persis”) NodeDataChanged NodeDataChanged
delete(“/zk-persis”) NodeDeleted NodeDeleted
create(“/zk-persis/children1”) NodeChildrenChange
setData(“/zk-persis/children1”)
delete(“/zk-persis/children1”) NodeChildrenChange

Curator API 操作 Zookeeper

使用 Curator 简化 Zookeeper 的客户端操作

数据增删改查操作

  1. public static void main(String[] args) {
  2. // 建立连接
  3. CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
  4. .connectString("192.168.202.18:2181,192.168.202.49:2181,192.168.202.50:2181")
  5. .sessionTimeoutMs(4000)
  6. .retryPolicy(new ExponentialBackoffRetry(1000, 3))
  7. .namespace("curator")
  8. .build();
  9. curatorFramework.start();
  10. try {
  11. // 创建节点
  12. curatorFramework.create()
  13. .creatingParentsIfNeeded()
  14. .withMode(CreateMode.PERSISTENT)
  15. .forPath("/node/children1", "0".getBytes());
  16. // 获取节点
  17. Stat stat = new Stat();
  18. byte[] bytes = curatorFramework.getData()
  19. .storingStatIn(stat)
  20. .forPath("/node/children1");
  21. System.out.println(new String(bytes));
  22. // 更新节点
  23. stat = curatorFramework.setData()
  24. .withVersion(stat.getVersion())
  25. .forPath("/node/children1", "1".getBytes());
  26. // 获取节点
  27. byte[] bytes1 = curatorFramework.getData()
  28. .storingStatIn(stat)
  29. .forPath("/node/children1");
  30. System.out.println(new String(bytes1));
  31. // 删除节点
  32. curatorFramework.delete()
  33. .withVersion(stat.getVersion())
  34. .forPath("/node/children1");
  35. } catch (Exception e) {
  36. e.printStackTrace();
  37. }
  38. curatorFramework.close();
  39. }

事件机制

参考

Git 地址:https://gitee.com/yin_jw/demo/tree/master/zookeeper-demo