概述
Zookeeper提供了简单易用的API,我们利用这些API实现添加、删除、修改、查看ZooKeeper的节点,以及实现对这些节点的监听功能。
API介绍
建议大家养成查阅 官方文档 的习惯,因为官方文档是最权威的,而且英文也不难,基本上大家都能看得懂。
本文以v3.8.0版本的zookeeper演示,org.apache.zookeeper.Zookeeper
是ZooKeeper客户端的主类,除非另有说明,该类的方法是线程安全的。
ZooKeeper构造方法
可以通过构造方法实例化ZooKeeper对象,同时会和到服务器建立连接,服务器就会为客户端分配一个会话ID。客户端将定期向服务器发送心跳,以保持会话的有效性。
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
- connectString: Zookeeper服务器的地址,多个地址用逗号分隔
- sessionTimeout:超时时间
- watcher:设置默认监听器
ZooKeeper常用方法
ZooKeeper的API提供了同步和异步两种方式。同步方法会阻塞,直到服务器响应。异步方法只是对请求进行排队,以便立即发送和返回。它们接受一个回调对象,该对象将在请求成功执行时执行,或在出现错误时执行,并带有指示错误的适当返回码。
方法 | 描述 |
---|---|
create(String path, byte[] data, List acl, CreateMode createMode) | 同步方式创建节点 |
create(String path, byte[] data, List acl, CreateMode createMode) | 异步方式创建节点 |
delete(String path, int version) | 同步方式删除节点 |
delete(String path, int version, AsyncCallback.VoidCallback cb, Object ctx) | 异步方式删除节点 |
exists(String path, boolean watch) | 返回指定路径的节点状态信息,如果不存在返回null |
getChildren(String path, boolean watch) | 返回指定路径的所有子节点状态信息 |
getData(String path, boolean watch, Stat stat) | 返回指定路径的节点数据和状态信息 |
setData(String path, byte[] data, int version) | 给指定路径和版本的节点设置新值,如版本为-1,即给所有版本设置值 |
环境准备
引入对应版本的依赖,本文用3.8.0最新版本演示
<!--zookeeper 依赖包-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>compile</scope>
</dependency>
<!--junit测试依赖-->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.5.2</version>
<scope>test</scope>
</dependency>
</dependencies>
我们采用junit的方式演示,junit的常用注解作用如下:
@BeforeClass
– 表示在类中的任意public static void方法执行之前执行@AfterClass
– 表示在类中的任意public static void方法执行之后执行@Before
– 表示在任意使用@Test注解标注的public void方法执行之前执行@After
– 表示在任意使用@Test注解标注的public void方法执行之后执行@Test
– 使用该注解标注的public void方法会表示为一个测试方法测试案例
创建会话和关闭会话
可以通过Zookeeper类的构造函数创建会话,它有10个重载的构造方法。
参数 | 说明 |
---|---|
connectString | 指定ZooKeeper服务器列表,有英文逗号分隔的host:port字符串组成,如”127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”。可以指定客户端连上connectString中服务器后的根目录,如 “127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a” ,对ZooKeeper的操作都会基于/app/a这个根目录,即创建路径为”/foo/bar”的节点,实际该节点的路径为”/app/a/foo/bar” |
sessionTimeout | 会话的超时时间,单位毫秒。在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。 |
watcher | ZooKeeper允许客户端在构造方法中传入一个接口Watcher(org.apache.zookeeper.Watcher)的实现类对象来作为默认的Watch事件通知器。该参数也可以设置为null,表明不需要设置默认的Watch处理器。 |
- 客户端和服务端建立会话是异步的。构造方法会在处理完客户端初始化工作后立即返回,在通常情况下,此时并没有真正建立好一个可用的会话,此时在会话的生命周期中处于“CONNECTING”的状态。当该会话真正创建完毕后,ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知后,才算真正建立了会话。
- 实例化的ZooKeeper客户端对象将从connectString列举的服务器中随机选择一个服务器,并尝试连接到该服务器。如果建立连接失败,将尝试连接另一个服务器(顺序是不确定的,因为列举的服务器是随机洗牌的),直到建立连接。即客户端连接一个服务器失败,将继续尝试,直到会话显式关闭。
代码:
private static final String ZK_ADDR = "10.100.1.14:2181";
private static final Integer ZK_SESSION_TIMEOUT = 30000;
private ZooKeeper zooKeeper = null;
@Before
public void init() throws IOException, InterruptedException {
log.info("********************** start zk ..................");
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper(ZK_ADDR, ZK_SESSION_TIMEOUT, event -> {
log.info("触发了事件:[{}]", event);
countDownLatch.countDown();
});
countDownLatch.await();
}
@After
public void close() throws InterruptedException {
zooKeeper.close();
log.info("************************ close zk ..................");
}
- init方法和close方法是用来创建和关闭zk会话,加了
@Before
和@After
注解,它会在每个测试用例前后执行。 由于客户端和服务端建立会话是异步的,因此做一个阻塞操作,防止还没开启就执行后面的操作,在真正打开了客户端之后,发送一个消息,并解掉阻塞。
创建节点
创建节点有同步和异步两种方式。
create( final String path, byte[] data, List<ACL> acl, CreateMode createMode)
说明:该方法是一个同步创建节点的方法
参数说明:
参数 | 说明 |
---|---|
path | znode路径。例如,/path, /app/node |
data | 存储到znode路径的数据,byte数组,最大1M |
acl | 要创建的节点的访问控制列表。zookeeper API提供了一个静态接口 ZooDefs.Ids 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回打开znode的acl列表。 - ZooDefs.Ids.OPEN_ACL_UNSAFE:表示开放权限,所有用户拥有所有权限 - ZooDefs.Ids.CREATOR_ALL_ACL:表示使用 auth 权限模式,并且对于满足条件的用户开放所有权限 - ZooDefs.Ids.READ_ACL_UNSAFE:表示对于所有用户,只开放Read权限 - ZooDefs.Ids.ANYONE_ID_UNSAFE:是一个常用的Id对象,表示所有用户 - ZooDefs.Ids.AUTH_IDS:是一个Auth模式的Id对象。 - 我们也可以自己定义权限模式 |
createMode | 节点的类型,是一个枚举。 - PERSISTENT:持久节点(也有叫永久节点的),不会随着会话的结束而自动删除。 - PERSISTENT_SEQUENTIAL:带单调递增序号的持久节点,不会随着会话的结束而自动删除。 - EPHEMERAL:临时节点,会随着会话的结束而自动删除。 - EPHEMERAL_SEQUENTIAL:带单调递增序号的临时节点,会随着会话的结束而自动删除。 - CONTAINER:容器节点,用于Leader、Lock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在将来某个时候删除的候选节点。 - PERSISTENT_WITH_TTL:带TTL(time-to-live,存活时间)的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。 - PERSISTENT_SEQUENTIAL_WITH_TTL:带TTL(time-to-live,存活时间)和单调递增序号的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。 |
create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback callBack,Object ctx)
说明:
- 这是一个异步创建节点的方法
参数说明:
其他参数和上面同步创建节点一样.
参数 | 说明 |
---|---|
callBack | 异步回调接口 |
ctx | 传递上下文参数 |
代码:
@Test
public void testCreate() throws InterruptedException, KeeperException {
// 创建一个持久节点,对所有用户开放
zooKeeper.create("/node1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 创建一个临时的有序节点,权限模式为对指定ip开放
Id ip = new Id("ip", "10.100.1.100");
zooKeeper.create("/user", "u00001".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip)), CreateMode.EPHEMERAL_SEQUENTIAL);
}
// 异步创建节点
@Test
public void testCreateAsync() throws InterruptedException {
zooKeeper.create("/path2", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new AsyncCallback.StringCallback(){
@Override
public void processResult(int rc, String path, Object ctx, String name) {
log.info("rc: [{}]", rc); // 0代表成功了
log.info(path); // 传进来的,添加的节点
log.info(name); // 真正查到的节点的名字
log.info(ctx.toString()); // 上下文参数,ctx传进来的东西
log.info("create node success!");
}
}, "ctx" );
Thread.sleep(1000);
}
查看节点
// 同步方式查看节点数据,使用自定义的监听器
byte[] getData(final String path, Watcher watcher, Stat stat)
// 同步方式查看节点数据,使用连接时的监听器
byte[] getData(String path, boolean watch, Stat stat)
// 异步方式查看节点,使用自定义的监听器
void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
// 异步方式查看节点,使用注册的连接器
void getData(String path, boolean watch, DataCallback cb, Object ctx)
参数说明:
参数 | 说明 |
---|---|
path | znode路径 |
watcher | 注册一个监听器 |
watch | 是否使用连接对象中注册的监视器 |
stat | 返回znode的元数据 |
callBack | 异步回调接口 |
ctx | 传递上下文参数 |
代码
@Test
public void testGet() throws InterruptedException, KeeperException {
Stat stat = new Stat();
byte[] data = zooKeeper.getData("/node1", false, stat);
log.info("获取到的数据是:" + new String(data));
log.info("当前节点的版本:" + stat.getVersion());
}
@Test
public void testGetAsync() throws InterruptedException, KeeperException {
zooKeeper.getData("/node1", null, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat) {
log.info("rc: " + rc);
log.info(path);
log.info(new String(bytes));
log.info("version: " + stat.getVersion());
}
}, null);
Thread.sleep(1000);
}
更新节点
// 同步方式更新节点
Stat setData(final String path, byte[] data, int version)
// 异步方式更新节点
void setData(final String path, byte[] data, int version, StatCallback cb, Object ctx)
参数说明:
参数 | 说明 |
---|---|
path | znode路径 |
data | 更新的数据 |
version | znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。 |
callBack | 异步回调接口 |
ctx | 传递上下文参数 |
代码:
@Test
public void testSetData() throws InterruptedException, KeeperException {
Stat stat = zooKeeper.setData("/node1", "alvin".getBytes(), -1); // 返回状态信息
log.info(stat.toString()); // 将状态信息打印
log.info("当前版本号" + stat.getVersion());
log.info("节点创建时间" + stat.getCtime());
log.info("节点修改时间" + stat.getMtime());
}
@Test
public void testSetDataAsync() throws InterruptedException, KeeperException {
zooKeeper.setData("/node1", "alvin2".getBytes(), 1, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
log.info("rc" + rc); // 0 代表修改成功
log.info(path); // 输入的节点路径
log.info("version " + stat.getVersion()); // 当前版本
}
}, null); // 返回状态信息
Thread.sleep(1000);
}
删除节点
// 同步方式删除节点
void delete(final String path, int version)
// 异步方式删除节点
void delete(final String path, int version, VoidCallback cb, Object ctx)
参数说明:
参数 | 说明 |
---|---|
path | znode路径 |
version | znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。 |
callBack | 异步回调接口 |
ctx | 传递上下文参数 |
代码:
@Test
public void testDelete() throws InterruptedException, KeeperException {
zooKeeper.delete("/node1", -1); // 如果节点不存在,会删除失败
zooKeeper.delete("/node5/child1", -1); // 如果节点不存在,会删除失败
}
@Test
public void testDeleteAsync() {
zooKeeper.delete("/node1", -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
log.info("rc:" + rc);
log.info(path);
}
}, "ctx");
}
查看子节点
//同步方式查看子节点,传入监听器
List<String> getChildren(final String path, Watcher watcher)
//同步方式查看子节点,是否使用默认的监听器
List<String> getChildren(String path, boolean watch)
//异步方式查看子节点,传入监听器
void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
//异步方式查看子节点,是否使用默认的监听器
void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
代码:
private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException {
zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@Test
public void testGetChild() throws InterruptedException, KeeperException {
syncCreateNode("/a", "hello");
syncCreateNode("/a/b", "hello");
syncCreateNode("/a/c", "hello");
List<String> children = zooKeeper.getChildren("/a", false);
log.info("********* children: [{}]", children);
}
检查节点是否存在
// 同步方式检查节点是否存在, 传入监听器
Stat exists(final String path, Watcher watcher)
// 同步方式检查节点是否存在, 是否用默认监听器
Stat exists(String path, boolean watch)
// 异步方式检查节点是否存在, 传入监听器
void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)
// 异步方式检查节点是否存在, 是否用默认监听器
void exists(String path, boolean watch, StatCallback cb, Object ctx)
代码:
@Test
public void testExist() throws InterruptedException, KeeperException {
syncCreateNode("/alvin", "hello");
Stat stat = zooKeeper.exists("/alvin", false);
log.info("stat: [{}]", stat);
log.info("delete node /alvin ......");
zooKeeper.delete("/alvin", -1);
stat = zooKeeper.exists("/alvin", false);
log.info("stat: [{}]", stat);
}
监听器代码验证
getData、exist是、getChildren三个方法都可以监听对应节点变化。
验证watch的一次性
创建监听执行
private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException {
zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
@Test
public void testGetWatch() throws InterruptedException, KeeperException {
// 创建临时节点
syncCreateNode("/watch", "aaa");
byte[] data = zooKeeper.getData("/watch", true, new Stat());
log.info("getData: [{}]", new String(data));
Thread.sleep(100000L);
}
多次执行更新节点的操作
// 修改数据
@Test
public void testUpdateData() throws InterruptedException, KeeperException {
// 创建临时节点
zooKeeper.setData("/watch", "bbb".getBytes(), -1);
Thread.sleep(10000L);
}
查看结果, 日志只打印了一次
通过自定义监听器多次监听
通过exists创建自定义监听
@Test
public void testExists() throws KeeperException, InterruptedException {
// 创建临时节点
syncCreateNode("/watch", "aaa");
// 重复使用,用完再注册一个新的
Stat stat = zooKeeper.exists("/watch", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
switch (watchedEvent.getType()) {
case NodeCreated:
log.info("{}节点创建了", watchedEvent.getPath());
break;
case NodeDataChanged:
log.info("{}节点数据被修改了", watchedEvent.getPath());
break;
case NodeDeleted:
log.info("{}节点被删除了", watchedEvent.getPath());
break;
}
try {
// 重复监听的关键
zooKeeper.exists("/watch", this);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
});
if (stat != null) {
log.info("version: " + stat.getVersion());
}
Thread.sleep(100000);
}
多次执行更新节点、删除节点、创建节点的操作
- 查看结果,多次响应监听
通过addWatcher方法实现多次监听
通过addWatch添加监听器,addWatch方法支持重复监听
@Test
public void testAddWatch() throws InterruptedException, KeeperException {
// 创建临时节点
syncCreateNode("/watch", "aaa");
zooKeeper.addWatch("/watch", new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case NodeCreated:
log.info("{}节点创建了", event.getPath());
break;
case NodeDataChanged:
log.info("{}节点数据被修改了", event.getPath());
break;
case NodeDeleted:
log.info("{}节点被删除了", event.getPath());
break;
}
}
}, AddWatchMode.PERSISTENT);
Thread.sleep(100000);
}
多次执行更新节点、删除节点、创建节点的操作
- 查看结果
说明:
- zk api提供了addWatch、printwatches、removewatches方法,分别用来添加监听,答应监听器和移除监听器列表。
- addWatch的参数中可以传入监听的两种模式,PERSISTENT和PERSISTENT_RECURSIVE,PERSISTENT模式只监听指定的节点事件,而PERSISTENT_RECURSIVE模式会监听指定节点与它所有子节点的事件。
参考
https://zookeeper.apache.org/doc/r3.8.0/apidocs/zookeeper-server/index.html
https://juejin.cn/post/6844903601517625357#heading-6
https://juejin.cn/post/6844904029546348551#heading-1
https://juejin.cn/post/6964703960250712101#heading-13