1. Zookeeper入门

Zookeeper 是一个开源的分布式的,为分布式框架提供协调服务的 Apache 项目。

1.1 概述

Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然 后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。用于服务注册与发现。
image.png

1.2 特点

image.png
1)Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
2)服务器集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。
3)全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
4)更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。
5)数据更新原子性,一次数据更新要么成功,要么失败。
6)实时性,在一定时间范围内,Client能读到最新数据。

1.3 数据结构

image.png
ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点(Server)称做一个 ZNode。每一个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识表示。(就是都可以通过唯一路径找到该节点)

1.4 应用场景

提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。

1.4.1 统一命名服务

image.png
在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。例如:IP不容易记住,而域名容易记住。

1.4.2 统一配置管理

image.png
1)分布式环境下,配置文件同步非常常见。

  • 一般要求一个集群中,所有节点的配置信息是一致的,比如 Kafka 集群。
  • 对配置文件修改后,希望能够快速同步到各个节点上。

2)配置管理可交由ZooKeeper实现。

  • 可将配置信息写入ZooKeeper上的一个Znode。
  • 各个客户端服务器监听这个Znode。
  • 一 旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器。

    1.4.3 统一集群管理

    1)分布式环境中,实时掌握每个节点的状态是必要的。

  • 可根据节点实时状态做出一些调整。

2)ZooKeeper可以实现实时监控节点状态变化

  • 可将节点信息写入ZooKeeper上的一个ZNode。(状态信息等等)
  • 监听这个ZNode可获取它的实时状态变化。

    1.4.4 服务器动态上下线

    image.png

    1.4.5 软负载均衡

    在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求

2. Zookeeper下载及安装

需要提前安装jdk
下载网址:https://zookeeper.apache.org/
将下载的安装包复制到linux下

2.1 安装

2.1.1 安装前准备

解压到指定目录

tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /usr/local/module/

修改解压文件夹名称(可省略)

mv apache-zookeeper-3.5.7-bin zookeeper-3.5.7

2.1.2 配置修改

将/opt/module/zookeeper-3.5.7/conf 这个路径下的 zoo_sample.cfg 修改为zoo.cfg;

mv zoo_sample.cfg zoo.cfg

在/usr/local/module/zookeeper-3.5.7目录下创建zkData文件夹

mkdir zkData

打开 zoo.cfg 文件,修改 dataDir 路径成以下内容:

dataDir=/usr/local/module/zookeeper-3.5.7/zkData

*端口占用问题

zookeeper v3.5启动后会占用8080端口,需要对端口进行修改,以免跟tomcat等发生冲突

  • 方式一:禁用AdminServer;在zoo.cnf 配置文件中增加配置 admin.enableServer=false,这样就无法访问Zookeeper AdminServer了。访问方式:http//:localhost:port/commands
  • 方式二:修改端口号;在zoo.cnf 配置文件中增加admin.serverPort=未占用的端口号,如8090,我这里改成了8022

新版Zookeeper v3.5启动后为什么占用8080端口,如何修改端口?

2.1.3 操作zookeeper

启动 Zookeeper ./zkServer.sh start 或者通过路径方式启动 bin/zkServer.sh start
查看进程是否启动 jps
查看状态 bin/zkServer.sh status 或者 ./zkServer.sh status
启动客户端 bin/zkCli.sh ./zkCli.sh
客户端内退出 quit
停止Zookeeper bin/zkServer.sh stop ./zkServer.sh stop

2.2 配置参数解读

  • tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒

image.png

  • initLimit = 10:LF初始通信时限,Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)。例如tickTime=2000,如果在2000*10ms内心跳次数小于10次,那么zookeeper则认为本次连接失败
  • syncLimit = 5:LF同步通信时限,Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。

