概述
Curator是Netflix公司开源的一套Zookeeper客户端框架。了解过Zookeeper原生API都会清楚其复杂度。Curator帮助我们在其基础上进行封装、实现一些开发细节,包括接连重连、反复注册Watcher和NodeExistsException等。目前已经作为Apache的顶级项目出现,是最流行的Zookeeper客户端之一。从编码风格上来讲,它提供了基于Fluent的编程风格支持。
除此之外,Curator还提供了Zookeeper的各种应用场景:分布式栅栏、共享锁服务、Master选举机制和分布式计数器等。
官网地址:https://curator.apache.org/index.html
项目组件
| 名称 | 描述 |
|---|---|
| Recipes | Zookeeper典型应用场景的实现,这些实现是基于Curator Framework。 |
| Framework | Zookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。 |
| Utilities | 为Zookeeper提供的各种实用程序。 |
| Client | Zookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。 |
| Errors | Curator如何处理错误,连接问题,可恢复的例外等。 |
Maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.1</version></dependency>
- 我们只要引入curator-recipes,就会自动将其他的依赖引入。
- 我们文章使用的是5.2.1的版本,它对应的zookeeper api版本是3.6.3。
使用案例
创建会话
Curator的创建会话方式与原生的API和ZkClient的创建方式区别很大。Curator创建客户端是通过CuratorFrameworkFactory工厂类的newClient方法返回CuratorFramework,它有多个重载方法, 最后调用start方法创建连接。
- CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
- CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
- CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig)
�查看3个方法的源码,发现最终他们都是使用建造者模式,统一调用了builder的.build方法。
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig){return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).zkClientConfig(zkClientConfig).build();}
参数说明:
| 参数 | 说明 |
|---|---|
| connectString | zk server的地址,多个server用逗号分隔 |
| sessionTimeoutMs | 会话超时时间,默认是60s |
| connectionTimeoutMs | 连接超时时间,默认是15s |
| retryPolicy | 失败重试策略 |
| zkClientConfig | zk客户端的配置,比如认证相关的配置 |
代码例子:
@Beforepublic void init() {// 重试机制, 每个1秒钟重试1次,最多重试5次RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5, 5000);// 定义客户端CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("localhost:2181", 30000, 60000, retryPolicy);// 建立连接curatorFramework.start();}
- 重试机制RetryPolicy有很多内置的实现,可以看他的javadoc一目了然。
-
创建节点
Curator创建节点的方法也是基于Fluent风格编码,原生API中的参数很多都转化为一层层的方法调用来进行设置,直接上例子。
@Testpublic void testCreateNode() throws Exception {// 创建临时的,任何人都有权限的节点String result = curatorFramework.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));log.warn("创建节点: [{}]", result);}
creatingParentContainersIfNeeded方法用来递归创建父节点。withMode方法可以设置节点的属性,是持久节点,临时节点,持久有序节点、临时有序节点等。withACL方法可以设置节点的访问权限。forPath方法设置节点的路径和对应值。-
更新节点
@Testpublic void testUpdateNode() throws Exception {// 创建节点curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));// 不带版本修改节点Stat stat = curatorFramework.setData().forPath("/emNode", "hello1".getBytes(StandardCharsets.UTF_8));log.info("修改后节点版本: [{}]", stat.getVersion());// 带版本修改节点,版本不正确,报错stat = curatorFramework.setData().withVersion(10).forPath("/emNode", "hello1".getBytes(StandardCharsets.UTF_8));log.info("修改后节点版本: [{}]", stat.getVersion());}

withVersion方法用来指定版本,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如果version已经变更,则抛出异常,可以用来实现乐观锁。查看节点
@Testpublic void testGetNode() throws Exception {// 创建节点curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));// 直接查询byte[] data = curatorFramework.getData().forPath("/emNode");log.info("节点数据: [{}]", new String(data));// 同时获取节点属性,放到stat中Stat stat = new Stat();data = curatorFramework.getData().storingStatIn(stat).forPath("/emNode");log.info("节点数据: [{}]", new String(data));log.info("节点版本: [{}]", stat.getVersion());// 查询的时候注册监听data = curatorFramework.getData().usingWatcher(new CuratorWatcher() {@Overridepublic void process(WatchedEvent event) throws Exception {log.info("事件监听: [{}]", event);}}).forPath("/emNode");log.info("节点数据: [{}]", new String(data));Thread.sleep(100000);}
getData()方法用来获取GetDataBuilder查询器。storingStatIn方法用来存放查询到的节点属性信息。usingWatcher方法用来注册监听器,这个方法只会一次监听。删除节点
@Testpublic void testDeleteNode() throws Exception {// 创建节点curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));// 方法curatorFramework.delete().deletingChildrenIfNeeded().forPath("/emNode");}
delete()用来创建删除构造器DeleteBuilder。deletingChildrenIfNeeded()删除节点并递归删除其子节点。withVersion()指定版本删除,如果版本不对抛出异常guaranteed()方法强制保证删除一个节点,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。比如遇到一些网络异常的情况,此guaranteed的强制删除就会很有效果。监听器使用
Curator 事件有两种模式,一种是标准的观察模式,一种是缓存监听模式。标准的监听模式是使用Watcher 监听器,上面已经做了演示。第二种缓存监听模式引入了一种本地缓存视图的Cache机制,来实现对Zookeeper服务端事件监听。
Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。Cache是一种缓存机制,可以借助Cache实现监听。简单来说,Cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。以前的Cache事件监听的种类有3种Path Cache, Cache,Tree Cache, 在新版本中合并为一种CuratorCache, 我们以CuratorCache新版演示为例子。
CuratorCache试图将来自节点的数据保存在本地缓存中。还可以选择缓存该节点下的整个子树。将响应更新/创建/删除事件,下拉数据等。您可以注册侦听器,以便在发生更改时得到通知。@Testpublic void testWatchCache() throws Exception {try {String workPath = "/aa/bb/cc";// 创建节点curatorFramework.create().creatingParentContainersIfNeeded().withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(workPath, "hello".getBytes(StandardCharsets.UTF_8));// 创建缓存CuratorCache curatorCache = CuratorCache.builder(curatorFramework, workPath).build();CuratorCacheListener curatorCacheListener = new CuratorCacheListener() {@Overridepublic void event(Type type, ChildData oldData, ChildData data) {log.info("%%%%%%%%%%%%%%%watch start……………………………………………………");log.info("change type: [{}]", type);log.info("ZNode节点状态改变, path={}", data.getPath());log.info("ZNode节点状态改变,before: [{}], after: [{}]", oldData != null ? new String(oldData.getData()): null, new String(data.getData()));}};curatorCache.listenable().addListener(curatorCacheListener);curatorCache.start();// 第1次变更节点数据curatorFramework.setData().forPath("/aa/bb", "第1次更改内容".getBytes());Thread.sleep(1000);// 第2次变更节点数据curatorFramework.setData().forPath("/aa/bb/cc", "第2次更改内容".getBytes());Thread.sleep(1000);// 第3次创建新节点curatorFramework.create().creatingParentContainersIfNeeded().withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/aa/bb/dd", "hello".getBytes(StandardCharsets.UTF_8));Thread.sleep(1000);Thread.sleep(5000);} catch (Exception e) {log.error("创建Cache监听失败", e);} finally {// 删除节点curatorFramework.delete().deletingChildrenIfNeeded().forPath("/aa");}}
通过
CuratorCache.builder方法创建CuratorCache,默认监听当前节点以及子节点。
小结
上面简单演示了下curator框架操作zookeeper,相对于原生的api,简洁方便很多,更多的使用可以自己参考 doc。
代码地址:https://github.com/alvinlkk/awesome-java-full-demo/tree/master/zookeeper-demo
参考
https://www.cnblogs.com/qingyunzong/p/8666288.html
https://www.cnblogs.com/crazymakercircle/p/10228385.html
