这是使用原始的api操作的
可以使用封装的api curator操作zookeeper

0、依赖

  1. <!-- zookeeper-->
  2. <dependency>
  3. <groupId>org.apache.zookeeper</groupId>
  4. <artifactId>zookeeper</artifactId>
  5. <version>3.4.12</version>
  6. </dependency>

1、客户端操作测试

ZooKeeperClientTest

  1. package com.lms.zookeeper;
  2. import org.apache.zookeeper.*;
  3. import org.apache.zookeeper.data.Stat;
  4. import org.junit.Before;
  5. import org.junit.Test;
  6. import java.io.IOException;
  7. import java.util.List;
  8. /**
  9. * @Author: 李孟帅
  10. * @CreateTime: 2020-08-20 22:07
  11. * @Description:
  12. */
  13. public class ZooKeeperClientTest {
  14. private static final String CONNECTION_STRING = "192.168.189.100:2181,192.168.189.101:2181,192.168.189.102:2181";
  15. private static final int SESSION_TIMEOUT = 2000;
  16. private ZooKeeper zk = null;
  17. /**
  18. * @Author 李孟帅
  19. * @Description 获取客户端连接
  20. * @Date 2020/8/20
  21. **/
  22. @Before
  23. public void getConnection() throws IOException {
  24. zk = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, new Watcher() {
  25. @Override
  26. public void process(WatchedEvent event) {
  27. System.out.println(event);
  28. try {
  29. zk.getChildren("/",true);
  30. } catch (KeeperException | InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. });
  35. }
  36. /**
  37. * @Author 李孟帅
  38. * @Description 创建节点
  39. * @Date 2020/8/20
  40. **/
  41. @Test
  42. public void createNode() throws KeeperException, InterruptedException {
  43. String s = zk.create("/app1", "this is a test!".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  44. System.out.println(s);
  45. }
  46. /**
  47. * @Author 李孟帅
  48. * @Description 获取节点数据
  49. * @Date 2020/8/20
  50. **/
  51. @Test
  52. public void getData() throws KeeperException, InterruptedException {
  53. byte[] data = zk.getData("/app1", false, null);
  54. System.out.println(new String(data));
  55. }
  56. /**
  57. * @Author 李孟帅
  58. * @Description 获取子节点
  59. * @Date 2020/8/20
  60. **/
  61. @Test
  62. public void getChildren() throws KeeperException, InterruptedException {
  63. List<String> children = zk.getChildren("/app1", true);
  64. for (String child : children) {
  65. System.out.println(child);
  66. }
  67. Thread.sleep(Long.MAX_VALUE);
  68. }
  69. /**
  70. * @Author 李孟帅
  71. * @Description 修改节点数据
  72. * @Date 2020/8/20
  73. **/
  74. @Test
  75. public void setData() throws KeeperException, InterruptedException {
  76. Stat stat = zk.setData("/app1", "new data".getBytes(), -1);
  77. System.out.println(stat);
  78. }
  79. /**
  80. * @Author 李孟帅
  81. * @Description 删除节点
  82. * @Date 2020/8/20
  83. **/
  84. @Test
  85. public void deleteNode () throws KeeperException, InterruptedException {
  86. zk.delete("/app1",-1);
  87. }
  88. /**
  89. * @Author 李孟帅
  90. * @Description 判断节点是否存在
  91. * @Date 2020/8/21
  92. **/
  93. @Test
  94. public void exist() throws KeeperException, InterruptedException {
  95. Stat exists = zk.exists("/app2", true);
  96. System.out.println(exists);
  97. }
  98. }

2、服务器动态上线下线监听 服务端

DistributedServer

  1. package com.lms.zookeeper;
  2. import org.apache.zookeeper.*;
  3. import java.io.IOException;
  4. /**
  5. * @Author: 李孟帅
  6. * @CreateTime: 2020-08-21 08:29
  7. * @Description:
  8. */
  9. public class DistributedServer {
  10. private static final String CONNECTION_STRING = "192.168.189.100:2181,192.168.189.101:2181,192.168.189.102:2181";
  11. private static final int SESSION_TIMEOUT = 2000;
  12. private ZooKeeper zk = null;
  13. /**
  14. * @Author 李孟帅
  15. * @Description 获取客户端连接
  16. * @Date 2020/8/20
  17. **/
  18. public void getConnection() throws IOException {
  19. zk = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, new Watcher() {
  20. @Override
  21. public void process(WatchedEvent event) {
  22. System.out.println(event);
  23. }
  24. });
  25. }
  26. /**
  27. * @Author 李孟帅
  28. * @Description 注册
  29. * @Date 2020/8/21
  30. **/
  31. public void registryServer(String hostname) throws KeeperException, InterruptedException {
  32. String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  33. System.out.println(hostname + " is online ," + create);
  34. }
  35. /**
  36. * @Author 李孟帅
  37. * @Description 业务功能
  38. * @Date 2020/8/21
  39. **/
  40. public void handleBusiness(String hostname) throws InterruptedException {
  41. System.out.println(hostname+" start working...");
  42. Thread.sleep(Long.MAX_VALUE);
  43. }
  44. public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
  45. //获取zk连接
  46. DistributedServer distributedServer = new DistributedServer();
  47. distributedServer.getConnection();
  48. //利用zk注册服务器信息
  49. distributedServer.registryServer(args[0]);
  50. //启动业务功能
  51. distributedServer.handleBusiness(args[0]);
  52. }
  53. }

