文件系统+通知机制

工作机制

  • 存储和管理数据
  • 接受Watcher的注册
  • 数据变,通知注册者

特点

  • 1 Leader n Follower
  • 需 台数>3 且半数存活 (选举机制)
  • 全局数据一致性
  • 数据更新原子性
  • 实时性

提供服务

  • 统一命名服务 (记录域名)
  • 统一配置管理 (配置文件同步)
  • 统一集群管理 (节点状态)
  • 服务器节点动态上下线 (HA切换)
  • 软负载均衡 (访问数少的优先范围跟)

    节点类型

  • 持久

  • 短暂 (断开后,短暂节点消失)

Stat结构体

image.png

zxid 就是事务的id

c —>create
m —-> 最近的
pzxid 最后更新的子节点
cversion
ephemeralOwner (临时节点的话,这个为session id)

监听器原理

image.png

  1. main中创建zk客户端
  2. 创建两个线程,Connect、Listener
  3. Connect 发送监听事件给zk
  4. zk添加到监听器列表
  5. 当数据变化,zk发送消息给Listen
  6. Listen内部再调用process方法

选举机制

一台一台启动来进行说明

  • n>3开始选举 myid大的赢
  • Looking状态不会更改投票

Paxos 分布式一致性协议 (晚点写)

image.png

写请求

image.png

  • Client向zk写数据 Server得把消息传给Leader、再有Leader讲请求分发下去(每个Server都有写请求队列)
  • Leader收到半数认为写成功,即向小弟说写成功了,告诉client吧

ZkCli

命令基本语法 功能描述
help 显示所有操作命令
ls path 使用 ls 命令来查看当前znode的子节点
-w 监听子节点变化
-s 附加次级信息
create 普通创建
-s 含有序列
-e 临时(重启或者超时消失)
get path 获得节点的值
-w 监听节点内容变化
-s 附加次级信息
set 设置节点的具体值
stat 查看节点状态
delete 删除节点
deleteall 递归删除节点

ZookeeperAPI

  1. package com.Zookeeper;
  2. import org.apache.zookeeper.*;
  3. import org.apache.zookeeper.data.Stat;
  4. import org.junit.After;
  5. import org.junit.Before;
  6. import org.junit.Test;
  7. import java.io.IOException;
  8. import java.nio.charset.StandardCharsets;
  9. import java.util.List;
  10. public class zookeeper {
  11. private String connectString;
  12. private int sessionTime; //这里sessionTimeout 时间一定要够
  13. private ZooKeeper zkClinet;
  14. @Before
  15. public void init() throws IOException {
  16. connectString = "node01:2181,node02:2181,node03:2181";
  17. sessionTime = 100000;
  18. zkClinet = new ZooKeeper(connectString, sessionTime, new Watcher() {
  19. @Override
  20. public void process(WatchedEvent watchedEvent) {
  21. }
  22. });
  23. }
  24. /**
  25. * 获取子节点、不监听
  26. *
  27. * @throws KeeperException
  28. * @throws InterruptedException
  29. */
  30. @Test
  31. public void ls() throws KeeperException, InterruptedException {
  32. List<String> children = zkClinet.getChildren("/", false);
  33. System.out.println(children);
  34. }
  35. /***
  36. * 获取子节点、并监听、监听要阻塞一下
  37. * @throws KeeperException
  38. * @throws InterruptedException
  39. */
  40. @Test
  41. public void lsAndWatch() throws KeeperException, InterruptedException {
  42. List<String> children = zkClinet.getChildren("/zk", new Watcher() {
  43. @Override
  44. public void process(WatchedEvent watchedEvent) {
  45. System.out.println(watchedEvent);
  46. }
  47. });
  48. System.out.println(children);
  49. Thread.sleep(Long.MAX_VALUE);
  50. }
  51. @Test
  52. public void create() throws KeeperException, InterruptedException {
  53. //创建永久节点
  54. String path = zkClinet.create("/zk_node01", "node01_data".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  55. //创建临时节点
  56. // String path = zkClinet.create("/zk_node01","node01_data".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
  57. System.out.println(path);
  58. //创建临时节点需阻塞,不然结束即没
  59. // Thread.sleep(1000);
  60. }
  61. /***
  62. * 节点是否存在
  63. * @throws KeeperException
  64. * @throws InterruptedException
  65. */
  66. @Test
  67. public void exist() throws KeeperException, InterruptedException {
  68. Stat stat = zkClinet.exists("/zk_node01", false);
  69. System.out.println(stat == null ? "not exist" : "exist");
  70. }
  71. /***
  72. * 获取节点存储信息
  73. * @throws KeeperException
  74. * @throws InterruptedException
  75. */
  76. @Test
  77. public void get() throws KeeperException, InterruptedException {
  78. Stat stat = zkClinet.exists("/zk_node01", false);
  79. if (stat == null) {
  80. System.out.println("not exist");
  81. return;
  82. }
  83. byte[] data = zkClinet.getData("/zk_node01", false, stat);
  84. System.out.println(new String(data));
  85. }
  86. /***
  87. * 设置节点的值
  88. * @throws KeeperException
  89. * @throws InterruptedException
  90. */
  91. @Test
  92. public void set() throws KeeperException, InterruptedException {
  93. Stat stat = zkClinet.exists("/zk_node01", false);
  94. if (stat == null) {
  95. System.out.println("not exist");
  96. return;
  97. }
  98. zkClinet.setData("/zk_node01", "modified_data".getBytes(StandardCharsets.UTF_8), stat.getVersion());
  99. }
  100. /***
  101. * 递归删除节点
  102. * @param path
  103. * @param zk
  104. * @throws KeeperException
  105. * @throws InterruptedException
  106. */
  107. public void deleteAll(String path, ZooKeeper zk) throws KeeperException, InterruptedException {
  108. Stat stat = zkClinet.exists(path, false);
  109. if (stat == null) {
  110. System.out.println("not exist");
  111. return;
  112. }
  113. List<String> children = zk.getChildren(path, false);
  114. if (children.isEmpty()) {
  115. zk.delete(path, stat.getVersion());
  116. } else {
  117. for (String child : children) {
  118. deleteAll(path + "/" + child, zk);
  119. }
  120. // 删除自己
  121. zk.delete(path, stat.getVersion());
  122. }
  123. }
  124. @Test
  125. public void test_delete_all() throws KeeperException, InterruptedException {
  126. deleteAll("/zk", zkClinet);
  127. }
  128. @After
  129. public void close() throws InterruptedException {
  130. zkClinet.close();
  131. }
  132. }