本篇文章要讲一下基于 ZooKeeper 的配置中心。简单来说就是发布者对 ZooKeeper 的一个/多个节点进行修改时,通知订阅者进行数据订阅,从而实现动态获取新数据的目的,实现配置的集中式管理和动态更新。

    基于 ZooKeeper 的配置中心实现思路:
    当应用启动时,从 ZooKeeper 的某节点上拉取配置数据,并对该节点注册一个 Watch 监听;但凡 ZooKeeper 上的保存配置数据的节点发生了变化,应用就能收到通知并重新拉取新的配置文件。
    点击查看【processon】
    这部分代码比较简单,如下所示:

    1. public class ZookeeperConfigApp {
    2. public static final String HOST = "192.168.136.128";
    3. public static final String PORT = "2181";
    4. public static MyConfig myConfig = new MyConfig();
    5. private static ZooKeeper zooKeeper;
    6. public static void main(String[] args) {
    7. try {
    8. // 因为Zookeeper的连接是异步的,需要加上同步工具
    9. final CountDownLatch countDownLatch = new CountDownLatch(1);
    10. zooKeeper = new ZooKeeper(HOST + ":" + PORT, 5000, new Watcher() {
    11. @Override
    12. public void process(WatchedEvent watchedEvent) {
    13. if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
    14. System.out.println("连接成功");
    15. countDownLatch.countDown();
    16. }
    17. if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
    18. try {
    19. if ("/userInfo/username".equals(watchedEvent.getPath())) {
    20. updateUsername(zooKeeper);
    21. } else if ("/userInfo/password".equals(watchedEvent.getPath() )) {
    22. updatePassword(zooKeeper);
    23. } else if ("/userInfo/url".equals(watchedEvent.getPath())) {
    24. updateUrl(zooKeeper);
    25. }
    26. } catch (KeeperException e) {
    27. e.printStackTrace();
    28. } catch (InterruptedException e) {
    29. e.printStackTrace();
    30. }
    31. }
    32. }
    33. });
    34. countDownLatch.await();
    35. updateUsername(zooKeeper);
    36. updatePassword(zooKeeper);
    37. updateUrl(zooKeeper);
    38. System.out.println(zooKeeper.getSessionId());
    39. System.in.read();
    40. zooKeeper.close();
    41. } catch (IOException e) {
    42. e.printStackTrace();
    43. } catch (InterruptedException e) {
    44. e.printStackTrace();
    45. } catch (KeeperException e) {
    46. e.printStackTrace();
    47. }
    48. }
    49. public static void updateUsername(ZooKeeper zookeeper) throws KeeperException, InterruptedException {
    50. byte[] data = zookeeper.getData("/userInfo/username", true, null);
    51. System.out.println("更新用户名 -> " + myConfig.getUsername() + ":" + new String(data));
    52. myConfig.setUsername(new String(data));
    53. }
    54. public static void updatePassword(ZooKeeper zookeeper) throws KeeperException, InterruptedException {
    55. byte[] data = zookeeper.getData("/userInfo/password", true, null);
    56. System.out.println("更新密码 -> " + myConfig.getPassword() + ":" + new String(data));
    57. myConfig.setPassword(new String(data));
    58. }
    59. public static void updateUrl(ZooKeeper zookeeper) throws KeeperException, InterruptedException {
    60. byte[] data = zookeeper.getData("/userInfo/url", true, null);
    61. System.out.println("更新URL -> " + myConfig.getUrl() + ":" + new String(data));
    62. myConfig.setUrl(new String(data));
    63. }
    64. }