思路

  1. 在上一篇文章中我们利用zk实现了分布式锁(两种实现方式),zk还有个作用就是可以利用它实现配置中心,就是你的配置文件都可以通过zk的配置节点来实现,比如你有一个application.properties文件,其中里面的一个属性为spring.application.name=lcloud-ceres-rnp,那么你就可以创建一个节点为node1用来表示此文件,再到node1节点下创建另外一个子节点node2,用来表示spring.application.name,其中的数据就为lcloud-ceres-rnp。<br /> 有了其中的思路那我们接着往下捋,那么我们创建一个文件怎么做呢?很简单,只要把你要新增的文件里面的属性以Properties来表示就行。删除的话就直接调用客户端的api来实现一个递归删除(服务端不支持递归),查询就直接返回一个Properties类即可。修改呢?则可以利用你传来的Properties类与你原来的node节点下的数据进行比较,如果存在并且数据一致则认为你这条属性没修改过。传来的key数据不存在于我们的node节点下则表明是新增的,然后剩下的就是需要删除的呗。直接获取到该节点直接删除即可。

代码实现

增删改查接口

  1. /**
  2. * @author heian
  3. * @create 2020-03-22-10:02 下午
  4. * @description 配置文件配置在zk固定的目录下,所以此处接口入参只需要传文件名而不是整个路径
  5. */
  6. public interface IConfigCenterWrite {
  7. //创建一个新的配置文件
  8. String insertCfgFile(String fileName, Properties properties);
  9. //删除一个配置文件
  10. boolean deleteCfgFile(String fileName);
  11. //修改
  12. void updateCfgFile(String fileName,Properties properties);
  13. //查
  14. Properties selectCfgFile(String fileName);
  15. }

watch动态监听接口

/**
 * @author heian
 * @create 2020-03-22-10:02 下午
 * @description 配置文件读取
 */
public interface IConfigCenterRead {

    //监听配置文件发生变化
    void watchCfgFile(String fileName, ChangeHandler changeHandler);

    interface ChangeHandler{
        //配置文件发生变化后,给一个完整的属性对象
        void itemChange(Properties properties);
    }

}

配置中心类

实现 增删改查类与监听

/**
 * @author heian
 * @create 2020-03-22-10:08 下午
 * @description
 */
public class ConfigCenterImpl implements IConfigCenterWrite,IConfigCenterRead {

    private String configFilePath;//配置文件地址 /distributeConfigure/cfgFile
    private String lockPath;//锁的文件地址
    private static final String default_configRootPath = "/distributeConfigure";

    private ZkClient client;

    public ConfigCenterImpl(){
        this(default_configRootPath);
    }

    public ConfigCenterImpl(String path){
        if (StringUtils.isBlank(path))
            throw new IllegalArgumentException("节点地址参数不能为空");
        configFilePath = path + "/cfgFile";
        lockPath = path + "/writeLock";
        String ip4 = "";
        try {
            ip4 = Inet4Address.getLocalHost().getHostAddress();
        }catch (Exception e){
            System.out.println("ip获取异常:" + ip4);
        }
        client = new ZkClient(ip4 + ":2183");
        client.setZkSerializer(new MyZkSerialiZer());
        if (!this.client.exists(configFilePath)) {
            try {
                this.client.createPersistent(configFilePath, true);
            } catch (ZkNodeExistsException e) {

            }
        }
    }

    @Override
    public String insertCfgFile(String fileName, Properties properties) {
        checkElement(fileName);
        //创建配置文件节点:一个文件为一个父节点;一个属性key为一个子节点,子节点的数据为value
        String parentNodePath = configFilePath + "/" + fileName;
        if (client.exists(parentNodePath))
            throw new IllegalArgumentException("文件"+parentNodePath+"已存在!");
        client.createPersistent(parentNodePath,true);
        if (properties == null) return parentNodePath;
        ZKDistributeLock2 lock = new ZKDistributeLock2(lockPath + "/" + fileName);
        lock.lock();
        for (Map.Entry<Object, Object> entry:properties.entrySet()){
            System.out.println("新增的属性为:" + entry.getKey() + "=" + entry.getValue());
            String sonNodePath = parentNodePath + "/" + entry.getKey();
            client.createPersistent(sonNodePath,entry.getValue());
        }
        lock.unlock();
        return parentNodePath;
    }

    @Override
    public boolean deleteCfgFile(String fileName) {
        checkElement(fileName);
        String parentNodePath = configFilePath + "/" + fileName;
        ZKDistributeLock2 lock = new ZKDistributeLock2(lockPath + "/" + fileName);
        lock.lock();
        boolean bool =  client.deleteRecursive(parentNodePath);
        lock.unlock();
        return bool;
    }

