1、zookeeper介绍
1.1、zookeeper概念
zookeeper是一个开源的分布式协调服务框架,主要解决分布式集群中应用系统的一致性问题和数据管理问题。
1.2、zookeeper特点
zookeeper本质上是一个分布式文件系统,适合存放小文件,也可以理解为一个数据库
zookeeper中存放的其实是一个又一个的znode,znode是zookeeper中的节点
znode是有路径的,例如:/data/host1,/data/host2,这个路径也可以理解为是znode的Name
znode也可以携带数据,例如说某个znode的路径是:/data/host1,其值是一个字符串“192.168.0.1”
正因为znode的特性,所以zookeeper可以对外提供出一个类似于文件系统的视图,可以通过操作文件系统的方式操作zookeeper
使用路径获取znode
获取znode携带的数据
修改znode携带的数据
删除znode
添加znode
2、zookeeper架构
zookeeper集群是一个基于主从架构的高可用集群
每个服务器承担以下三种角色中的某一种:
Leader:一个zookeeper集群同一时间知会有一个实际工作的Leader,它会发起并维护与各Follower
和Observer之间的心跳,所有的写操作必须要通过Leader完成再由Leader将写操作广播给其他服务
器。
Follower:一个zookeeper集群中可能同时存在多个Follower,它会影响Leader的心跳。
Leader可直接处理并返回给客户端的读请求,同时会将写请求转发给Leader处理,并且负责在Leader写请求时对请求进行投票。
Observer:Observer和Follower类似,但无投票权。
3、zookeeper的应用场景
3.1、数据发布/订阅
数据发布/订阅系统,需要发布者将数据发布到zookeeper节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置信息的集中式管理和数据的动态更新。
发布/订阅一般有两种设计模式:推模式和拉模式,服务端主动将数据更新发布给所有订阅者的客户端称为推模式;客户端主动请求获取最新数据称为拉模式。
zookeeper采用了推拉相结合的模式,客户端向服务端注册自己需要关注的节点,一旦该节点的数据发生变化,那么服务端就会向相应的客户端推送watcher事件通知,客户端接收到此通知后,主动到服务端获取最新的数据。
3.2、命名服务
3.3、分布式协调/通知
①心跳检测
②工作进度汇报
③系统调度
3.4、分布式锁
分布式锁用于控制分布式系统之间同步访问共享资源的一种方式,可以保证不同系统访问一个或一组资源时的一致性,主要分为排他锁和共享锁。
(1)排他锁又称为写锁或独占锁:若事务T1对数据对象O1加上了排他锁,那么在整个加锁期间,仅允许事务T1对O1进行读取和更新操作,其他任何事物不能再对这个数据对象进行任何类型的操作,直到T1释放了排他锁。
①读取锁:在需要获取排他锁时,所有客户端通过调用接口,在/exclusive_lock节点下面创建临时子节点/exclusive_lock/lock。zookeeper可以保证只有一个客户端能创建成功,没有成功的客户端需要注册/exclusive_lock节点监听。
②释放锁:当获取到排他锁的客户端宕机或者完成任务都会导致临时节点的删除,此时,所有在/exclusive_lock节点监听的客户端都会收到通知,可以重新发起分布式锁获取。
(2)共享锁又称为读锁:若事务T1对数据对象O1加上共享锁,那么当前事务只能对O1进行读取操作,其它事务也只能对这个对象加共享锁,直到该数据上的所有共享锁都被释放,在需要获取共享锁时,所有客户端都会到/shared_lock下面创建一个临时顺序节点。
4、zookeeper的选举机制
Leader选举时保证分布式数据一致性的关键性所在,当zookeeper集群中的一台服务器出现以下两种情况之一时,需要进行Leader选举。
4.1、服务器启动时期的Leader选举

补充:其中myid相当于每个服务器自己的id标识,ZXID的值越大,说明数据越新。所以myid和ZXID越大被选的可能性越高。