3、服务器动态上线下线监听 客户端

DistributedClient

  1. package com.lms.zookeeper;
  2. import org.apache.zookeeper.KeeperException;
  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. import org.apache.zookeeper.ZooKeeper;
  6. import java.io.IOException;
  7. import java.util.ArrayList;
  8. import java.util.List;
  9. /**
  10. * @Author: 李孟帅
  11. * @CreateTime: 2020-08-21 08:48
  12. * @Description:
  13. */
  14. public class DistributedClient {
  15. private static final String CONNECTION_STRING = "192.168.189.100:2181,192.168.189.101:2181,192.168.189.102:2181";
  16. private static final int SESSION_TIMEOUT = 2000;
  17. private ZooKeeper zk = null;
  18. private volatile ArrayList<String> serverList=new ArrayList<String>();
  19. /**
  20. * @Author 李孟帅
  21. * @Description 获取客户端连接
  22. * @Date 2020/8/20
  23. **/
  24. public void getConnection() throws IOException {
  25. zk = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, new Watcher() {
  26. @Override
  27. public void process(WatchedEvent event) {
  28. try {
  29. getServerList();
  30. } catch (KeeperException | InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. });
  35. }
  36. /**
  37. * @Author 李孟帅
  38. * @Description 获取服务器列表
  39. * @Date 2020/8/21
  40. **/
  41. public void getServerList() throws KeeperException, InterruptedException {
  42. //获取子节点信息,并监听父节点
  43. List<String> children = zk.getChildren("/servers", true);
  44. ArrayList<String> servers = new ArrayList<>();
  45. for (String child : children) {
  46. byte[] data = zk.getData("/servers/" + child, false, null);
  47. servers.add(new String(data));
  48. }
  49. serverList=servers;
  50. System.out.println(serverList);
  51. }
  52. //业务功能
  53. public void handleBusiness() throws InterruptedException {
  54. System.out.println("client start working...");
  55. Thread.sleep(Long.MAX_VALUE);
  56. }
  57. public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
  58. //获取zk连接
  59. DistributedClient distributedClient = new DistributedClient();
  60. distributedClient.getConnection();
  61. //获取/servers子节点信息(并监听),从中获取服务器列表
  62. distributedClient.getServerList();
  63. //业务功能
  64. distributedClient.handleBusiness();
  65. }
  66. }