环境搭建
1)创建一个Maven工程
2)添加pom文件
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.7</version></dependency></dependencies>
3)拷贝log4j.properties文件到项目根目录
需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%nlog4j.appender.logfile=org.apache.log4j.FileAppenderlog4j.appender.logfile.File=target/spring.loglog4j.appender.logfile.layout=org.apache.log4j.PatternLayoutlog4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
在API中使用指令
import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.io.IOException;import java.util.List;public class ZKDemo {private ZooKeeper zk;/*通过代码操作Zookeeper1.创建Zookeeper的客户端2.具体API操作3.关闭资源*/@Beforepublic void before() throws IOException {//1.创建Zookeeper的客户端//zookeeper集群中节点的地址String hostName = "hadoop102:2181,hadoop103:2181,hadoop104:2181";//session超时时间int sessionTimeout = 4000;//Watcher对象是用来接收服务器端的响应事件(总响应事件--一般不用)zk = new ZooKeeper(hostName, sessionTimeout, new Watcher() {public void process(WatchedEvent event) {}});}@Afterpublic void after(){// 3.关闭资源if (zk != null){try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}}//创建子节点@Testpublic void test() throws KeeperException, InterruptedException {/*create(final String path, byte data[], List<ACL> acl,CreateMode createMode)path : 目标节点的路径data : 节点存放的数据acl : 节点的权限createMode : 节点类型(持久有序列号,持久无序列号,临时有序列号,临时无序列号)*/zk.create("/sanguo", "caocaobushiren".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}//获取子节点并监听节点变化@Testpublic void test2() throws KeeperException, InterruptedException {try {listener();} catch (Exception e) {e.printStackTrace();}//因为主线程一旦死掉,那么用来监听响应事件的线程也会死掉。Thread.sleep(Long.MAX_VALUE);}public void listener() throws Exception {/*getChildren(final String path, Watcher watcher)path : 目标节点的路径watcher : 监听器对象--接收响应事件*/List<String> children = zk.getChildren("/xiyouji", new Watcher() {//process方法是真正用来接收响应事件的方法public void process(WatchedEvent event) {//1.处理响应事件System.out.println("节点发生了改变");//2.再次注册监听try {listener();} catch (Exception e) {e.printStackTrace();}}});for (String child : children) {System.out.println(child);}}//判断节点是否存在@Testpublic void test3() throws KeeperException, InterruptedException {/*exists(String path, boolean watch)path:目标节点的路径watch : 是否使用总监听器对象如果节点存在则返回Stat类型的对象,如果节点不存在则返回null*/Stat exists = zk.exists("/sanguo2", false);System.out.println((exists==null)?"false":"true");}}
在API实现服务器动态上下线。
说明:
创建的ZKClient是ZKServer的客户端
ZKClient和ZKServer都是ZooKeeper的客户端
代码:
1)创建服务器,复制3台服务器副本,设置不同的参数
import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.io.IOException;/*具体步骤:1.创建一个临时节点作为Zookeeper客户端2.先判断父节点是否存在,如果不存在则创建3.创建临时节点*/public class ZKServer {public static void main(String[] args){ZooKeeper zk = null;try {//1.创建Zookeeper客户端String hostName = "hadoop102:2181,hadoop103:2181,hadoop104:2181";int sessionTimeout = 4000;zk = new ZooKeeper(hostName, sessionTimeout, new Watcher() {public void process(WatchedEvent event) {}});//2.判断父节点是否存在Stat exists = zk.exists("/server", false);if (exists == null) {//父节点不存在,则创建--永久节点(父节点必须一直存在,client会监听此节点)zk.create("/server", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}//3.创建临时节点---不同的服务器临时节点是不一样的所以不能写死。//因为服务器断开节点就得消失所以节点必须是临时节点zk.create("/server/" + args[0], args[1].getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);Thread.sleep(Long.MAX_VALUE);//程序不能结束。}catch (Exception e){e.printStackTrace();}finally {//关闭资源try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}}}
2)创建客户端
import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.io.IOException;import java.util.List;/*客户端具体步骤:1.创建Zookeeper的客户端2.判断父节点是否存在(因为是先启动服务器-服务器中已经判断过了所以该操作不用做了)如果允许先启动客户端那么必须做该操作。3.获取/server节点下的子节点的内容并监听此节点*/public class ZKClient {private static ZooKeeper zk;public static void main(String[] args){try {//1.创建Zookeeper客户端String hostName = "hadoop102:2181,hadoop103:2181,hadoop104:2181";int sessionTimeout = 4000;zk = new ZooKeeper(hostName, sessionTimeout, new Watcher() {public void process(WatchedEvent event) {}});//2.判断父节点是否存在(允许先起动客户端)Stat exists = zk.exists("/server", false);if (exists == null) {//父节点不存在,则创建--永久节点(父节点必须一直存在,client会监听此节点)zk.create("/server", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}//3.获取子节点并监听(一直可以监听-递归)listener();//4.程序需要一直运行Thread.sleep(Long.MAX_VALUE);}catch (Exception e){e.printStackTrace();}finally {if (zk != null){try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}}}public static void listener() throws Exception {List<String> children = zk.getChildren("/server", new Watcher() {//一旦/server节点的子节点发生变化那么就会触发该方法public void process(WatchedEvent event) {//1.一旦子节点发生变化那么就重新获取该节点中的内容(那么就知道现在在线的有哪台服务器)//2.重新注册监听try {listener();} catch (Exception e) {e.printStackTrace();}}});for (String child : children) {System.out.println(child);}System.out.println("=====================================================");}}