4.2、服务器运行时期的Leader选举
在zookeeper运行期间,Leader与非Leader服务器各司其职,即便有非Leader服务器宕机或者新的服务器加入,此时也不会影响Leader,但是一旦Leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮的选举,其过程和启动时期的Leader选举过程基本一致,过程相同。
5、zookeeper环境搭建
第一步:上传压缩包
(1)rz -E 上传本地文件
(2)ls 查看是否上传成功
(3)mv zookeeper-3.4.9.tar.gz /export/softwares/ 将zookeeper压缩包移动
到/export/softwares/目录下
(4)cd /export/softwares/
(5)ls 查看是否移动成功
第二步:解压
(6)tar -zxvf zookeeper-3.4.9.tar.gz -C ../servers/ 将zookeeper解压到servers下
cd ..
cd servers/ 到servers目录下
(7)ls 查看是否解压成功
第三步:修改配置文件
(8)cd /export/servers/zookeeper-3.4.9/conf/
(9)cp zoo_sample.cfg zoo.cfg
(10)mkdir -p /export/servers/zookeeper-3.4.9/zkdatas 先创建zkdatas
(11)vim zoo.cfg
修改或添加以下内容:
第一个在文件中找到dataDir 并将后面的内容填上即可
最后三行加在配置文件的后面就行了
剩下的两个直接在文件中找到,然后取消注释即可
第四步:添加myid的配置
(12)cd /export/servers/zookeeper-3.4.9/zkdatas/ 到zkdatas下创建文件myid
(13)vim myid
在里面写一个 1 然后保存退出即可
这个1就是第一台机子的myid
第五步:安装包分发并修改myid的值
scp -r /export/servers/zookeeper-3.4.9/ node02:/export/servers/
scp -r /export/servers/zookeeper-3.4.9/ node03:/export/servers/
第二台机子上myid的值修改为2
到node02中:
cd /export/servers/ 切换到这个目录
ls 查看是否拷贝成功
cd zookeeper-3.4.9/
ls 查看是否有zkdatas文件存在
cd zkdatas/ 切换到zkdatas目录下
ls 查看myid这个文件是否存在
vim myid 进去将 1 改为 2
第三台机子同样操作
第六步:三台机器启动zookeeper服务
这个命令三台机器都要执行:
开启zookeeper服务:
/export/servers/zookeeper-3.4.9/bin/zkServer.sh start
在zookeeper-3.4.9下执行jps可以查看是否启动成功
若看到:
四个数字 QuorumPeerMain 则代表启动成功
查看启动状态:
/export/servers/zookeeper-3.4.9/bin/zkServer.sh status
6、zookeeper的数据模型
zookeeper的数据模型,在结构上和标准文件系统的非常相似,拥有一个层次的命名空间,都是采用树形层次结构。
zookeeper树中的每个节点被称为一个Znode,和文件系统的目录树一样,zookeeper树中的每个节点可以拥有子节点。
但也有不同之处:
(1)Znode兼具文件和目录两种特点:
(2)Znode存储数据大小有限制,每个Znode数据大小最多为1M
(3)Znode通过路径引用,如同Unix中的文件路劲。路径必须是绝对的
(4)每个Znode节点由三部分组成:
stat:此为状态信息,存放Znode的版本、权限等信息。
data:与该Znode关联的数据。
children:该Znode下的子节点
7、zookeeper的节点特性:
7.1、Znode有两种
分别为临时节点和永久节点,结点的类型在创建时就已经被确定,并且不能改变。
临时节点:该节点的生命周期依赖于创建他们的会话,一旦会话结束,临时节点将被自动删除,也可以手动删除,临时节点不允许有子节点
永久节点:该节点的生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,他们才能被删除。
7.2、Znode还有一个序列化的特性
8、zookeeper的shell客户端操作
8.1、登陆zookeeper客户端
命令:
先进入到zookeeper里面,然后执行:
bin/zkCli.sh -server node01:2181
其中bin/zkCli.sh是zookeeper的一个文件路径,进去之后选择想要连接的客户端
出现zk:node01:2181(CONNECTED)就说明连接成功
输入quit可以退出
然后在zookeeper里操作一些命令
8.2、zookeeper客户端操作命令
8.3、操作实例
列出path下所有的znode
ls /
创建永久节点
create /hello world world为给这个永久节点hello携带的数据
创建临时节点
create -e /tmp world
创建永久序列化节点
create -s /hello1 world
创建临时序列化节点
create -s -e /tmp1 world
给节点创建子节点
create /hello/aaa world aaa为hello的子节点
修改节点数据
set /hello nihao
获取节点属性
get /hello
删除节点,如果要删除的节点有子Znode,就无法删除
delete /hello
删除节点,如果有子节点就递归删除
rmr /abc
列出历史记录命令
history
8.4、查看Znode节点属性
每个znode都包含了一系列的属性,通过命令get,可以获得节点的属性
其中一些比较重要的属性:
8.5、zookeeper的watch机制
总体来说:watch机制有两个作用:
(1)发布和订阅
(2)监控集群中主机的存活状态
通知类似于数据库中的触发器,对某个Znode设置Watcher,当Znode发生变化的时候,WatchManager会调用对应的Watcher
当Znode发生删除、修改、创建,子节点修改的时候,对应的Watcher会收到通知。
Watcher的特点:
一次性触发:一个Watcher只会被触发一次,如果需要继续监听,则需要再次添加Watcher
事件封装:Watcher得到的事件是被封装过的,包括三个内容:keeperstate、eventType、path
9、zookeeper的JavaAPI操作
这里操作zookeeper的JavaAPI使用的是一套zookeeper客户端框架Curator,解决了很多zookeeper客户端非常底层的细节开发工作
Curator包含几个包:
curator-framework:对zookeeper的底层api的一些封装
curator-recipes:封装了一些高级特性
9.1、创建java工程,导入jar包
创建maven java工程,导入jar包
9.2、节点的操作
9.2.1、创建永久节点:
创建一个测试方法:
package com.itheima.zookeeper_api;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class ZookeeperAPITest {
@Test<br /> public void createZnode() throws Exception {<br /> //1、定制一个重试策略<br /> /*<br /> * param1:重试的时间间隔<br /> * param2:最大的重试次数<br /> * */<br /> RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);<br /> //2、获取一个客户端对象<br /> /*<br /> * param1:要连接的zookeeper服务器列表<br /> * param2:会话的超时时间<br /> * param3:链接的超时时间<br /> * param4:重试策略<br /> * */<br /> String connectionStr = "192.168.174.100:2181,192.168.174.110:2181,192.168.174.120:2181";<br /> CuratorFramework client = CuratorFrameworkFactory._newClient_(connectionStr, 8000, 8000, retryPolicy);<br /> //3、开启客户端<br /> client.start();<br /> //4、创建节点<br /> client.create().creatingParentsIfNeeded().withMode(CreateMode._PERSISTENT_).forPath("/hello2","world".getBytes());<br /> //5、关闭客户端<br /> client.close();<br /> }<br />}
9.2.2、创建临时节点:
创建一个测试方法:
package com.itheima.zookeeper_api;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class ZookeeperAPITmpTest {
@Test<br /> public void createZnode() throws Exception {<br /> //1、定制一个重试策略<br /> /*<br /> * param1:重试的时间间隔<br /> * param2:最大的重试次数<br /> * */<br /> RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);<br /> //2、获取一个客户端对象<br /> /*<br /> * param1:要连接的zookeeper服务器列表<br /> * param2:会话的超时时间<br /> * param3:链接的超时时间<br /> * param4:重试策略<br /> * */<br /> String connectionStr = "192.168.174.100:2181,192.168.174.110:2181,192.168.174.120:2181";<br /> CuratorFramework client = CuratorFrameworkFactory._newClient_(connectionStr, 8000, 8000, retryPolicy);<br /> //3、开启客户端<br /> client.start();<br /> //4、创建节点<br /> client.create().creatingParentsIfNeeded().withMode(CreateMode._EPHEMERAL_).forPath("/hello4","world".getBytes());<br /> //5、关闭客户端<br /> client.close();<br /> }<br />}<br />只需要改变withMode里面的参数,因为这个方法是指定创建的节点类型的
9.2.3、设置节点数据:
@Test
public void setZnodeData() throws Exception {
//1、定制一个重试策略
/
param1:重试的时间间隔
param2:最大的重试次数
/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2、获取一个客户端对象
/
param1:要连接的zookeeper服务器列表
param2:会话的超时时间
param3:链接的超时时间
param4:重试策略
/
String connectionStr = “192.168.174.100:2181,192.168.174.110:2181,192.168.174.120:2181”;
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, 8000, 8000, retryPolicy);
//3、开启客户端
client.start();
//4、创建节点
client.setData().forPath(“/hello2”,”zookeeper”.getBytes());
//5、关闭客户端
client.close();
}
9.2.4、查看节点数据
@Test
public void getZnodeData() throws Exception {
//1、定制一个重试策略
/
param1:重试的时间间隔
param2:最大的重试次数
/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2、获取一个客户端对象
/
param1:要连接的zookeeper服务器列表
param2:会话的超时时间
param3:链接的超时时间
param4:重试策略
/
String connectionStr = “192.168.174.100:2181,192.168.174.110:2181,192.168.174.120:2181”;
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, 8000, 8000, retryPolicy);
//3、开启客户端
client.start();
//4、创建节点
byte[] forPath = client.getData().forPath(“/hello2”);
System.out.println(new String(forPath));
//5、关闭客户端
client.close();
}
9.2.5、节点watch机制:
@Test
public void watchZnode() throws Exception {
//1、定制一个重试策略
/
param1:重试的时间间隔
param2:最大的重试次数
/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2、获取一个客户端对象
/
param1:要连接的zookeeper服务器列表
param2:会话的超时时间
param3:链接的超时时间
param4:重试策略
/
String connectionStr = “192.168.174.100:2181,192.168.174.110:2181,192.168.174.120:2181”;
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, 8000, 8000, retryPolicy);
//3、开启客户端
client.start();
//4、创建一个TreeCache对象,指定要监控的节点路径
TreeCache treeCache = new TreeCache(client, “/hello2”);
//5、自定义一个监听器
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
ChildData data = treeCacheEvent.getData();
if(data != null){ //不为空,就说明有变化
switch (treeCacheEvent.getType()){
case NODE_ADDED:
System.out.println(“监控到有新增节点”);
break;
case NODE_REMOVED:
System.out.println(“监控到有节点被移除”);
break;
case NODE_UPDATED:
System.out.println(“监控到有节点被更新”);
break;
default:
break;
}
}
}
});
//开始监听
treeCache.start();
//让程序挂起,做一个延时,时间越长越好
Thread.sleep(1000000);
}


