1. Zookeeper概述

1.1 简介

Zookeeper.jpg
Zookeeper是Apache软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。

1.2 架构

通过冗余服务实现高可用性。

1.3 设计目标

将复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接口提供给用户使用。

1.4 功能特性

一个典型的分布式数据一致性的解决方案,分布式应用程序可以直接基于它实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、分布式锁和分布式队列等功能。

1.5 CAP理论

对于一个分布式系统来说,以下三点不能同时满足:

  • Consistency:一致性,一个节点在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。
  • Availability:可用性,每次请求都能获取到正确的响应,但是不保证获取的数据为最新数据
  • Partion tolerance:分区容错性,分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非整个网络环境都发生了故障。

三大古老的注册中心对比

组件名 语言 CAP 服务健康检查 对外暴露接口 Spring Cloud集成
Eureka Java AP 可配支持 HTTP 已集成
Consul Go CP 支持 HTTP/DNS 已集成
Zookeeper Java CP 支持 客户端 已集成

1.6 BASE理论

BASE理论是以下三个短语的缩写:

  • Basically Available:基本可用,在分布式系统出现故障,允许损失部分可用性(服务降级)
  • Soft-state:软状态,允许分布式系统出现中间状态,而且中间状态不影响系统的可用性,即允许系统不同节点的数据副本之间进行同步的过程存在时延。
  • Eventually Consistent:最终一致性,系统中所有的数据副本,在经过一段时间的同步后,最终能达到一致的状态。

BASE理论是对CAP中一致性和可用性进行一个权衡的结果,核心思想:无法做到强一致,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性。

2. Zookeeper安装

注意

3.4版本是基于jdk8构建的,3.5版本之后是基于jdk11构建的。

2.1 Linux安装

  1. # 下载jdk8
  2. $ yum install -y java-1.8.0-openjdk-devel.x86_64
  3. # 配置环境变量
  4. $ vim /etc/profile
  5. export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64/jre/bin
  6. export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
  7. export PATH=$JAVA_HOME/bin:$PATH
  8. # 使配置生效
  9. source /etc/profile
  10. # 下载源码
  11. $ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/zookeeper-3.5.8.tar.gz
  12. # 解压安装包
  13. $ tar -zxvf zookeeper-3.5.8.tar.gz
  14. # 修改配置
  15. $ cd conf/ && cp zoo_sample.cfg zoo.cfg
  16. # 启动服务端
  17. $ cd ../bin/ && sh zkServer.sh start

2.2 Docker安装

  1. # 拉取镜像
  2. $ docker pull zookeeper:3.5.8
  3. # 启动服务
  4. $ docker run -d -p 2181:2181 -d --name zookeeper zookeeper:3.5.8

3. Zookeeper数据模型

3.1 模型结构

structure.jpg

3.2 模型特点

  • 每个子目录如/app1都被称作一个znode(节点),这个znode是被它所在的路径唯一标识
  • znode可以有子节点目录,并且每个znode可以存储数据
  • znode是有版本的,每个znode中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
  • znode可以被监控,包括这个目录中存储的数据修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端

3.3 节点分类

(1)持久节点(PERSISTENT)
是指在节点创建后,就一直存在,直到有删除操作来主动删除这个节点——不会因为创建该节点的客户端会话失效而消失。
(2)持久顺序节点(PERSISTENT_SEQUENTIAL)
这类节点的基本特性和上面的节点类型是一致的。额外的特性是,在ZK中,每个父节点会为它的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点名加上一个数字后缀,作为新的节点名。这个数字后缀的范围是整形的最大值。
(3)临时节点(EPHEMERAL)
和持久化节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非连接断开。另外,在临时节点下面不能创建子节点。
(4)临时顺序节点(EPHEMERAL SEQUETIAL)
具有临时节点特点,额外的特性是,每个父节点会为它的第一级子节点维护一份时序,这点和刚才提到的持久顺序节点类似。

4. zookeeper配置文件

4.1 zoo.cfg

  1. # The number of milliseconds of each tick
  2. tickTime=2000
  3. # The number of ticks that the initial
  4. # synchronization phase can take
  5. initLimit=10
  6. # The number of ticks that can pass between
  7. # sending a request and getting an acknowledgement
  8. syncLimit=5
  9. # the directory where the snapshot is stored.
  10. # do not use /tmp for storage, /tmp here is just
  11. # example sakes.
  12. dataDir=/tmp/zookeeper
  13. # the port at which the clients will connect
  14. clientPort=2181
  15. # the maximum number of client connections.
  16. # increase this if you need to handle more clients
  17. #maxClientCnxns=60
  18. # Be sure to read the maintenance section of the
  19. # administrator guide before turning on autopurge.
  20. # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
  21. # The number of snapshots to retain in dataDir
  22. #autopurge.snapRetainCount=3
  23. # Purge task interval in hours
  24. # Set to "0" to disable auto purge feature
  25. #autopurge.purgeInterval=1
  26. ## Metrics Providers
  27. # https://prometheus.io Metrics Exporter
  28. #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
  29. #metricsProvider.httpHost=0.0.0.0
  30. #metricsProvider.httpPort=7000
  31. #metricsProvider.exportJvmInfo=true

