概述

Curator是Netflix公司开源的一套Zookeeper客户端框架。了解过Zookeeper原生API都会清楚其复杂度。Curator帮助我们在其基础上进行封装、实现一些开发细节,包括接连重连、反复注册Watcher和NodeExistsException等。目前已经作为Apache的顶级项目出现,是最流行的Zookeeper客户端之一。从编码风格上来讲,它提供了基于Fluent的编程风格支持。
除此之外,Curator还提供了Zookeeper的各种应用场景:分布式栅栏、共享锁服务、Master选举机制和分布式计数器等。
官网地址:https://curator.apache.org/index.html

项目组件

名称 描述
Recipes Zookeeper典型应用场景的实现,这些实现是基于Curator Framework。
Framework Zookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。
Utilities 为Zookeeper提供的各种实用程序。
Client Zookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。
Errors Curator如何处理错误,连接问题,可恢复的例外等。

Maven依赖

  1. <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
  2. <dependency>
  3. <groupId>org.apache.curator</groupId>
  4. <artifactId>curator-recipes</artifactId>
  5. <version>5.2.1</version>
  6. </dependency>
  • 我们只要引入curator-recipes,就会自动将其他的依赖引入。
  • 我们文章使用的是5.2.1的版本,它对应的zookeeper api版本是3.6.3。

image.png

使用案例

创建会话

Curator的创建会话方式与原生的API和ZkClient的创建方式区别很大。Curator创建客户端是通过CuratorFrameworkFactory工厂类的newClient方法返回CuratorFramework,它有多个重载方法, 最后调用start方法创建连接。

  • CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
  • CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
  • CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig)

�查看3个方法的源码,发现最终他们都是使用建造者模式,统一调用了builder的.build方法。

  1. public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig)
  2. {
  3. return builder().
  4. connectString(connectString).
  5. sessionTimeoutMs(sessionTimeoutMs).
  6. connectionTimeoutMs(connectionTimeoutMs).
  7. retryPolicy(retryPolicy).
  8. zkClientConfig(zkClientConfig).
  9. build();
  10. }

参数说明:

参数 说明
connectString zk server的地址,多个server用逗号分隔
sessionTimeoutMs 会话超时时间,默认是60s
connectionTimeoutMs 连接超时时间,默认是15s
retryPolicy 失败重试策略
zkClientConfig zk客户端的配置,比如认证相关的配置

