zookeeper集群配置

准备3台服务器:
n1:192.168.1.101
n2:192.168.1.102
n3:192.168.1.103

安装zookeeper

  1. cd /opt
  2. wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz
  3. tar -zxvf zookeeper-3.4.10.tar.gz
  4. mv zookeeper-3.4.10 zookeeper

配置zookeeper

  1. cd zookeeper/conf
  2. cp zoo_sample.cfg zoo.cfg
  3. vi zoo.cfg
  4. dataDir=/var/zookeeper # 修改zookeeper的数据目录
  5. server.1=192.168.1.101:2888:3888 # 指定集群节点
  6. server.2=192.168.1.102:2888:3888 # 指定集群节点
  7. server.3=192.168.1.103:2888:3888 # 指定集群节点

创建zookeeper数据目录和myid文件:

  1. cd /var
  2. mkdir zookeeper
  3. cd zookeeper
  4. vi myid

myid里键入各自在配置文件中的服务器编号即可。

启动zookeeper

  1. cd /opt/zookeeper/conf
  2. ./zkServer.sh start

验证状态

  1. telnet 192.168.1.101 2181

stat获取zookeepr状态

  1. stat

输出:This ZooKeeper instance is not currently serving requests

再启动第二台,超过1/2的zookeeper集群节点正常工作后,zookeeper就可以提供服务了。

重新telnet后用stat查看n1:

  1. Latency min/avg/max: 0/0/0
  2. Received: 2
  3. Sent: 1
  4. Connections: 1
  5. Outstanding: 0
  6. Zxid: 0x0
  7. Mode: follower
  8. Node count: 4

查看n2:

  1. Latency min/avg/max: 0/0/0
  2. Received: 2
  3. Sent: 1
  4. Connections: 1
  5. Outstanding: 0
  6. Zxid: 0x100000000
  7. Mode: leader
  8. Node count: 4

可以看到n1是follower角色,n2是leader角色。

zkCli.sh的使用

  1. cd /opt/zookeeper/bin
  2. ./zkCli.sh -timeout 5000 -server 192.168.1.101:2181

输入h显示客户端可用命令:

  1. h
  2. ZooKeeper -server host:port cmd args
  3. stat path [watch]
  4. set path data [version]
  5. ls path [watch]
  6. delquota [-n|-b] path
  7. ls2 path [watch]
  8. setAcl path acl
  9. setquota -n|-b val path
  10. history
  11. redo cmdno
  12. printwatches on|off
  13. delete path [version]
  14. sync path
  15. listquota path
  16. rmr path
  17. get path [watch]
  18. create [-s] [-e] path data acl
  19. addauth scheme auth
  20. quit
  21. getAcl path
  22. close
  23. connect host:port

开源客户端ZkClient

引入maven依赖

  1. <dependency>
  2. <groupId>org.apache.zookeeper</groupId>
  3. <artifactId>zookeeper</artifactId>
  4. <version>3.4.6</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.101tec</groupId>
  8. <artifactId>zkclient</artifactId>
  9. <version>0.5</version>
  10. </dependency>

会话创建

  1. public class CreateSession {
  2. public static void main(String[] args) {
  3. ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
  4. System.out.println("conneted ok!");
  5. }
  6. }

节点创建

  1. public class CreateNode {
  2. public static void main(String[] args) {
  3. ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
  4. System.out.println("conneted ok!");
  5. User u = new User();
  6. u.setId(1);
  7. u.setName("test");
  8. String path = zc.create("/jike5", u, CreateMode.PERSISTENT);
  9. System.out.println("created path:"+path);
  10. }
  11. }

获取节点

  1. public class GetData {
  2. public static void main(String[] args) {
  3. ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
  4. System.out.println("conneted ok!");
  5. Stat stat = new Stat();
  6. User u = zc.readData("/jike5",stat);
  7. System.out.println(u.toString());
  8. System.out.println(stat);
  9. }
  10. }

获取子节点

  1. public class GetChild {
  2. public static void main(String[] args) {
  3. ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
  4. System.out.println("conneted ok!");
  5. List<String> cList = zc.getChildren("/jike5");
  6. System.out.println(cList.toString());
  7. }
  8. }

检测节点

  1. public class NodeExists {
  2. public static void main(String[] args) {
  3. ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
  4. System.out.println("conneted ok!");
  5. boolean e = zc.exists("/jike5");
  6. System.out.println(e);
  7. }
  8. }

节点删除

  1. public class NodeDel {
  2. public static void main(String[] args) {
  3. ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
  4. System.out.println("conneted ok!");
  5. boolean e1 = zc.delete("/jike5");
  6. boolean e2 = zc.deleteRecursive("/jike5");
  7. }
  8. }

数据修改

  1. public class WriteData {
  2. public static void main(String[] args) {
  3. ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
  4. System.out.println("conneted ok!");
  5. User u = new User();
  6. u.setId(2);
  7. u.setName("test2");
  8. zc.writeData("/jike5", u, 1);
  9. }
  10. }

