环境搭建

1)创建一个Maven工程
2)添加pom文件

  1. <dependencies>
  2. <dependency>
  3. <groupId>junit</groupId>
  4. <artifactId>junit</artifactId>
  5. <version>RELEASE</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.logging.log4j</groupId>
  9. <artifactId>log4j-core</artifactId>
  10. <version>2.8.2</version>
  11. </dependency>
  12. <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
  13. <dependency>
  14. <groupId>org.apache.zookeeper</groupId>
  15. <artifactId>zookeeper</artifactId>
  16. <version>3.5.7</version>
  17. </dependency>
  18. </dependencies>

3)拷贝log4j.properties文件到项目根目录
需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

  1. log4j.rootLogger=INFO, stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
  5. log4j.appender.logfile=org.apache.log4j.FileAppender
  6. log4j.appender.logfile.File=target/spring.log
  7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
  8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

在API中使用指令

  1. import org.apache.zookeeper.*;
  2. import org.apache.zookeeper.data.Stat;
  3. import org.junit.After;
  4. import org.junit.Before;
  5. import org.junit.Test;
  6. import java.io.IOException;
  7. import java.util.List;
  8. public class ZKDemo {
  9. private ZooKeeper zk;
  10. /*
  11. 通过代码操作Zookeeper
  12. 1.创建Zookeeper的客户端
  13. 2.具体API操作
  14. 3.关闭资源
  15. */
  16. @Before
  17. public void before() throws IOException {
  18. //1.创建Zookeeper的客户端
  19. //zookeeper集群中节点的地址
  20. String hostName = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
  21. //session超时时间
  22. int sessionTimeout = 4000;
  23. //Watcher对象是用来接收服务器端的响应事件(总响应事件--一般不用)
  24. zk = new ZooKeeper(hostName, sessionTimeout, new Watcher() {
  25. public void process(WatchedEvent event) {
  26. }
  27. });
  28. }
  29. @After
  30. public void after(){
  31. // 3.关闭资源
  32. if (zk != null){
  33. try {
  34. zk.close();
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. }
  40. //创建子节点
  41. @Test
  42. public void test() throws KeeperException, InterruptedException {
  43. /*
  44. create(final String path, byte data[], List<ACL> acl,
  45. CreateMode createMode)
  46. path : 目标节点的路径
  47. data : 节点存放的数据
  48. acl : 节点的权限
  49. createMode : 节点类型(持久有序列号,持久无序列号,临时有序列号,临时无序列号)
  50. */
  51. zk.create("/sanguo", "caocaobushiren".getBytes(),
  52. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  53. }
  54. //获取子节点并监听节点变化
  55. @Test
  56. public void test2() throws KeeperException, InterruptedException {
  57. try {
  58. listener();
  59. } catch (Exception e) {
  60. e.printStackTrace();
  61. }
  62. //因为主线程一旦死掉,那么用来监听响应事件的线程也会死掉。
  63. Thread.sleep(Long.MAX_VALUE);
  64. }
  65. public void listener() throws Exception {
  66. /*
  67. getChildren(final String path, Watcher watcher)
  68. path : 目标节点的路径
  69. watcher : 监听器对象--接收响应事件
  70. */
  71. List<String> children = zk.getChildren("/xiyouji", new Watcher() {
  72. //process方法是真正用来接收响应事件的方法
  73. public void process(WatchedEvent event) {
  74. //1.处理响应事件
  75. System.out.println("节点发生了改变");
  76. //2.再次注册监听
  77. try {
  78. listener();
  79. } catch (Exception e) {
  80. e.printStackTrace();
  81. }
  82. }
  83. });
  84. for (String child : children) {
  85. System.out.println(child);
  86. }
  87. }
  88. //判断节点是否存在
  89. @Test
  90. public void test3() throws KeeperException, InterruptedException {
  91. /*
  92. exists(String path, boolean watch)
  93. path:目标节点的路径
  94. watch : 是否使用总监听器对象
  95. 如果节点存在则返回Stat类型的对象,如果节点不存在则返回null
  96. */
  97. Stat exists = zk.exists("/sanguo2", false);
  98. System.out.println((exists==null)?"false":"true");
  99. }
  100. }

在API实现服务器动态上下线。

说明:
创建的ZKClient是ZKServer的客户端
ZKClient和ZKServer都是ZooKeeper的客户端

代码:
1)创建服务器,复制3台服务器副本,设置不同的参数

  1. import org.apache.zookeeper.*;
  2. import org.apache.zookeeper.data.Stat;
  3. import java.io.IOException;
  4. /*
  5. 具体步骤:
  6. 1.创建一个临时节点作为Zookeeper客户端
  7. 2.先判断父节点是否存在,如果不存在则创建
  8. 3.创建临时节点
  9. */
  10. public class ZKServer {
  11. public static void main(String[] args){
  12. ZooKeeper zk = null;
  13. try {
  14. //1.创建Zookeeper客户端
  15. String hostName = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
  16. int sessionTimeout = 4000;
  17. zk = new ZooKeeper(hostName, sessionTimeout, new Watcher() {
  18. public void process(WatchedEvent event) {
  19. }
  20. });
  21. //2.判断父节点是否存在
  22. Stat exists = zk.exists("/server", false);
  23. if (exists == null) {
  24. //父节点不存在,则创建--永久节点(父节点必须一直存在,client会监听此节点)
  25. zk.create("/server", "".getBytes(),
  26. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  27. }
  28. //3.创建临时节点---不同的服务器临时节点是不一样的所以不能写死。
  29. //因为服务器断开节点就得消失所以节点必须是临时节点
  30. zk.create("/server/" + args[0], args[1].getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
  31. CreateMode.EPHEMERAL);
  32. Thread.sleep(Long.MAX_VALUE);//程序不能结束。
  33. }catch (Exception e){
  34. e.printStackTrace();
  35. }finally {
  36. //关闭资源
  37. try {
  38. zk.close();
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. }
  44. }

2)创建客户端

  1. import org.apache.zookeeper.*;
  2. import org.apache.zookeeper.data.Stat;
  3. import java.io.IOException;
  4. import java.util.List;
  5. /*
  6. 客户端具体步骤:
  7. 1.创建Zookeeper的客户端
  8. 2.判断父节点是否存在(因为是先启动服务器-服务器中已经判断过了所以该操作不用做了)
  9. 如果允许先启动客户端那么必须做该操作。
  10. 3.获取/server节点下的子节点的内容并监听此节点
  11. */
  12. public class ZKClient {
  13. private static ZooKeeper zk;
  14. public static void main(String[] args){
  15. try {
  16. //1.创建Zookeeper客户端
  17. String hostName = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
  18. int sessionTimeout = 4000;
  19. zk = new ZooKeeper(hostName, sessionTimeout, new Watcher() {
  20. public void process(WatchedEvent event) {
  21. }
  22. });
  23. //2.判断父节点是否存在(允许先起动客户端)
  24. Stat exists = zk.exists("/server", false);
  25. if (exists == null) {
  26. //父节点不存在,则创建--永久节点(父节点必须一直存在,client会监听此节点)
  27. zk.create("/server", "".getBytes(),
  28. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  29. }
  30. //3.获取子节点并监听(一直可以监听-递归)
  31. listener();
  32. //4.程序需要一直运行
  33. Thread.sleep(Long.MAX_VALUE);
  34. }catch (Exception e){
  35. e.printStackTrace();
  36. }finally {
  37. if (zk != null){
  38. try {
  39. zk.close();
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. }
  45. }
  46. public static void listener() throws Exception {
  47. List<String> children = zk.getChildren("/server", new Watcher() {
  48. //一旦/server节点的子节点发生变化那么就会触发该方法
  49. public void process(WatchedEvent event) {
  50. //1.一旦子节点发生变化那么就重新获取该节点中的内容(那么就知道现在在线的有哪台服务器)
  51. //2.重新注册监听
  52. try {
  53. listener();
  54. } catch (Exception e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. });
  59. for (String child : children) {
  60. System.out.println(child);
  61. }
  62. System.out.println("=====================================================");
  63. }
  64. }