环境搭建

pom.xml

  1. <dependencies>
  2. <dependency>
  3. <groupId>junit</groupId>
  4. <artifactId>junit</artifactId>
  5. <version>RELEASE</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.logging.log4j</groupId>
  9. <artifactId>log4j-core</artifactId>
  10. <version>2.8.2</version>
  11. </dependency>
  12. <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
  13. <dependency>
  14. <groupId>org.apache.zookeeper</groupId>
  15. <artifactId>zookeeper</artifactId>
  16. <version>3.5.7</version>
  17. </dependency>
  18. </dependencies>

log4j.properties

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

API

创建ZooKeeper客户端

如果是云服务器的搭建环境,先在本地/etc/hosts配置
Image 1.png

@Before
    public void before() throws IOException {
        //Zookeeper链接地址
        String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";

        //会话超时时间
        int sessionTimeout = 2000;
        //1. 创建一个Zookeeper对象
        zooKeeper = new ZooKeeper(connectString,        //Zookeeper地址
                sessionTimeout,             //超时时间
                new Watcher() {
                    //Zookeeper监听的回调函数
                    //这个就是先前在命令行测试的监听功能
                    //异步通信,我做我的其他事情,我让一个回调函数帮我看着
                    //一旦有反应了,对面就会调用我的回调函数,我的回调函数就会通知我一下
                    //这个回调函数,就不是给mian线程调用的,而是给eventthread调用的
                    public void process(WatchedEvent watchedEvent) {
                    }
                });
    }

创建节点

@Test
    public void create() throws KeeperException, InterruptedException {
        //2. 做事情
        zooKeeper.create(
                "/testAPI",
                "123".getBytes(),
                //访问控制列表,权限控制的
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                //节点类型
                CreateMode.PERSISTENT
        );
    }

Image 3.png

查询子节点并监听

测试监听器的使用

 @Test
public void ls() throws KeeperException, InterruptedException {
    List<String> children = zooKeeper.getChildren(
        "/",
        //这里不写回调函数,写一个true,则调用的是默认的回调函数
        new Watcher() {
            public void process(WatchedEvent event) {
                System.out.println("自定义回调函数:节点变化辣");
            }
        }
    );

    for (String child : children) {
        System.out.println(child);
    }
    //把主线程阻塞住,让监听的子线程得以执行
    Thread.sleep(Long.MAX_VALUE);
}

Image 4.png

修改节点值

主要是dataversion 数据变化号的知识点
乐观锁

 @Test
    public void set() throws KeeperException, InterruptedException {
        String node = "/testAPI";
        Stat stat = zooKeeper.exists(node, false);

        if (stat == null) {
            System.out.println("节点不存在");
        } else {
            System.out.println("修改前version:"+stat.getVersion());
            zooKeeper.setData(
                    "/testAPI",
                    "abc".getBytes(),
                //这里stat.getVersion()其实是一个乐观锁的作用
                //保证我想改的数据,在改的时候,没有被别人修改
                //比如我想改的是4,但是有个人抢先了改了,我改的时候“咦,这个成5了,我改不了
                //异常了”
                //getVersion()的到的是修改的次数,通过这个来判定,修改一次就+1
                    stat.getVersion()
                //防止在修改节点值的时候,出现并发修改出问题的情况,所以有了stat.getVersion()。
                //dataVersion起一个乐观锁的作用,大家都可以修改,但只有一个人能修改成功。
            );
            stat = zooKeeper.exists(node, false);
            System.out.println("修改后version:"+stat.getVersion());
        }
    }

Image 5.png
其他一些API都是类似的操作,就不每一个都写了要用的时候去查就好了。。。。。。。。