前言:Zookeeper官方也提供了客户端,但是我们在日常工作中不怎么用,目前用的多的是Zookeeper客户端框架是Curator,本节我们主要讲解如何集成Curator,Zookeeper集群搭建方法也会总结。 ZooKeeper有很多可视化数据查看工具,比如基于CS架构的 ZooInspector,还有基于BS结构的如:ZKUI,感兴趣的可自行学习,这里不做介绍。
1、Apache Curator开源客户端
基本介绍
Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目 是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用。
Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、 分布式计数器、分布式锁。减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工 作。
如何集成
引入jar包:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
</dependency>
参数准备:
zookeeper:
enabled: true
server: 127.0.0.1:2181
namespace: lwq
digest: lwq:123456
sessionTimeoutMs: 3000
connectionTimeoutMs: 60000
maxRetries: 2
baseSleepTimeMs: 1000
@Data
@Component
@ConfigurationProperties(prefix = "zookeeper")
public class ZookeeperParam {
private boolean enabled;
private String server;
private String namespace;
private String digest;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
private int maxRetries;
private int baseSleepTimeMs;
}
配置类:
@Configuration
public class ZookeeperConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperConfig.class);
@Resource
private ZookeeperParam zookeeperParam;
private static CuratorFramework client = null;
/**
* 初始化
*/
@PostConstruct
public void init() {
// 重试策略,初试时间1秒,重试10次
RetryPolicy policy =
new ExponentialBackoffRetry(zookeeperParam.getBaseSleepTimeMs(), zookeeperParam.getMaxRetries());
// 通过工厂创建Curator
client = CuratorFrameworkFactory.builder().connectString(zookeeperParam.getServer())
.authorization("digest", zookeeperParam.getDigest().getBytes())
.connectionTimeoutMs(zookeeperParam.getConnectionTimeoutMs())
.sessionTimeoutMs(zookeeperParam.getSessionTimeoutMs()).retryPolicy(policy).build();
// 开启连接
client.start();
LOGGER.info("zookeeper 初始化完成...");
}
public static CuratorFramework getClient() {
return client;
}
public static void closeClient() {
if (client != null) {
client.close();
}
}
}
Znode节点操作:
public interface ZookeeperService {
/**
* 判断节点是否存在
*/
boolean isExistNode(final String path);
/**
* 创建节点
*/
void createNode(CreateMode mode, String path);
/**
* 设置节点数据
*/
void setNodeData(String path, String nodeData);
/**
* 创建节点
*/
void createNodeAndData(CreateMode mode, String path, String nodeData);
/**
* 获取节点数据
*/
String getNodeData(String path);
/**
* 获取节点下数据
*/
List<String> getNodeChild(String path);
/**
* 是否递归删除节点
*/
void deleteNode(String path, Boolean recursive);
}
@Service
public class ZookeeperServiceImpl implements ZookeeperService {
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperServiceImpl.class);
@Override
public boolean isExistNode(String path) {
CuratorFramework client = ZookeeperConfig.getClient();
client.sync();
try {
Stat stat = client.checkExists().forPath(path);
return client.checkExists().forPath(path) != null;
} catch (Exception e) {
LOGGER.error("isExistNode error...", e);
e.printStackTrace();
}
return false;
}
@Override
public void createNode(CreateMode mode, String path) {
CuratorFramework client = ZookeeperConfig.getClient();
try {
// 递归创建所需父节点
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
} catch (Exception e) {
LOGGER.error("createNode error...", e);
e.printStackTrace();
}
}
@Override
public void setNodeData(String path, String nodeData) {
CuratorFramework client = ZookeeperConfig.getClient();
try {
// 设置节点数据
client.setData().forPath(path, nodeData.getBytes("UTF-8"));
} catch (Exception e) {
LOGGER.error("setNodeData error...", e);
e.printStackTrace();
}
}
@Override
public void createNodeAndData(CreateMode mode, String path, String nodeData) {
CuratorFramework client = ZookeeperConfig.getClient();
try {
// 创建节点,关联数据
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, nodeData.getBytes("UTF-8"));
} catch (Exception e) {
LOGGER.error("createNode error...", e);
e.printStackTrace();
}
}
@Override
public String getNodeData(String path) {
CuratorFramework client = ZookeeperConfig.getClient();
try {
// 数据读取和转换
byte[] dataByte = client.getData().forPath(path);
String data = new String(dataByte, "UTF-8");
if (StringUtils.isNotEmpty(data)) {
return data;
}
} catch (Exception e) {
LOGGER.error("getNodeData error...", e);
e.printStackTrace();
}
return null;
}
@Override
public List<String> getNodeChild(String path) {
CuratorFramework client = ZookeeperConfig.getClient();
List<String> nodeChildDataList = new ArrayList<>();
try {
// 节点下数据集
nodeChildDataList = client.getChildren().forPath(path);
} catch (Exception e) {
LOGGER.error("getNodeChild error...", e);
e.printStackTrace();
}
return nodeChildDataList;
}
@Override
public void deleteNode(String path, Boolean recursive) {
CuratorFramework client = ZookeeperConfig.getClient();
try {
if (recursive) {
// 递归删除节点
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
} else {
// 删除单个节点
client.delete().guaranteed().forPath(path);
}
} catch (Exception e) {
LOGGER.error("deleteNode error...", e);
e.printStackTrace();
}
}
}
Controller调用:
@RestController
public class ZookeeperApi {
@Resource
private ZookeeperService zookeeperService;
@GetMapping("/getNodeData")
public String getNodeData(String path) {
return zookeeperService.getNodeData(path);
}
@GetMapping("/isExistNode")
public boolean isExistNode(final String path) {
return zookeeperService.isExistNode(path);
}
@GetMapping("/createNode")
public String createNode(CreateMode mode, String path) {
zookeeperService.createNode(mode, path);
return "success";
}
@GetMapping("/setNodeData")
public String setNodeData(String path, String nodeData) {
zookeeperService.setNodeData(path, nodeData);
return "success";
}
@GetMapping("/createNodeAndData")
public String createNodeAndData(CreateMode mode, String path, String nodeData) {
zookeeperService.createNodeAndData(mode, path, nodeData);
return "success";
}
@GetMapping("/getNodeChild")
public List<String> getNodeChild(String path) {
return zookeeperService.getNodeChild(path);
}
@GetMapping("/deleteNode")
public String deleteNode(String path, Boolean recursive) {
zookeeperService.deleteNode(path, recursive);
return "success";
}
}
2、Zookeeper集群搭建
集群搭建
Zookeeper 集群模式一共有三种类型的角色
- Leader: 处理所有的事务请求(写请求),可以处理读请求,集群中只能有一个Leader
- Follower:只能处理读请求,同时作为 Leader的候选节点,即如果Leader宕机,Follower节点 要参与到新的Leader选举中,有可能成为新的Leader节点。
- Observer:只能处理读请求。不能参与选举
搭建集群示例:
Zookeeper 集群配置.docx