代码实现

全部以异步回调的方式实现。

DistributeCfgClient类

  1. import org.apache.zookeeper.ZooKeeper;
  2. import java.io.IOException;
  3. import java.util.concurrent.CountDownLatch;
  4. public class DistributeCfgClient {
  5. //zookeeper集群,字符串最后跟上一个路径/testConfig表示连接后只能看到/testConfig目录下的内容,其它兄弟节点是看不到的
  6. private static final String ZK_SERVER_LIST = "120.53.119.107:2181,120.53.119.107:2182,120.53.119.107:2183/testConfig";
  7. //设置超时时间,当节点3s后没反应就认该节点已经挂了,比如创建临时节点客户端关闭链接后3s,临时节点才移除
  8. private static final int sessionTimeout = 3000;
  9. //创建new ZooKeeper时传递的watch
  10. private static DefaultWatch defaultWatch = new DefaultWatch();
  11. private static CountDownLatch countDownLatch = new CountDownLatch(1);
  12. public ZooKeeper zooKeeper() throws IOException, IOException, InterruptedException {
  13. defaultWatch.setCountDownLatch(countDownLatch);
  14. //这个会立即返回一个ZooKeeper对象,如果别人直接拿过去用,可能还没有连接成功,所以我们在这里要阻塞一下,当连接成功后再返回
  15. ZooKeeper zk = new ZooKeeper(ZK_SERVER_LIST,sessionTimeout,defaultWatch);
  16. countDownLatch.await();
  17. return zk;
  18. }
  19. public static void main(String[] args) throws InterruptedException, IOException {
  20. DistributeCfgClient distributeCfgClient = new DistributeCfgClient();
  21. ZooKeeper zk = distributeCfgClient.zooKeeper();
  22. WallStatCallBack wallStatCallBack = new WallStatCallBack();
  23. wallStatCallBack.setZk(zk);
  24. ReceiveDataConf receiveDataConf = new ReceiveDataConf();
  25. wallStatCallBack.setConfigData(receiveDataConf);
  26. //获取数据
  27. wallStatCallBack.getConfigDataAwait();
  28. while (true){ //业务代码,以后只需关心配置和这部分代码就可以了
  29. if (receiveDataConf.getConf().equals("")){
  30. //等待获取数据
  31. wallStatCallBack.getConfigDataAwait();
  32. System.out.println(receiveDataConf.getConf());
  33. }else {
  34. System.out.println(receiveDataConf.getConf());
  35. }
  36. Thread.sleep(2000);
  37. }
  38. }
  39. }

DefaultWatch类

  1. import org.apache.zookeeper.WatchedEvent;
  2. import org.apache.zookeeper.Watcher;
  3. import java.util.concurrent.CountDownLatch;
  4. /**
  5. * 创建zookeeper对象时使用的watch,可以理解session级别的watch
  6. */
  7. public class DefaultWatch implements Watcher {
  8. private CountDownLatch countDownLatch;
  9. public void setCountDownLatch(CountDownLatch countDownLatch) {
  10. this.countDownLatch = countDownLatch;
  11. }
  12. @Override
  13. public void process(WatchedEvent event) {
  14. System.out.println(event.toString());
  15. Event.KeeperState state = event.getState();
  16. switch (state) {
  17. case Unknown -> {
  18. }
  19. case Disconnected -> {
  20. }
  21. case NoSyncConnected -> {
  22. }
  23. case SyncConnected -> { //连接成功
  24. countDownLatch.countDown();
  25. }
  26. case AuthFailed -> {
  27. }
  28. case ConnectedReadOnly -> {
  29. }
  30. case SaslAuthenticated -> {
  31. }
  32. case Expired -> {
  33. }
  34. case Closed -> {
  35. }
  36. }
  37. }
  38. }

需要获取的配置数据

  1. public class ReceiveDataConf {
  2. private String conf;
  3. public String getConf() {
  4. return conf;
  5. }
  6. public void setConf(String conf) {
  7. this.conf = conf;
  8. }
  9. }

WallStatCallBack核心类

  1. import org.apache.zookeeper.AsyncCallback;
  2. import org.apache.zookeeper.WatchedEvent;
  3. import org.apache.zookeeper.Watcher;
  4. import org.apache.zookeeper.ZooKeeper;
  5. import org.apache.zookeeper.data.Stat;
  6. import java.util.concurrent.CountDownLatch;
  7. /**
  8. * WallStatCallBack扮演Watcher角色、StatCallback角色、DataCallback角色
  9. */
  10. public class WallStatCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
  11. private ZooKeeper zk;
  12. private ReceiveDataConf configData; //接受的数据
  13. private CountDownLatch awaitDataLatch = new CountDownLatch(1);
  14. public void setConfigData(ReceiveDataConf configData) {
  15. this.configData = configData;
  16. }
  17. public void setZk(ZooKeeper zk) {
  18. this.zk = zk;
  19. }
  20. public void getConfigDataAwait() throws InterruptedException {
  21. //这里统一使用异步的方式进行开发,如果节点存在就会异步调用StateCallback
  22. zk.exists("/DataBaseConf", this, this,"abc");
  23. awaitDataLatch.await();
  24. }
  25. /**
  26. * watch的Callback,监听到数据的变化后调用
  27. * @param event
  28. */
  29. @Override
  30. public void process(WatchedEvent event) {
  31. Event.EventType type = event.getType();
  32. switch (type) {
  33. case None -> {
  34. }
  35. case NodeCreated -> {
  36. System.out.println("NodeCreated====");
  37. zk.getData("/DataBaseConf",this,this,"abc");
  38. }
  39. case NodeDeleted -> {
  40. //如果删除,可以自己根据业务情况实现,容忍性
  41. configData.setConf(""); //设置获取的配置数据为空
  42. awaitDataLatch = new CountDownLatch(1); //让外界继续阻塞
  43. }
  44. case NodeDataChanged -> {
  45. zk.getData("/DataBaseConf",this,this,"abc");
  46. }
  47. case NodeChildrenChanged -> {
  48. }
  49. case DataWatchRemoved -> {
  50. }
  51. case ChildWatchRemoved -> {
  52. }
  53. case PersistentWatchRemoved -> {
  54. }
  55. }
  56. }
  57. /**
  58. * 数据返回的callback,比如zk.getData,获取完数据后异步的方式返回数据
  59. * @param rc
  60. * @param path
  61. * @param ctx
  62. * @param data
  63. * @param stat
  64. */
  65. @Override
  66. public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
  67. if (data != null){
  68. String s = new String(data);
  69. configData.setConf(s);
  70. awaitDataLatch.countDown();
  71. }
  72. }
  73. /**
  74. *节点状态的callback,比如判断节点是否存在等
  75. * @param rc
  76. * @param path
  77. * @param ctx
  78. * @param stat
  79. */
  80. @Override
  81. public void processResult(int rc, String path, Object ctx, Stat stat) {
  82. if (stat != null){ //有需要的节点
  83. zk.getData("/DataBaseConf",this,this,"abc");
  84. }
  85. }
  86. }