    @Override
    public void updateCfgFile(String fileName, Properties properties) {
        checkElement(fileName);
        if(properties == null) {throw new NullPointerException("要修改的配置项不能为空");}
        String parentNodePath = configFilePath + "/" + fileName;
        ZKDistributeLock2 lock = new ZKDistributeLock2(lockPath + "/" + fileName);
        lock.lock();
        try {
            List<String> old = client.getChildren(parentNodePath);
            //遍历传来的内容 name 看这个节点是否存在于我们原来的zk文件中
            for (Map.Entry<Object, Object> entry :properties.entrySet()){
                System.out.println("修改的属性为:" + entry.getKey() + "=" + entry.getValue());
                String newSonKey = entry.getKey().toString();
                String newSonValue = entry.getValue().toString();

                String sonNodePath = parentNodePath + "/" + newSonKey;
                if (old.contains(newSonKey)){
                    //可能修改了,也可能没动
                    String oldSonVlue = client.readData(sonNodePath);
                    if (!oldSonVlue.equals(newSonValue)){
                        //动了则修改,类似于 set /path 123
                        client.writeData(sonNodePath,newSonValue);
                    }
                    //减少复杂度,剩下的就是要删除的(新增的做了,修改和没修改的也做了)
                    old.remove(newSonKey);
                }else {
                    //zk 原来不存在的,需要新增
                    client.createPersistent(sonNodePath,newSonValue);
                }
            }
            if (!old.isEmpty()){
                for (String key : old) {
                    client.delete(parentNodePath + "/" + key);
                }
            }
        }catch (Exception e){
            System.out.println("修改异常:" + e.getMessage());
        }finally {
            lock.unlock();
        }
    }

    @Override
    public Properties selectCfgFile(String fileName) {
        String parentNodePath = configFilePath + "/" + fileName;
        checkElement(parentNodePath);
        List<String> children = client.getChildren(parentNodePath);
        if (children == null || children.isEmpty())
            return new Properties();
        Properties p = new Properties();
        for (String child : children) {
            String sonNodePath = parentNodePath + "/" + child;
            String value = client.readData(sonNodePath,true);
            p.put(child,value);
        }
        return p;
    }

    //监听机制
    @Override
    public void watchCfgFile(String fileName, ChangeHandler changeHandler) {
        if (!fileName.startsWith("/")){
            fileName = configFilePath + "/" + fileName;
        }
        final String fileNodePath = fileName;
        Properties properties = selectCfgFile(fileName);
        if (properties != null){
            int waitTime = 5;
            //合并5秒配置项变化,5秒内只触发一次处理事件
            ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(1);
            scheduled.setRemoveOnCancelPolicy(true);
            final List<ScheduledFuture<?>> futureList =  new ArrayList<>();
            for (Map.Entry<Object, Object> entry :properties.entrySet()){
                System.out.println("监控:"+fileNodePath+"/"+entry.getKey().toString());
                client.subscribeDataChanges(fileNodePath+"/"+entry.getKey().toString(), new IZkDataListener() {
                    @Override
                    public void handleDataDeleted(String dataPath) throws Exception {
                        System.out.println("触发删除:"+dataPath);
                        triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler);
                    }

                    @Override
                    public void handleDataChange(String dataPath, Object data) throws Exception {
                        System.out.println("触发修改:"+dataPath);
                        triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler);
                    }
                });

            }
            client.subscribeChildChanges(fileNodePath, new IZkChildListener() {
                @Override
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    System.out.println("触发子节点:"+parentPath);
                    triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler);
                }
            });
        }

    }

    /**
     * 合并修改变化事件,5秒钟内发生变化的合并到一个事件进行
     * @param futureList 装有定时触发任务的列表
     * @param scheduled 定时任务执行器
     * @param waitTime 延迟时间,单位秒
     * @param fileName zk配置文件的节点
     * @param changeHandler 事件处理器
     */
    private void triggerHandler(List<ScheduledFuture<?>> futureList, ScheduledThreadPoolExecutor scheduled, int waitTime, String fileName, ChangeHandler changeHandler) {
        if(futureList != null && !futureList.isEmpty()) {
            for(int i = 0 ; i < futureList.size(); i++) {
                ScheduledFuture<?> future = futureList.get(i);
                if(future != null && !future.isCancelled() && !future.isDone()) {
                    System.out.println("----取消------");
                    future.cancel(true);
                    futureList.remove(future);
                    i--;
                }
            }
        }
        ScheduledFuture<?> future = scheduled.schedule(()->{
            Properties p = selectCfgFile(fileName);
            changeHandler.itemChange(p);
        }, waitTime, TimeUnit.SECONDS);
        futureList.add(future);
    }


    private void checkElement(String v) {
        if (v == null) throw new NullPointerException();
        if("".equals(v.trim())) {
            throw new IllegalArgumentException("不能使用空格");
        }
        if(v.startsWith(" ") || v.endsWith(" ")) {
            throw new IllegalArgumentException("前后不能包含空格");
        }
    }

}

测试

  当然上面除了增删改查还有些方法没讲到,不着急。先来演示一波!我们先新增一个文件然后插入到zk节点中。

新增文件测试

