概述
Curator是Netflix公司开源的一套Zookeeper客户端框架。了解过Zookeeper原生API都会清楚其复杂度。Curator帮助我们在其基础上进行封装、实现一些开发细节,包括接连重连、反复注册Watcher和NodeExistsException等。目前已经作为Apache的顶级项目出现,是最流行的Zookeeper客户端之一。从编码风格上来讲,它提供了基于Fluent的编程风格支持。
除此之外,Curator还提供了Zookeeper的各种应用场景:分布式栅栏、共享锁服务、Master选举机制和分布式计数器等。
官网地址:https://curator.apache.org/index.html
项目组件
名称 | 描述 |
---|---|
Recipes | Zookeeper典型应用场景的实现,这些实现是基于Curator Framework。 |
Framework | Zookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。 |
Utilities | 为Zookeeper提供的各种实用程序。 |
Client | Zookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。 |
Errors | Curator如何处理错误,连接问题,可恢复的例外等。 |
Maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.1</version>
</dependency>
- 我们只要引入curator-recipes,就会自动将其他的依赖引入。
- 我们文章使用的是5.2.1的版本,它对应的zookeeper api版本是3.6.3。
使用案例
创建会话
Curator的创建会话方式与原生的API和ZkClient的创建方式区别很大。Curator创建客户端是通过CuratorFrameworkFactory
工厂类的newClient方法返回CuratorFramework,它有多个重载方法, 最后调用start方法创建连接。
- CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
- CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
- CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig)
�查看3个方法的源码,发现最终他们都是使用建造者模式,统一调用了builder的.build方法。
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig)
{
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
zkClientConfig(zkClientConfig).
build();
}
参数说明:
参数 | 说明 |
---|---|
connectString | zk server的地址,多个server用逗号分隔 |
sessionTimeoutMs | 会话超时时间,默认是60s |
connectionTimeoutMs | 连接超时时间,默认是15s |
retryPolicy | 失败重试策略 |
zkClientConfig | zk客户端的配置,比如认证相关的配置 |
代码例子:
@Before
public void init() {
// 重试机制, 每个1秒钟重试1次,最多重试5次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5, 5000);
// 定义客户端
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("localhost:2181", 30000, 60000, retryPolicy);
// 建立连接
curatorFramework.start();
}
- 重试机制RetryPolicy有很多内置的实现,可以看他的javadoc一目了然。
-
创建节点
Curator创建节点的方法也是基于Fluent风格编码,原生API中的参数很多都转化为一层层的方法调用来进行设置,直接上例子。
@Test
public void testCreateNode() throws Exception {
// 创建临时的,任何人都有权限的节点
String result = curatorFramework.create().withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));
log.warn("创建节点: [{}]", result);
}
creatingParentContainersIfNeeded
方法用来递归创建父节点。withMode
方法可以设置节点的属性,是持久节点,临时节点,持久有序节点、临时有序节点等。withACL
方法可以设置节点的访问权限。forPath
方法设置节点的路径和对应值。-
更新节点
@Test
public void testUpdateNode() throws Exception {
// 创建节点
curatorFramework.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));
// 不带版本修改节点
Stat stat = curatorFramework.setData().forPath("/emNode", "hello1".getBytes(StandardCharsets.UTF_8));
log.info("修改后节点版本: [{}]", stat.getVersion());
// 带版本修改节点,版本不正确,报错
stat = curatorFramework.setData().withVersion(10).forPath("/emNode", "hello1".getBytes(StandardCharsets.UTF_8));
log.info("修改后节点版本: [{}]", stat.getVersion());
}
withVersion
方法用来指定版本,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如果version已经变更,则抛出异常,可以用来实现乐观锁。查看节点
@Test
public void testGetNode() throws Exception {
// 创建节点
curatorFramework.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));
// 直接查询
byte[] data = curatorFramework.getData().forPath("/emNode");
log.info("节点数据: [{}]", new String(data));
// 同时获取节点属性,放到stat中
Stat stat = new Stat();
data = curatorFramework.getData().storingStatIn(stat).forPath("/emNode");
log.info("节点数据: [{}]", new String(data));
log.info("节点版本: [{}]", stat.getVersion());
// 查询的时候注册监听
data = curatorFramework.getData().usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
log.info("事件监听: [{}]", event);
}
}).forPath("/emNode");
log.info("节点数据: [{}]", new String(data));
Thread.sleep(100000);
}
getData()
方法用来获取GetDataBuilder
查询器。storingStatIn
方法用来存放查询到的节点属性信息。usingWatcher
方法用来注册监听器,这个方法只会一次监听。删除节点
@Test
public void testDeleteNode() throws Exception {
// 创建节点
curatorFramework.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));
// 方法
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/emNode");
}
delete()
用来创建删除构造器DeleteBuilder
。deletingChildrenIfNeeded()
删除节点并递归删除其子节点。withVersion()
指定版本删除,如果版本不对抛出异常guaranteed()
方法强制保证删除一个节点,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。比如遇到一些网络异常的情况,此guaranteed的强制删除就会很有效果。监听器使用
Curator 事件有两种模式,一种是标准的观察模式,一种是缓存监听模式。标准的监听模式是使用Watcher 监听器,上面已经做了演示。第二种缓存监听模式引入了一种本地缓存视图的Cache机制,来实现对Zookeeper服务端事件监听。
Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。Cache是一种缓存机制,可以借助Cache实现监听。简单来说,Cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。以前的Cache事件监听的种类有3种Path Cache, Cache,Tree Cache, 在新版本中合并为一种CuratorCache, 我们以CuratorCache新版演示为例子。
CuratorCache试图将来自节点的数据保存在本地缓存中。还可以选择缓存该节点下的整个子树。将响应更新/创建/删除事件,下拉数据等。您可以注册侦听器,以便在发生更改时得到通知。@Test
public void testWatchCache() throws Exception {
try {
String workPath = "/aa/bb/cc";
// 创建节点
curatorFramework.create().creatingParentContainersIfNeeded()
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(workPath, "hello".getBytes(StandardCharsets.UTF_8));
// 创建缓存
CuratorCache curatorCache = CuratorCache.builder(curatorFramework, workPath).build();
CuratorCacheListener curatorCacheListener = new CuratorCacheListener() {
@Override
public void event(Type type, ChildData oldData, ChildData data) {
log.info("%%%%%%%%%%%%%%%watch start……………………………………………………");
log.info("change type: [{}]", type);
log.info("ZNode节点状态改变, path={}", data.getPath());
log.info("ZNode节点状态改变,before: [{}], after: [{}]", oldData != null ? new String(oldData.getData()): null, new String(data.getData()));
}
};
curatorCache.listenable().addListener(curatorCacheListener);
curatorCache.start();
// 第1次变更节点数据
curatorFramework.setData().forPath("/aa/bb", "第1次更改内容".getBytes());
Thread.sleep(1000);
// 第2次变更节点数据
curatorFramework.setData().forPath("/aa/bb/cc", "第2次更改内容".getBytes());
Thread.sleep(1000);
// 第3次创建新节点
curatorFramework.create().creatingParentContainersIfNeeded()
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/aa/bb/dd", "hello".getBytes(StandardCharsets.UTF_8));
Thread.sleep(1000);
Thread.sleep(5000);
} catch (Exception e) {
log.error("创建Cache监听失败", e);
} finally {
// 删除节点
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/aa");
}
}
通过
CuratorCache.builder
方法创建CuratorCache,默认监听当前节点以及子节点。
小结
上面简单演示了下curator框架操作zookeeper,相对于原生的api,简洁方便很多,更多的使用可以自己参考 doc。
代码地址:https://github.com/alvinlkk/awesome-java-full-demo/tree/master/zookeeper-demo
参考
https://www.cnblogs.com/qingyunzong/p/8666288.html
https://www.cnblogs.com/crazymakercircle/p/10228385.html