4.2 参数说明

参数 说明
tickTime 集群服务器节点之间或者服务器与客户端之间维持心跳的时间间隔,即每隔tickTime会发生心跳包,时间单位为ms,并且最小session超时时间为2 * tickTime。
initLimit 初始化集群时Leader与Follower之间的最多心跳数,限定Follower连接到Leader的时限为initLimit * tickTime。
syncLimit 集群运行时Leader与Follower之间同步的最大响应时间单位,即响应时间超过syncLimit * tickTime,Leader认为Follower死亡,从服务器列表删除Follower。
dataDir 数据存储位置。
clientPort 服务监听端口。
maxClientCnxns 最大客户端连接数量。
autopurge.snapRetainCount 快照保存数量,到达这个数量自动进行快照合并。
autopurge.purgeInterval 快照清理频率,单位为小时,即这个时间间隔内快照数量到达上述指定数量,就进行快照合并。

5. ZooKeeper基本指令

进入docker内部客户端

  1. $ docker exec -it zookeeper bash
  2. $ ./bin/zkCli.sh

5.1 基本指令

  1. $ ls path # 查看特定节点下面的子节点
  2. $ create path data # 创建一个节点,并给节点绑定数据(默认是持久性节点)
  3. $ create -s path data # 创建持久性顺序节点
  4. $ create -e path data # 创建临时性节点(临时性节点不能含有任何子节点)
  5. $ create -e -s path data # 创建临时性顺序节点(临时性节点不能含有任何子节点)
  6. $ stat path # 查看节点状态
  7. $ set path data # 修改节点数据
  8. $ ls2 path # 查看节点下孩子和当前节点的状态
  9. $ history # 查看操作历史
  10. $ get path # 获得节点上绑定的数据信息
  11. $ delete path # 删除节点(删除节点不能含有子节点)
  12. $ rmr path # 递归删除节点(会将当前节点下所有节点删除)
  13. $ quit # 退出当前会话

5.2 stat结果详解

  1. 1) czxid - 创建节点的事务的zxid
  2. 2) ctime - 创建节点的毫秒数(从1970年开始)
  3. 3) mzxid - 更新节点的事务zxid
  4. 4) mtime - 最后更新的毫秒数(从1970年开始)
  5. 5) pZxid - 最后更新子节点的事务zxid
  6. 6) cversion - 子节点变化号,子节点修改次数
  7. 7) dataversion - 数据变化号
  8. 8) aclVersion - 访问控制列表的变化号
  9. 9) ephemeralOwner - 如果是临时节点,这个是znode拥有者的session id;如果不是临时节点则是0
  10. 10) dataLength - 数据长度
  11. 11) numChildren - 子节点数量

5.3 节点监听机制

客户端可以监测znode节点的变化,znode节点的变化触发相应的事件,然后清除对该节点的监测。当监测一个znode节点时候,zookeeper会发送通知给监测节点。一个watch事件是一个一次性的触发器,当被设置了watch的数据获取目录发生了改变的时候,则服务器将这个改变发送给设置了watch的客户端以便通知它们。

  1. $ ls /path true # 监听节点目录的变化
  2. $ get /path true # 监听节点数据的变化

6. Zookeeper集群搭建

6.1 docker搭建伪集群

