代码实现
全部以异步回调的方式实现。
DistributeCfgClient类
import org.apache.zookeeper.ZooKeeper;import java.io.IOException;import java.util.concurrent.CountDownLatch;public class DistributeCfgClient { //zookeeper集群,字符串最后跟上一个路径/testConfig表示连接后只能看到/testConfig目录下的内容,其它兄弟节点是看不到的 private static final String ZK_SERVER_LIST = "120.53.119.107:2181,120.53.119.107:2182,120.53.119.107:2183/testConfig"; //设置超时时间,当节点3s后没反应就认该节点已经挂了,比如创建临时节点客户端关闭链接后3s,临时节点才移除 private static final int sessionTimeout = 3000; //创建new ZooKeeper时传递的watch private static DefaultWatch defaultWatch = new DefaultWatch(); private static CountDownLatch countDownLatch = new CountDownLatch(1); public ZooKeeper zooKeeper() throws IOException, IOException, InterruptedException { defaultWatch.setCountDownLatch(countDownLatch); //这个会立即返回一个ZooKeeper对象,如果别人直接拿过去用,可能还没有连接成功,所以我们在这里要阻塞一下,当连接成功后再返回 ZooKeeper zk = new ZooKeeper(ZK_SERVER_LIST,sessionTimeout,defaultWatch); countDownLatch.await(); return zk; } public static void main(String[] args) throws InterruptedException, IOException { DistributeCfgClient distributeCfgClient = new DistributeCfgClient(); ZooKeeper zk = distributeCfgClient.zooKeeper(); WallStatCallBack wallStatCallBack = new WallStatCallBack(); wallStatCallBack.setZk(zk); ReceiveDataConf receiveDataConf = new ReceiveDataConf(); wallStatCallBack.setConfigData(receiveDataConf); //获取数据 wallStatCallBack.getConfigDataAwait(); while (true){ //业务代码,以后只需关心配置和这部分代码就可以了 if (receiveDataConf.getConf().equals("")){ //等待获取数据 wallStatCallBack.getConfigDataAwait(); System.out.println(receiveDataConf.getConf()); }else { System.out.println(receiveDataConf.getConf()); } Thread.sleep(2000); } }}
DefaultWatch类
import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import java.util.concurrent.CountDownLatch;/** * 创建zookeeper对象时使用的watch,可以理解session级别的watch */public class DefaultWatch implements Watcher { private CountDownLatch countDownLatch; public void setCountDownLatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void process(WatchedEvent event) { System.out.println(event.toString()); Event.KeeperState state = event.getState(); switch (state) { case Unknown -> { } case Disconnected -> { } case NoSyncConnected -> { } case SyncConnected -> { //连接成功 countDownLatch.countDown(); } case AuthFailed -> { } case ConnectedReadOnly -> { } case SaslAuthenticated -> { } case Expired -> { } case Closed -> { } } }}
需要获取的配置数据
public class ReceiveDataConf { private String conf; public String getConf() { return conf; } public void setConf(String conf) { this.conf = conf; }}
WallStatCallBack核心类
import org.apache.zookeeper.AsyncCallback;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import java.util.concurrent.CountDownLatch;/** * WallStatCallBack扮演Watcher角色、StatCallback角色、DataCallback角色 */public class WallStatCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zk; private ReceiveDataConf configData; //接受的数据 private CountDownLatch awaitDataLatch = new CountDownLatch(1); public void setConfigData(ReceiveDataConf configData) { this.configData = configData; } public void setZk(ZooKeeper zk) { this.zk = zk; } public void getConfigDataAwait() throws InterruptedException { //这里统一使用异步的方式进行开发,如果节点存在就会异步调用StateCallback zk.exists("/DataBaseConf", this, this,"abc"); awaitDataLatch.await(); } /** * watch的Callback,监听到数据的变化后调用 * @param event */ @Override public void process(WatchedEvent event) { Event.EventType type = event.getType(); switch (type) { case None -> { } case NodeCreated -> { System.out.println("NodeCreated===="); zk.getData("/DataBaseConf",this,this,"abc"); } case NodeDeleted -> { //如果删除,可以自己根据业务情况实现,容忍性 configData.setConf(""); //设置获取的配置数据为空 awaitDataLatch = new CountDownLatch(1); //让外界继续阻塞 } case NodeDataChanged -> { zk.getData("/DataBaseConf",this,this,"abc"); } case NodeChildrenChanged -> { } case DataWatchRemoved -> { } case ChildWatchRemoved -> { } case PersistentWatchRemoved -> { } } } /** * 数据返回的callback,比如zk.getData,获取完数据后异步的方式返回数据 * @param rc * @param path * @param ctx * @param data * @param stat */ @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if (data != null){ String s = new String(data); configData.setConf(s); awaitDataLatch.countDown(); } } /** *节点状态的callback,比如判断节点是否存在等 * @param rc * @param path * @param ctx * @param stat */ @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if (stat != null){ //有需要的节点 zk.getData("/DataBaseConf",this,this,"abc"); } }}