代码实现
全部以异步回调的方式实现。
DistributeCfgClient类
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class DistributeCfgClient {
//zookeeper集群,字符串最后跟上一个路径/testConfig表示连接后只能看到/testConfig目录下的内容,其它兄弟节点是看不到的
private static final String ZK_SERVER_LIST = "120.53.119.107:2181,120.53.119.107:2182,120.53.119.107:2183/testConfig";
//设置超时时间,当节点3s后没反应就认该节点已经挂了,比如创建临时节点客户端关闭链接后3s,临时节点才移除
private static final int sessionTimeout = 3000;
//创建new ZooKeeper时传递的watch
private static DefaultWatch defaultWatch = new DefaultWatch();
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public ZooKeeper zooKeeper() throws IOException, IOException, InterruptedException {
defaultWatch.setCountDownLatch(countDownLatch);
//这个会立即返回一个ZooKeeper对象,如果别人直接拿过去用,可能还没有连接成功,所以我们在这里要阻塞一下,当连接成功后再返回
ZooKeeper zk = new ZooKeeper(ZK_SERVER_LIST,sessionTimeout,defaultWatch);
countDownLatch.await();
return zk;
}
public static void main(String[] args) throws InterruptedException, IOException {
DistributeCfgClient distributeCfgClient = new DistributeCfgClient();
ZooKeeper zk = distributeCfgClient.zooKeeper();
WallStatCallBack wallStatCallBack = new WallStatCallBack();
wallStatCallBack.setZk(zk);
ReceiveDataConf receiveDataConf = new ReceiveDataConf();
wallStatCallBack.setConfigData(receiveDataConf);
//获取数据
wallStatCallBack.getConfigDataAwait();
while (true){ //业务代码,以后只需关心配置和这部分代码就可以了
if (receiveDataConf.getConf().equals("")){
//等待获取数据
wallStatCallBack.getConfigDataAwait();
System.out.println(receiveDataConf.getConf());
}else {
System.out.println(receiveDataConf.getConf());
}
Thread.sleep(2000);
}
}
}
DefaultWatch类
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;
/**
* 创建zookeeper对象时使用的watch,可以理解session级别的watch
*/
public class DefaultWatch implements Watcher {
private CountDownLatch countDownLatch;
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void process(WatchedEvent event) {
System.out.println(event.toString());
Event.KeeperState state = event.getState();
switch (state) {
case Unknown -> {
}
case Disconnected -> {
}
case NoSyncConnected -> {
}
case SyncConnected -> { //连接成功
countDownLatch.countDown();
}
case AuthFailed -> {
}
case ConnectedReadOnly -> {
}
case SaslAuthenticated -> {
}
case Expired -> {
}
case Closed -> {
}
}
}
}
需要获取的配置数据
public class ReceiveDataConf {
private String conf;
public String getConf() {
return conf;
}
public void setConf(String conf) {
this.conf = conf;
}
}
WallStatCallBack核心类
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
/**
* WallStatCallBack扮演Watcher角色、StatCallback角色、DataCallback角色
*/
public class WallStatCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zk;
private ReceiveDataConf configData; //接受的数据
private CountDownLatch awaitDataLatch = new CountDownLatch(1);
public void setConfigData(ReceiveDataConf configData) {
this.configData = configData;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
public void getConfigDataAwait() throws InterruptedException {
//这里统一使用异步的方式进行开发,如果节点存在就会异步调用StateCallback
zk.exists("/DataBaseConf", this, this,"abc");
awaitDataLatch.await();
}
/**
* watch的Callback,监听到数据的变化后调用
* @param event
*/
@Override
public void process(WatchedEvent event) {
Event.EventType type = event.getType();
switch (type) {
case None -> {
}
case NodeCreated -> {
System.out.println("NodeCreated====");
zk.getData("/DataBaseConf",this,this,"abc");
}
case NodeDeleted -> {
//如果删除,可以自己根据业务情况实现,容忍性
configData.setConf(""); //设置获取的配置数据为空
awaitDataLatch = new CountDownLatch(1); //让外界继续阻塞
}
case NodeDataChanged -> {
zk.getData("/DataBaseConf",this,this,"abc");
}
case NodeChildrenChanged -> {
}
case DataWatchRemoved -> {
}
case ChildWatchRemoved -> {
}
case PersistentWatchRemoved -> {
}
}
}
/**
* 数据返回的callback,比如zk.getData,获取完数据后异步的方式返回数据
* @param rc
* @param path
* @param ctx
* @param data
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if (data != null){
String s = new String(data);
configData.setConf(s);
awaitDataLatch.countDown();
}
}
/**
*节点状态的callback,比如判断节点是否存在等
* @param rc
* @param path
* @param ctx
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (stat != null){ //有需要的节点
zk.getData("/DataBaseConf",this,this,"abc");
}
}
}