cluster.jpg

  1. # 查看最小容器ip
  2. $ docker network inspect bridge
  3. # 创建三个zk的ip分别在这个最小的ip上加上1,2,3
  4. # zk1: 172.17.0.7
  5. # zk2: 172.17.0.8
  6. # zk3: 172.17.0.9
  7. # 创建集群的挂载目录
  8. $ mkdir -p /opt/zookeeper/cluster/
  9. # 创建配置和数据目录
  10. $ mkdir -p node1/conf node1/data node2/conf node2/data node3/conf node3/data
  11. # 创建配置文件和myid
  12. $ touch node1/conf/zoo.cfg node2/conf/zoo.cfg node3/conf/zoo.cfg node1/data/myid node2/data/myid node3/data/myid
  13. # 编辑配置文件
  14. # 三个文件相同,内容如下
  15. dataDir=/data
  16. dataLogDir=/data/log
  17. tickTime=2000
  18. initLimit=5
  19. syncLimit=2
  20. autopurge.snapRetainCount=3
  21. autopurge.purgeInterval=0
  22. maxClientCnxns=60
  23. standaloneEnabled=true
  24. admin.enableServer=true
  25. 4lw.commands.whitelist=*
  26. clientPort=2181
  27. server.1=172.17.0.7:2888:3888
  28. server.2=172.17.0.8:2888:3888
  29. server.3=172.17.0.9:2888:3888
  30. # 编辑myid
  31. $ echo "1" >> node1/data/myid
  32. $ echo "2" >> node2/data/myid
  33. $ echo "3" >> node3/data/myid
  34. # 启动三个容器
  35. $ docker run -d -p 2182:2181 -v /opt/zookeeper/cluster/node1/conf:/conf -v /opt/zookeeper/cluster/node1/data:/data --name zk1 zookeeper:3.4.14
  36. $ docker run -d -p 2183:2181 -v /opt/zookeeper/cluster/node2/conf:/conf -v /opt/zookeeper/cluster/node2/data:/data --name zk2 zookeeper:3.4.14
  37. $ docker run -d -p 2184:2181 -v /opt/zookeeper/cluster/node3/conf:/conf -v /opt/zookeeper/cluster/node3/data:/data --name zk3 zookeeper:3.4.14

6.2 查看节点znode状态

  1. $ docker exec -it zk<i> bash
  2. $ ./bin/zkServer.sh status
  3. # zk1
  4. ZooKeeper JMX enabled by default
  5. Using config: /conf/zoo.cfg
  6. Mode: follower
  7. # zk2
  8. ZooKeeper JMX enabled by default
  9. Using config: /conf/zoo.cfg
  10. Mode: follower
  11. # zk3
  12. ZooKeeper JMX enabled by default
  13. Using config: /conf/zoo.cfg
  14. Mode: leader

可以看到leader结点为zk3,zk1和zk2为follower节点。

7. Java客户端操作

7.1 引入依赖

  1. <dependency>
  2. <groupId>com.101tec</groupId>
  3. <artifactId>zkclient</artifactId>
  4. <version>0.10</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>junit</groupId>
  8. <artifactId>junit</artifactId>
  9. <version>4.12</version>
  10. </dependency>

7.2 日志配置

  1. log4j.rootLogger = INFO,CONSOLE,FILE,HIGHNESS,
  2. # 输出日志到控制台
  3. log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
  4. log4j.appender.CONSOLE.Threshold=INFO
  5. log4j.appender.CONSOLE.Target=System.out
  6. log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
  7. log4j.appender.CONSOLE.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss.SSS} HIGHNESS %-5p [%c] [%t] [%F:%L]- %m%n
  8. # 输出日志到文件
  9. log4j.appender.FILE=org.apache.log4j.FileAppender
  10. log4j.appender.FILE.File=log/project.log
  11. log4j.appender.FILE.Append=false
  12. log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
  13. log4j.appender.FILE.layout.ConversionPattern=%d HIGHNESS %-5p [%c] - %m%n
  14. # 每日一个日志文件
  15. log4j.appender.HIGHNESS=org.apache.log4j.DailyRollingFileAppender
  16. # 日志最低的输出级别
  17. log4j.appender.HIGHNESS.Threshold=INFO
  18. # 日志日期格式
  19. log4j.appender.HIGHNESS.DatePattern='_'yyyy-MM-dd
  20. # 日志编码格式
  21. log4j.appender.HIGHNESS.encoding=UTF-8
  22. # 有日志时立即输出
  23. log4j.appender.HIGHNESS.ImmediateFlush=true
  24. # 日志文件的保存位置及文件名
  25. log4j.appender.HIGHNESS.File=log/ProjectDaily.log
  26. # 日志文件的最大大小
  27. log4j.appender.HIGHNESS.maxFileSize=10KB
  28. # 日志布局方式
  29. log4j.appender.HIGHNESS.layout=org.apache.log4j.PatternLayout
  30. # 日志文件中日志的格式
  31. log4j.appender.HIGHNESS.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss.SSS} HIGHNESS %-5p [%c]- %m%n

7.3 搭建客户端

  1. private final Logger log = LoggerFactory.getLogger(ZKClient.class);
  2. private ZkClient zkClient;
  3. @Before
  4. public void before() {
  5. // 服务器ip:port
  6. // 会话超时时间
  7. // 连接超时时间
  8. // 序列化方式
  9. zkClient = new ZkClient("192.168.117.155:2181", 6000 * 30, 6000, new SerializableSerializer());
  10. }
  11. @After
  12. public void close() {
  13. zkClient.close();
  14. }
  15. private static class User implements Serializable {
  16. private final Integer id;
  17. private final String name;
  18. public User(Integer id, String name) {
  19. this.id = id;
  20. this.name = name;
  21. }
  22. @Override
  23. public String toString() {
  24. return "User{" + "id=" + id + ", name='" + name + '\'' + '}';
  25. }
  26. }

