Zookeeper可以简单理解一个数据库。但不要把zookeeper当做数据库用,因为zookeeper官网一直强调快(响应快、从不可用选出leader快),快就要求所有节点的传输数据体量要小,这样才能快
zkservice.jpg

zookeeper特点

分布式读写

客户端的所有写入请求都被转发到单个服务器,称为领导者,_ZooKeeper 服务器的其余部分,称为追随者_,接收来自领导者的数据传递。(数据量最好不要超过1M)。

全局数据一致性

每个server节点保存相同的数据副本,Client无论链接哪个server,数据都是一致的。有一种情况是如果Leader写完数据后,正在同步给其他follow节点,数据还没同步完成,此时如果一个Client正在链接这个follow节点,Client就会先阻塞,等leader同步完数据后,Client才能进行访问。

实时性

在一定时间范围内,Client能读到最新数据(一般毫秒级别)

选举过程

http://dockone.io/article/696772
leader肯定有挂的可能性,造成服务不可用。这就会产生一个问题,我们需要有一种机制能够快速的产生一个leader出来,让整个集群达到可用状态,而且这种不可用的状态到达可用的状态越快越好,官方压测200ms就能恢复。

数据模型

zknamespace.jpg
每个节点在zookeeper中叫znode,每个znode有一个唯一的路径标识并且每个节点只能存放一个数据,

节点类型

持久节点(分为:持久节点、持久顺序节点)

相当于数据保存在硬盘中。
创建持久节点

  1. create /rootZnode test1

创建持久顺序节点
用于控制很多客户端Client同时向同一个节点进行写操作,如果写入的节点名称相同会自定生成不同节点的名称。

  1. [zk: localhost:2181(CONNECTED) 3] create -s /sZode
  2. Created /sZode0000000002
  3. [zk: localhost:2181(CONNECTED) 4] create -s /sZode1
  4. Created /sZode10000000003

