环境搭建
1)创建一个Maven工程
2)添加pom文件
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>
3)拷贝log4j.properties文件到项目根目录
需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
在API中使用指令
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class ZKDemo {
private ZooKeeper zk;
/*
通过代码操作Zookeeper
1.创建Zookeeper的客户端
2.具体API操作
3.关闭资源
*/
@Before
public void before() throws IOException {
//1.创建Zookeeper的客户端
//zookeeper集群中节点的地址
String hostName = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
//session超时时间
int sessionTimeout = 4000;
//Watcher对象是用来接收服务器端的响应事件(总响应事件--一般不用)
zk = new ZooKeeper(hostName, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
}
});
}
@After
public void after(){
// 3.关闭资源
if (zk != null){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//创建子节点
@Test
public void test() throws KeeperException, InterruptedException {
/*
create(final String path, byte data[], List<ACL> acl,
CreateMode createMode)
path : 目标节点的路径
data : 节点存放的数据
acl : 节点的权限
createMode : 节点类型(持久有序列号,持久无序列号,临时有序列号,临时无序列号)
*/
zk.create("/sanguo", "caocaobushiren".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//获取子节点并监听节点变化
@Test
public void test2() throws KeeperException, InterruptedException {
try {
listener();
} catch (Exception e) {
e.printStackTrace();
}
//因为主线程一旦死掉,那么用来监听响应事件的线程也会死掉。
Thread.sleep(Long.MAX_VALUE);
}
public void listener() throws Exception {
/*
getChildren(final String path, Watcher watcher)
path : 目标节点的路径
watcher : 监听器对象--接收响应事件
*/
List<String> children = zk.getChildren("/xiyouji", new Watcher() {
//process方法是真正用来接收响应事件的方法
public void process(WatchedEvent event) {
//1.处理响应事件
System.out.println("节点发生了改变");
//2.再次注册监听
try {
listener();
} catch (Exception e) {
e.printStackTrace();
}
}
});
for (String child : children) {
System.out.println(child);
}
}
//判断节点是否存在
@Test
public void test3() throws KeeperException, InterruptedException {
/*
exists(String path, boolean watch)
path:目标节点的路径
watch : 是否使用总监听器对象
如果节点存在则返回Stat类型的对象,如果节点不存在则返回null
*/
Stat exists = zk.exists("/sanguo2", false);
System.out.println((exists==null)?"false":"true");
}
}
在API实现服务器动态上下线。
说明:
创建的ZKClient是ZKServer的客户端
ZKClient和ZKServer都是ZooKeeper的客户端
代码:
1)创建服务器,复制3台服务器副本,设置不同的参数
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
/*
具体步骤:
1.创建一个临时节点作为Zookeeper客户端
2.先判断父节点是否存在,如果不存在则创建
3.创建临时节点
*/
public class ZKServer {
public static void main(String[] args){
ZooKeeper zk = null;
try {
//1.创建Zookeeper客户端
String hostName = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
int sessionTimeout = 4000;
zk = new ZooKeeper(hostName, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
}
});
//2.判断父节点是否存在
Stat exists = zk.exists("/server", false);
if (exists == null) {
//父节点不存在,则创建--永久节点(父节点必须一直存在,client会监听此节点)
zk.create("/server", "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//3.创建临时节点---不同的服务器临时节点是不一样的所以不能写死。
//因为服务器断开节点就得消失所以节点必须是临时节点
zk.create("/server/" + args[0], args[1].getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
Thread.sleep(Long.MAX_VALUE);//程序不能结束。
}catch (Exception e){
e.printStackTrace();
}finally {
//关闭资源
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
2)创建客户端
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
/*
客户端具体步骤:
1.创建Zookeeper的客户端
2.判断父节点是否存在(因为是先启动服务器-服务器中已经判断过了所以该操作不用做了)
如果允许先启动客户端那么必须做该操作。
3.获取/server节点下的子节点的内容并监听此节点
*/
public class ZKClient {
private static ZooKeeper zk;
public static void main(String[] args){
try {
//1.创建Zookeeper客户端
String hostName = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
int sessionTimeout = 4000;
zk = new ZooKeeper(hostName, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
}
});
//2.判断父节点是否存在(允许先起动客户端)
Stat exists = zk.exists("/server", false);
if (exists == null) {
//父节点不存在,则创建--永久节点(父节点必须一直存在,client会监听此节点)
zk.create("/server", "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//3.获取子节点并监听(一直可以监听-递归)
listener();
//4.程序需要一直运行
Thread.sleep(Long.MAX_VALUE);
}catch (Exception e){
e.printStackTrace();
}finally {
if (zk != null){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void listener() throws Exception {
List<String> children = zk.getChildren("/server", new Watcher() {
//一旦/server节点的子节点发生变化那么就会触发该方法
public void process(WatchedEvent event) {
//1.一旦子节点发生变化那么就重新获取该节点中的内容(那么就知道现在在线的有哪台服务器)
//2.重新注册监听
try {
listener();
} catch (Exception e) {
e.printStackTrace();
}
}
});
for (String child : children) {
System.out.println(child);
}
System.out.println("=====================================================");
}
}