7.4 创建节点

  1. @Test // 创建节点
  2. public void create() {
  3. // 1. 持久节点
  4. String k1 = zkClient.create("/para1", "k1", CreateMode.PERSISTENT);
  5. log.info("创建持久节点: {}", k1);
  6. // 2. 持久顺序节点
  7. String k2 = zkClient.create("/para2", "k2", CreateMode.PERSISTENT_SEQUENTIAL);
  8. log.info("创建持久顺序节点: {}", k2);
  9. // 3. 临时节点
  10. String k3 = zkClient.create("/para3", "k3", CreateMode.EPHEMERAL);
  11. log.info("创建临时节点: {}", k3);
  12. // 4. 临时顺序节点
  13. String k4 = zkClient.create("/para4", "k4", CreateMode.EPHEMERAL_SEQUENTIAL);
  14. log.info("创建临时顺序节点:{}",k4);
  15. }

7.5 查询节点数据

  1. @Test // 查询节点数据
  2. public void get() {
  3. Object data = zkClient.readData("/para1");
  4. log.info("读取/para1数据:{}", data);
  5. }

7.6 查询节点状态

  1. @Test // 查询节点状态
  2. public void stat() {
  3. Stat stat = new Stat();
  4. Object o = zkClient.readData("/para1", stat);
  5. log.info("节点/para1状态:{}", stat);
  6. log.info("创建节点的事务zxID:{}", stat.getCzxid());
  7. log.info("创建毫秒数:{}", stat.getCtime());
  8. log.info("更新节点的事务zxID:{}", stat.getMzxid());
  9. log.info("更新毫秒数:{}", stat.getMtime());
  10. log.info("数据长度: {}", stat.getDataLength());
  11. log.info("访问控制列表变化号: {}", stat.getAversion());
  12. log.info("更新子节点的事务zxID:{}", stat.getPzxid());
  13. log.info("子节点的修改次数:{}", stat.getCversion());
  14. log.info("子节点数量:{}", stat.getNumChildren());
  15. }

7.7 修改节点数据

  1. @Test // 修改节点数据
  2. public void set() {
  3. zkClient.writeData("/para1", new User(1, "KHighness"));
  4. User user = zkClient.readData("/para1");
  5. log.info("修改后的/para1: {}", user);
  6. }

7.8 删除节点

  1. @Test // 删除节点
  2. public void delete() {
  3. boolean delete = zkClient.delete("/para1");
  4. log.info("delete /para1: {}", delete);
  5. }

7.9 监听节点数据的变化

  1. @Test // 监听节点数据的变化,非一次性,永久监听
  2. public void getTrue() throws IOException {
  3. zkClient.subscribeDataChanges("/para1", new IZkDataListener() {
  4. // nodeName:当前修改节点的名称,result:节点修改之后的数据
  5. public void handleDataChange(String nodeName, Object result) throws Exception {
  6. log.info("修改节点的名称:{}", nodeName);
  7. log.info("修改后节点数据:{}", result);
  8. }
  9. // nodeName:当前修改节点的名称
  10. public void handleDataDeleted(String nodeName) throws Exception {
  11. log.info("删除节点的名称:{}", nodeName);
  12. }
  13. });
  14. // 阻塞客户端
  15. System.in.read();
  16. }

7.10 监听节点目录的变化

  1. @Test // 监听节点目录的变化,非一次性,永久监听
  2. public void lsTrue() throws IOException {
  3. zkClient.subscribeChildChanges("/para1", new IZkChildListener() {
  4. // nodeName:当前修改节点的名称,list:发生修改的所有子节点名称
  5. public void handleChildChange(String nodeName, List<String> list) throws Exception {
  6. log.info("修改节点的名称:{}", nodeName);
  7. log.info("发生修改的所有子节点名称:{}", list.toString());
  8. }
  9. });
  10. // 阻塞客户端
  11. System.in.read();
  12. }

7.11 操作集群

  1. @Test // 集群操作
  2. public void cluster() {
  3. // 可以将所有节点的ip:port都放入构造函数,中间用逗号隔开,不要加空格
  4. ZkClient cluster = new ZkClient("192.168.117.155:2182,192.168.117.155:2183,192.168.117.155:2184");
  5. Object o = zkClient.readData("/para1");
  6. log.info("集群读取/para1: {}", o.toString());
  7. }