订阅子节点列表变化

  1. public class SubscribeChildChanges {
  2. private static class ZkChildListener implements IZkChildListener{
  3. public void handleChildChange(String parentPath,
  4. List<String> currentChilds) throws Exception {
  5. System.out.println(parentPath);
  6. System.out.println(currentChilds.toString());
  7. }
  8. }
  9. public static void main(String[] args) throws InterruptedException {
  10. ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
  11. System.out.println("conneted ok!");
  12. // 除子节点变化外,节点本身创建和删除也会收到通知
  13. zc.subscribeChildChanges("/jike20", new ZkChildListener());
  14. Thread.sleep(Integer.MAX_VALUE);
  15. }
  16. }

订阅数据内容变化

  1. public class SubscribeDataChanges {
  2. private static class ZkDataListener implements IZkDataListener{
  3. public void handleDataChange(String dataPath, Object data)
  4. throws Exception {
  5. System.out.println(dataPath+":"+data.toString());
  6. }
  7. public void handleDataDeleted(String dataPath) throws Exception {
  8. System.out.println(dataPath);
  9. }
  10. }
  11. public static void main(String[] args) throws InterruptedException {
  12. ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new BytesPushThroughSerializer());
  13. System.out.println("conneted ok!");
  14. zc.subscribeDataChanges("/jike20", new ZkDataListener());
  15. Thread.sleep(Integer.MAX_VALUE);
  16. }
  17. }

开源客户端Curator

引入maven依赖

  1. <dependency>
  2. <groupId>org.apache.curator</groupId>
  3. <artifactId>curator-framework</artifactId>
  4. <version>2.8.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.curator</groupId>
  8. <artifactId>curator-recipes</artifactId>
  9. <version>2.8.0</version>
  10. </dependency>

创建会话

  1. public class CreateSession {
  2. public static void main(String[] args) throws InterruptedException {
  3. //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
  4. //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
  5. RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
  6. // CuratorFramework client = CuratorFrameworkFactory
  7. // .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
  8. CuratorFramework client = CuratorFrameworkFactory
  9. .builder()
  10. .connectString("192.168.1.105:2181")
  11. .sessionTimeoutMs(5000)
  12. .connectionTimeoutMs(5000)
  13. .retryPolicy(retryPolicy)
  14. .build();
  15. client.start();
  16. Thread.sleep(Integer.MAX_VALUE);
  17. }
  18. }

创建节点

  1. public class CreateNode {
  2. public static void main(String[] args) throws Exception {
  3. //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
  4. //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
  5. RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
  6. // CuratorFramework client = CuratorFrameworkFactory
  7. // .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
  8. CuratorFramework client = CuratorFrameworkFactory
  9. .builder()
  10. .connectString("192.168.1.105:2181")
  11. .sessionTimeoutMs(5000)
  12. .connectionTimeoutMs(5000)
  13. .retryPolicy(retryPolicy)
  14. .build();
  15. client.start();
  16. String path = client.create()
  17. .creatingParentsIfNeeded()
  18. .withMode(CreateMode.EPHEMERAL)
  19. .forPath("/jike/1","123".getBytes());
  20. System.out.println(path);
  21. Thread.sleep(Integer.MAX_VALUE);
  22. }
  23. }

节点删除

  1. public class DelNode {
  2. public static void main(String[] args) throws Exception {
  3. //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
  4. //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
  5. RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
  6. // CuratorFramework client = CuratorFrameworkFactory
  7. // .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
  8. CuratorFramework client = CuratorFrameworkFactory
  9. .builder()
  10. .connectString("192.168.1.105:2181")
  11. .sessionTimeoutMs(5000)
  12. .connectionTimeoutMs(5000)
  13. .retryPolicy(retryPolicy)
  14. .build();
  15. client.start();
  16. client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/jike20");
  17. Thread.sleep(Integer.MAX_VALUE);
  18. }
  19. }

获取子节点

  1. public class GetChildren {
  2. public static void main(String[] args) throws Exception {
  3. //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
  4. //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
  5. RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
  6. // CuratorFramework client = CuratorFrameworkFactory
  7. // .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
  8. CuratorFramework client = CuratorFrameworkFactory
  9. .builder()
  10. .connectString("192.168.1.105:2181")
  11. .sessionTimeoutMs(5000)
  12. .connectionTimeoutMs(5000)
  13. .retryPolicy(retryPolicy)
  14. .build();
  15. client.start();
  16. List<String> cList = client.getChildren().forPath("/jike20");
  17. System.out.println(cList.toString());
  18. }
  19. }

获取节点内容

  1. public class GetData {
  2. public static void main(String[] args) throws Exception {
  3. //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
  4. //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
  5. RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
  6. // CuratorFramework client = CuratorFrameworkFactory
  7. // .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
  8. CuratorFramework client = CuratorFrameworkFactory
  9. .builder()
  10. .connectString("192.168.1.105:2181")
  11. .sessionTimeoutMs(5000)
  12. .connectionTimeoutMs(5000)
  13. .retryPolicy(retryPolicy)
  14. .build();
  15. client.start();
  16. Stat stat = new Stat();
  17. byte[] ret = client.getData().storingStatIn(stat).forPath("/jike");
  18. System.out.println(new String(ret));
  19. System.out.println(stat);
  20. }
  21. }

