本篇文章要讲一下基于 ZooKeeper 的配置中心。简单来说就是发布者对 ZooKeeper 的一个/多个节点进行修改时,通知订阅者进行数据订阅,从而实现动态获取新数据的目的,实现配置的集中式管理和动态更新。
基于 ZooKeeper 的配置中心实现思路:
当应用启动时,从 ZooKeeper 的某节点上拉取配置数据,并对该节点注册一个 Watch 监听;但凡 ZooKeeper 上的保存配置数据的节点发生了变化,应用就能收到通知并重新拉取新的配置文件。
点击查看【processon】
这部分代码比较简单,如下所示:
public class ZookeeperConfigApp {public static final String HOST = "192.168.136.128";public static final String PORT = "2181";public static MyConfig myConfig = new MyConfig();private static ZooKeeper zooKeeper;public static void main(String[] args) {try {// 因为Zookeeper的连接是异步的,需要加上同步工具final CountDownLatch countDownLatch = new CountDownLatch(1);zooKeeper = new ZooKeeper(HOST + ":" + PORT, 5000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {System.out.println("连接成功");countDownLatch.countDown();}if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {try {if ("/userInfo/username".equals(watchedEvent.getPath())) {updateUsername(zooKeeper);} else if ("/userInfo/password".equals(watchedEvent.getPath() )) {updatePassword(zooKeeper);} else if ("/userInfo/url".equals(watchedEvent.getPath())) {updateUrl(zooKeeper);}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}});countDownLatch.await();updateUsername(zooKeeper);updatePassword(zooKeeper);updateUrl(zooKeeper);System.out.println(zooKeeper.getSessionId());System.in.read();zooKeeper.close();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}public static void updateUsername(ZooKeeper zookeeper) throws KeeperException, InterruptedException {byte[] data = zookeeper.getData("/userInfo/username", true, null);System.out.println("更新用户名 -> " + myConfig.getUsername() + ":" + new String(data));myConfig.setUsername(new String(data));}public static void updatePassword(ZooKeeper zookeeper) throws KeeperException, InterruptedException {byte[] data = zookeeper.getData("/userInfo/password", true, null);System.out.println("更新密码 -> " + myConfig.getPassword() + ":" + new String(data));myConfig.setPassword(new String(data));}public static void updateUrl(ZooKeeper zookeeper) throws KeeperException, InterruptedException {byte[] data = zookeeper.getData("/userInfo/url", true, null);System.out.println("更新URL -> " + myConfig.getUrl() + ":" + new String(data));myConfig.setUrl(new String(data));}}
