Java客户端

maven依赖

  1. <dependency>
  2. <groupId>org.apache.zookeeper</groupId>
  3. <artifactId>zookeeper</artifactId>
  4. <version>3.5.8</version>
  5. </dependency>

应用示例

  1. @Slf4j
  2. public class ConfigCenter {
  3. private final static String CONNECT_STR="192.168.109.200:2181";
  4. private final static Integer SESSION_TIMEOUT=30*1000;
  5. private static ZooKeeper zooKeeper=null;
  6. private static CountDownLatch countDownLatch=new CountDownLatch(1);
  7. public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
  8. zooKeeper=new ZooKeeper(CONNECT_STR, SESSION_TIMEOUT, new Watcher() {
  9. @Override
  10. public void process(WatchedEvent event) {
  11. if (event.getType()== Event.EventType.None
  12. && event.getState() == Event.KeeperState.SyncConnected){
  13. log.info("连接已建立");
  14. countDownLatch.countDown();
  15. }
  16. }
  17. });
  18. countDownLatch.await();
  19. MyConfig myConfig = new MyConfig();
  20. myConfig.setKey("anykey");
  21. myConfig.setName("anyName");
  22. ObjectMapper objectMapper=new ObjectMapper();
  23. byte[] bytes = objectMapper.writeValueAsBytes(myConfig);
  24. String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  25. Watcher watcher = new Watcher() {
  26. @SneakyThrows
  27. @Override
  28. public void process(WatchedEvent event) {
  29. if (event.getType()== Event.EventType.NodeDataChanged
  30. && event.getPath()!=null && event.getPath().equals("/myconfig")){
  31. log.info(" PATH:{} 发生了数据变化" ,event.getPath());
  32. byte[] data = zooKeeper.getData("/myconfig", this, null);
  33. MyConfig newConfig = objectMapper.readValue(new String(data), MyConfig.class);
  34. log.info("数据发生变化: {}",newConfig);
  35. }
  36. }
  37. };
  38. byte[] data = zooKeeper.getData("/myconfig", watcher, null);
  39. MyConfig originalMyConfig = objectMapper.readValue(new String(data), MyConfig.class);
  40. log.info("原始数据: {}", originalMyConfig);
  41. TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  42. }
  43. }

zookeeper api

  1. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
  2. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, ZKClientConfig)
  3. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider)
  4. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider, ZKClientConfig)
  5. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
  6. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig)
  7. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[])
  8. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[], boolean, HostProvider)
  9. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[], boolean, HostProvider, ZKClientConfig)
  10. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[], boolean)

参数解释

参数名称 含义
connectString ZooKeeper服务器列表,由英文逗号分开的host:port字符串组成,
每一个都代表一台ZooKeeper机器,如,host1:port1,host2:port2,host3:port3。另外,也可以在connectString中设置客户端连接上ZooKeeper
后的根目录,方法是在host:port字符串之后添加上这个根目录,例如,host1:port1,host2:port2,host3:port3/zk-base,这样就指定了该客户端连接上ZooKeeper服务器之后,所有对ZooKeeper
的操作,都会基于这个根目录。例如,客户端对/sub-node 的操作,最终创建 /zk-node/sub-node, 这个目录也叫Chroot,即客户端隔离命名空间。
sessionTimeout 会话的超时时间,单位毫秒。在ZooKeeper中有
会话的概念,在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳检
测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效
的心跳检测,会话就会失效。
watcher Watcher事件通知处理器。可以为null
canBeReadOnly boolean,用于标识当前会话是否支持“read-only(只
读)”模式。默认情况下,在ZooKeeper集群中,一个机器如果和集群中过半及
以上机器失去了网络连接,那么这个机器将不再处理客户端请求(包括读写请
求)。但是在某些使用场景下,当ZooKeeper服务器发生此类故障的时候,我们
还是希望ZooKeeper服务器能够提供读服务(当然写服务肯定无法提供)——
这就是 ZooKeeper的“read-only”模式。
sessionId和 ses
sionPasswd
分别代表会话ID和会话秘钥。这两个参数能够唯一确定一个会话,同时客户
端使用这两个参数可以实现客户端会话复用,从而达到恢复会话的效果。具体
使用方法是,第一次连接上ZooKeeper服务器时,通过调用ZooKeeper对象实
例的以下两个接口,即可获得当前会话的ID和秘钥:
long getSessionId();
byte[]getSessionPasswd( );
荻取到这两个参数值之后,就可以在下次创建ZooKeeper对象实例的时候传
入构造方法了

