1. Zookeeper

Zookeeper 是 Apache Hadoop 项目下的一个子项目 , 是一个树形目录服务

是一个分布式 开源的分布式应用程序的协调服务

主要功能包括: 配置管理 分布式锁 集群管理

2. 安装

  1. tar -zxvf /opt/software/apache-zookeeper-3.5.6-bin.tar.gz -C /opt/
  2. mv /opt/apache-zookeeper-3.5.6-bin /opt/zookeeper
  3. mkdir zkdata
  4. cd /opt/zookeeper/conf
  5. cp zoo_sample.cfg zoo.cfg
  6. vim zoo.cfg

修改存储路径

  1. dataDir=/opt/zookeeper/zkdata

启动

  1. cd /opt/zookeeper/bin/
  2. ./zkServer.sh start

3. 数据模型

10. Zookeeper - 图1

4. 服务端常用命令

  1. #启动
  2. ./zkServer.sh start
  3. #状态
  4. ./zkServer.sh status
  5. #停止
  6. ./zkServer.sh stop
  7. #重启
  8. ./zkServer.sh restart

5. 客户端常用命令

  1. #连接 如果是本机直接忽略后面的参数 忽略连接必须为默认端口2181
  2. ./zkCli.sh -server localhost:2181
  • create [节点 -e -s -es] 路径 [内容] 创建文件
  • get [节点] 路径 获取文件内容
  • ls 路径 查看路径下的所有文件
  • set [节点] 路径 内容 修改文件内容
  • delete [节点] 路径 删除文件
  • deleteall [节点] 路径 删除该文件夹下的所有文件
  • quit 退出
  • help 帮助

10. Zookeeper - 图2

6. Curator

ZooKeeper 客户端库

  • 原生Java Api
  • ZkClient
  • Curator

https://curator.apache.org/

坐标

  1. <dependencies>
  2. <dependency>
  3. <groupId>junit</groupId>
  4. <artifactId>junit</artifactId>
  5. <version>4.10</version>
  6. <scope>test</scope>
  7. </dependency>
  8. <!--curator-->
  9. <dependency>
  10. <groupId>org.apache.curator</groupId>
  11. <artifactId>curator-framework</artifactId>
  12. <version>4.0.0</version>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.apache.curator</groupId>
  16. <artifactId>curator-recipes</artifactId>
  17. <version>4.0.0</version>
  18. </dependency>
  19. <!--日志-->
  20. <dependency>
  21. <groupId>org.slf4j</groupId>
  22. <artifactId>slf4j-api</artifactId>
  23. <version>1.7.21</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.slf4j</groupId>
  27. <artifactId>slf4j-log4j12</artifactId>
  28. <version>1.7.21</version>
  29. </dependency>
  30. </dependencies>
  31. <build>
  32. <plugins>
  33. <plugin>
  34. <groupId>org.apache.maven.plugins</groupId>
  35. <artifactId>maven-compiler-plugin</artifactId>
  36. <version>3.1</version>
  37. <configuration>
  38. <source>1.8</source>
  39. <target>1.8</target>
  40. </configuration>
  41. </plugin>
  42. </plugins>
  43. </build>