节点修改

  1. public class UpdateData {
  2. public static void main(String[] args) throws Exception {
  3. //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
  4. //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
  5. RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
  6. // CuratorFramework client = CuratorFrameworkFactory
  7. // .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
  8. CuratorFramework client = CuratorFrameworkFactory
  9. .builder()
  10. .connectString("192.168.1.105:2181")
  11. .sessionTimeoutMs(5000)
  12. .connectionTimeoutMs(5000)
  13. .retryPolicy(retryPolicy)
  14. .build();
  15. client.start();
  16. Stat stat = new Stat();
  17. client.getData().storingStatIn(stat).forPath("/jike");
  18. client.setData().withVersion(stat.getVersion()).forPath("/jike", "123".getBytes());
  19. }
  20. }

检测节点

  1. public class checkexists {
  2. public static void main(String[] args) throws Exception {
  3. ExecutorService es = Executors.newFixedThreadPool(5);
  4. //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
  5. //RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
  6. RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
  7. // CuratorFramework client = CuratorFrameworkFactory
  8. // .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
  9. CuratorFramework client = CuratorFrameworkFactory
  10. .builder()
  11. .connectString("192.168.1.105:2181")
  12. .sessionTimeoutMs(5000)
  13. .connectionTimeoutMs(5000)
  14. .retryPolicy(retryPolicy)
  15. .build();
  16. client.start();
  17. // Stat s = client.checkExists().forPath("/jike");
  18. client.checkExists().inBackground(new BackgroundCallback() {
  19. public void processResult(CuratorFramework arg0, CuratorEvent arg1)
  20. throws Exception {
  21. Stat stat = arg1.getStat();
  22. System.out.println(stat);
  23. System.out.println(arg1.getContext());
  24. }
  25. },"123",es).forPath("/jike");
  26. Thread.sleep(Integer.MAX_VALUE);
  27. }
  28. }

节点监听

  1. public class NodeListener {
  2. public static void main(String[] args) throws Exception {
  3. RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
  4. CuratorFramework client = CuratorFrameworkFactory
  5. .builder()
  6. .connectString("192.168.1.105:2181")
  7. .sessionTimeoutMs(5000)
  8. .connectionTimeoutMs(5000)
  9. .retryPolicy(retryPolicy)
  10. .build();
  11. client.start();
  12. final NodeCache cache = new NodeCache(client,"/jike");
  13. cache.start();
  14. cache.getListenable().addListener(new NodeCacheListener() {
  15. public void nodeChanged() throws Exception {
  16. byte[] ret = cache.getCurrentData().getData();
  17. System.out.println("new data:"+new String(ret));
  18. }
  19. });
  20. Thread.sleep(Integer.MAX_VALUE);
  21. }
  22. }

监听子节点

  1. public class NodeChildrenListener {
  2. public static void main(String[] args) throws Exception {
  3. RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
  4. CuratorFramework client = CuratorFrameworkFactory
  5. .builder()
  6. .connectString("192.168.1.105:2181")
  7. .sessionTimeoutMs(5000)
  8. .connectionTimeoutMs(5000)
  9. .retryPolicy(retryPolicy)
  10. .build();
  11. client.start();
  12. final PathChildrenCache cache = new PathChildrenCache(client,"/jike",true);
  13. cache.start();
  14. cache.getListenable().addListener(new PathChildrenCacheListener() {
  15. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
  16. throws Exception {
  17. switch (event.getType()) {
  18. case CHILD_ADDED:
  19. System.out.println("CHILD_ADDED:"+event.getData());
  20. break;
  21. case CHILD_UPDATED:
  22. System.out.println("CHILD_UPDATED:"+event.getData());
  23. break;
  24. case CHILD_REMOVED:
  25. System.out.println("CHILD_REMOVED:"+event.getData());
  26. break;
  27. default:
  28. break;
  29. }
  30. }
  31. });
  32. Thread.sleep(Integer.MAX_VALUE);
  33. }
  34. }

指定权限

  1. public class CreateNodeAuth {
  2. public static void main(String[] args) throws Exception {
  3. RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
  4. CuratorFramework client = CuratorFrameworkFactory
  5. .builder()
  6. .connectString("192.168.1.105:2181")
  7. .sessionTimeoutMs(5000)
  8. .connectionTimeoutMs(5000)
  9. .retryPolicy(retryPolicy)
  10. .build();
  11. client.start();
  12. ACL aclIp = new ACL(Perms.READ,new Id("ip","192.168.1.105"));
  13. ACL aclDigest = new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("jike:123456")));
  14. ArrayList<ACL> acls = new ArrayList<ACL>();
  15. acls.add(aclDigest);
  16. acls.add(aclIp);
  17. String path = client.create()
  18. .creatingParentsIfNeeded()
  19. .withMode(CreateMode.PERSISTENT)
  20. .withACL(acls)
  21. .forPath("/jike/3","123".getBytes());
  22. System.out.println(path);
  23. Thread.sleep(Integer.MAX_VALUE);
  24. }
  25. }