临时节点(分为:临时节点、临时顺序节点)

  • 数据保存在内存中,会话(session)断开,数据就会删除,每当一个客户端连接到follow或master的时候就会创建一个session连接,并且这个session会被leader进行写操作,写完后还会同步到其他follow,所以当一个Client连接的follow挂掉后,Client还能连接到其他follow并且复用之前的session。
  • 创建临时节点

    1. create -e /tempZnode tempTest1
  • 不允许创建子节点

    1. create -e /tempZnode/tZonde1 tempTest1
    2. Ephemerals cannot have children: /tempZnode/tZonde1

    查看创建的Znode节点

    1. [zk: localhost:2181(CONNECTED) 0] ls -R /
    2. /
    3. /rootZnode
    4. /sZode0000000002
    5. /sZode10000000003
    6. /zookeeper
    7. /zookeeper/config
    8. /zookeeper/quota ls /
    9. [rootZnode, tempZnode, zookeeper]

    关闭当前的Client,临时节点会删除。

    1. quit
    2. ./zkCli.sh
    3. ls /
    4. [rootZnode, zookeeper]

    API操作

    API的操作相对简单,包括连接zookeeper集群、创建节点、更新节点、获取节点、删除节点。

    1. <!-- 官方自带的客户端操作 -->
    2. <dependency>
    3. <groupId>org.apache.zookeeper</groupId>
    4. <artifactId>zookeeper</artifactId>
    5. <version>3.6.3</version>
    6. </dependency>

    Snipaste_2022-02-09_09-05-17.png
    可以把服务端的log文件放到客户端里面,能够详细看到zookeeper连接、断开以及操作数据的详细过程。 ```java

import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.util.concurrent.CountDownLatch;

@Configuration public class ZookeeperAPI { //zookeeper集群,zookeeper的ip与端口,这里测试一台机器开启三个zookeeper,会随机链接一台 private static final String ZK_SERVER_LIST = “120.53.119.107:2181,120.53.119.107:2182,120.53.119.107:2183”; //设置超时时间,当节点3s后没反应就认该节点已经挂了,比如创建临时节点客户端关闭链接后3s,临时节点才移除 private static final int sessionTimeout = 3000; private ZooKeeper zk;

  1. @Bean
  2. public ZooKeeper zooKeeper1() throws IOException, InterruptedException, KeeperException {
  3. /**
  4. * watch分为两类:第一类:创建ZooKeeper对象(new ZooKeeper)时候传入的watch,这个watch为session级别的watch。
  5. * 当ZooKeeper链接成功或断开等会监听到,如果断开连接,还有其他服务可用,会自动连接到其他节点,并收到通知,而且session是不变的。
  6. * 如果ZooKeeper节点变化删除、更新等是监听不到的。
  7. */
  8. CountDownLatch countDownLatch = new CountDownLatch(1);
  9. zk = new ZooKeeper(ZK_SERVER_LIST, sessionTimeout, new Watcher() {
  10. @Override
  11. public void process(WatchedEvent event) {
  12. Event.KeeperState state = event.getState();
  13. Event.EventType type = event.getType();
  14. System.out.println("new ZooKeeper Watcher连接成功");
  15. //状态
  16. switch (state) {
  17. case Unknown -> {
  18. }
  19. case Disconnected -> {
  20. }
  21. case NoSyncConnected -> {
  22. }
  23. case SyncConnected -> {
  24. System.out.println("new ZooKeeper Watcher连接成功");
  25. countDownLatch.countDown();
  26. }
  27. case AuthFailed -> {
  28. }
  29. case ConnectedReadOnly -> {
  30. }
  31. case SaslAuthenticated -> {
  32. }
  33. case Expired -> {
  34. }
  35. case Closed -> {
  36. }
  37. }
  38. }
  39. });
  40. countDownLatch.await();//在这里会阻塞
  41. //创建节点的方式大概可以分为两类API,一个是阻塞的,创建节点成功才返回值、一个是不阻塞创建成功使用回调的函数返回结果值。
  42. String pathName = zk.create("/znodexx","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
  43. //取数据的同时注册了一个观察,取完数据后,未来如果该节点数据发生变化,就会回调这个Watch
  44. Stat stat = new Stat();//获取源数据,zookeeper内部会对其赋值
  45. byte[] znodData = zk.getData("/znodexx", new Watcher() {//这个只会调用1次,需要重复注册才能解决修改该节点后再次调用
  46. @Override
  47. public void process(WatchedEvent event) { //什么时候节点变化才会回调,并且是在另一个线程回调
  48. System.out.println("哈哈getData watch : " + event.toString());
  49. //注意这种重复注册监听,第二个参数传true,会调用创建new ZooKeeper时的watch
  50. //zk.getData("/znodexx",true,stat);
  51. try {
  52. //第二个参数传递的是this,当该节点变化时,会调用当前watch而不是创建ZooKeeper时的watch
  53. zk.getData("/znodexx",this,stat);
  54. } catch (KeeperException e) {
  55. e.printStackTrace();
  56. } catch (InterruptedException e) {
  57. e.printStackTrace();
  58. }
  59. }
  60. },stat);
  61. System.out.println("获取到数据");
  62. System.out.println(new String(znodData));
  63. Stat stat1 = zk.setData("/znodexx","hhh".getBytes(),0);//触发上面的回调
  64. zk.setData("/znodexx","00hhh00".getBytes(),stat1.getVersion());//触发上面的回调
  65. System.out.println("async --------begin");
  66. //异步回调的使用,拿到结果数据会回调processResult函数,代码不阻塞,继续执行
  67. zk.getData("/znodexx", false, new AsyncCallback.DataCallback() {
  68. @Override
  69. public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
  70. System.out.println("path = " + path); //路径
  71. System.out.println("ctx = " + ctx.toString()); //传入的上下文信息
  72. System.out.println("data = " + new String(data)); //返回的数据
  73. System.out.println("stat = " + stat.toString());//返回源数据信息
  74. }
  75. },"xxxx");
  76. System.out.println("async --------end");
  77. return zk;
  78. }

}

```