Zookeeper系统模型
数据模型ZNode
在zookeeper中,数据信息被保存在数据结点Znode中。ZNode是Zookeeper中最小数据单位,在ZNode下面又可以挂ZNode,一层层下来就形成了层次化命名空间ZNode树。ZNode Tree采用类似文件系统的层级树状结构进行管理。
ZNode的节点路径表示方式和Unix文件系统路径相似,都是由一系列使用斜杠(/)进行分割的路径表示。针对节点,可以写入数据,也可以在该节点下创建子节点。
ZNode类型
ZNode类型分为三类:持久性节点(Persistent)、临时性节点(Ephemeral)、顺序性节点(Sequential)
在开发中创建的节点是通过组合产生的四种节点类型:持久节点、持久顺序节点、临时节点、临时顺序节点。
持久节点
最常见的节点类型。是指节点被创建后会一直存在服务器,直到删除操作主动清除。
持久顺序节点
节点特性和持久节点一样。额外特性表现在顺序上,在创建顺序节点的时候,在节点名后面加上一个数字后缀,来表示节点的顺序。
临时节点
节点的生命周期和客户端会话绑定在一起,客户端会话结束时,节点就会被删除。同时,与持久性节点不同的是,临时节点不能创建子节点。
临时顺序节点
节点特性和临时节点一样。额外特性表现在顺序上,在创建顺序节点的时候,在节点名后面加上一个数字后缀,来表示节点的顺序。
事务ID
狭义的事务通常指的是数据库事务,一般包含了一系列数据库有序的读写操作,并具备ACID特性。
Zookeeper中,事务是指能够改变Zookeeper服务器状态的操作。也可以死称之为事务操作或更新操作。该操作包括数据节点的创建和删除,数据节点内容更新等。
对于每一个事务请求,Zookeeper都会为其分配一个全局唯一的事务ID,用ZXID表示,通常为一个64位数字。每一个ZXID对应一次更新,从ZXID可以间接识别Zookeeper处理这些更新操作请求的全局顺序。
ZNode状态信息
整个ZNode节点内容包括两部分:节点数据内容和节点状态信息。[quota]代表数据内容,其余属于状态信息。
cZxid 就是 Create ZXID,表示节点被创建时的事务ID。
ctime 就是 Create Time,表示节点创建时间。
mZxid 就是 Modified ZXID,表示节点最后⼀次被修改时的事务ID。
mtime 就是 Modified Time,表示节点最后⼀次被修改的时间。
pZxid 表示该节点的⼦节点列表最后⼀次被修改时的事务 ID。只有⼦节点列表变更才会更新 pZxid,
⼦节点内容变更不会更新。
cversion 表示⼦节点的版本号。
dataVersion 表示内容版本号。
aclVersion 标识acl版本
ephemeralOwner 表示创建该临时节点时的会话 sessionID,如果是持久性节点那么值为 0
dataLength 表示数据⻓度。
numChildren 表示直系⼦节点数。
Watch—数据变更通知
Zookeeper使用Watch机制实现分布式数据的发布/订阅功能。
发布订阅模型:一对多订阅关系。能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生改变时,就会通知所有订阅者,让他们能够做出相应的处理。
Zookeeper引入Watch机制实现分布式通知功能。Zookeeper允许客户端向服务端注册一个Watcher监听,当服务端的一些指定事件触发了Watcher,就会向指定客户端发送一个事件通知来实现分布式的通知功能。
Watcher注册与通知过程
Watcher机制包括: 客户端线程、客户端WatchManager、Zookeeper服务器
工作流程:客户端向Zookeeper服务器注册的同时,会将Watcher对象存储在客户端的WatchManager当中;
当Zookeeper服务器触发了Watcher事件后,会向客户端发送通知;客户端线程从WatcherManager中取出对应的Watcher对象来执行回调逻辑。
ACL—保证数据的安全
Zookeeper作为一个分布式协调框架,内部存储了分布式系统运行时状态的元数据。元数据会直接影响基于Zookeeper进行构造的分布式系统的运行状态。所以,需要保证系统数据的安全,避免因为误操作变更了数据导致数据的异常,Zookeeper提供了ACL权限机制来保证数据的安全。
ACL机制包括三个方面: 权限模式(Scheme)、授权对象(ID)、权限(Permission),
通常使用 scheme:id:permission
来标识一个有效的ACL信息。
权限模式:Scheme
权限模式用来确定验证过程中使用的检验策略,有四种模式
- IP
通过IP地址粒度进行权限控制,如”ip:192.168.0.20”表示权限控制针对该IP地址,”ip:192.168.0.1/24”表示针对192.168.0.*这个网段进行权限控制。
- Digest
最常用的权限控制模式。用”username:password”形式的权限表示来进行权限配置,便于区分不同应用来继续拿给你权限控制。通过”username:password”形式配置权限标识后,Zookeeper会先用对其进行SHA-1加密和BASE64编码。
- World
开放的权限控制模式,数据结点的访问权限对所有用户开放,即所有用户都可以在不进行任何权限校验的情况下操作Zookeeper上的数据。 可以看作是一种特殊的Digest模式,只有一个权限标识,即”world:anyone”。
- Super
超级用户模式。特殊的Digest模式,在Super模式下超级用户可以对任意Zookeeper上的数据结点进行任何操作。
授权对象:ID
授权对象是指权限赋予的用户或一个指定实体,例如IP地址或机器。在不同权限下,授权对象是不同的。
权限模式 | 授权对象 |
---|---|
IP | 通常是一个IP地址或IP网段 例如:192.168.0.20 或 192.168.0.1/24 |
Digest | 自定义,通常是 username:BASE64(SHA-1(usename:password)) |
World | 只有一个ID:anyone |
Super | 超级用户 |
权限
全年是指那些通过权限检查后可以被允许执行的操作。在Zookeeper中,所有对数据的操作权限分为五类
- CREATE
数据节点的创建权限。允许授权对象在数据节点下创建子节点。
- DELETE
子节点的删除权限,允许收钱对象删除该数据节点的子节点。
- READ
数据节点的读取权限,允许授权对象访问该数据节点并读取其数据内容或子节点列表。
- WRITE
数据节点的更新权限,允许授权对象对该数据节点进行更新操作。
- ADMIN
数据节点的管理权限,允许授权对象对该数据节点进行ACL相关的设置操作。
Zookeeper命令行操作
首先需要通过zkClient进入Zookeeper客户端命令行
./zkcli.sh 连接本地的zookeeper服务器
./zkCli.sh -server ip:port 连接指定的服务器
命令查看Zookeeper命令
help
创建节点
create [-s][-e] path data acl
path为节点路径,data为节点数据内容。
-s 代表顺序节点,-e代表临时节点。若不指定,则创建持久节点;acl用来进行权限通知。
创建顺序节点
create -s /zk-test 123
创建持久顺序节点zk-test。节点创建成功后,节点后缀会有一串数字区分顺序。
创建临时节点
create -e /zk-temp 123
临时节点在客户端会话结束后,就会自动删除。
通过命令quit退出客户端后,再红心登录客户端后,通过ls命令查看会发现临时节点已被删除
创建永久节点
create /zk-permanent 123
读取节点
- ls命令
列出指定节点下的所有直系子节点列表。
ls path
path: 指定数据节点的节点路径
- ls2命令
列出指定结点下的所有直系子节点列表,同时展示当前节点的状态信息。
ls2 path
- get命令
获取指定节点的数据内容和状态信息。
get path
更新节点
- set命令
更新指定节点的数据内容
set path data [version]
data: 更新的内容
version:数据的版本
删除节点
- delete命令
删除指定节点。如果删除的节点存在子节点,那么无法删除该节点,必须先删除子节点,再删除父节点。
delete path [version]
ZookeeperAPI使用
ZookeeperAPI共包含五个包。
(1)org.apache.zookeeper
(2)org.apache.zookeeper.data
(3)org.apache.zookeeper.server
(4)org.apache.zookeeper.quorum
(5)org.apache.zookeeper.upgrade
导入依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
建立会话
Zookeeper客户端和服务端会话的建立是一个异步过程,也就是在程序中,构造方法会在处理完客户端的初始化工作后立即返回,在大多数情况下,此时并没有真正建立好一个可用的会话,在会话的生命周期中仍然处于”CONNECTING”的状态。当该会话真正创建完毕后,Zookeeper会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话。
API
/*
客户端可以通过创建⼀个zk实例来连接zk服务器
connectString: 连接地址:IP:端⼝
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
new Zookeeper(connectString, sesssionTimeOut, Wather)
代码示例
public class CreateSession implements Watcher {
//countDownLatch这个类使⼀个线程等待,主要不让main⽅法结束
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException,IOException {
ZooKeeper zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new CreateSession());
System.out.println(zooKeeper.getState());
countDownLatch.await();
//表示会话真正建⽴
System.out.println("=========Client Connected tozookeeper==========");
}
// 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的
// watcher通知,在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上
// 的等待阻塞,⾄此,会话创建完毕
public void process(WatchedEvent watchedEvent) {
//当连接创建了,服务端发送给客户端SyncConnected事件
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
countDownLatch.countDown();
}
}
}
创建节点
API
/**
* path :节点创建的路径
* data[] :节点创建要保存的数据,是个byte类型的
* acl :节点创建的权限信息(4种类型)
** ANYONE_ID_UNSAFE : 表示任何⼈
** AUTH_IDS :此ID仅可⽤于设置ACL。它将被客户机验证的ID替换。
** OPEN_ACL_UNSAFE :这是⼀个完全开放的ACL(常⽤)-->world:anyone
** CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
* createMode :创建节点的类型(4种类型)
** PERSISTENT:持久节点
** PERSISTENT_SEQUENTIAL:持久顺序节点
** EPHEMERAL:临时节点
** EPHEMERAL_SEQUENTIAL:临时顺序节点
*/
String node = zookeeper.create(path, data, acl, createMode);
代码示例
public class CreateNote implements Watcher {
//countDownLatch这个类使⼀个线程等待,主要不让main⽅法结束
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new CreateNote());
countDownLatch.await();
}
public void process(WatchedEvent watchedEvent) {
//当连接创建了,服务端发送给客户端SyncConnected事件
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
countDownLatch.countDown();
}
//调⽤创建节点⽅法
try {
createNodeSync();
} catch (Exception e) {
e.printStackTrace();
}
}
private void createNodeSync() throws Exception {
String node_PERSISTENT =
zooKeeper.create("/lg_persistent", "持久节点内容".getBytes("utf-8"),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String node_PERSISTENT_SEQUENTIAL =
zooKeeper.create("/lg_persistent_sequential", "持久节点内容".getBytes("utf-8"),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
String node_EPERSISTENT =
zooKeeper.create("/lg_ephemeral", "临时节点内容".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("创建的持久节点是:"+node_PERSISTENT);
System.out.println("创建的持久顺序节点是:"+node_PERSISTENT_SEQUENTIAL);
System.out.println("创建的临时节点是:"+node_EPERSISTENT);
}
}
获取节点数据
API
/**
获取节点数据内容
* path : 获取数据的路径
* watch : 是否开启监听
* stat : 节点状态信息
* null: 表示获取最新版本的数据
*/
byte[] data = zk.getData(path, watch, stat);
/*
获取节点的直系节点列表
path:路径
watch:是否要启动监听,当⼦节点列表发⽣变化,会触发监听
*/
List<String> children = zooKeeper.getChildren(path, watch);
代码示例
public class GetNoteData implements Watcher {
//countDownLatch这个类使⼀个线程等待,主要不让main⽅法结束
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("10.211.55.4:2181", 10000, new GetNoteDate());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent watchedEvent) {
//⼦节点列表发⽣变化时,服务器会发出NodeChildrenChanged通知,但不会把变化情况告诉给客户端
// 需要客户端⾃⾏获取,且通知是⼀次性的,需反复注册监听
if(watchedEvent.getType() ==Event.EventType.NodeChildrenChanged) {
//再次获取节点数据
try {
List<String> children = zooKeeper.getChildren(watchedEvent.getPath(), true);
System.out.println(children);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//当连接创建了,服务端发送给客户端SyncConnected事件
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
try {
//调⽤获取单个节点数据⽅法
getNoteDate();
getChildrens();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static void getNoteData() throws Exception {
byte[] data = zooKeeper.getData("/lg_persistent/lg-children", true, null);
System.out.println(new String(data,"utf-8"));
}
private static void getChildrens() throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren("/lg_persistent", true);
System.out.println(children);
}
}
修改节点
API
/*
path:路径
data:要修改的内容 byte[]
version:为-1,表示对最新版本的数据进⾏修改
stat 状态信息对象, -1:最新版本
*/
Stat stat = zooKeeper.setData(path, data,version);
代码示例
public class updateNote implements Watcher {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new updateNote());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent watchedEvent) {
//当连接创建了,服务端发送给客户端SyncConnected事件
try {
updateNodeSync();
} catch (Exception e) {
e.printStackTrace();
}
}
private void updateNodeSync() throws Exception {
byte[] data = zooKeeper.getData("/lg_persistent", false, null);
System.out.println("修改前的值:"+new String(data));
//修改 stat:状态信息对象 -1:最新版本
Stat stat = zooKeeper.setData("/lg_persistent", "客户端修改内容".getBytes(), -1);
byte[] data2 = zooKeeper.getData("/lg_persistent", false, null);
System.out.println("修改后的值:"+new String(data2));
}
}
删除节点
API
/*
zooKeeper.exists(path,watch) :判断节点是否存在
zookeeper.delete(path,version) : 删除节点
*/
代码示例
public class DeleteNote implements Watcher {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new DeleteNote());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent watchedEvent) {
//当连接创建了,服务端发送给客户端SyncConnected事件
try {
deleteNodeSync();
} catch (Exception e) {
e.printStackTrace();
}
}
private void deleteNodeSync() throws KeeperException, InterruptedException {
Stat exists = zooKeeper.exists("/lg_persistent/lg-children", false);
System.out.println(exists == null ? "该节点不存在":"该节点存在");
zooKeeper.delete("/lg_persistent/lg-children",-1);
Stat exists2 = zooKeeper.exists("/lg_persistent/lg-children", false);
System.out.println(exists2 == null ? "该节点不存在":"该节点存在");
}
}
Zookeeper开源客户端 — ZkClient
引入依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency
创建会话
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
创建节点
// createParents值为true,可以递归创建节点:先完成父节点创建,再创建子节点
zkClient.createPersistent(path, createParents);
删除节点
zkClient.deleteRecursive(path);
获取子节点列表
API
List<String> children = zkClient.getChildren(path);
代码示例
public class Get_Children_Sample {
public static void main(String[] args) throws Exception {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
List<String> children = zkClient.getChildren("/zkClient");
System.out.println(children);
// 注册监听事件
// 一旦客户端对一个节点注册了子节点列表变更监听后,那么当该节点的子节点列表发生变更,
// 服务端就会通知客户端,并将最新的子节点列表发送给客户端
zkClient.subscribeChildChanges(path, new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath + " 's child changed,currentChilds:" + currentChilds);
}});
zkClient.createPersistent("/zkClient");
Thread.sleep(1000);
zkClient.createPersistent("/zkClient/c1");
Thread.sleep(1000);
zkClient.delete("/lg-zkClient/c1");
Thread.sleep(1000);
zkClient.delete(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
运行结果
/zkClient 's child changed, currentChilds:[]
/zkClient 's child changed, currentChilds:[c1]
/zkClient 's child changed, currentChilds:[]
/zkClient 's child changed, currentChilds:null
获取节点数据
Object o = zkClient.readData(path);
节点数据更新
zkClient.writeData(path, data);
节点是否存在
zkClient.delete(path);
代码示例
public class Get_Data_Sample {
public static void main(String[] args) throws InterruptedException {
String path = "/zkClient-Ep";
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
//判断节点是否存在
boolean exists = zkClient.exists(path);
if (!exists){
zkClient.createEphemeral(path, "123");
}
//注册监听
zkClient.subscribeDataChanges(path, new IZkDataListener() {
public void handleDataChange(String path, Object data) throws Exception {
System.out.println(path+"该节点内容被更新,更新后的内容"+data);
}
public void handleDataDeleted(String s) throws Exception {
System.out.println(s+" 该节点被删除");
}
});
//获取节点内容
Object o = zkClient.readData(path);
System.out.println(o);
//更新
zkClient.writeData(path,"4567");
Thread.sleep(1000);
//删除
zkClient.delete(path);
Thread.sleep(1000);
}
}
运行结果
123
/zkClient-Ep该节点内容被更新,更新后的内容4567
/zkClient-Ep 该节点被删除
Zookeeper开源客户端 — Curator
提供Fluent编码风格的支持
Fluent编码风格:
引入依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
创建会话工厂类
创建步骤
1、Curator创建客户端是通过 CuratorFrameworkFactory
工厂类实现的。
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs,
int connectionTimeoutMs, RetryPolicy retryPolicy)
- RetryPolicy: 重试策略接口,可自定义。默认实现有:
ExponentialBackoffRetry(基于backoff重连策略)
RetryNTimes(重连N次策略)
RetryForever(永远重连策略)
2、通过调用 CuratorFramework
的 start()
方法启动会话
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy);
client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 1000, retryPolicy);
client.start();
// Fluent编码风格
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
private static CuratorFramework Client = CuratorFrameworkFactory.builder()
.connectString("server1:2181,server2:2181,server3:2181") // server地址
.sessionTimeoutMs(50000) // 会话超时时间
.connectionTimeoutMs(30000) // 连接超时时间
.retryPolicy(retryPolicy) // 重试策略
.namespace("base") // 独立命名空间
.build();
client.start();
参数解析
connectString:zk的server地址,多个server之间使⽤英⽂逗号分隔开
connectionTimeoutMs:连接超时时间,如上是30s,默认是15s
sessionTimeoutMs:会话超时时间,如上是50s,默认是60s
retryPolicy:失败重试策略
ExponentialBackoffRetry:构造器含有三个参数
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
baseSleepTimeMs:初始的sleep时间,⽤于计算之后的每次重试的sleep时间,
计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1,random.nextInt(1<<(retryCount+1)))
maxRetries:最⼤重试次数
maxSleepMs:最⼤sleep时间,如果上述的当前sleep计算出来⽐这个⼤,
那么sleep⽤这个时间,默认的最⼤时间是Integer.MAX_VALUE毫秒。
其他,查看org.apache.curator.RetryPolicy接⼝的实现类
start():完成会话的创建
创建节点
创建一个初始内容为空的节点
client.create().forPath(path);
创建一个包含内容的节点
client.create().forPath(path, "我是内容".getBytes());
递归创建父节点,并选择节点类型
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
- creatingParentsIfNeeded() : 能够自动递归创建所需要的不存在的父节点。
删除节点
删除一个子节点
client.delete().forPath(path);
删除节点并递归删除其子节点
client.delete().deletingChildrenIfNeeded().forPath(path);
指定版本进行删除
client.delete().withVersion(1).forPath(path);
- withVersion(): -1 代表最新版本
- 如果版本不存在,则删除异常,BadVersionException
强制保证删除一个节点
client.delete().guaranteed().forPath(path);
- guaranteed(): 只要客户端会话有效,那么会在后台持续进行删除操作,知道节点删除成功。
适用于在网络异常的情况
获取节点数据
普通查询
byte[] data = client.getData().forPath(path);
包含状态查询
Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat).forPath(path);
-stat 默认初始为0,执行了storingStatIn(stat),将会把本次获取数据的版本更新至stat
更新节点数据
普通更新
Stat stat = client.setData().forPath(path,"新内容".getBytes());
指定版本更新
Stat stat = client.setData().withVersion(1).forPath(path);
- 更新如果出现版本不一致情况,会抛出BadVersionException异常