前言:Zookeeper官方也提供了客户端,但是我们在日常工作中不怎么用,目前用的多的是Zookeeper客户端框架是Curator,本节我们主要讲解如何集成Curator,Zookeeper集群搭建方法也会总结。 ZooKeeper有很多可视化数据查看工具,比如基于CS架构的 ZooInspector,还有基于BS结构的如:ZKUI,感兴趣的可自行学习,这里不做介绍。

1、Apache Curator开源客户端

基本介绍

Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目 是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用。
Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、 分布式计数器、分布式锁。减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工 作。

如何集成

引入jar包:

  1. <dependency>
  2. <groupId>org.apache.curator</groupId>
  3. <artifactId>curator-framework</artifactId>
  4. <version>2.12.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.curator</groupId>
  8. <artifactId>curator-recipes</artifactId>
  9. <version>2.12.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.curator</groupId>
  13. <artifactId>curator-client</artifactId>
  14. <version>2.12.0</version>
  15. </dependency>

参数准备:

  1. zookeeper:
  2. enabled: true
  3. server: 127.0.0.1:2181
  4. namespace: lwq
  5. digest: lwq:123456
  6. sessionTimeoutMs: 3000
  7. connectionTimeoutMs: 60000
  8. maxRetries: 2
  9. baseSleepTimeMs: 1000
  1. @Data
  2. @Component
  3. @ConfigurationProperties(prefix = "zookeeper")
  4. public class ZookeeperParam {
  5. private boolean enabled;
  6. private String server;
  7. private String namespace;
  8. private String digest;
  9. private int sessionTimeoutMs;
  10. private int connectionTimeoutMs;
  11. private int maxRetries;
  12. private int baseSleepTimeMs;
  13. }

配置类:

  1. @Configuration
  2. public class ZookeeperConfig {
  3. private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperConfig.class);
  4. @Resource
  5. private ZookeeperParam zookeeperParam;
  6. private static CuratorFramework client = null;
  7. /**
  8. * 初始化
  9. */
  10. @PostConstruct
  11. public void init() {
  12. // 重试策略,初试时间1秒,重试10次
  13. RetryPolicy policy =
  14. new ExponentialBackoffRetry(zookeeperParam.getBaseSleepTimeMs(), zookeeperParam.getMaxRetries());
  15. // 通过工厂创建Curator
  16. client = CuratorFrameworkFactory.builder().connectString(zookeeperParam.getServer())
  17. .authorization("digest", zookeeperParam.getDigest().getBytes())
  18. .connectionTimeoutMs(zookeeperParam.getConnectionTimeoutMs())
  19. .sessionTimeoutMs(zookeeperParam.getSessionTimeoutMs()).retryPolicy(policy).build();
  20. // 开启连接
  21. client.start();
  22. LOGGER.info("zookeeper 初始化完成...");
  23. }
  24. public static CuratorFramework getClient() {
  25. return client;
  26. }
  27. public static void closeClient() {
  28. if (client != null) {
  29. client.close();
  30. }
  31. }
  32. }

