引入maven:
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>{服务器的zk是什么版本这里就引入什么版本}</version>
</dependency>
</dependencies>
java:
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class TestZookeeperClient {
static CountDownLatch cd = new CountDownLatch(1);
static Stat stat=new Stat();
static ZooKeeper zk;
static {
try {
zk = new ZooKeeper("192.168.235.133:2181,192.168.235.133:2182,192.168.235.133:2183,192.168.235.133:2184",
3000, new SessionWatcher() );
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
System.out.println( "Hello World!" );
//zk是有session概念的,没有连接池的概念
//watch:观察,回调
//watch的注册值发生在 读类型调用,get,exites。。。
//第一类:new zk 时候,传入的watch,这个watch,session级别的,跟path 、node没有关系。
System.out.println("程序卡主等待连接zk...");
cd.await();
ZooKeeper.States state = zk.getState();
switch (state) {
case CONNECTING:
System.out.println("ing......");
break;
case ASSOCIATING:
break;
case CONNECTED:
System.out.println("程序已连接zk......");
break;
case CONNECTEDREADONLY:
break;
case CLOSED:
break;
case AUTH_FAILED:
break;
case NOT_CONNECTED:
break;
}
//创建/ooxx的临时目录。存放数据为 "olddata"
String pathName = zk.create("/ooxx", "olddata".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//获取/ooxx下的数据,并在数据修改时触发一次new OOXXWatcher()
byte[] node = zk.getData("/ooxx", new OOXXWatcher() , stat);
System.out.println("/ooxx下的数据为" + new String(node));
//触发回调
Stat stat1 = zk.setData("/ooxx", "newdata".getBytes(), 0);
//再次在/ooxx的临时目录下设置新的数据
Stat stat2 = zk.setData("/ooxx", "newdata01".getBytes(), stat1.getVersion());
System.out.println("-------async start----------");
zk.getData("/ooxx", false, new GetOOXXCallBack("/ooxx"),"abc");
System.out.println("-------async over----------");
//程序阻塞住,防止程序session丢失
System.in.read();
}
/**
* Session级别的 Watcher
*/
private static class SessionWatcher implements Watcher{
//Watch 的回调方法!
@Override
public void process(WatchedEvent event) {
Event.KeeperState state = event.getState();
Event.EventType type = event.getType();
String path = event.getPath();
System.out.println("new zk watch: "+ event.toString());
switch (state) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
System.out.println("SessionWatcher监听到到程序已连接zk......");
cd.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
}
switch (type) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
}
}
}
/**
* 目录级别的 Watcher
*/
private static class OOXXWatcher implements Watcher{
@Override
public void process(WatchedEvent event) {
System.out.println("触发 OOXXWatcher : "+event.toString());
try {
//true default Watch 被重新注册 new zk的那个watch
System.out.println("再次注册/ooxx下的Watcher为 OOXXWatcher");
zk.getData("/ooxx",this ,stat);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* /OOXX目录下的数据更新 异步回调
*/
private static class GetOOXXCallBack implements AsyncCallback.DataCallback{
private String path;
GetOOXXCallBack(String path){
this.path = path;
}
@Override
public void processResult(int i, String s, Object ctx, byte[] data, Stat stat) {
System.out.println("-------async call back----------");
System.out.println(ctx.toString());
System.out.println(this.path+"目录下的数据为:"+new String(data));
}
}
}