文件系统+通知机制
工作机制
- 存储和管理数据
- 接受Watcher的注册
- 数据变,通知注册者
特点
- 1 Leader n Follower
- 需 台数>3 且半数存活 (选举机制)
- 全局数据一致性
- 数据更新原子性
- 实时性
提供服务
Stat结构体

zxid 就是事务的id
c —>create
m —-> 最近的
pzxid 最后更新的子节点
cversion
ephemeralOwner (临时节点的话,这个为session id)
监听器原理

- main中创建zk客户端
- 创建两个线程,Connect、Listener
- Connect 发送监听事件给zk
- zk添加到监听器列表
- 当数据变化,zk发送消息给Listen
- Listen内部再调用process方法
选举机制
一台一台启动来进行说明
- n>3开始选举 myid大的赢
- Looking状态不会更改投票
Paxos 分布式一致性协议 (晚点写)

写请求

- Client向zk写数据 Server得把消息传给Leader、再有Leader讲请求分发下去(每个Server都有写请求队列)
- Leader收到半数认为写成功,即向小弟说写成功了,告诉client吧
ZkCli
| 命令基本语法 | 功能描述 |
|---|---|
| help | 显示所有操作命令 |
| ls path | 使用 ls 命令来查看当前znode的子节点 -w 监听子节点变化 -s 附加次级信息 |
| create | 普通创建 -s 含有序列 -e 临时(重启或者超时消失) |
| get path | 获得节点的值 -w 监听节点内容变化 -s 附加次级信息 |
| set | 设置节点的具体值 |
| stat | 查看节点状态 |
| delete | 删除节点 |
| deleteall | 递归删除节点 |
ZookeeperAPI
package com.Zookeeper;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.List;public class zookeeper {private String connectString;private int sessionTime; //这里sessionTimeout 时间一定要够private ZooKeeper zkClinet;@Beforepublic void init() throws IOException {connectString = "node01:2181,node02:2181,node03:2181";sessionTime = 100000;zkClinet = new ZooKeeper(connectString, sessionTime, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}/*** 获取子节点、不监听** @throws KeeperException* @throws InterruptedException*/@Testpublic void ls() throws KeeperException, InterruptedException {List<String> children = zkClinet.getChildren("/", false);System.out.println(children);}/**** 获取子节点、并监听、监听要阻塞一下* @throws KeeperException* @throws InterruptedException*/@Testpublic void lsAndWatch() throws KeeperException, InterruptedException {List<String> children = zkClinet.getChildren("/zk", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println(watchedEvent);}});System.out.println(children);Thread.sleep(Long.MAX_VALUE);}@Testpublic void create() throws KeeperException, InterruptedException {//创建永久节点String path = zkClinet.create("/zk_node01", "node01_data".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//创建临时节点// String path = zkClinet.create("/zk_node01","node01_data".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);System.out.println(path);//创建临时节点需阻塞,不然结束即没// Thread.sleep(1000);}/**** 节点是否存在* @throws KeeperException* @throws InterruptedException*/@Testpublic void exist() throws KeeperException, InterruptedException {Stat stat = zkClinet.exists("/zk_node01", false);System.out.println(stat == null ? "not exist" : "exist");}/**** 获取节点存储信息* @throws KeeperException* @throws InterruptedException*/@Testpublic void get() throws KeeperException, InterruptedException {Stat stat = zkClinet.exists("/zk_node01", false);if (stat == null) {System.out.println("not exist");return;}byte[] data = zkClinet.getData("/zk_node01", false, stat);System.out.println(new String(data));}/**** 设置节点的值* @throws KeeperException* @throws InterruptedException*/@Testpublic void set() throws KeeperException, InterruptedException {Stat stat = zkClinet.exists("/zk_node01", false);if (stat == null) {System.out.println("not exist");return;}zkClinet.setData("/zk_node01", "modified_data".getBytes(StandardCharsets.UTF_8), stat.getVersion());}/**** 递归删除节点* @param path* @param zk* @throws KeeperException* @throws InterruptedException*/public void deleteAll(String path, ZooKeeper zk) throws KeeperException, InterruptedException {Stat stat = zkClinet.exists(path, false);if (stat == null) {System.out.println("not exist");return;}List<String> children = zk.getChildren(path, false);if (children.isEmpty()) {zk.delete(path, stat.getVersion());} else {for (String child : children) {deleteAll(path + "/" + child, zk);}// 删除自己zk.delete(path, stat.getVersion());}}@Testpublic void test_delete_all() throws KeeperException, InterruptedException {deleteAll("/zk", zkClinet);}@Afterpublic void close() throws InterruptedException {zkClinet.close();}}