image.png

  • dataDir:保存Zookeeper中的数据。注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。
  • clientPort = 2181:客户端连接端口,通常不做修改。

    3. Zookeeper 集群操作

    3.1 集群操作

    3.1.1 集群安装

    1)集群规划

    在三台虚拟机上的三个节点上都部署 Zookeeper。n台服务器需要部署多少台Zookeeper?

    2)解压安装

    跟之前步骤一致

    3)配置服务器编号

  • 在zookeeper解压目录下(我的是/usr/local/module/zookeeper-3.5.7)的zkData(上一步配置修改的dataDir目录)目录下创建一个myid的文件

    • mkdir zkData
    • vi myid
  • 在myid文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格)

    • image.png
    • 我配置Ubuntu1 是 1,Ubuntu2 是2 CentOS1是3

      4)配置zoo.cnf文件

  • (1) 重命名/opt/module/zookeeper-3.5.7/conf 这个目录下的 zoo_sample.cfg 为 zoo.cfg

  • (2) 打开 zoo.cfg 文件,修改数据存储路径配置
    • dataDir=/usr/local/module/zookeeper-3.5.7/zkData 这里的路径就是你创建的zkData的绝对路径
  • 增加如下配置

    #################cluster

    server.2=hadoop102:2888:3888 server.3=hadoop103:2888:3888 server.4=hadoop104:2888:3888

  • 配置参数解读 server.A=B:C:D

    • A 是一个数字,表示这个是第几号服务器;集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个 server。
    • B 是这个服务器的地址; 可以直接写ip地址
    • C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
    • D是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
  • 同步配置文件

    5)集群操作

    分别启动Zookeeper, bin/zkServer.sh start 查看状态 bin/zkServer.sh status
    注意:启动过程中需要有半数以上的服务启动才能启动成功,需要关闭防火墙
    image.png
    image.png

    3.1.2 选举机制(面试重点)

    1.第一次启动

    image.png
  1. 服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态为LOOKING
  2. 服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持为LOOKING
  3. 服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING
  4. 服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING
  5. 服务器5启动,同4一样当小弟。

总的来说:当启动的服务器数量没有达到一半以上时,选票会集中投给myid大的服务器,直到达到一半以上时选出leader,一旦选出了leader,无论后面的服务器myid如何,均为follower。

  • SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。
  • ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和ZooKeeper服务器对于客户端“更新请求”的处理逻辑有关。
  • Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加。

    2. 非第一次启动

    1)当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:

  • 服务器初始化启动。

  • 服务器运行期间无法和Leader保持连接。

2)而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:

  • 集群中本来就已经存在一个Leader。
    • 对于第一种已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。
  • 集群中确实不存在Leader。

image.png

3.1.3 zk集群启动停止脚本

1)在VMUbuntu1的/home/mrlinxi/bin 目录下创建zk.sh脚本

sudo vim zk.sh

  1. #!/bin/bash
  2. case $1 in
  3. "start"){
  4. for i in 192.168.190.128 192.168.190.129 192.168.190.131
  5. do
  6. echo ----------------- zookeeper-3 $i 启动 --------------------
  7. ssh $i "sudo /usr/local/module/zookeeper-3.5.7/bin/zkServer.sh start"
  8. done
  9. }
  10. ;;
  11. "stop"){
  12. for i in 192.168.190.128 192.168.190.129 192.168.190.131
  13. do
  14. echo ----------------- zookeeper-3 $i 停止 --------------------
  15. ssh $i "sudo /usr/local/module/zookeeper-3.5.7/bin/zkServer.sh stop"
  16. done
  17. }
  18. ;;
  19. "status"){
  20. for i in 192.168.190.128 192.168.190.129 192.168.190.131
  21. do
  22. echo ----------------- zookeeper-3 $i 状态 --------------------
  23. ssh $i "sudo /usr/local/module/zookeeper-3.5.7/bin/zkServer.sh status"
  24. done
  25. }
  26. ;;
  27. esac

解决CentOS Zookeeper JAVA_HOME is not set and java could not be found in PATH 问题

2)增加脚本执行权限

chmod 777 zk.sh

3)Zookeeper 集群启动脚本

./zk.sh start

4)Zookeeper 集群停止脚本

./zk.sh stop

3.2 客户端命令行操作

