zookeeper集群配置
准备3台服务器:
n1:192.168.1.101
n2:192.168.1.102
n3:192.168.1.103
安装zookeeper
cd /optwget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gztar -zxvf zookeeper-3.4.10.tar.gzmv zookeeper-3.4.10 zookeeper
配置zookeeper
cd zookeeper/confcp zoo_sample.cfg zoo.cfgvi zoo.cfgdataDir=/var/zookeeper # 修改zookeeper的数据目录server.1=192.168.1.101:2888:3888 # 指定集群节点server.2=192.168.1.102:2888:3888 # 指定集群节点server.3=192.168.1.103:2888:3888 # 指定集群节点
创建zookeeper数据目录和myid文件:
cd /varmkdir zookeepercd zookeepervi myid
myid里键入各自在配置文件中的服务器编号即可。
启动zookeeper
cd /opt/zookeeper/conf./zkServer.sh start
验证状态
telnet 192.168.1.101 2181
stat获取zookeepr状态
stat
输出:This ZooKeeper instance is not currently serving requests
再启动第二台,超过1/2的zookeeper集群节点正常工作后,zookeeper就可以提供服务了。
重新telnet后用stat查看n1:
Latency min/avg/max: 0/0/0Received: 2Sent: 1Connections: 1Outstanding: 0Zxid: 0x0Mode: followerNode count: 4
查看n2:
Latency min/avg/max: 0/0/0Received: 2Sent: 1Connections: 1Outstanding: 0Zxid: 0x100000000Mode: leaderNode count: 4
可以看到n1是follower角色,n2是leader角色。
zkCli.sh的使用
cd /opt/zookeeper/bin./zkCli.sh -timeout 5000 -server 192.168.1.101:2181
输入h显示客户端可用命令:
hZooKeeper -server host:port cmd argsstat path [watch]set path data [version]ls path [watch]delquota [-n|-b] pathls2 path [watch]setAcl path aclsetquota -n|-b val pathhistoryredo cmdnoprintwatches on|offdelete path [version]sync pathlistquota pathrmr pathget path [watch]create [-s] [-e] path data acladdauth scheme authquitgetAcl pathcloseconnect host:port
开源客户端ZkClient
引入maven依赖
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.6</version></dependency><dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.5</version></dependency>
会话创建
public class CreateSession {public static void main(String[] args) {ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());System.out.println("conneted ok!");}}
节点创建
public class CreateNode {public static void main(String[] args) {ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());System.out.println("conneted ok!");User u = new User();u.setId(1);u.setName("test");String path = zc.create("/jike5", u, CreateMode.PERSISTENT);System.out.println("created path:"+path);}}
获取节点
public class GetData {public static void main(String[] args) {ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());System.out.println("conneted ok!");Stat stat = new Stat();User u = zc.readData("/jike5",stat);System.out.println(u.toString());System.out.println(stat);}}
获取子节点
public class GetChild {public static void main(String[] args) {ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());System.out.println("conneted ok!");List<String> cList = zc.getChildren("/jike5");System.out.println(cList.toString());}}
检测节点
public class NodeExists {public static void main(String[] args) {ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());System.out.println("conneted ok!");boolean e = zc.exists("/jike5");System.out.println(e);}}
节点删除
public class NodeDel {public static void main(String[] args) {ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());System.out.println("conneted ok!");boolean e1 = zc.delete("/jike5");boolean e2 = zc.deleteRecursive("/jike5");}}
数据修改
public class WriteData {public static void main(String[] args) {ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());System.out.println("conneted ok!");User u = new User();u.setId(2);u.setName("test2");zc.writeData("/jike5", u, 1);}}
订阅子节点列表变化
public class SubscribeChildChanges {private static class ZkChildListener implements IZkChildListener{public void handleChildChange(String parentPath,List<String> currentChilds) throws Exception {System.out.println(parentPath);System.out.println(currentChilds.toString());}}public static void main(String[] args) throws InterruptedException {ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());System.out.println("conneted ok!");// 除子节点变化外,节点本身创建和删除也会收到通知zc.subscribeChildChanges("/jike20", new ZkChildListener());Thread.sleep(Integer.MAX_VALUE);}}
订阅数据内容变化
public class SubscribeDataChanges {private static class ZkDataListener implements IZkDataListener{public void handleDataChange(String dataPath, Object data)throws Exception {System.out.println(dataPath+":"+data.toString());}public void handleDataDeleted(String dataPath) throws Exception {System.out.println(dataPath);}}public static void main(String[] args) throws InterruptedException {ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new BytesPushThroughSerializer());System.out.println("conneted ok!");zc.subscribeDataChanges("/jike20", new ZkDataListener());Thread.sleep(Integer.MAX_VALUE);}}
开源客户端Curator
引入maven依赖
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.8.0</version></dependency>
创建会话
public class CreateSession {public static void main(String[] args) throws InterruptedException {//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);// CuratorFramework client = CuratorFrameworkFactory// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.1.105:2181").sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();client.start();Thread.sleep(Integer.MAX_VALUE);}}
创建节点
public class CreateNode {public static void main(String[] args) throws Exception {//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);// CuratorFramework client = CuratorFrameworkFactory// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.1.105:2181").sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();client.start();String path = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/jike/1","123".getBytes());System.out.println(path);Thread.sleep(Integer.MAX_VALUE);}}
节点删除
public class DelNode {public static void main(String[] args) throws Exception {//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);// CuratorFramework client = CuratorFrameworkFactory// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.1.105:2181").sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();client.start();client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/jike20");Thread.sleep(Integer.MAX_VALUE);}}
获取子节点
public class GetChildren {public static void main(String[] args) throws Exception {//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);// CuratorFramework client = CuratorFrameworkFactory// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.1.105:2181").sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();client.start();List<String> cList = client.getChildren().forPath("/jike20");System.out.println(cList.toString());}}
获取节点内容
public class GetData {public static void main(String[] args) throws Exception {//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);// CuratorFramework client = CuratorFrameworkFactory// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.1.105:2181").sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();client.start();Stat stat = new Stat();byte[] ret = client.getData().storingStatIn(stat).forPath("/jike");System.out.println(new String(ret));System.out.println(stat);}}
节点修改
public class UpdateData {public static void main(String[] args) throws Exception {//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);// CuratorFramework client = CuratorFrameworkFactory// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.1.105:2181").sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();client.start();Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/jike");client.setData().withVersion(stat.getVersion()).forPath("/jike", "123".getBytes());}}
检测节点
public class checkexists {public static void main(String[] args) throws Exception {ExecutorService es = Executors.newFixedThreadPool(5);//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);// CuratorFramework client = CuratorFrameworkFactory// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.1.105:2181").sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();client.start();// Stat s = client.checkExists().forPath("/jike");client.checkExists().inBackground(new BackgroundCallback() {public void processResult(CuratorFramework arg0, CuratorEvent arg1)throws Exception {Stat stat = arg1.getStat();System.out.println(stat);System.out.println(arg1.getContext());}},"123",es).forPath("/jike");Thread.sleep(Integer.MAX_VALUE);}}
节点监听
public class NodeListener {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.1.105:2181").sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();client.start();final NodeCache cache = new NodeCache(client,"/jike");cache.start();cache.getListenable().addListener(new NodeCacheListener() {public void nodeChanged() throws Exception {byte[] ret = cache.getCurrentData().getData();System.out.println("new data:"+new String(ret));}});Thread.sleep(Integer.MAX_VALUE);}}
监听子节点
public class NodeChildrenListener {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.1.105:2181").sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();client.start();final PathChildrenCache cache = new PathChildrenCache(client,"/jike",true);cache.start();cache.getListenable().addListener(new PathChildrenCacheListener() {public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)throws Exception {switch (event.getType()) {case CHILD_ADDED:System.out.println("CHILD_ADDED:"+event.getData());break;case CHILD_UPDATED:System.out.println("CHILD_UPDATED:"+event.getData());break;case CHILD_REMOVED:System.out.println("CHILD_REMOVED:"+event.getData());break;default:break;}}});Thread.sleep(Integer.MAX_VALUE);}}
指定权限
public class CreateNodeAuth {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.1.105:2181").sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();client.start();ACL aclIp = new ACL(Perms.READ,new Id("ip","192.168.1.105"));ACL aclDigest = new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("jike:123456")));ArrayList<ACL> acls = new ArrayList<ACL>();acls.add(aclDigest);acls.add(aclIp);String path = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath("/jike/3","123".getBytes());System.out.println(path);Thread.sleep(Integer.MAX_VALUE);}}