代码例子:

  1. @Before
  2. public void init() {
  3. // 重试机制, 每个1秒钟重试1次,最多重试5次
  4. RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5, 5000);
  5. // 定义客户端
  6. CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("localhost:2181", 30000, 60000, retryPolicy);
  7. // 建立连接
  8. curatorFramework.start();
  9. }
  • 重试机制RetryPolicy有很多内置的实现,可以看他的javadoc一目了然。
  • 最后需要调用start方法建立连接。

    创建节点

    Curator创建节点的方法也是基于Fluent风格编码,原生API中的参数很多都转化为一层层的方法调用来进行设置,直接上例子。

    1. @Test
    2. public void testCreateNode() throws Exception {
    3. // 创建临时的,任何人都有权限的节点
    4. String result = curatorFramework.create().withMode(CreateMode.EPHEMERAL)
    5. .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
    6. .forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));
    7. log.warn("创建节点: [{}]", result);
    8. }
  • creatingParentContainersIfNeeded方法用来递归创建父节点。

  • withMode方法可以设置节点的属性,是持久节点,临时节点,持久有序节点、临时有序节点等。
  • withACL方法可以设置节点的访问权限。
  • forPath方法设置节点的路径和对应值。
  • 更多方法使用可以自行参考java doc文档。

    更新节点

    1. @Test
    2. public void testUpdateNode() throws Exception {
    3. // 创建节点
    4. curatorFramework.create().creatingParentContainersIfNeeded()
    5. .withMode(CreateMode.EPHEMERAL)
    6. .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
    7. .forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));
    8. // 不带版本修改节点
    9. Stat stat = curatorFramework.setData().forPath("/emNode", "hello1".getBytes(StandardCharsets.UTF_8));
    10. log.info("修改后节点版本: [{}]", stat.getVersion());
    11. // 带版本修改节点,版本不正确,报错
    12. stat = curatorFramework.setData().withVersion(10).forPath("/emNode", "hello1".getBytes(StandardCharsets.UTF_8));
    13. log.info("修改后节点版本: [{}]", stat.getVersion());
    14. }

    image.png

  • withVersion方法用来指定版本,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如果version已经变更,则抛出异常,可以用来实现乐观锁。

    查看节点

    1. @Test
    2. public void testGetNode() throws Exception {
    3. // 创建节点
    4. curatorFramework.create().creatingParentContainersIfNeeded()
    5. .withMode(CreateMode.EPHEMERAL)
    6. .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
    7. .forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));
    8. // 直接查询
    9. byte[] data = curatorFramework.getData().forPath("/emNode");
    10. log.info("节点数据: [{}]", new String(data));
    11. // 同时获取节点属性,放到stat中
    12. Stat stat = new Stat();
    13. data = curatorFramework.getData().storingStatIn(stat).forPath("/emNode");
    14. log.info("节点数据: [{}]", new String(data));
    15. log.info("节点版本: [{}]", stat.getVersion());
    16. // 查询的时候注册监听
    17. data = curatorFramework.getData().usingWatcher(new CuratorWatcher() {
    18. @Override
    19. public void process(WatchedEvent event) throws Exception {
    20. log.info("事件监听: [{}]", event);
    21. }
    22. }).forPath("/emNode");
    23. log.info("节点数据: [{}]", new String(data));
    24. Thread.sleep(100000);
    25. }
  • getData()方法用来获取GetDataBuilder查询器。

  • storingStatIn方法用来存放查询到的节点属性信息。
  • usingWatcher方法用来注册监听器,这个方法只会一次监听。

    删除节点

    1. @Test
    2. public void testDeleteNode() throws Exception {
    3. // 创建节点
    4. curatorFramework.create().creatingParentContainersIfNeeded()
    5. .withMode(CreateMode.EPHEMERAL)
    6. .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
    7. .forPath("/emNode", "hello".getBytes(StandardCharsets.UTF_8));
    8. // 方法
    9. curatorFramework.delete().deletingChildrenIfNeeded().forPath("/emNode");
    10. }
  • delete()用来创建删除构造器DeleteBuilder

  • deletingChildrenIfNeeded()删除节点并递归删除其子节点。
  • withVersion()指定版本删除,如果版本不对抛出异常
  • guaranteed()方法强制保证删除一个节点,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。比如遇到一些网络异常的情况,此guaranteed的强制删除就会很有效果。

    监听器使用

    Curator 事件有两种模式,一种是标准的观察模式,一种是缓存监听模式。标准的监听模式是使用Watcher 监听器,上面已经做了演示。第二种缓存监听模式引入了一种本地缓存视图的Cache机制,来实现对Zookeeper服务端事件监听。
    Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。Cache是一种缓存机制,可以借助Cache实现监听。简单来说,Cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。以前的Cache事件监听的种类有3种Path Cache, Cache,Tree Cache, 在新版本中合并为一种CuratorCache, 我们以CuratorCache新版演示为例子。
    CuratorCache试图将来自节点的数据保存在本地缓存中。还可以选择缓存该节点下的整个子树。将响应更新/创建/删除事件,下拉数据等。您可以注册侦听器,以便在发生更改时得到通知。

    1. @Test
    2. public void testWatchCache() throws Exception {
    3. try {
    4. String workPath = "/aa/bb/cc";
    5. // 创建节点
    6. curatorFramework.create().creatingParentContainersIfNeeded()
    7. .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
    8. .forPath(workPath, "hello".getBytes(StandardCharsets.UTF_8));
    9. // 创建缓存
    10. CuratorCache curatorCache = CuratorCache.builder(curatorFramework, workPath).build();
    11. CuratorCacheListener curatorCacheListener = new CuratorCacheListener() {
    12. @Override
    13. public void event(Type type, ChildData oldData, ChildData data) {
    14. log.info("%%%%%%%%%%%%%%%watch start……………………………………………………");
    15. log.info("change type: [{}]", type);
    16. log.info("ZNode节点状态改变, path={}", data.getPath());
    17. log.info("ZNode节点状态改变,before: [{}], after: [{}]", oldData != null ? new String(oldData.getData()): null, new String(data.getData()));
    18. }
    19. };
    20. curatorCache.listenable().addListener(curatorCacheListener);
    21. curatorCache.start();
    22. // 第1次变更节点数据
    23. curatorFramework.setData().forPath("/aa/bb", "第1次更改内容".getBytes());
    24. Thread.sleep(1000);
    25. // 第2次变更节点数据
    26. curatorFramework.setData().forPath("/aa/bb/cc", "第2次更改内容".getBytes());
    27. Thread.sleep(1000);
    28. // 第3次创建新节点
    29. curatorFramework.create().creatingParentContainersIfNeeded()
    30. .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
    31. .forPath("/aa/bb/dd", "hello".getBytes(StandardCharsets.UTF_8));
    32. Thread.sleep(1000);
    33. Thread.sleep(5000);
    34. } catch (Exception e) {
    35. log.error("创建Cache监听失败", e);
    36. } finally {
    37. // 删除节点
    38. curatorFramework.delete().deletingChildrenIfNeeded().forPath("/aa");
    39. }
    40. }
  • 通过CuratorCache.builder方法创建CuratorCache,默认监听当前节点以及子节点。

小结

上面简单演示了下curator框架操作zookeeper,相对于原生的api,简洁方便很多,更多的使用可以自己参考 doc。
代码地址:https://github.com/alvinlkk/awesome-java-full-demo/tree/master/zookeeper-demo

参考

https://www.cnblogs.com/qingyunzong/p/8666288.html
https://www.cnblogs.com/crazymakercircle/p/10228385.html