节点的创建与获取

添加依赖

  1. <dependency>
  2. <groupId>org.apache.zookeeper</groupId>
  3. <artifactId>zookeeper</artifactId>
  4. <version>3.7.0</version>
  5. </dependency>

创建ZooKeeper客户端

    private ZooKeeper zkClient = null;
    /**
     * 初始化
     * @param connectString zookeeper连接,如果有多个则使用逗号连接, 如:10.0.1.1:2181,10.0.1.2:2181
     * @throws IOException
     */
    private void init(String connectString) throws IOException {
        int sessionTimeout = 2000;
        zkClient = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
            // 收到事件通知后的回调函数(用户的业务逻辑)
            System.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath());
            // 再次启动监听
            try {
                List<String> children = zkClient.getChildren("/", true);
                for (String child : children) {
                    System.out.println(child);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

创建子节点

    /**
     * 创建节点
     * @param nodePath 节点路径
     * @param data 节点数据
     * @param acl 节点权限
     * @param createMode 节点的类型
     */
    private String createNode(String nodePath, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException {
        return zkClient.create(nodePath, data, acl, createMode);
    }
// 创建节点
String node1 = createNode("/node1", "chentiefeng".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

获取子节点并监听节点变化

    /**
     * 获取节点子节点,并监听
     * @param nodePath 节点路径
     * @return 子节点集合
     */
    private List<String> getChildren(String nodePath) throws InterruptedException, KeeperException {
        return zkClient.getChildren(nodePath, true);
    }
// 获取子节点
List<String> nodeChild = getChildren("/");

判断 Znode 是否存在

    /**
     * 判断节点是否存在
     * @param nodePath 节点路径
     * @return 节点情况
     * @throws InterruptedException
     * @throws KeeperException
     */
    private Stat exist(String nodePath) throws InterruptedException, KeeperException {
        return zkClient.exists(nodePath, false);
    }
String testNode = "/test/node1";
// 判断节点是否存在
Stat stat = zkClientService.existNode(testNode);
if (Objects.isNull(stat)) {
    System.out.println(String.format("节点[%s]不存在", testNode));
} else {
    System.out.println(String.format("节点[%s]的修改时间是:%s", testNode, stat.getMtime()));
}

获取节点数据

    /**
     * 获取节点数据
     * @param nodePath 节点路径
     * @param stat 节点状态
     * @return 节点数据
     * @throws InterruptedException
     * @throws KeeperException
     */
    private byte[] getNodeData(String nodePath, Stat stat) throws InterruptedException, KeeperException {
        return zkClient.getData(nodePath, true, stat);
    }
String testNode = "/test/node1"; 
// 判断节点是否存在
Stat stat = zkClientService.existNode(testNode);
// 获取节点数据
byte[] nodeData = zkClientService.getNodeData(testNode, stat);

完整的zkclientService

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;

/**
 * Zk客户端服务
 *
 * @author chentiefeng
 * @created 2021/8/18 22:51
 */
public class ZkClientService {
    private ZooKeeper zkClient = null;

    public ZkClientService(String connectString) throws IOException {
        init(connectString);
    }
    /**
     * 初始化
     * @param connectString zookeeper连接,如果有多个则使用逗号连接, 如:10.0.1.1:2181,10.0.1.2:2181
     * @throws IOException
     */
    private void init(String connectString) throws IOException {
        int sessionTimeout = 2000;
        zkClient = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
            // 收到事件通知后的回调函数(用户的业务逻辑)
            System.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath());
            // 再次启动监听
            try {
                List<String> children = zkClient.getChildren("/", true);
                for (String child : children) {
                    System.out.println(child);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 创建节点
     * @param nodePath 节点路径
     * @param data 节点数据
     * @param acl 节点权限
     * @param createMode 节点的类型
     */
    private String createNode(String nodePath, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException {
        return zkClient.create(nodePath, data, acl, createMode);
    }

    /**
     * 获取子节点,并监听
     * @param nodePath 节点路径
     * @return 子节点集合
     */
    private List<String> getChildren(String nodePath) throws InterruptedException, KeeperException {
        return zkClient.getChildren(nodePath, true);
    }

    /**
     * 判断节点是否存在
     * @param nodePath 节点路径
     * @return 节点状态
     * @throws InterruptedException
     * @throws KeeperException
     */
    private Stat existNode(String nodePath) throws InterruptedException, KeeperException {
        return zkClient.exists(nodePath, false);
    }

    /**
     * 获取节点数据
     * @param nodePath 节点路径
     * @param stat 节点状态
     * @return 节点数据
     * @throws InterruptedException
     * @throws KeeperException
     */
    private byte[] getNodeData(String nodePath, Stat stat) throws InterruptedException, KeeperException {
        return zkClient.getData(nodePath, true, stat);
    }
}

服务器动态上下线监听

需求:某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
需求分析:

  1. 服务端启动时,去Zookeeper注册信息(创建临时节点)。
  2. 客户端启动时,主动获取当前在线服务器列表,并注册监听
  3. 服务端节点下线
  4. 客户端收到服务端节点下线通知。

image.png

具体实现

服务端

public class DistributeServer {
    private static ZooKeeper zkClient = null;
    private String parentNode = "/appService";
    private int sessionTimeout = 2000;

    /**
     * 创建连接
     * @param connectString 连接地址,如:10.0.1.1:2181,10.0.1.2:2181
     * @throws IOException
     */
    public void createConnect(String connectString) throws IOException {
        //连接到zookeeper
        zkClient = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
            // 监听处理事件
        });
    }

    /**
     * 注册服务
     * @param serviceHost 主机地址,如:"10.0.1.100:28888"
     */
    public void registService(String serviceHost) throws InterruptedException, KeeperException {
        String serviceNode = zkClient.create(parentNode + "node", serviceHost.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeServer distributeServer = new DistributeServer();
        // 创建连接
        String connectString = "10.0.1.1:2181,10.0.1.2:2181";
        distributeServer.createConnect(connectString);
        // 2 利用 zk 连接注册服务器信息
        String serviceHost = "10.0.1.100:28888";
        distributeServer.registService(serviceHost);
        // 启动业务逻辑
    }
}

客户端

public class DistributeClient {
    private static ZooKeeper zkClient = null;
    private String parentNode = "/appService";
    private int sessionTimeout = 2000;

    /**
     * 创建连接
     * @param connectString 连接地址,如:10.0.1.1:2181,10.0.1.2:2181
     * @throws IOException
     */
    public void createConnect(String connectString) throws IOException {
        //连接到zookeeper
        zkClient = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
            // 再次启动监听
            try {
                getServerList();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 获取服务注册信息
     */
    public List<String> getServerList() throws InterruptedException, KeeperException {
        // 1 获取服务器子节点信息,并且对父节点进行监听
        List<String> children = zkClient.getChildren(parentNode, true);
        // 2 存储服务器信息列表
        List<String> servers = new ArrayList<>();
        // 3 遍历所有节点,获取节点中的主机名称信息
        for (String child : children) {
            byte[] data = zkClient.getData(parentNode + "/" + child,
                    false, null);
            servers.add(new String(data));
        }
        return servers;
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeClient distributeClient = new DistributeClient();
        // 创建连接
        String connectString = "10.0.1.1:2181,10.0.1.2:2181";
        distributeClient.createConnect(connectString);
        // 2 获取服务注册信息
        List<String>  serviceList = distributeClient.getServerList();
        // 3、开始业务逻辑
    }
}

ZooKeeper分布式锁案例

什么是分布式锁

"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。<br />    我们把这个分布式环境下的这个锁叫作分布式锁。

案例分析

image.png

原生 Zookeeper 实现分布式锁

步骤如下:

  1. 创建连接
    1. 获取连接
    2. 等待连接成功
    3. 判断锁根节点是否存在,如果不存在,创建节点
  2. 加锁
    1. 在根节点下创建带序号的临时节点
    2. 判断该节点是不是最新的节点
    3. 如果是,获取到锁;如果不是,监听前一个节点
  3. 解锁
    1. 删除节点

示例:

public class DistributeLock {
    private static ZooKeeper zkClient = null;
    // zookeeper server 列表
    private String connectString = "10.0.1.1:2181,10.0.1.2:2181";
    //  超时时间
    private int sessionTimeout = 2000;
    private String rootNode = "locks";
    private String subNode = "seq-";
    // 当前 client 等待的子节点
    private String waitPath;
    //ZooKeeper 连接
    private CountDownLatch connectLatch = new CountDownLatch(1);
    //ZooKeeper 节点等待
    private CountDownLatch waitLatch = new CountDownLatch(1);
    // 当前 client 创建的子节点
    private String currentNode;
    // 和 zk 服务建立连接,并创建根节点
    public DistributeLock() throws IOException,InterruptedException, KeeperException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
                        // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
                        if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                            connectLatch.countDown();
                        }
                        // 发生了 waitPath 的删除事件
                        if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                            waitLatch.countDown();
                        }
                });
        // 等待连接建立
        connectLatch.await();
        //获取根节点状态
        Stat stat = zkClient.exists("/" + rootNode, false);
        //如果根节点不存在,则创建根节点,根节点类型为永久节点
        if (stat == null) {
            System.out.println("根节点不存在");
            zkClient.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    // 加锁方法
    public void zkLock() {
        try {
            //在根节点下创建临时顺序节点,返回值为创建的节点路径
            currentNode = zkClient.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // wait 一小会, 让结果更清晰一些
            Thread.sleep(10);
            // 注意, 没有必要监听"/locks"的子节点的变化情况
            List<String> childrenNodes = zkClient.getChildren("/" + rootNode, false);
            // 列表中只有一个子节点, 那肯定就是 currentNode , 说明  client 获得锁
            if (childrenNodes.size() == 1) {
                return;
            } else {
                //对根节点下的所有临时顺序节点进行从小到大排序
                Collections.sort(childrenNodes);
                //当前节点名称
                String thisNode = currentNode.substring(("/" + rootNode + "/").length());
                //获取当前节点的位置
                int index = childrenNodes.indexOf(thisNode);
                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    // index == 0, 说明 thisNode 在列表中最小, 当前 client 获得锁
                    return;
                } else {
                    // 获得排名比 currentNode 前 1 位的节点
                    this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
                    // 在 waitPath 上注册监听器, 当 waitPath 被删除时,  zookeeper 会回调监听器的 process 方法
                    zkClient.getData(waitPath, true, new Stat());
                    //进入等待锁状态
                    waitLatch.await();
                    return;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    // 解锁方法
    public void zkUnlock() {
        try {
            zkClient.delete(this.currentNode, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }
}

Curator 框架实现分布式锁案例

  1. 原生的 Java API 开发存在的问题
    • 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
    • Watch 需要重复注册,不然就不能生效
    • 开发的复杂性还是比较高的
    • 不支持多节点删除和创建。需要自己去递归
  2. Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。
    详情请查看官方文档:https://curator.apache.org/index.html

    添加依赖

    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>4.3.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>4.3.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-client</artifactId>
      <version>4.3.0</version>
    </dependency>
    

    分布式锁使用

    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessLock;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    public class CuratorLockTest {
     private String rootNode = "/locks";
     // zookeeper server 列表
     private String connectString = "10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181";
     // connection 超时时间
     private int connectionTimeout = 2000;
     // session 超时时间
     private int sessionTimeout = 2000;
     public static void main(String[] args) {
         new CuratorLockTest().test();
     }
     // 测试
     private void test() {
         // 创建分布式锁 1
         final InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
         // 创建分布式锁 2
         final InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
         new Thread(new Runnable() {
             @Override
             public void run() {
                 // 获取锁对象
                 try {
                     lock1.acquire();
                     System.out.println("线程 1 获取锁");
                     // 测试锁重入
                     lock1.acquire();
                     System.out.println("线程 1 再次获取锁");
                     Thread.sleep(5 * 1000);
                     lock1.release();
                     System.out.println("线程 1 释放锁");
                     lock1.release();
                     System.out.println("线程 1 再次释放锁");
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
             }
         }).start();
         new Thread(new Runnable() {
             @Override
             public void run() {
                 // 获取锁对象
                 try {
                     lock2.acquire();
                     System.out.println("线程 2 获取锁");
                     // 测试锁重入
                     lock2.acquire();
                     System.out.println("线程 2 再次获取锁");
                     Thread.sleep(5 * 1000);
                     lock2.release();
                     System.out.println("线程 2 释放锁");
                     lock2.release();
                     System.out.println("线程 2 再次释放锁");
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
             }
         }).start();
     }
     // 分布式锁初始化
     public CuratorFramework getCuratorFramework (){
         //重试策略,初试时间 3 秒,重试 3 次
         RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
         //通过工厂创建 Curator
         CuratorFramework client =
                 CuratorFrameworkFactory.builder()
                         .connectString(connectString)
                         .connectionTimeoutMs(connectionTimeout)
                         .sessionTimeoutMs(sessionTimeout)
                         .retryPolicy(policy).build();
         //开启连接
         client.start();
         System.out.println("zookeeper 初始化完成...");
         return client;
     } 
    }