public static void main(String[] args) {
        IConfigCenterWrite write = new ConfigCenterImpl();
        String fileName = "application.properties";
        write.deleteCfgFile(fileName);
        //创建配置文件
        Properties properties = new Properties();
        properties.put("my.girl.name", "chenLing");
        properties.put("my.girl.age", "26");
        String s = write.insertCfgFile(fileName, properties);
        System.out.println("new file: "+s);
}

屏幕快照 2020-03-23 下午8.18.40.png
控制台输出信息

main尝试获取分布式锁null
main获得分布式锁
main释放分布式锁
main尝试获取分布式锁null
main获得分布式锁
新增的属性为:my.girl.age=26
新增的属性为:my.girl.name=chenLing
main释放分布式锁
new file: /distributeConfigure/cfgFile/application.properties

修改点文件测试

public static void main(String[] args) {
        IConfigCenterWrite write = new ConfigCenterImpl();
        String fileName = "application.properties";
        write.deleteCfgFile(fileName);
        //创建配置文件
        Properties properties = new Properties();
        properties.put("my.girl.name", "chenLing");
        properties.put("my.girl.age", "26");
        String s = write.insertCfgFile(fileName, properties);
        System.out.println("new file: "+s);
          LockSupport.parkNanos(1000000000*3L);
        //修改  新增  删除
        Properties properties2 = new Properties();
        properties2.put("my.girl.name", "chenLing2");
        properties2.put("my.girl.age", "18");
        properties2.put("my.girl.location", "深圳");
        write.updateCfgFile(fileName,properties2);
}

屏幕快照 2020-03-23 下午8.24.39.png
控制台输出信息

main尝试获取分布式锁null
main获得分布式锁
main释放分布式锁
main尝试获取分布式锁null
main获得分布式锁
新增的属性为:my.girl.age=26
新增的属性为:my.girl.name=chenLing
main释放分布式锁
new file: /distributeConfigure/cfgFile/application.properties
main尝试获取分布式锁null
main获得分布式锁
修改的属性为:my.girl.location=深圳
修改的属性为:my.girl.age=18
修改的属性为:my.girl.name=chenLing2
main释放分布式锁

监听机制

 public static void main(String[] args) throws InterruptedException {
        IConfigCenterWrite write = new ConfigCenterImpl();
        String fileName = "application.properties";
        write.deleteCfgFile(fileName);
        //创建配置文件
        Properties properties = new Properties();
        properties.put("my.girl.name", "chenLing");
        properties.put("my.girl.age", "26");
        String s = write.insertCfgFile(fileName, properties);
        System.out.println("new file: "+s);

        new Thread(() -> {
            IConfigCenterRead read = new ConfigCenterImpl();
            read.watchCfgFile(fileName, new IConfigCenterRead.ChangeHandler() {
                @Override
                public void itemChange(Properties properties) {
                    System.out.println(Thread.currentThread().getName() + " 监听到数据发生变化");
                }
            });
        }).start();
        LockSupport.parkNanos(1000000000*3L);
        //修改  新增  删除
        Properties properties2 = new Properties();
        properties2.put("my.girl.name", "chenLing2");
        properties2.put("my.girl.age", "18");
        properties2.put("my.girl.location", "深圳");
        write.updateCfgFile(fileName,properties2);
        //因为监听属于守护线程,结束程序会导致 守护线程退出无法处于监听状态
        Thread.currentThread().join();
    }

main尝试获取分布式锁null
main获得分布式锁
main释放分布式锁
main尝试获取分布式锁null
main获得分布式锁
新增的属性为:my.girl.age=26
新增的属性为:my.girl.name=chenLing
main释放分布式锁
new file: /distributeConfigure/cfgFile/application.properties
监控:/distributeConfigure/cfgFile/application.properties/my.girl.age
监控:/distributeConfigure/cfgFile/application.properties/my.girl.name
main尝试获取分布式锁null
main获得分布式锁
修改的属性为:my.girl.location=深圳
修改的属性为:my.girl.age=18
触发子节点:/distributeConfigure/cfgFile/application.properties
修改的属性为:my.girl.name=chenLing2
main释放分布式锁
触发修改:/distributeConfigure/cfgFile/application.properties/my.girl.age
----取消------
触发修改:/distributeConfigure/cfgFile/application.properties/my.girl.name
----取消------
pool-1-thread-1 监听到数据发生变化
  从上面的日志可以看出,我做了三个操作分别是增加 删除 修改  但是此时他们他们只会有一个进入此方法,因为我那个triggerHandler方法做了一个五秒钟内数据发生变化我这里只会记录最后一次的变化。这是这里最为难理解的一部分,目的就是为了防止高并发提交触发多次记录提示。
read.watchCfgFile(fileName, new IConfigCenterRead.ChangeHandler() {
      @Override
      public void itemChange(Properties properties) {
           System.out.println(Thread.currentThread().getName() + " 监听到数据发生变化");
      }  
});