6.1. 获取客户端

  1. //第一种方式
  2. ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000,10);
  3. //多个服务端 connectString参数 地址用逗号隔开 连接超时时间 会话超时时间 重试策略
  4. CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.130.124:2181",
  5. 60 * 1000, 15 * 1000, retryPolicy);
  6. client.start(); //开启连接
  7. client.close();
  8. //第二种方式 namespace为设置根节点路径
  9. client2 = CuratorFrameworkFactory.builder().connectString("192.168.130.124:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace("itheima").build();
  10. client2.start();

6.2. 关闭客户端

  1. client2.close();

6.3. 创建节点

  1. //创建节点 节点默持久化的节点 内容默认将当前客户端ip作为内容
  2. String path = client2.create().forPath("/app1");
  3. System.out.println(path);
  4. //带内容创建
  5. client2.create().forPath("/app2","hello".getBytes(StandardCharsets.UTF_8));
  6. //设置节点类型 使用withMode方法 值为一个枚举值
  7. client2.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
  8. //创建多级节点 creatingParentContainersIfNeeded如果父节点不存在则自动创建
  9. client2.create().creatingParentContainersIfNeeded().forPath("/app4/p1");

6.4. 查询节点

  1. //查询节点
  2. byte[] bytes = client2.getData().forPath("/app1");
  3. System.out.println(new String(bytes));
  4. //查询子节点
  5. List<String> path = client2.getChildren().forPath("/app4");
  6. System.out.println(path);
  7. //查询节点状态信息 ls -s 需要一个Stat类
  8. Stat stat =new Stat();
  9. client2.getData().storingStatIn(stat).forPath("/app1");
  10. System.out.println(stat);

6.5. 修改节点

  1. //修改内容
  2. client2.setData().forPath("/app1","hello".getBytes(StandardCharsets.UTF_8));
  3. //根据版本修改
  4. Stat stat =new Stat();
  5. client2.getData().storingStatIn(stat).forPath("/app1");
  6. System.out.println(stat);
  7. int version = stat.getVersion();
  8. //withVersion 查询版本与之前版本是否相同 相同则修改 不相同则不执行
  9. client2.setData().withVersion(version).forPath("/app1","hello world".getBytes(StandardCharsets.UTF_8));

6.6. 删除节点

  1. //删除节点
  2. client2.delete().forPath("/app1");
  3. //删除带有子节点的节点
  4. client2.delete().deletingChildrenIfNeeded().forPath("/app4");
  5. //必须删除成功
  6. client2.delete().guaranteed().forPath("/app2");
  7. //回调
  8. client2.delete().guaranteed().inBackground(new BackgroundCallback() {
  9. @Override
  10. public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
  11. //curatorEvent 存储删除后的信息 如路径 是否成功
  12. System.out.println(curatorEvent);
  13. }
  14. });

6.7. Watch事件监听

ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事情触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是ZooKeeper实现分布式协调服务的重要特征

Watch机制来实现了发布/订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者

Curator引入了Cache来实现对ZooKeeper服务端事件的监听

  • NodeCache 只是监听某一个特点的节点
  • PathChildrenCache 监控一个ZNdoe的子节点
  • TreeCache 可以监控整个树上的所有节点 类似于NodeCache 和 PathChildrenCache 结合

6.8. NodeCache

  1. //创建NodeCache对象
  2. final NodeCache nodeCache = new NodeCache(client2, "/app1");
  3. //注册监听
  4. nodeCache.getListenable().addListener(new NodeCacheListener() {
  5. @Override
  6. public void nodeChanged() throws Exception {
  7. System.out.println("节点发生变化");
  8. //获取修改节点后的数据
  9. byte[] data = nodeCache.getCurrentData().getData();
  10. System.out.println(new String(data));
  11. }
  12. });
  13. //开启监听 如果为true 则开启监听时,加载缓冲数据
  14. nodeCache.start(true);

发生增删改操作都会触发监听 并且当此方法(线程)结束后不再监听 一般我们持续让此线程存活

6.9. PathChildrenCache

  1. //创建监听器
  2. PathChildrenCache pathChildrenCache = new PathChildrenCache(client2,"/app2",true);
  3. pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
  4. @Override
  5. public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
  6. System.out.println("子节点发生变化");
  7. System.out.println(pathChildrenCacheEvent);
  8. //监听子节点的数据变更,并获取到对应的数据
  9. //1.获取类型
  10. PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
  11. //判断类型
  12. if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
  13. byte[] data = pathChildrenCacheEvent.getData().getData();
  14. System.out.println(new String(data));
  15. }
  16. }
  17. });
  18. pathChildrenCache.start();

当发生连接时 childEvent会执行一次 我们可以通过type来判断

