Zookeeper可以简单理解一个数据库。但不要把zookeeper当做数据库用,因为zookeeper官网一直强调快(响应快、从不可用选出leader快),快就要求所有节点的传输数据体量要小,这样才能快
zookeeper特点
分布式读写
客户端的所有写入请求都被转发到单个服务器,称为领导者,_ZooKeeper 服务器的其余部分,称为追随者_,接收来自领导者的数据传递。(数据量最好不要超过1M)。
全局数据一致性
每个server节点保存相同的数据副本,Client无论链接哪个server,数据都是一致的。有一种情况是如果Leader写完数据后,正在同步给其他follow节点,数据还没同步完成,此时如果一个Client正在链接这个follow节点,Client就会先阻塞,等leader同步完数据后,Client才能进行访问。
实时性
在一定时间范围内,Client能读到最新数据(一般毫秒级别)
选举过程
http://dockone.io/article/696772
leader肯定有挂的可能性,造成服务不可用。这就会产生一个问题,我们需要有一种机制能够快速的产生一个leader出来,让整个集群达到可用状态,而且这种不可用的状态到达可用的状态越快越好,官方压测200ms就能恢复。
数据模型
每个节点在zookeeper中叫znode,每个znode有一个唯一的路径标识并且每个节点只能存放一个数据,
节点类型
持久节点(分为:持久节点、持久顺序节点)
相当于数据保存在硬盘中。
创建持久节点
create /rootZnode test1
创建持久顺序节点
用于控制很多客户端Client同时向同一个节点进行写操作,如果写入的节点名称相同会自定生成不同节点的名称。
[zk: localhost:2181(CONNECTED) 3] create -s /sZode
Created /sZode0000000002
[zk: localhost:2181(CONNECTED) 4] create -s /sZode1
Created /sZode10000000003
临时节点(分为:临时节点、临时顺序节点)
- 数据保存在内存中,会话(session)断开,数据就会删除,每当一个客户端连接到follow或master的时候就会创建一个session连接,并且这个session会被leader进行写操作,写完后还会同步到其他follow,所以当一个Client连接的follow挂掉后,Client还能连接到其他follow并且复用之前的session。
创建临时节点
create -e /tempZnode tempTest1
不允许创建子节点
create -e /tempZnode/tZonde1 tempTest1
Ephemerals cannot have children: /tempZnode/tZonde1
查看创建的Znode节点
[zk: localhost:2181(CONNECTED) 0] ls -R /
/
/rootZnode
/sZode0000000002
/sZode10000000003
/zookeeper
/zookeeper/config
/zookeeper/quota ls /
[rootZnode, tempZnode, zookeeper]
关闭当前的Client,临时节点会删除。
quit
./zkCli.sh
ls /
[rootZnode, zookeeper]
API操作
API的操作相对简单,包括连接zookeeper集群、创建节点、更新节点、获取节点、删除节点。
<!-- 官方自带的客户端操作 -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
可以把服务端的log文件放到客户端里面,能够详细看到zookeeper连接、断开以及操作数据的详细过程。 ```java
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.util.concurrent.CountDownLatch;
@Configuration public class ZookeeperAPI { //zookeeper集群,zookeeper的ip与端口,这里测试一台机器开启三个zookeeper,会随机链接一台 private static final String ZK_SERVER_LIST = “120.53.119.107:2181,120.53.119.107:2182,120.53.119.107:2183”; //设置超时时间,当节点3s后没反应就认该节点已经挂了,比如创建临时节点客户端关闭链接后3s,临时节点才移除 private static final int sessionTimeout = 3000; private ZooKeeper zk;
@Bean
public ZooKeeper zooKeeper1() throws IOException, InterruptedException, KeeperException {
/**
* watch分为两类:第一类:创建ZooKeeper对象(new ZooKeeper)时候传入的watch,这个watch为session级别的watch。
* 当ZooKeeper链接成功或断开等会监听到,如果断开连接,还有其他服务可用,会自动连接到其他节点,并收到通知,而且session是不变的。
* 如果ZooKeeper节点变化删除、更新等是监听不到的。
*/
CountDownLatch countDownLatch = new CountDownLatch(1);
zk = new ZooKeeper(ZK_SERVER_LIST, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
Event.KeeperState state = event.getState();
Event.EventType type = event.getType();
System.out.println("new ZooKeeper Watcher连接成功");
//状态
switch (state) {
case Unknown -> {
}
case Disconnected -> {
}
case NoSyncConnected -> {
}
case SyncConnected -> {
System.out.println("new ZooKeeper Watcher连接成功");
countDownLatch.countDown();
}
case AuthFailed -> {
}
case ConnectedReadOnly -> {
}
case SaslAuthenticated -> {
}
case Expired -> {
}
case Closed -> {
}
}
}
});
countDownLatch.await();//在这里会阻塞
//创建节点的方式大概可以分为两类API,一个是阻塞的,创建节点成功才返回值、一个是不阻塞创建成功使用回调的函数返回结果值。
String pathName = zk.create("/znodexx","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
//取数据的同时注册了一个观察,取完数据后,未来如果该节点数据发生变化,就会回调这个Watch
Stat stat = new Stat();//获取源数据,zookeeper内部会对其赋值
byte[] znodData = zk.getData("/znodexx", new Watcher() {//这个只会调用1次,需要重复注册才能解决修改该节点后再次调用
@Override
public void process(WatchedEvent event) { //什么时候节点变化才会回调,并且是在另一个线程回调
System.out.println("哈哈getData watch : " + event.toString());
//注意这种重复注册监听,第二个参数传true,会调用创建new ZooKeeper时的watch
//zk.getData("/znodexx",true,stat);
try {
//第二个参数传递的是this,当该节点变化时,会调用当前watch而不是创建ZooKeeper时的watch
zk.getData("/znodexx",this,stat);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},stat);
System.out.println("获取到数据");
System.out.println(new String(znodData));
Stat stat1 = zk.setData("/znodexx","hhh".getBytes(),0);//触发上面的回调
zk.setData("/znodexx","00hhh00".getBytes(),stat1.getVersion());//触发上面的回调
System.out.println("async --------begin");
//异步回调的使用,拿到结果数据会回调processResult函数,代码不阻塞,继续执行
zk.getData("/znodexx", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("path = " + path); //路径
System.out.println("ctx = " + ctx.toString()); //传入的上下文信息
System.out.println("data = " + new String(data)); //返回的数据
System.out.println("stat = " + stat.toString());//返回源数据信息
}
},"xxxx");
System.out.println("async --------end");
return zk;
}
}