3.2.1 命令行语法

  • 1)启动客户端 bin/zkCli.sh -server VMUbuntu1:2181 这里VMUbuntu1是服务器域名也就是ip的别名(192.168.190.128)
  • 2)显示所有操作命令 help

    3.2.2 znode节点数据信息

  • 1)查看当前znode中所包含的内容 ls /

  • 2)查看当前节点详细数据 ls -s /

    • image.png

      3.2.3 节点类型(持久/短暂/有序号/无序号)

  • 持久(Persistent):客户端和服务器端断开连接后,创建的节点(服务器)不删除

    • (1)持久化目录节点:客户端与Zookeeper断开连接后,该节点依旧存在
    • (2)持久化顺序编号目录节点:客户端与Zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
  • 短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点(服务器)自己删除
    • (3)临时目录节点:客户端与Zookeeper断开连接后,该节点被删除
    • (4)临时顺序编号目录节点:客户端与 Zookeeper 断开连接后 , 该 节 点 被 删 除 , 只 是Zookeeper给该节点名称进行顺序编号。

image.png
说明:创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护
注意:在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

1)分别创建2个普通节点(永久节点 + 不带序号)

create /sanguo “diaochan”
create /sanguo/shuguo “liubei”

2)获得节点的值

get -s /sanguo

3)创建带序号的节点(永久节点 + 带序号)

(1)先创建一个普通的根节点/sanguo/weiguo

create /sanguo/weiguo “caocao”

(2)创建带序号的节点

[zk: VMUbuntu1:2181(CONNECTED) 14] create -s /sanguo/weiguo/zhangliao “zhangliao” Created /sanguo/weiguo/zhangliao0000000000 [zk: VMUbuntu1:2181(CONNECTED) 15] create -s /sanguo/weiguo/zhangliao “zhangliao” Created /sanguo/weiguo/zhangliao0000000001

如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。

4)创建短暂节点(短暂节点 + 不带序号 or 带序号)

(1)创建短暂的不带序号的节点

[zk: VMUbuntu1:2181(CONNECTED) 2] create -e /sanguo/wuguo “zhouyu” Created /sanguo/wuguo

(2)创建短暂的带序号的节点

[zk: VMUbuntu1:2181(CONNECTED) 4] create -e -s /sanguo/wuguo “zhouyu” Created /sanguo/wuguo0000000003

5)修改节点数据值

将weiguo的caocao改为simayi

[zk: VMUbuntu1:2181(CONNECTED) 3] set /sanguo/weiguo “simayi”

3.2.4 *监听器原理

1、监听器原理详解

1)首先要有一个main()线程
2)在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
3)通过connect线程将注册的监听事件发送给Zookeeper。(告诉服务端我要监听什么)
4)在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
5)Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。(告诉客户端监听的东西发生了变化)
6)listener线程内部调用了process()方法;进行数据获取的一个处理。
image.png

2、常见的监听

1)监听节点数据的变化

get path [watch]

2)监听子节点增减的变化

ls path [watch]

1)节点的值变化监听

在VMCentOS1主机上监听/sanguo 节点数据变化

[zk: localhost:2181(CONNECTED) 1] get -w /sanguo

在 VMUbuntu2 主机上修改/sanguo 节点的数据

[zk: localhost:2181(CONNECTED) 0] set /sanguo “xishi”

观察 VMCentOS1 主机收到数据变化的监听

WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo

注意:在VMUbuntu2 上多次修改/sanguo的值,VMCentOS1上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。

2)节点的子节点变化监听(路径变化)

在 VMCentOS1主机上注册监听/sanguo 节点的子节点变化

[zk: localhost:2181(CONNECTED) 3] ls -w /sanguo [shuguo, weiguo]

在 VMUbuntu2 主机/sanguo 节点上创建子节点

[zk: localhost:2181(CONNECTED) 4] create /sanguo/jin “simayi” Created /sanguo/jin1

观察 VMCentOS1主机收到子节点变化的监听

WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo

注意:同节点值一样,节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。

3.2.5 节点删除与查看

1)删除节点

[zk: localhost:2181(CONNECTED) 7] delete /sanguo/jin

2)递归删除节点

删除weiguo,包括weiguo下的所有节点

[zk: localhost:2181(CONNECTED) 5] deleteall /sanguo/weiguo

3)查看节点状态