同步创建节点

  1. @Test
  2. public void createTest() throws KeeperException, InterruptedException {
  3. String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  4. log.info("created path: {}",path);
  5. }

异步创建节点

  1. @Test
  2. public void createAsycTest() throws InterruptedException {
  3. zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
  4. CreateMode.PERSISTENT,
  5. (rc, path, ctx, name) -> log.info("rc {},path {},ctx {},name {}",rc,path,ctx,name),"context");
  6. TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  7. }

修改节点

  1. @Test
  2. public void setTest() throws KeeperException, InterruptedException {
  3. Stat stat = new Stat();
  4. byte[] data = zooKeeper.getData(ZK_NODE, false, stat);
  5. log.info("修改前: {}",new String(data));
  6. zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());
  7. byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);
  8. log.info("修改后: {}",new String(dataAfter));
  9. }

Apache curator 客户端

推荐使用
封装了如下功能

  1. Leader选举
  2. 分布式计数器
  3. 分布式锁

    引入mean

    1. <dependency>
    2. <groupId>org.apache.curator</groupId>
    3. <artifactId>curator-recipes</artifactId>
    4. <version>5.0.0</version>
    5. <exclusions>
    6. <exclusion>
    7. <groupId>org.apache.zookeeper</groupId>
    8. <artifactId>zookeeper</artifactId>
    9. </exclusion>
    10. </exclusions>
    11. </dependency>

    核心类

CuratorFramework
一个zk客户端对象

创建客户端代码

  1. @Before
  2. public void init() {
  3. RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);
  4. curatorFramework = CuratorFrameworkFactory.builder().connectString("192.168.109.200:2181")
  5. //重试策略
  6. .retryPolicy(retryPolicy)
  7. // session超时时间,作用在服务端
  8. .sessionTimeoutMs(sessionTimeoutMs)
  9. // 链接时间,作用在客户端
  10. .connectionTimeoutMs(connectionTimeoutMs)
  11. .canBeReadOnly(true)
  12. .build();
  13. curatorFramework.getConnectionStateListenable().addListener((client, newState) -> {
  14. if (newState == ConnectionState.CONNECTED) {
  15. log.info("连接成功!");
  16. }
  17. });
  18. log.info("连接中......");
  19. curatorFramework.start();
  20. }

RetryPolicy : 重试策略

策略名称 描述
ExponentialBackoffRetry 重试一组次数,重试之间的睡眠时间增加
RetryNTimes 重试最大次数
RetryOneTime 只重试一次
RetryUntilElapsed 在给定的时间结束之前重试

创建节点

  1. // forPath对应路径,信息
  2. String path = curatorFramework.create().forPath("/curator-node");
  3. // withMode :指定节点类型持久还是临时等,
  4. curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node","some-data".getBytes());
  5. // 创建带层级的节点
  6. String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
  7. log.info("curator create node :{} successfully.",path);

获取数据

  1. //获取的是字节数组
  2. byte[] bytes = curatorFramework.getData().forPath("/curator-node");
  3. log.info("get data from node :{} successfully.",new String(bytes));
  4. //inBackground 异步获取
  5. //executorService 指定异步的线程池,可以不执行
  6. ExecutorService executorService = Executors.newSingleThreadExecutor();
  7. String ZK_NODE="/zk-node";
  8. byte[] bytes = curatorFramework.getData().inBackground((client, event) -> {
  9. log.info(" background: {}", event);
  10. }, executorService).forPath(ZK_NODE);

更新节点

  1. curatorFramework.setData().forPath("/curator-node","changed!".getBytes());

删除节点

  1. String pathWithParent="/node-parent";
  2. //guaranteed : 只要改客户端回话有效,就会在后台持续发请求,确保删除
  3. //deletingChildrenIfNeeded : 递归删除子节点
  4. curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);

异步接口

