引入maven:

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.zookeeper</groupId>
    4. <artifactId>zookeeper</artifactId>
    5. <version>{服务器的zk是什么版本这里就引入什么版本}</version>
    6. </dependency>
    7. </dependencies>

    java:

    1. import org.apache.zookeeper.*;
    2. import org.apache.zookeeper.data.Stat;
    3. import java.io.IOException;
    4. import java.util.concurrent.CountDownLatch;
    5. public class TestZookeeperClient {
    6. static CountDownLatch cd = new CountDownLatch(1);
    7. static Stat stat=new Stat();
    8. static ZooKeeper zk;
    9. static {
    10. try {
    11. zk = new ZooKeeper("192.168.235.133:2181,192.168.235.133:2182,192.168.235.133:2183,192.168.235.133:2184",
    12. 3000, new SessionWatcher() );
    13. } catch (IOException e) {
    14. e.printStackTrace();
    15. }
    16. }
    17. public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
    18. System.out.println( "Hello World!" );
    19. //zk是有session概念的,没有连接池的概念
    20. //watch:观察,回调
    21. //watch的注册值发生在 读类型调用,get,exites。。。
    22. //第一类:new zk 时候,传入的watch,这个watch,session级别的,跟path 、node没有关系。
    23. System.out.println("程序卡主等待连接zk...");
    24. cd.await();
    25. ZooKeeper.States state = zk.getState();
    26. switch (state) {
    27. case CONNECTING:
    28. System.out.println("ing......");
    29. break;
    30. case ASSOCIATING:
    31. break;
    32. case CONNECTED:
    33. System.out.println("程序已连接zk......");
    34. break;
    35. case CONNECTEDREADONLY:
    36. break;
    37. case CLOSED:
    38. break;
    39. case AUTH_FAILED:
    40. break;
    41. case NOT_CONNECTED:
    42. break;
    43. }
    44. //创建/ooxx的临时目录。存放数据为 "olddata"
    45. String pathName = zk.create("/ooxx", "olddata".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    46. //获取/ooxx下的数据,并在数据修改时触发一次new OOXXWatcher()
    47. byte[] node = zk.getData("/ooxx", new OOXXWatcher() , stat);
    48. System.out.println("/ooxx下的数据为" + new String(node));
    49. //触发回调
    50. Stat stat1 = zk.setData("/ooxx", "newdata".getBytes(), 0);
    51. //再次在/ooxx的临时目录下设置新的数据
    52. Stat stat2 = zk.setData("/ooxx", "newdata01".getBytes(), stat1.getVersion());
    53. System.out.println("-------async start----------");
    54. zk.getData("/ooxx", false, new GetOOXXCallBack("/ooxx"),"abc");
    55. System.out.println("-------async over----------");
    56. //程序阻塞住,防止程序session丢失
    57. System.in.read();
    58. }
    59. /**
    60. * Session级别的 Watcher
    61. */
    62. private static class SessionWatcher implements Watcher{
    63. //Watch 的回调方法!
    64. @Override
    65. public void process(WatchedEvent event) {
    66. Event.KeeperState state = event.getState();
    67. Event.EventType type = event.getType();
    68. String path = event.getPath();
    69. System.out.println("new zk watch: "+ event.toString());
    70. switch (state) {
    71. case Unknown:
    72. break;
    73. case Disconnected:
    74. break;
    75. case NoSyncConnected:
    76. break;
    77. case SyncConnected:
    78. System.out.println("SessionWatcher监听到到程序已连接zk......");
    79. cd.countDown();
    80. break;
    81. case AuthFailed:
    82. break;
    83. case ConnectedReadOnly:
    84. break;
    85. case SaslAuthenticated:
    86. break;
    87. case Expired:
    88. break;
    89. }
    90. switch (type) {
    91. case None:
    92. break;
    93. case NodeCreated:
    94. break;
    95. case NodeDeleted:
    96. break;
    97. case NodeDataChanged:
    98. break;
    99. case NodeChildrenChanged:
    100. break;
    101. }
    102. }
    103. }
    104. /**
    105. * 目录级别的 Watcher
    106. */
    107. private static class OOXXWatcher implements Watcher{
    108. @Override
    109. public void process(WatchedEvent event) {
    110. System.out.println("触发 OOXXWatcher : "+event.toString());
    111. try {
    112. //true default Watch 被重新注册 new zk的那个watch
    113. System.out.println("再次注册/ooxx下的Watcher为 OOXXWatcher");
    114. zk.getData("/ooxx",this ,stat);
    115. } catch (KeeperException e) {
    116. e.printStackTrace();
    117. } catch (InterruptedException e) {
    118. e.printStackTrace();
    119. }
    120. }
    121. }
    122. /**
    123. * /OOXX目录下的数据更新 异步回调
    124. */
    125. private static class GetOOXXCallBack implements AsyncCallback.DataCallback{
    126. private String path;
    127. GetOOXXCallBack(String path){
    128. this.path = path;
    129. }
    130. @Override
    131. public void processResult(int i, String s, Object ctx, byte[] data, Stat stat) {
    132. System.out.println("-------async call back----------");
    133. System.out.println(ctx.toString());
    134. System.out.println(this.path+"目录下的数据为:"+new String(data));
    135. }
    136. }
    137. }