[zk: localhost:2181(CONNECTED) 7] stat /sanguo cZxid = 0x500000008 ctime = Tue Sep 14 01:47:48 UTC 2021 mZxid = 0x500000018 mtime = Tue Sep 14 02:31:42 UTC 2021 pZxid = 0x500000023 cversion = 10 dataVersion = 3 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 5 numChildren = 2

3.3 客户端 API 操作

前提:保证 hadoop102、hadoop103、hadoop104 服务器上 Zookeeper 集群服务端启动。

3.3.1 IDEA 环境搭建

1)创建一个工程:zookeeper
2)添加pom文件

  1. <dependencies>
  2. <dependency>
  3. <groupId>junit</groupId>
  4. <artifactId>junit</artifactId>
  5. <version>RELEASE</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.logging.log4j</groupId>
  9. <artifactId>log4j-core</artifactId>
  10. <version>2.8.2</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.zookeeper</groupId>
  14. <artifactId>zookeeper</artifactId>
  15. <version>3.5.7</version>
  16. </dependency>
  17. </dependencies>

3)拷贝log4j.properties文件到项目根目录
需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

  1. log4j.rootLogger=INFO, stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n
  5. log4j.appender.logfile=org.apache.log4j.FileAppender
  6. log4j.appender.logfile.File=target/spring.log
  7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
  8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n

4)创建包名com.atguigu.zk
5)创建类名称ZkClient

3.3.2 创建 ZooKeeper 客户端

  1. public class ZkClient {
  2. //这里注意,前后不能有空格
  3. private String connectString = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.131:2181";
  4. private int sessionTimeout = 200000;
  5. private ZooKeeper zooKeeper;
  6. //创建 ZooKeeper 客户端
  7. @Before
  8. public void init() throws IOException {
  9. zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  10. @Override
  11. public void process(WatchedEvent watchedEvent) {
  12. }
  13. });
  14. }
  15. }

3.3.3 创建子节点

  1. @Test
  2. public void create() throws InterruptedException, KeeperException {
  3. String nodeCreated = zooKeeper.create("/atguigu", "ss.avi".getBytes(),
  4. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  5. }

3.3.4 获取子节点并监听节点变化

  1. @Before
  2. public void init() throws IOException {
  3. zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  4. //process中再次配置监听,因为每次发生变化后监听都会失效,需要重新监听,而process方法是在每次监听内容发生变化后执行
  5. //因此在process中再次进行监听,可以达到反复监听的效果
  6. @Override
  7. public void process(WatchedEvent watchedEvent) {
  8. System.out.println("--------------------------------------");
  9. List<String> children = null;
  10. try {
  11. children = zooKeeper.getChildren("/", true);
  12. for (String child : children) {
  13. System.out.println(child);
  14. }
  15. System.out.println("--------------------------------------");
  16. } catch (KeeperException e) {
  17. e.printStackTrace();
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. });
  23. }
  24. @Test
  25. public void getChildren() throws InterruptedException, KeeperException {
  26. List<String> children = zooKeeper.getChildren("/", true);
  27. for (String child : children) {
  28. System.out.println(child);
  29. }
  30. // 延时阻塞
  31. Thread.sleep(Long.MAX_VALUE);
  32. }

3.3.5 判断 Znode 是否存在

  1. // 判断 znode 是否存在
  2. @Test
  3. public void exist() throws InterruptedException, KeeperException {
  4. Stat stat = zooKeeper.exists("/atguigu", false);
  5. System.out.println(stat == null ? "不存在" : "存在");
  6. }

3.4 客户端向服务端写数据流程

3.4.1 写流程之写入请求直接发送给Leader节点

image.png
客户端发送写请求给Leader,Leader收到后会执行写请求,然后将写请求发送给Follower让其执行(同步),Follower同步后会返回一个ack给Leader;当集群上超过半数服务器完成了写请求,那么Leader会发送ack通知Client已完成请求;剩下的Follower会继续写数据同步信息,直到集群中所有的服务器均完成同步。

3.4.2 写流程之写入请求发送给follower节点

image.png
客户端发送写请求给Follower,Follower将写请求转发给Leader,Leader处理写请求,然后将写请求发送给Follower让其执行(同步),Follower同步后会返回一个ack给Leader;当集群上超过半数服务器完成了写请求,Leader会发送一个ack给接收Client的Follower告诉其半数服务器已完成同步,随后Follower会发送ack通知Client已完成请求;剩下的Follower会继续写数据同步信息,直到集群中所有的服务器均完成同步。

第 4 章 服务器动态上下线监听案例

4.1 需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

4.2 需求分析

image.png

4.3 具体实现

(1)先在集群上创建/servers 节点

[zk: VMUbuntu1:2181(CONNECTED) 20] create /servers “servers”

(2)在 Idea 中创建包名:com.atguigu.zkcase1

(3)服务器端客户端向 Zookeeper 注册代码

服务器端

  1. package com.atguigu.zkcase1;
  2. import org.apache.zookeeper.*;
  3. import java.io.IOException;
  4. public class DistributeServer {
  5. private String connectString = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.131:2181";
  6. private int sessionTimeout = 200000;
  7. private ZooKeeper zk;
  8. public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
  9. DistributeServer server = new DistributeServer();
  10. //1 获取zk连接
  11. server.getConnection();
  12. //2 注册服务器到zk集群
  13. server.regist(args[0]);
  14. //3 启动业务逻辑(sleep)
  15. server.business();
  16. }
  17. private void business() throws InterruptedException {
  18. Thread.sleep(Long.MAX_VALUE);
  19. }
  20. private void regist(String hostName) throws InterruptedException, KeeperException {
  21. String create = zk.create("/servers/" + hostName, hostName.getBytes(),
  22. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  23. System.out.println(hostName + "已经上线********");
  24. }
  25. private void getConnection() throws IOException {
  26. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  27. @Override
  28. public void process(WatchedEvent watchedEvent) {
  29. }
  30. });
  31. }
  32. }