Znode节点操作:

  1. public interface ZookeeperService {
  2. /**
  3. * 判断节点是否存在
  4. */
  5. boolean isExistNode(final String path);
  6. /**
  7. * 创建节点
  8. */
  9. void createNode(CreateMode mode, String path);
  10. /**
  11. * 设置节点数据
  12. */
  13. void setNodeData(String path, String nodeData);
  14. /**
  15. * 创建节点
  16. */
  17. void createNodeAndData(CreateMode mode, String path, String nodeData);
  18. /**
  19. * 获取节点数据
  20. */
  21. String getNodeData(String path);
  22. /**
  23. * 获取节点下数据
  24. */
  25. List<String> getNodeChild(String path);
  26. /**
  27. * 是否递归删除节点
  28. */
  29. void deleteNode(String path, Boolean recursive);
  30. }
  1. @Service
  2. public class ZookeeperServiceImpl implements ZookeeperService {
  3. private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperServiceImpl.class);
  4. @Override
  5. public boolean isExistNode(String path) {
  6. CuratorFramework client = ZookeeperConfig.getClient();
  7. client.sync();
  8. try {
  9. Stat stat = client.checkExists().forPath(path);
  10. return client.checkExists().forPath(path) != null;
  11. } catch (Exception e) {
  12. LOGGER.error("isExistNode error...", e);
  13. e.printStackTrace();
  14. }
  15. return false;
  16. }
  17. @Override
  18. public void createNode(CreateMode mode, String path) {
  19. CuratorFramework client = ZookeeperConfig.getClient();
  20. try {
  21. // 递归创建所需父节点
  22. client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
  23. } catch (Exception e) {
  24. LOGGER.error("createNode error...", e);
  25. e.printStackTrace();
  26. }
  27. }
  28. @Override
  29. public void setNodeData(String path, String nodeData) {
  30. CuratorFramework client = ZookeeperConfig.getClient();
  31. try {
  32. // 设置节点数据
  33. client.setData().forPath(path, nodeData.getBytes("UTF-8"));
  34. } catch (Exception e) {
  35. LOGGER.error("setNodeData error...", e);
  36. e.printStackTrace();
  37. }
  38. }
  39. @Override
  40. public void createNodeAndData(CreateMode mode, String path, String nodeData) {
  41. CuratorFramework client = ZookeeperConfig.getClient();
  42. try {
  43. // 创建节点,关联数据
  44. client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, nodeData.getBytes("UTF-8"));
  45. } catch (Exception e) {
  46. LOGGER.error("createNode error...", e);
  47. e.printStackTrace();
  48. }
  49. }
  50. @Override
  51. public String getNodeData(String path) {
  52. CuratorFramework client = ZookeeperConfig.getClient();
  53. try {
  54. // 数据读取和转换
  55. byte[] dataByte = client.getData().forPath(path);
  56. String data = new String(dataByte, "UTF-8");
  57. if (StringUtils.isNotEmpty(data)) {
  58. return data;
  59. }
  60. } catch (Exception e) {
  61. LOGGER.error("getNodeData error...", e);
  62. e.printStackTrace();
  63. }
  64. return null;
  65. }
  66. @Override
  67. public List<String> getNodeChild(String path) {
  68. CuratorFramework client = ZookeeperConfig.getClient();
  69. List<String> nodeChildDataList = new ArrayList<>();
  70. try {
  71. // 节点下数据集
  72. nodeChildDataList = client.getChildren().forPath(path);
  73. } catch (Exception e) {
  74. LOGGER.error("getNodeChild error...", e);
  75. e.printStackTrace();
  76. }
  77. return nodeChildDataList;
  78. }
  79. @Override
  80. public void deleteNode(String path, Boolean recursive) {
  81. CuratorFramework client = ZookeeperConfig.getClient();
  82. try {
  83. if (recursive) {
  84. // 递归删除节点
  85. client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
  86. } else {
  87. // 删除单个节点
  88. client.delete().guaranteed().forPath(path);
  89. }
  90. } catch (Exception e) {
  91. LOGGER.error("deleteNode error...", e);
  92. e.printStackTrace();
  93. }
  94. }
  95. }

Controller调用:

  1. @RestController
  2. public class ZookeeperApi {
  3. @Resource
  4. private ZookeeperService zookeeperService;
  5. @GetMapping("/getNodeData")
  6. public String getNodeData(String path) {
  7. return zookeeperService.getNodeData(path);
  8. }
  9. @GetMapping("/isExistNode")
  10. public boolean isExistNode(final String path) {
  11. return zookeeperService.isExistNode(path);
  12. }
  13. @GetMapping("/createNode")
  14. public String createNode(CreateMode mode, String path) {
  15. zookeeperService.createNode(mode, path);
  16. return "success";
  17. }
  18. @GetMapping("/setNodeData")
  19. public String setNodeData(String path, String nodeData) {
  20. zookeeperService.setNodeData(path, nodeData);
  21. return "success";
  22. }
  23. @GetMapping("/createNodeAndData")
  24. public String createNodeAndData(CreateMode mode, String path, String nodeData) {
  25. zookeeperService.createNodeAndData(mode, path, nodeData);
  26. return "success";
  27. }
  28. @GetMapping("/getNodeChild")
  29. public List<String> getNodeChild(String path) {
  30. return zookeeperService.getNodeChild(path);
  31. }
  32. @GetMapping("/deleteNode")
  33. public String deleteNode(String path, Boolean recursive) {
  34. zookeeperService.deleteNode(path, recursive);
  35. return "success";
  36. }
  37. }

集成Curator,完成!

2、Zookeeper集群搭建

集群搭建

Zookeeper 集群模式一共有三种类型的角色

  • Leader: 处理所有的事务请求(写请求),可以处理读请求,集群中只能有一个Leader
  • Follower:只能处理读请求,同时作为 Leader的候选节点,即如果Leader宕机,Follower节点 要参与到新的Leader选举中,有可能成为新的Leader节点。
  • Observer:只能处理读请求。不能参与选举

搭建集群示例:
Zookeeper 集群配置.docx