节点的创建与获取
添加依赖
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.0</version></dependency>
创建ZooKeeper客户端
private ZooKeeper zkClient = null;
/**
* 初始化
* @param connectString zookeeper连接,如果有多个则使用逗号连接, 如:10.0.1.1:2181,10.0.1.2:2181
* @throws IOException
*/
private void init(String connectString) throws IOException {
int sessionTimeout = 2000;
zkClient = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
// 收到事件通知后的回调函数(用户的业务逻辑)
System.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath());
// 再次启动监听
try {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
创建子节点
/**
* 创建节点
* @param nodePath 节点路径
* @param data 节点数据
* @param acl 节点权限
* @param createMode 节点的类型
*/
private String createNode(String nodePath, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException {
return zkClient.create(nodePath, data, acl, createMode);
}
// 创建节点
String node1 = createNode("/node1", "chentiefeng".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
获取子节点并监听节点变化
/**
* 获取节点子节点,并监听
* @param nodePath 节点路径
* @return 子节点集合
*/
private List<String> getChildren(String nodePath) throws InterruptedException, KeeperException {
return zkClient.getChildren(nodePath, true);
}
// 获取子节点
List<String> nodeChild = getChildren("/");
判断 Znode 是否存在
/**
* 判断节点是否存在
* @param nodePath 节点路径
* @return 节点情况
* @throws InterruptedException
* @throws KeeperException
*/
private Stat exist(String nodePath) throws InterruptedException, KeeperException {
return zkClient.exists(nodePath, false);
}
String testNode = "/test/node1";
// 判断节点是否存在
Stat stat = zkClientService.existNode(testNode);
if (Objects.isNull(stat)) {
System.out.println(String.format("节点[%s]不存在", testNode));
} else {
System.out.println(String.format("节点[%s]的修改时间是:%s", testNode, stat.getMtime()));
}
获取节点数据
/**
* 获取节点数据
* @param nodePath 节点路径
* @param stat 节点状态
* @return 节点数据
* @throws InterruptedException
* @throws KeeperException
*/
private byte[] getNodeData(String nodePath, Stat stat) throws InterruptedException, KeeperException {
return zkClient.getData(nodePath, true, stat);
}
String testNode = "/test/node1";
// 判断节点是否存在
Stat stat = zkClientService.existNode(testNode);
// 获取节点数据
byte[] nodeData = zkClientService.getNodeData(testNode, stat);
完整的zkclientService
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
/**
* Zk客户端服务
*
* @author chentiefeng
* @created 2021/8/18 22:51
*/
public class ZkClientService {
private ZooKeeper zkClient = null;
public ZkClientService(String connectString) throws IOException {
init(connectString);
}
/**
* 初始化
* @param connectString zookeeper连接,如果有多个则使用逗号连接, 如:10.0.1.1:2181,10.0.1.2:2181
* @throws IOException
*/
private void init(String connectString) throws IOException {
int sessionTimeout = 2000;
zkClient = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
// 收到事件通知后的回调函数(用户的业务逻辑)
System.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath());
// 再次启动监听
try {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
/**
* 创建节点
* @param nodePath 节点路径
* @param data 节点数据
* @param acl 节点权限
* @param createMode 节点的类型
*/
private String createNode(String nodePath, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException {
return zkClient.create(nodePath, data, acl, createMode);
}
/**
* 获取子节点,并监听
* @param nodePath 节点路径
* @return 子节点集合
*/
private List<String> getChildren(String nodePath) throws InterruptedException, KeeperException {
return zkClient.getChildren(nodePath, true);
}
/**
* 判断节点是否存在
* @param nodePath 节点路径
* @return 节点状态
* @throws InterruptedException
* @throws KeeperException
*/
private Stat existNode(String nodePath) throws InterruptedException, KeeperException {
return zkClient.exists(nodePath, false);
}
/**
* 获取节点数据
* @param nodePath 节点路径
* @param stat 节点状态
* @return 节点数据
* @throws InterruptedException
* @throws KeeperException
*/
private byte[] getNodeData(String nodePath, Stat stat) throws InterruptedException, KeeperException {
return zkClient.getData(nodePath, true, stat);
}
}
服务器动态上下线监听
需求:某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
需求分析:
- 服务端启动时,去Zookeeper注册信息(创建临时节点)。
- 客户端启动时,主动获取当前在线服务器列表,并注册监听
- 服务端节点下线
- 客户端收到服务端节点下线通知。
具体实现
服务端
public class DistributeServer {
private static ZooKeeper zkClient = null;
private String parentNode = "/appService";
private int sessionTimeout = 2000;
/**
* 创建连接
* @param connectString 连接地址,如:10.0.1.1:2181,10.0.1.2:2181
* @throws IOException
*/
public void createConnect(String connectString) throws IOException {
//连接到zookeeper
zkClient = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
// 监听处理事件
});
}
/**
* 注册服务
* @param serviceHost 主机地址,如:"10.0.1.100:28888"
*/
public void registService(String serviceHost) throws InterruptedException, KeeperException {
String serviceNode = zkClient.create(parentNode + "node", serviceHost.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeServer distributeServer = new DistributeServer();
// 创建连接
String connectString = "10.0.1.1:2181,10.0.1.2:2181";
distributeServer.createConnect(connectString);
// 2 利用 zk 连接注册服务器信息
String serviceHost = "10.0.1.100:28888";
distributeServer.registService(serviceHost);
// 启动业务逻辑
}
}
客户端
public class DistributeClient {
private static ZooKeeper zkClient = null;
private String parentNode = "/appService";
private int sessionTimeout = 2000;
/**
* 创建连接
* @param connectString 连接地址,如:10.0.1.1:2181,10.0.1.2:2181
* @throws IOException
*/
public void createConnect(String connectString) throws IOException {
//连接到zookeeper
zkClient = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
// 再次启动监听
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
});
}
/**
* 获取服务注册信息
*/
public List<String> getServerList() throws InterruptedException, KeeperException {
// 1 获取服务器子节点信息,并且对父节点进行监听
List<String> children = zkClient.getChildren(parentNode, true);
// 2 存储服务器信息列表
List<String> servers = new ArrayList<>();
// 3 遍历所有节点,获取节点中的主机名称信息
for (String child : children) {
byte[] data = zkClient.getData(parentNode + "/" + child,
false, null);
servers.add(new String(data));
}
return servers;
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeClient distributeClient = new DistributeClient();
// 创建连接
String connectString = "10.0.1.1:2181,10.0.1.2:2181";
distributeClient.createConnect(connectString);
// 2 获取服务注册信息
List<String> serviceList = distributeClient.getServerList();
// 3、开始业务逻辑
}
}
ZooKeeper分布式锁案例
什么是分布式锁
"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。<br /> 我们把这个分布式环境下的这个锁叫作分布式锁。
案例分析
原生 Zookeeper 实现分布式锁
步骤如下:
- 创建连接
- 获取连接
- 等待连接成功
- 判断锁根节点是否存在,如果不存在,创建节点
- 加锁
- 在根节点下创建带序号的临时节点
- 判断该节点是不是最新的节点
- 如果是,获取到锁;如果不是,监听前一个节点
- 解锁
- 删除节点
示例:
public class DistributeLock {
private static ZooKeeper zkClient = null;
// zookeeper server 列表
private String connectString = "10.0.1.1:2181,10.0.1.2:2181";
// 超时时间
private int sessionTimeout = 2000;
private String rootNode = "locks";
private String subNode = "seq-";
// 当前 client 等待的子节点
private String waitPath;
//ZooKeeper 连接
private CountDownLatch connectLatch = new CountDownLatch(1);
//ZooKeeper 节点等待
private CountDownLatch waitLatch = new CountDownLatch(1);
// 当前 client 创建的子节点
private String currentNode;
// 和 zk 服务建立连接,并创建根节点
public DistributeLock() throws IOException,InterruptedException, KeeperException {
zkClient = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
// 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 发生了 waitPath 的删除事件
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
});
// 等待连接建立
connectLatch.await();
//获取根节点状态
Stat stat = zkClient.exists("/" + rootNode, false);
//如果根节点不存在,则创建根节点,根节点类型为永久节点
if (stat == null) {
System.out.println("根节点不存在");
zkClient.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 加锁方法
public void zkLock() {
try {
//在根节点下创建临时顺序节点,返回值为创建的节点路径
currentNode = zkClient.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// wait 一小会, 让结果更清晰一些
Thread.sleep(10);
// 注意, 没有必要监听"/locks"的子节点的变化情况
List<String> childrenNodes = zkClient.getChildren("/" + rootNode, false);
// 列表中只有一个子节点, 那肯定就是 currentNode , 说明 client 获得锁
if (childrenNodes.size() == 1) {
return;
} else {
//对根节点下的所有临时顺序节点进行从小到大排序
Collections.sort(childrenNodes);
//当前节点名称
String thisNode = currentNode.substring(("/" + rootNode + "/").length());
//获取当前节点的位置
int index = childrenNodes.indexOf(thisNode);
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
// index == 0, 说明 thisNode 在列表中最小, 当前 client 获得锁
return;
} else {
// 获得排名比 currentNode 前 1 位的节点
this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
// 在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的 process 方法
zkClient.getData(waitPath, true, new Stat());
//进入等待锁状态
waitLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 解锁方法
public void zkUnlock() {
try {
zkClient.delete(this.currentNode, -1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}
Curator 框架实现分布式锁案例
- 原生的 Java API 开发存在的问题
- 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
- Watch 需要重复注册,不然就不能生效
- 开发的复杂性还是比较高的
- 不支持多节点删除和创建。需要自己去递归
- Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。
详情请查看官方文档:https://curator.apache.org/index.html添加依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency>分布式锁使用
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorLockTest { private String rootNode = "/locks"; // zookeeper server 列表 private String connectString = "10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181"; // connection 超时时间 private int connectionTimeout = 2000; // session 超时时间 private int sessionTimeout = 2000; public static void main(String[] args) { new CuratorLockTest().test(); } // 测试 private void test() { // 创建分布式锁 1 final InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode); // 创建分布式锁 2 final InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { lock1.acquire(); System.out.println("线程 1 获取锁"); // 测试锁重入 lock1.acquire(); System.out.println("线程 1 再次获取锁"); Thread.sleep(5 * 1000); lock1.release(); System.out.println("线程 1 释放锁"); lock1.release(); System.out.println("线程 1 再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { lock2.acquire(); System.out.println("线程 2 获取锁"); // 测试锁重入 lock2.acquire(); System.out.println("线程 2 再次获取锁"); Thread.sleep(5 * 1000); lock2.release(); System.out.println("线程 2 释放锁"); lock2.release(); System.out.println("线程 2 再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } // 分布式锁初始化 public CuratorFramework getCuratorFramework (){ //重试策略,初试时间 3 秒,重试 3 次 RetryPolicy policy = new ExponentialBackoffRetry(3000, 3); //通过工厂创建 Curator CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .connectionTimeoutMs(connectionTimeout) .sessionTimeoutMs(sessionTimeout) .retryPolicy(policy).build(); //开启连接 client.start(); System.out.println("zookeeper 初始化完成..."); return client; } }