客户端

  1. package com.atguigu.zkcase1;
  2. import org.apache.zookeeper.KeeperException;
  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. import org.apache.zookeeper.ZooKeeper;
  6. import java.io.IOException;
  7. import java.util.ArrayList;
  8. import java.util.List;
  9. public class DistributeClient {
  10. private String connectString = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.131:2181";
  11. private int sessionTimeout = 200000;
  12. private ZooKeeper zk;
  13. public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
  14. DistributeClient client = new DistributeClient();
  15. //1 获取zk连接
  16. client.getConnect();
  17. //2 监听/servers下面子节点的增加和删除
  18. client.getServerList();
  19. //3 业务逻辑(sleep)
  20. client.business();
  21. }
  22. // 业务功能
  23. private void business() throws InterruptedException {
  24. System.out.println("client is working ...");
  25. Thread.sleep(Long.MAX_VALUE);
  26. }
  27. // 获取服务器列表信息
  28. private void getServerList() throws InterruptedException, KeeperException {
  29. // 1 获取服务器子节点信息,并且对父节点进行监听
  30. List<String> children = zk.getChildren("/servers", true);
  31. // 2 存储服务器信息列表
  32. ArrayList<String> servers = new ArrayList<>();
  33. // 3 遍历所有节点,获取节点中的主机名称信息
  34. for (String child : children) {
  35. byte[] data = zk.getData("/servers/" + child, false, null);
  36. servers.add(new String(data));
  37. }
  38. // 4 打印服务器列表信息
  39. System.out.println(servers);
  40. }
  41. // 创建到 zk 的客户端连接
  42. private void getConnect() throws IOException {
  43. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  44. @Override
  45. public void process(WatchedEvent watchedEvent) {
  46. try {
  47. getServerList();
  48. } catch (InterruptedException e) {
  49. e.printStackTrace();
  50. } catch (KeeperException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. });
  55. }
  56. }

4.4 测试

image.png红框处可以给main传参

第 5 章 ZooKeeper 分布式锁案例

什么叫做分布式锁呢?
比如说”进程 1”在使用该资源的时候,会先去获得锁,”进程 1”获得锁以后会对该资源 尚硅谷技术之 Zookeeper保持独占,这样其他进程就无法访问该资源,”进程 1”用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
image.png

5.1 原生 Zookeeper 实现分布式锁案例