6.10. TreeCache

  1. //创建监听器
  2. TreeCache treeCache = new TreeCache(client2, "/app2");
  3. treeCache.getListenable().addListener(new TreeCacheListener() {
  4. @Override
  5. public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
  6. System.out.println("节点发生变化");
  7. byte[] data = treeCacheEvent.getData().getData();
  8. System.out.println(new String(data));
  9. }
  10. });
  11. treeCache.start();

7. 分布式锁

在以前我们涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下

我们分布式集群工作时,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题

分布式锁———处理跨机器的进程之间的数据同步问题

7.1. Zookeeper分布式锁原理

核心思想: 当客户端要获取锁,则创建节点,使用完锁,则删除该节点

  1. 客户端获取锁时,在lock节点下创建临时顺序节点
  2. 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就任务该客户端获取到了锁.使用完锁后,将该节点删除
  3. 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件
  4. 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应的通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听

7.1.1. Curator 实现分布式锁

在Curator中有五种锁方案

  • InterProcessSemaphoreMutex 分布式排它锁(非可重入锁)
  • InterProcessMutex 分布式可重入排它锁
  • InterProcessReadWriteLock 分布式读写锁
  • InterProcessMultiLock 将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2 共享信号量

获取锁对象

  1. private InterProcessMutex lock;
  2. public Ticket() {
  3. ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 10);
  4. CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.130.124:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace("itheima").build();
  5. client.start();
  6. lock = new InterProcessMutex(client, "lock");
  7. }

获取锁

  1. lock.acquire(3, TimeUnit.SECONDS);

释放锁

  1. lock.release();

8. 集群搭建

先安装ZooKeeper

在每个ZooKeeper的data目录下创建一个myid文件,内容分别是1 2 3 这个文件就是记录每个服务器的ID

  1. #1
  2. echo "1" > /opt/zookeeper/zkdata/myid
  3. #2
  4. echo "2" > /opt/zookeeper/zkdata/myid
  5. #3
  6. echo "2" > /opt/zookeeper/zkdata/myid

配置每一台的ZooKeeper 的 zoo.cfg

  1. vim /opt/zookeeper/conf/zoo.cfg

添加集群地址 ip和端口根据配置修改 .后面为 设置的myid

  1. server.1=realtime-1:2888:3888
  2. server.2=realtime-2:2888:3888
  3. server.3=realtime-3:2888:3888

server.服务器ID=服务器IP地址:服务器之间通讯端口:服务器之间投票选举端口

启动集群

  1. #1
  2. ./opt/zookeeper/bin/zkServer.sh start
  3. #2
  4. ./opt/zookeeper/bin/zkServer.sh start
  5. #3
  6. ./opt/zookeeper/bin/zkServer.sh start

8.1. Leader选举

Serverid:服务器id 编号越大在选择算法的权重越大

Zxid: 数据id 值越大数据越新 在选举中数据越新 权重越大

如果某台ZooKeeper获得超过半数的选票 则此ZooKeeper就可以成为Leader了

9. 模拟集群异常

关闭3号集群

  1. ./opt/zookeeper/bin/zkServer.sh stop

1号和2号状态没有变化

再关闭1号机器

  1. ./opt/zookeeper/bin/zkServer.sh stop

此时只有2号机器 并没有关闭 但处于休眠状态

再把1号机器重新启动

  1. ./opt/zookeeper/bin/zkServer.sh start

2号机器重新运行,

再把3号重新开启, 然后停掉2号

此时重新选举leader 3号超过半数票成为leader

10. 集群角色

  • Leader 领导者

    1. 处理事务请求
    2. 集群内部各服务器的调度者
  • Follower 跟随者

    1. 处理客户端非事务请求,转发事务请求给Leader服务器
    2. 参与Leader选举投票
  • Observer 观察者

    1. 处理客户端非事务请求,转发事务请求给Leader服务器

10. Zookeeper - 图3