BackgroundCallback

  1. public interface BackgroundCallback
  2. {
  3. /**
  4. * Called when the async background operation completes
  5. *
  6. * @param client the client
  7. * @param event operation result details
  8. * @throws Exception errors
  9. */
  10. public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
  11. }

Curator 监听器:

  1. public interface CuratorListener{
  2. /**
  3. * Called when a background task has completed or a watch has triggered
  4. *
  5. * @param client client
  6. * @param event the event
  7. * @throws Exception any errors
  8. */
  9. public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
  10. }

inBackground 方法会异步获得监听

Curator Caches:

服务端事件监听,本地缓存视图与服务端视图的对比过程
可以反复注册

  1. **nodeCache 节点监听

**

  1. NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);
  2. nodeCache.getListenable().addListener(new NodeCacheListener() {
  3. @Override
  4. public void nodeChanged() throws Exception {
  5. log.info("{} path nodeChanged: ",NODE_CACHE);
  6. CuratorFramework curatorFramework = getCuratorFramework();
  7. byte[] bytes = curatorFramework.getData().forPath(NODE_CACHE);
  8. log.info("data: {}",new String(bytes));
  9. }
  10. });
  11. nodeCache.start();
  1. **path cache子节点监听

**

  1. PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);
  2. pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
  3. @Override
  4. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  5. log.info("event: {}",event);
  6. }
  7. });
  8. // 如果设置为true则在首次启动时就会缓存节点内容到Cache中
  9. pathChildrenCache.start(true);

3. tree cache:
treeNode维护树结构,可以递归子节点数据编号

  1. @Test
  2. public void testTreeCache() throws Exception {
  3. createIfNeed(TREE_CACHE);
  4. TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);
  5. treeCache.getListenable().addListener(new TreeCacheListener() {
  6. @Override
  7. public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
  8. log.info(" tree cache: {}",event);
  9. }
  10. });
  11. treeCache.start();
  12. }

Watcher 监听

  1. private static Watcher watcher = new Watcher() {
  2. @Override
  3. public void process(WatchedEvent event) {
  4. if (event.getType() == Event.EventType.None
  5. && event.getState() == Event.KeeperState.SyncConnected){
  6. countDownLatch.countDown();
  7. log.info(" 连接建立");
  8. // start to watch config
  9. try {
  10. log.info(" 开始监听:{}",ZooDefs.CONFIG_NODE);
  11. zookeeper.getConfig(true,null);
  12. } catch (KeeperException e) {
  13. e.printStackTrace();
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }else if( event.getPath()!=null && event.getPath().equals(ZooDefs.CONFIG_NODE)){
  18. try {
  19. byte[] config = zookeeper.getConfig(this, null);
  20. String clientConfigStr = ConfigUtils.getClientConfigStr(new String(config));
  21. log.info(" 配置发生变更: {}",clientConfigStr);
  22. zookeeper.updateServerList(clientConfigStr.split(" ")[1]);
  23. } catch (KeeperException e) {
  24. e.printStackTrace();
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. };

zookeeper集群

三种类型的角色

  1. Leader 处理写城区,可以处理读请求只能有一个
  2. Follower 只能处理读请求,可以参与节点选举
  3. Observer:只能处理读请求,不可以参与选举,不参与过半写成功

1:修改配置文件

vim conf/zoo1.cfg dataDir=/usr/local/data/zookeeper-1 clientPort=2181 //客户端链接的端口 server.1=127.0.0.1:2001:3001:participant// participant 可以不用写,默认就是participant server.2=127.0.0.1:2002:3002:participant //前面一个是通讯端口,后面一个是选举端口 server.3=127.0.0.1:2003:3003:participant server.4=127.0.0.1:2004:3004:observer

2:标识ServerId

在 dataDir=/usr/local/data/zookeeper-1 路径下
创建myid文件,写入配置的 server.1 中的1

3:依次创建其他文件

以及zoo2.cfg zoo3.cfg zoo4.cfg

4:启动

bin/zkServer.sh start conf/zoo1.cfg bin/zkServer.sh start conf/zoo2.cfg bin/zkServer.sh start conf/zoo3.cfg

5:检测是否启动

zkServer.sh status conf/zoo1.cfg

链接客户端 bin/zkCli.sh -server ip1:port1,ip2:port2,ip3:port3

Zookeeper 3.5.0 新特性: 集群动态配置