1. Zookeeper入门
Zookeeper 是一个开源的分布式的,为分布式框架提供协调服务的 Apache 项目。
1.1 概述
Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然 后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。用于服务注册与发现。
1.2 特点
1)Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
2)服务器集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。
3)全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
4)更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。
5)数据更新原子性,一次数据更新要么成功,要么失败。
6)实时性,在一定时间范围内,Client能读到最新数据。
1.3 数据结构
ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点(Server)称做一个 ZNode。每一个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识表示。(就是都可以通过唯一路径找到该节点)
1.4 应用场景
提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
1.4.1 统一命名服务
在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。例如:IP不容易记住,而域名容易记住。
1.4.2 统一配置管理
1)分布式环境下,配置文件同步非常常见。
- 一般要求一个集群中,所有节点的配置信息是一致的,比如 Kafka 集群。
- 对配置文件修改后,希望能够快速同步到各个节点上。
2)配置管理可交由ZooKeeper实现。
- 可将配置信息写入ZooKeeper上的一个Znode。
- 各个客户端服务器监听这个Znode。
一 旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器。
1.4.3 统一集群管理
1)分布式环境中,实时掌握每个节点的状态是必要的。
可根据节点实时状态做出一些调整。
2)ZooKeeper可以实现实时监控节点状态变化
- 可将节点信息写入ZooKeeper上的一个ZNode。(状态信息等等)
- 监听这个ZNode可获取它的实时状态变化。
1.4.4 服务器动态上下线
1.4.5 软负载均衡
在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求
2. Zookeeper下载及安装
需要提前安装jdk
下载网址:https://zookeeper.apache.org/
将下载的安装包复制到linux下
2.1 安装
2.1.1 安装前准备
解压到指定目录
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /usr/local/module/
修改解压文件夹名称(可省略)
mv apache-zookeeper-3.5.7-bin zookeeper-3.5.7
2.1.2 配置修改
将/opt/module/zookeeper-3.5.7/conf 这个路径下的 zoo_sample.cfg 修改为zoo.cfg;
mv zoo_sample.cfg zoo.cfg
在/usr/local/module/zookeeper-3.5.7目录下创建zkData文件夹
mkdir zkData
打开 zoo.cfg 文件,修改 dataDir 路径成以下内容:
dataDir=/usr/local/module/zookeeper-3.5.7/zkData
*端口占用问题
zookeeper v3.5启动后会占用8080端口,需要对端口进行修改,以免跟tomcat等发生冲突
- 方式一:禁用AdminServer;在zoo.cnf 配置文件中增加配置 admin.enableServer=false,这样就无法访问Zookeeper AdminServer了。访问方式:http//:localhost:port/commands
- 方式二:修改端口号;在zoo.cnf 配置文件中增加admin.serverPort=未占用的端口号,如8090,我这里改成了8022
新版Zookeeper v3.5启动后为什么占用8080端口,如何修改端口?
2.1.3 操作zookeeper
启动 Zookeeper ./zkServer.sh start
或者通过路径方式启动 bin/zkServer.sh start
查看进程是否启动 jps
查看状态 bin/zkServer.sh status
或者 ./zkServer.sh status
启动客户端 bin/zkCli.sh
./zkCli.sh
客户端内退出 quit
停止Zookeeper bin/zkServer.sh stop
./zkServer.sh stop
2.2 配置参数解读
- tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
- initLimit = 10:LF初始通信时限,Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)。例如tickTime=2000,如果在2000*10ms内心跳次数小于10次,那么zookeeper则认为本次连接失败
- syncLimit = 5:LF同步通信时限,Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。
- dataDir:保存Zookeeper中的数据。注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。
clientPort = 2181:客户端连接端口,通常不做修改。
3. Zookeeper 集群操作
3.1 集群操作
3.1.1 集群安装
1)集群规划
在三台虚拟机上的三个节点上都部署 Zookeeper。n台服务器需要部署多少台Zookeeper?
2)解压安装
3)配置服务器编号
在zookeeper解压目录下(我的是/usr/local/module/zookeeper-3.5.7)的zkData(上一步配置修改的dataDir目录)目录下创建一个myid的文件
- mkdir zkData
- vi myid
在myid文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格)
(1) 重命名/opt/module/zookeeper-3.5.7/conf 这个目录下的 zoo_sample.cfg 为 zoo.cfg
- (2) 打开 zoo.cfg 文件,修改数据存储路径配置
dataDir=/usr/local/module/zookeeper-3.5.7/zkData
这里的路径就是你创建的zkData的绝对路径
增加如下配置
#################cluster
server.2=hadoop102:2888:3888 server.3=hadoop103:2888:3888 server.4=hadoop104:2888:3888
配置参数解读 server.A=B:C:D
- A 是一个数字,表示这个是第几号服务器;集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个 server。
- B 是这个服务器的地址; 可以直接写ip地址
- C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
- D是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
- 同步配置文件
5)集群操作
分别启动Zookeeper,bin/zkServer.sh start
查看状态bin/zkServer.sh status
注意:启动过程中需要有半数以上的服务启动才能启动成功,需要关闭防火墙3.1.2 选举机制(面试重点)
1.第一次启动
- 服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态为LOOKING;
- 服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持为LOOKING;
- 服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;
- 服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;
- 服务器5启动,同4一样当小弟。
总的来说:当启动的服务器数量没有达到一半以上时,选票会集中投给myid大的服务器,直到达到一半以上时选出leader,一旦选出了leader,无论后面的服务器myid如何,均为follower。
- SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。
- ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和ZooKeeper服务器对于客户端“更新请求”的处理逻辑有关。
Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加。
2. 非第一次启动
1)当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
服务器初始化启动。
- 服务器运行期间无法和Leader保持连接。
2)而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
- 集群中本来就已经存在一个Leader。
- 对于第一种已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。
- 集群中确实不存在Leader。
3.1.3 zk集群启动停止脚本
1)在VMUbuntu1的/home/mrlinxi/bin 目录下创建zk.sh脚本
sudo vim zk.sh
#!/bin/bash
case $1 in
"start"){
for i in 192.168.190.128 192.168.190.129 192.168.190.131
do
echo ----------------- zookeeper-3 $i 启动 --------------------
ssh $i "sudo /usr/local/module/zookeeper-3.5.7/bin/zkServer.sh start"
done
}
;;
"stop"){
for i in 192.168.190.128 192.168.190.129 192.168.190.131
do
echo ----------------- zookeeper-3 $i 停止 --------------------
ssh $i "sudo /usr/local/module/zookeeper-3.5.7/bin/zkServer.sh stop"
done
}
;;
"status"){
for i in 192.168.190.128 192.168.190.129 192.168.190.131
do
echo ----------------- zookeeper-3 $i 状态 --------------------
ssh $i "sudo /usr/local/module/zookeeper-3.5.7/bin/zkServer.sh status"
done
}
;;
esac
解决CentOS Zookeeper JAVA_HOME is not set and java could not be found in PATH 问题
2)增加脚本执行权限
3)Zookeeper 集群启动脚本
4)Zookeeper 集群停止脚本
3.2 客户端命令行操作
3.2.1 命令行语法
- 1)启动客户端
bin/zkCli.sh -server VMUbuntu1:2181
这里VMUbuntu1是服务器域名也就是ip的别名(192.168.190.128) -
3.2.2 znode节点数据信息
1)查看当前znode中所包含的内容
ls /
2)查看当前节点详细数据
ls -s /
持久(Persistent):客户端和服务器端断开连接后,创建的节点(服务器)不删除
- (1)持久化目录节点:客户端与Zookeeper断开连接后,该节点依旧存在
- (2)持久化顺序编号目录节点:客户端与Zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
- 短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点(服务器)自己删除
- (3)临时目录节点:客户端与Zookeeper断开连接后,该节点被删除
- (4)临时顺序编号目录节点:客户端与 Zookeeper 断开连接后 , 该 节 点 被 删 除 , 只 是Zookeeper给该节点名称进行顺序编号。
说明:创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护
注意:在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序
1)分别创建2个普通节点(永久节点 + 不带序号)
create /sanguo “diaochan”
create /sanguo/shuguo “liubei”
2)获得节点的值
get -s /sanguo
3)创建带序号的节点(永久节点 + 带序号)
(1)先创建一个普通的根节点/sanguo/weiguo
create /sanguo/weiguo “caocao”
(2)创建带序号的节点
[zk: VMUbuntu1:2181(CONNECTED) 14] create -s /sanguo/weiguo/zhangliao “zhangliao” Created /sanguo/weiguo/zhangliao0000000000 [zk: VMUbuntu1:2181(CONNECTED) 15] create -s /sanguo/weiguo/zhangliao “zhangliao” Created /sanguo/weiguo/zhangliao0000000001
如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。
4)创建短暂节点(短暂节点 + 不带序号 or 带序号)
(1)创建短暂的不带序号的节点
[zk: VMUbuntu1:2181(CONNECTED) 2] create -e /sanguo/wuguo “zhouyu” Created /sanguo/wuguo
(2)创建短暂的带序号的节点
[zk: VMUbuntu1:2181(CONNECTED) 4] create -e -s /sanguo/wuguo “zhouyu” Created /sanguo/wuguo0000000003
5)修改节点数据值
将weiguo的caocao改为simayi
[zk: VMUbuntu1:2181(CONNECTED) 3] set /sanguo/weiguo “simayi”
3.2.4 *监听器原理
1、监听器原理详解
1)首先要有一个main()线程
2)在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
3)通过connect线程将注册的监听事件发送给Zookeeper。(告诉服务端我要监听什么)
4)在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
5)Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。(告诉客户端监听的东西发生了变化)
6)listener线程内部调用了process()方法;进行数据获取的一个处理。
2、常见的监听
1)监听节点数据的变化
get path [watch]
2)监听子节点增减的变化
ls path [watch]
1)节点的值变化监听
在VMCentOS1主机上监听/sanguo 节点数据变化
[zk: localhost:2181(CONNECTED) 1] get -w /sanguo
在 VMUbuntu2 主机上修改/sanguo 节点的数据
[zk: localhost:2181(CONNECTED) 0] set /sanguo “xishi”
观察 VMCentOS1 主机收到数据变化的监听
WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo
注意:在VMUbuntu2 上多次修改/sanguo的值,VMCentOS1上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。
2)节点的子节点变化监听(路径变化)
在 VMCentOS1主机上注册监听/sanguo 节点的子节点变化
[zk: localhost:2181(CONNECTED) 3] ls -w /sanguo [shuguo, weiguo]
在 VMUbuntu2 主机/sanguo 节点上创建子节点
[zk: localhost:2181(CONNECTED) 4] create /sanguo/jin “simayi” Created /sanguo/jin1
观察 VMCentOS1主机收到子节点变化的监听
WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo
注意:同节点值一样,节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。
3.2.5 节点删除与查看
1)删除节点
[zk: localhost:2181(CONNECTED) 7] delete /sanguo/jin
2)递归删除节点
删除weiguo,包括weiguo下的所有节点
[zk: localhost:2181(CONNECTED) 5] deleteall /sanguo/weiguo
3)查看节点状态
[zk: localhost:2181(CONNECTED) 7] stat /sanguo cZxid = 0x500000008 ctime = Tue Sep 14 01:47:48 UTC 2021 mZxid = 0x500000018 mtime = Tue Sep 14 02:31:42 UTC 2021 pZxid = 0x500000023 cversion = 10 dataVersion = 3 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 5 numChildren = 2
3.3 客户端 API 操作
前提:保证 hadoop102、hadoop103、hadoop104 服务器上 Zookeeper 集群服务端启动。
3.3.1 IDEA 环境搭建
1)创建一个工程:zookeeper
2)添加pom文件
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>
3)拷贝log4j.properties文件到项目根目录
需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n
4)创建包名com.atguigu.zk
5)创建类名称ZkClient
3.3.2 创建 ZooKeeper 客户端
public class ZkClient {
//这里注意,前后不能有空格
private String connectString = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.131:2181";
private int sessionTimeout = 200000;
private ZooKeeper zooKeeper;
//创建 ZooKeeper 客户端
@Before
public void init() throws IOException {
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
}
3.3.3 创建子节点
@Test
public void create() throws InterruptedException, KeeperException {
String nodeCreated = zooKeeper.create("/atguigu", "ss.avi".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
3.3.4 获取子节点并监听节点变化
@Before
public void init() throws IOException {
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
//process中再次配置监听,因为每次发生变化后监听都会失效,需要重新监听,而process方法是在每次监听内容发生变化后执行
//因此在process中再次进行监听,可以达到反复监听的效果
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("--------------------------------------");
List<String> children = null;
try {
children = zooKeeper.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
System.out.println("--------------------------------------");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
@Test
public void getChildren() throws InterruptedException, KeeperException {
List<String> children = zooKeeper.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
// 延时阻塞
Thread.sleep(Long.MAX_VALUE);
}
3.3.5 判断 Znode 是否存在
// 判断 znode 是否存在
@Test
public void exist() throws InterruptedException, KeeperException {
Stat stat = zooKeeper.exists("/atguigu", false);
System.out.println(stat == null ? "不存在" : "存在");
}
3.4 客户端向服务端写数据流程
3.4.1 写流程之写入请求直接发送给Leader节点
客户端发送写请求给Leader,Leader收到后会执行写请求,然后将写请求发送给Follower让其执行(同步),Follower同步后会返回一个ack给Leader;当集群上超过半数服务器完成了写请求,那么Leader会发送ack通知Client已完成请求;剩下的Follower会继续写数据同步信息,直到集群中所有的服务器均完成同步。
3.4.2 写流程之写入请求发送给follower节点
客户端发送写请求给Follower,Follower将写请求转发给Leader,Leader处理写请求,然后将写请求发送给Follower让其执行(同步),Follower同步后会返回一个ack给Leader;当集群上超过半数服务器完成了写请求,Leader会发送一个ack给接收Client的Follower告诉其半数服务器已完成同步,随后Follower会发送ack通知Client已完成请求;剩下的Follower会继续写数据同步信息,直到集群中所有的服务器均完成同步。
第 4 章 服务器动态上下线监听案例
4.1 需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
4.2 需求分析
4.3 具体实现
(1)先在集群上创建/servers 节点
[zk: VMUbuntu1:2181(CONNECTED) 20] create /servers “servers”
(2)在 Idea 中创建包名:com.atguigu.zkcase1
(3)服务器端客户端向 Zookeeper 注册代码
服务器端
package com.atguigu.zkcase1;
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private String connectString = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.131:2181";
private int sessionTimeout = 200000;
private ZooKeeper zk;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeServer server = new DistributeServer();
//1 获取zk连接
server.getConnection();
//2 注册服务器到zk集群
server.regist(args[0]);
//3 启动业务逻辑(sleep)
server.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void regist(String hostName) throws InterruptedException, KeeperException {
String create = zk.create("/servers/" + hostName, hostName.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostName + "已经上线********");
}
private void getConnection() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
}
客户端
package com.atguigu.zkcase1;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributeClient {
private String connectString = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.131:2181";
private int sessionTimeout = 200000;
private ZooKeeper zk;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeClient client = new DistributeClient();
//1 获取zk连接
client.getConnect();
//2 监听/servers下面子节点的增加和删除
client.getServerList();
//3 业务逻辑(sleep)
client.business();
}
// 业务功能
private void business() throws InterruptedException {
System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
}
// 获取服务器列表信息
private void getServerList() throws InterruptedException, KeeperException {
// 1 获取服务器子节点信息,并且对父节点进行监听
List<String> children = zk.getChildren("/servers", true);
// 2 存储服务器信息列表
ArrayList<String> servers = new ArrayList<>();
// 3 遍历所有节点,获取节点中的主机名称信息
for (String child : children) {
byte[] data = zk.getData("/servers/" + child, false, null);
servers.add(new String(data));
}
// 4 打印服务器列表信息
System.out.println(servers);
}
// 创建到 zk 的客户端连接
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
getServerList();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
});
}
}
4.4 测试
第 5 章 ZooKeeper 分布式锁案例
什么叫做分布式锁呢?
比如说”进程 1”在使用该资源的时候,会先去获得锁,”进程 1”获得锁以后会对该资源 尚硅谷技术之 Zookeeper保持独占,这样其他进程就无法访问该资源,”进程 1”用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
5.1 原生 Zookeeper 实现分布式锁案例
1)分布式锁实现
package com.atguigu.zkdistributelock;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributedLock {
private final String connectString = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.131:2181";
private final int sessionTimeout = 200000;
private final ZooKeeper zooKeeper;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath;
private String curNode;
public DistributedLock() throws IOException, InterruptedException, KeeperException {
//获取连接
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//countDownLatch 如果连接上zk 可以释放
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
//waitPath 需要释放
//当前一个节点被删除的时候waitLatch释放
//判断操作是否是删除节点,同时删除的节点是当前节点的前一个(监听的那个),如果是则释放
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
});
//等待zk正常连接后,往下走
countDownLatch.await();
//判断根节点/locks是否存在
Stat stat = zooKeeper.exists("/locks", false);
if (stat == null) {
//创建根节点
zooKeeper.create("/locks", "locks".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
//对zk加锁
public void zkLock() {
//创建对应的临时带序号节点
try {
curNode = zooKeeper.create("/locks/" + "seq-", null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//判断创建的节点是否是序号最小的那个,如果是获取到所;
//如果不是,监听他序号前一个节点
//(应该是监听他序号前的所有节点?因为前一个节点可能还没拿到锁就挂了)
// 获取锁应该有超时时间,应该监听前面所有的节点,防止中间节点超时退出,
// 导致后面的节点异常获取锁.
List<String> children = zooKeeper.getChildren("/locks", false);
//如果children只有一个值,那就直接获取锁;如果有多个节点,需要判断谁最小
if (children.size() == 1) {
return;
}
Collections.sort(children);
//获取节点名称 seq-000000000x
String thisNode = curNode.substring("/locks/".length());
//通过seq-000000000x获取到当前节点在集合中的位置
int index = children.indexOf(thisNode);
//判断
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
//当前节点序号最小,可以获取锁
return;
} else {
//需要监听当前节点的前一个节点
waitPath = "/locks/" + children.get(index - 1);
zooKeeper.getData(waitPath, true, null);
//等待监听
waitLatch.await();
return;
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//释放锁
public void unzkLock() throws InterruptedException, KeeperException {
//删除节点
zooKeeper.delete(curNode, -1);
}
}
2)分布式锁测试
有不清楚的兄弟,记住:
- 1、这里的线程是模拟多个客户端同时访问集群数据
- 2、lock1和lock2实际代表的是两个客户端
- 3、这个原理实现的核心有几个:临时顺序节点(两个作用看前一节),还有就是await方法,监听节点删除事件 ```java package com.atguigu.zkdistributelock;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class DistributedLockTest { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { DistributedLock lock1 = new DistributedLock(); DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.zkLock();
System.out.println("线程1启动");
Thread.sleep(5 * 1000);
lock1.unzkLock();
System.out.println("线程1释放锁");
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.zkLock();
System.out.println("线程2启动");
Thread.sleep(5 * 1000);
lock2.unzkLock();
System.out.println("线程2释放锁");
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}).start();
}
}
<a name="qrBNT"></a>
## 5.2 Curator 框架实现分布式锁案例
详情请查看官方文档:[https://curator.apache.org/index.html](https://curator.apache.org/index.html)
<a name="H6791"></a>
### 1)原生的 Java API 开发存在的问题
- (1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
- (2)Watch 需要重复注册,不然就不能生效
- (3)开发的复杂性还是比较高的
- (4)不支持多节点删除和创建。需要自己去递归
<a name="bgEHC"></a>
### 2)Curator 案例实操
<a name="ghfsp"></a>
#### (1)添加依赖
```xml
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
(2)代码实现
package com.atguigu.curatorframwork;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest {
private static final String CONNECT_STRING = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.131:2181";
public static void main(String[] args) {
//创建分布式锁1
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
//创建分布式锁2
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.acquire();
System.out.println("线程1获取到锁");
lock1.acquire();
System.out.println("线程1再次获取到锁");
Thread.sleep(5000);
lock1.release();
System.out.println("线程1释放锁");
lock1.release();
System.out.println("线程1再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.acquire();
System.out.println("线程2获取到锁");
lock2.acquire();
System.out.println("线程2再次获取到锁");
Thread.sleep(5000);
lock2.release();
System.out.println("线程2释放锁");
lock2.release();
System.out.println("线程2再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
private static CuratorFramework getCuratorFramework() {
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STRING, 200000, 200000, retry);
// CuratorFramework client = CuratorFrameworkFactory.builder().connectString(CONNECT_STRING)
// .connectionTimeoutMs(200000).sessionTimeoutMs(200000).retryPolicy(retry).build();
client.start();
System.out.println("zookeeper 启动成功");
return client;
}
}
第 6 章 企业面试真题(面试重点)
6.1 选举机制
半数机制,超过半数的投票通过,即通过。
(1)第一次启动选举规则:
投票过半数时,服务器 id 大的胜出
(2)第二次启动选举规则:
①EPOCH 大的直接胜出
②EPOCH 相同,事务 id 大的胜出
③事务 id 相同,服务器 id 大的胜出
6.2 生产集群安装多少 zk 合适?
安装奇数台。
生产经验:
- 10 台服务器:3 台 zk;
- 20 台服务器:5 台 zk;
- 100 台服务器:11 台 zk;
- 200 台服务器:11 台 zk
服务器台数多:好处,提高可靠性;坏处:提高通信延时