1)分布式锁实现

  1. package com.atguigu.zkdistributelock;
  2. import org.apache.zookeeper.*;
  3. import org.apache.zookeeper.data.Stat;
  4. import java.io.IOException;
  5. import java.util.Collections;
  6. import java.util.List;
  7. import java.util.concurrent.CountDownLatch;
  8. public class DistributedLock {
  9. private final String connectString = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.131:2181";
  10. private final int sessionTimeout = 200000;
  11. private final ZooKeeper zooKeeper;
  12. private CountDownLatch countDownLatch = new CountDownLatch(1);
  13. private CountDownLatch waitLatch = new CountDownLatch(1);
  14. private String waitPath;
  15. private String curNode;
  16. public DistributedLock() throws IOException, InterruptedException, KeeperException {
  17. //获取连接
  18. zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  19. @Override
  20. public void process(WatchedEvent watchedEvent) {
  21. //countDownLatch 如果连接上zk 可以释放
  22. if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
  23. countDownLatch.countDown();
  24. }
  25. //waitPath 需要释放
  26. //当前一个节点被删除的时候waitLatch释放
  27. //判断操作是否是删除节点,同时删除的节点是当前节点的前一个(监听的那个),如果是则释放
  28. if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
  29. waitLatch.countDown();
  30. }
  31. }
  32. });
  33. //等待zk正常连接后,往下走
  34. countDownLatch.await();
  35. //判断根节点/locks是否存在
  36. Stat stat = zooKeeper.exists("/locks", false);
  37. if (stat == null) {
  38. //创建根节点
  39. zooKeeper.create("/locks", "locks".getBytes(),
  40. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  41. }
  42. }
  43. //对zk加锁
  44. public void zkLock() {
  45. //创建对应的临时带序号节点
  46. try {
  47. curNode = zooKeeper.create("/locks/" + "seq-", null,
  48. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  49. //判断创建的节点是否是序号最小的那个,如果是获取到所;
  50. //如果不是,监听他序号前一个节点
  51. //(应该是监听他序号前的所有节点?因为前一个节点可能还没拿到锁就挂了)
  52. // 获取锁应该有超时时间,应该监听前面所有的节点,防止中间节点超时退出,
  53. // 导致后面的节点异常获取锁.
  54. List<String> children = zooKeeper.getChildren("/locks", false);
  55. //如果children只有一个值,那就直接获取锁;如果有多个节点,需要判断谁最小
  56. if (children.size() == 1) {
  57. return;
  58. }
  59. Collections.sort(children);
  60. //获取节点名称 seq-000000000x
  61. String thisNode = curNode.substring("/locks/".length());
  62. //通过seq-000000000x获取到当前节点在集合中的位置
  63. int index = children.indexOf(thisNode);
  64. //判断
  65. if (index == -1) {
  66. System.out.println("数据异常");
  67. } else if (index == 0) {
  68. //当前节点序号最小,可以获取锁
  69. return;
  70. } else {
  71. //需要监听当前节点的前一个节点
  72. waitPath = "/locks/" + children.get(index - 1);
  73. zooKeeper.getData(waitPath, true, null);
  74. //等待监听
  75. waitLatch.await();
  76. return;
  77. }
  78. } catch (KeeperException e) {
  79. e.printStackTrace();
  80. } catch (InterruptedException e) {
  81. e.printStackTrace();
  82. }
  83. }
  84. //释放锁
  85. public void unzkLock() throws InterruptedException, KeeperException {
  86. //删除节点
  87. zooKeeper.delete(curNode, -1);
  88. }
  89. }

2)分布式锁测试

