本篇文章要讲一下基于 ZooKeeper
的配置中心。简单来说就是发布者对 ZooKeeper
的一个/多个节点进行修改时,通知订阅者进行数据订阅,从而实现动态获取新数据的目的,实现配置的集中式管理和动态更新。
基于 ZooKeeper
的配置中心实现思路:
当应用启动时,从 ZooKeeper
的某节点上拉取配置数据,并对该节点注册一个 Watch
监听;但凡 ZooKeeper
上的保存配置数据的节点发生了变化,应用就能收到通知并重新拉取新的配置文件。
点击查看【processon】
这部分代码比较简单,如下所示:
public class ZookeeperConfigApp {
public static final String HOST = "192.168.136.128";
public static final String PORT = "2181";
public static MyConfig myConfig = new MyConfig();
private static ZooKeeper zooKeeper;
public static void main(String[] args) {
try {
// 因为Zookeeper的连接是异步的,需要加上同步工具
final CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper(HOST + ":" + PORT, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
countDownLatch.countDown();
}
if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
try {
if ("/userInfo/username".equals(watchedEvent.getPath())) {
updateUsername(zooKeeper);
} else if ("/userInfo/password".equals(watchedEvent.getPath() )) {
updatePassword(zooKeeper);
} else if ("/userInfo/url".equals(watchedEvent.getPath())) {
updateUrl(zooKeeper);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
countDownLatch.await();
updateUsername(zooKeeper);
updatePassword(zooKeeper);
updateUrl(zooKeeper);
System.out.println(zooKeeper.getSessionId());
System.in.read();
zooKeeper.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public static void updateUsername(ZooKeeper zookeeper) throws KeeperException, InterruptedException {
byte[] data = zookeeper.getData("/userInfo/username", true, null);
System.out.println("更新用户名 -> " + myConfig.getUsername() + ":" + new String(data));
myConfig.setUsername(new String(data));
}
public static void updatePassword(ZooKeeper zookeeper) throws KeeperException, InterruptedException {
byte[] data = zookeeper.getData("/userInfo/password", true, null);
System.out.println("更新密码 -> " + myConfig.getPassword() + ":" + new String(data));
myConfig.setPassword(new String(data));
}
public static void updateUrl(ZooKeeper zookeeper) throws KeeperException, InterruptedException {
byte[] data = zookeeper.getData("/userInfo/url", true, null);
System.out.println("更新URL -> " + myConfig.getUrl() + ":" + new String(data));
myConfig.setUrl(new String(data));
}
}