有不清楚的兄弟,记住:

  • 1、这里的线程是模拟多个客户端同时访问集群数据
  • 2、lock1和lock2实际代表的是两个客户端
  • 3、这个原理实现的核心有几个:临时顺序节点(两个作用看前一节),还有就是await方法,监听节点删除事件 ```java package com.atguigu.zkdistributelock;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class DistributedLockTest { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { DistributedLock lock1 = new DistributedLock(); DistributedLock lock2 = new DistributedLock();

  1. new Thread(new Runnable() {
  2. @Override
  3. public void run() {
  4. try {
  5. lock1.zkLock();
  6. System.out.println("线程1启动");
  7. Thread.sleep(5 * 1000);
  8. lock1.unzkLock();
  9. System.out.println("线程1释放锁");
  10. } catch (InterruptedException | KeeperException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }).start();
  15. new Thread(new Runnable() {
  16. @Override
  17. public void run() {
  18. try {
  19. lock2.zkLock();
  20. System.out.println("线程2启动");
  21. Thread.sleep(5 * 1000);
  22. lock2.unzkLock();
  23. System.out.println("线程2释放锁");
  24. } catch (InterruptedException | KeeperException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. }).start();
  29. }

}

  1. <a name="qrBNT"></a>
  2. ## 5.2 Curator 框架实现分布式锁案例
  3. 详情请查看官方文档:[https://curator.apache.org/index.html](https://curator.apache.org/index.html)
  4. <a name="H6791"></a>
  5. ### 1)原生的 Java API 开发存在的问题
  6. - (1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
  7. - (2)Watch 需要重复注册,不然就不能生效
  8. - (3)开发的复杂性还是比较高的
  9. - (4)不支持多节点删除和创建。需要自己去递归
  10. <a name="bgEHC"></a>
  11. ### 2)Curator 案例实操
  12. <a name="ghfsp"></a>
  13. #### (1)添加依赖
  14. ```xml
  15. <dependency>
  16. <groupId>org.apache.curator</groupId>
  17. <artifactId>curator-framework</artifactId>
  18. <version>4.3.0</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.curator</groupId>
  22. <artifactId>curator-recipes</artifactId>
  23. <version>4.3.0</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.apache.curator</groupId>
  27. <artifactId>curator-client</artifactId>
  28. <version>4.3.0</version>
  29. </dependency>

(2)代码实现

  1. package com.atguigu.curatorframwork;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. public class CuratorLockTest {
  7. private static final String CONNECT_STRING = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.131:2181";
  8. public static void main(String[] args) {
  9. //创建分布式锁1
  10. InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
  11. //创建分布式锁2
  12. InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
  13. new Thread(new Runnable() {
  14. @Override
  15. public void run() {
  16. try {
  17. lock1.acquire();
  18. System.out.println("线程1获取到锁");
  19. lock1.acquire();
  20. System.out.println("线程1再次获取到锁");
  21. Thread.sleep(5000);
  22. lock1.release();
  23. System.out.println("线程1释放锁");
  24. lock1.release();
  25. System.out.println("线程1再次释放锁");
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }).start();
  31. new Thread(new Runnable() {
  32. @Override
  33. public void run() {
  34. try {
  35. lock2.acquire();
  36. System.out.println("线程2获取到锁");
  37. lock2.acquire();
  38. System.out.println("线程2再次获取到锁");
  39. Thread.sleep(5000);
  40. lock2.release();
  41. System.out.println("线程2释放锁");
  42. lock2.release();
  43. System.out.println("线程2再次释放锁");
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }).start();
  49. }
  50. private static CuratorFramework getCuratorFramework() {
  51. ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 3);
  52. CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STRING, 200000, 200000, retry);
  53. // CuratorFramework client = CuratorFrameworkFactory.builder().connectString(CONNECT_STRING)
  54. // .connectionTimeoutMs(200000).sessionTimeoutMs(200000).retryPolicy(retry).build();
  55. client.start();
  56. System.out.println("zookeeper 启动成功");
  57. return client;
  58. }
  59. }

第 6 章 企业面试真题(面试重点)

6.1 选举机制

半数机制,超过半数的投票通过,即通过。
(1)第一次启动选举规则:
投票过半数时,服务器 id 大的胜出
(2)第二次启动选举规则:
①EPOCH 大的直接胜出
②EPOCH 相同,事务 id 大的胜出
③事务 id 相同,服务器 id 大的胜出

6.2 生产集群安装多少 zk 合适?

安装奇数台。
生产经验:

  • 10 台服务器:3 台 zk;
  • 20 台服务器:5 台 zk;
  • 100 台服务器:11 台 zk;
  • 200 台服务器:11 台 zk

服务器台数多:好处,提高可靠性;坏处:提高通信延时