概述❓
开源的,分布式,为分布式框架提供协调服务的Apache项目。
工作机制: 基于观察者模式的分布式服务管理框架,负责存储和管理数据,接受观察者的注册。一旦数据状态发生变化,Zookeeper负责通知注册的观察者做出相应反应。
- 一个Leader,多个Follower组成的集群。
- 只要有半数以上的节点存活,Zookeeper集群就能正常服务。因此zk集群适合安排奇数台服务器。
- 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致
- 数据更新原子性,要么更新成功,要么失败。
- 实时性。一定时间范围内,Client读到最新数据
更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。
数据结构⚓️
类似于Unix文件系统。每个节点称作一个ZNode。每个ZNode默认能存储1MB的数据,通过其路径唯一标识。
应用场景
统一命名服务
分布式环境,对应用/服务进行统一命名,便于识别。
统一配置管理
配置文件同步,一个集群中所有节点的配置信息是一致的(Kafka集群),对于配置文件修改后,希望可以快速同步到各个节点。可将配置信息写入zk中的一个ZNode,各客户端服务器监听这个Znode,
统一集群管理
分布式环境中需要实时掌握各个节点状态,根据节点状态实时做出调整。可以将2节点信息写入ZNode,监听Znode获取实时状态变化。
服务器节点动态上下线
软负载均衡
zk中记录每台服务器的访问数,让访问数最少的服务器处理最新的客户端请求。下载配置
解压
[atguigu@hadoop102 software]$ tar -zxvf apache-zookeeper-3.5.7- bin.tar.gz -C /opt/module/
配置修改
(1)将/opt/module/zookeeper-3.5.7/conf 这个路径下的 zoo_sample.cfg 修改为 zoo.cfg; [atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
(2)打开 zoo.cfg 文件,修改 dataDir 路径:
[atguigu@hadoop102 zookeeper-3.5.7]$ vim zoo.cfg
修改如下内容:
dataDir=/opt/module/zookeeper-3.5.7/zkData
(3)在/opt/module/zookeeper-3.5.7/这个目录上创建 zkData 文件夹
[atguigu@hadoop102 zookeeper-3.5.7]$ mkdir zkData操作 Zookeeper
(1)启动 Zookeeper
[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start
(2)查看进程是否启动
[atguigu@hadoop102 zookeeper-3.5.7]$ jps 4020 Jps
4001 QuorumPeerMain
(3)查看状态
[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh status ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfgMode: standalone
(4)启动客户端
[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh
(5)退出客户端:
[zk: localhost:2181(CONNECTED) 0] quit
(6)停止 Zookeeper
[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh stop集群操作
🖖选举机制
第一次启动(5台zk服务器集群)
服务器1启动后发起一次选举,将票投给自己,不够半数以上,服务器1状态保持为LOOKING
- 服务器2启动后发起一次选举。服务器1和2分别投自己一票并交换选票信息。服务器1发现服务器2的myid比自己目前投票选举(服务器1)的myid大,更改投票推举服务器2。服务器1 0票,服务器2 2票,没有半数以上,选举无法完成,服务器1 2 状态保持为LOOKING
- 服务器3启动后发起一次选举。服务器1,2都会投票给服务器3.此时服务器1服务器2服务器3的票数情况为0,0,3。服务器3的票数超过半数,当选Leader。服务器1服务器2的状态更改为FOLLOWING,服务器3的状态为LEADING
- 服务器4启动后发起一次选举。服务器123不是LOOKING状态,不会改变选票信息。因此服务器3为3票,服务器4一票,服务器4服从多数更改选票信息为服务器3,并更改状态为FOLLOWING.
- 服务器5启动后发起一次选举。同服务器4一样,更改状态为FOLLOWING。
👓SID: 服务器ID。唯一标识一台zk集群中的机器,每台机器不能重复,与myid一致。
👓ZXID:事务ID。标识一次服务器状态的变更。某一时刻,集群中每台机器的ZXID值不一定完全一致,这与ZK服务器对于客户端更新请求的处理器逻辑有关。
👓Epoch: 每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是相同的,每投完票逻辑时钟值会增加。
第二次启动
- 当zk集群中的一台服务器出现以下情况会进行Leader选举:1. 服务器初始化启动2. 服务器运行时期无法与Leader保持连接。
- 当一台机器进入Leader选举流程,集群也可能会处于以下两种状态:
- 集群中本来存在一个Leader:会被告知当前服务器的Leader信息,仅需与Leader建立连接,进行状态同步。
- 集群中不存在Leader:假设zk集群五台服务器,SID分别为1/2/3/4/5,ZXID分别为8/8/8/7/7,此时SID为3的服务器是Leader。某一时刻3和5服务器出现故障,进行Leader选举。 | SID的投票情况 | EPOCH,ZXID,SID | 1.EPOCH大的直接胜出 | | —- | —- | —- | | 1 | 1,8,1 | 2.EPOCH相同,事务id大的胜出 | | 2 | 1,8,2 | 3.事务id相同,服务器id大的胜出 | | 4 | 1,7,4 | EPOCH>ZXID>SID |
客户端操作
znode节点数据信息
指定主机和端口号启动客户端:bin/zkCli.sh -server node1:2181
[zk: node1:2181(CONNECTED) 1] ls -s /
[zookeeper]cZxid = 0x0
# znode被创建时的毫秒数
ctime = Thu Jan 01 08:00:00 CST 1970
# znode最后更新的事务zxid
mZxid = 0x0
# znode最后修改的毫秒数
mtime = Thu Jan 01 08:00:00 CST 1970
# znode最后更新的子节点zxid
pZxid = 0x0
# znode子节点修改次数
cversion = -1
# znode数据变化号
dataVersion = 0
# znode访问控制列表的变化号
aclVersion = 0
ephemeralOwner = 0x0
# znode数据长度
dataLength = 0
# znode子节点个数
numChildren = 1
节点类型
持久(Persistent
客户端和服务器端断开连接后,创建节点不删除
持久化目录节点
持久化顺序编号目录节点:断开连接后,zk给节点名称进行顺序编号。顺序号单调递增,由父节点维护。
🔣顺序号可以被用于为所有事件全局排序,客户端通过顺序号推断事件顺序。
短暂(Ephemeral
客户端和服务器端断开连接后,创建节点立即删除
临时目录节点
临时顺序编号目录节点:断开连接后节点被删除,只是zk给该节点名称进行顺序编号
ls / #查看所有节点 [zookeeper]
create /sanguo "diaochan" #创建永久节点三国 赋值diaochan [sanguo,zookeeper]
create /sanguo/shuguo "liubei" # c创建sanguo的子节点shuguo 赋值liubei [sanguo,zookeeper]
ls /sanguo #[shuguo]
get -s /sanguo #diaochan
create /sanguo/weiguo "caocao"
ls /sanguo #[shuguo,weiguo]
create -s /sanguo/weiguo/zhangliao "zhangliao" #-s创建永久节点带序号
ls /sanguo/weiguo [zhangliao00000000000] #带序号的节点不允许重复
# 退出客户端后不会被删除
create -e /sanguo/wuguo "zhouyu" #-e创建临时节点zhouyu
ls /sanguo [shuguo,weiguo,wuguo]
create -e -s /sanguo/wuguo "zhouyu"
ls /sanguo/wuguo []
ls /sanguo #[shuguo,weiguo,wuguo,wuguo00000000003]
# 临时节点退出客户端后会被删除
set /sanguo/weiguo "simayi" #修改节点值
监听器原理
- main()线程
- 在main()中创建Zookeeper客户端,这时会创建两个线程,一个负责网络连接通信(connect),一个负责监听(listener)
- 通过connect线程将注册的监听事件发送给Zookeeper
- 在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中
- Zookeeper监听到有数据或路径变化,会把消息发送给listener线程
- listener线程内部调用process()方法
客户端API操作
连接、增加节点
```java public class zkClient {
String connectString="192.168.213.100:2181,192.168.213.101:2181,192.168.213.120:2181";
int sessionTimeout=2000;
ZooKeeper zkClient;
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher(){
public void process(WatchedEvent watchedEvent) {
}
});
}
@Test
public void create() throws KeeperException, InterruptedException {
String nodeCreated = zkClient.create("/atcompany", "ss.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
<a name="eGJgL"></a>
#### 获取子节点并监听状态变化
<a name="WSZv0"></a>
#### [判断某个节点是否存在](https://zookeeper.apache.org/)
```java
//此时需关闭监听process()
@Test
public void isExist() throws KeeperException, InterruptedException {
Stat stat = zkClient.exists("/atcompany", false);
System.out.println(stat==null?"not exist":"exist");
}
服务器动态上下线监听案例
package com.atcompany.case1;
import org.apache.zookeeper.*;
import java.io.IOException;
import static java.lang.Long.MAX_VALUE;
//服务器
public class DistributeServer {
ZooKeeper zk;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeServer server = new DistributeServer();
//连接zk
String connectString = "192.168.213.100:2181,192.168.213.101:2181,192.168.213.120:2181";
int sessionTimeout = 2000;
server.getConnect(connectString,sessionTimeout);
//2.注册服务器到zk集群
server.register(args[0]);
//3.启动业务逻辑
server.business();
}
private void business() throws InterruptedException {
Thread.sleep(MAX_VALUE);
}
private void register(String hostname) throws KeeperException, InterruptedException {
String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname+"is online");
}
private void getConnect(String connectString,int sessionTimeout) throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
}
});
}
}
客户端监听状态变化
package com.atcompany.case1;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributeClient {
String connectString= "192.168.213.100:2181,192.168.213.101:2181,192.168.213.120:2181";
int sessionTimeout = 2000;
ZooKeeper zk;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeClient client = new DistributeClient();
//1. zk连接
client.getConnect();
//2.监听/servers下的子节点的增加和删除
client.getServerList();
//3. 业务逻辑
client.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void getServerList() throws KeeperException, InterruptedException {
ArrayList<String> servers = new ArrayList<String>();
List<String> children = zk.getChildren("/servers", true);
for(String child:children){
byte[] data = zk.getData("/servers/" + child, false, null);
servers.add(new String(data));
}
System.out.println(servers);
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
//保证时刻监听节点状态
try {
getServerList();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
分布式锁
🔐分布式锁:进程使用资源时,先去获得锁。获得锁之后对资源保持独占,这样其他进程无法访问该资源。进程使用完资源后释放锁,让其他进程来获取。保证分布式系统多个进程有序的访问临界资源。
- 接收到请求后,在/locks节点下创建一个临时 顺序 节点
- 判断当前节点是否为最小节点:是,获取到锁;不是,对前一个节点进行监听。
- 处理完业务后,delete节点,释放锁,下面的节点收到通知,重复第二步。
原生Zookeeper实现分布式锁
```java package com.atcompany.lock;
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat;
import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;
public class DistributeLock {
public final String connectString = "192.168.213.100:2181,192.168.213.101:2181,192.168.213.120:2181";
int sessionTimeout = 2000;
String waitPath = "";
ZooKeeper zk;
//java的计数器,用来进行线程唤醒,且为原子性操作 //wait()休眠方法,count+1继续执行,count==0 继续休眠 private CountDownLatch countDownLatch = new CountDownLatch(1); //等待前一步骤执行完毕 private CountDownLatch waitDownLatch = new CountDownLatch(1); private String currentMode;
public DistributeLock() throws IOException, InterruptedException, KeeperException {
//获取连接
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
//进行监听
//zk连接 释放countDownLatch
if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
countDownLatch.countDown();
}
//前一个路径对比
if(watchedEvent.getType()==Event.EventType.NodeDeleted&&watchedEvent.getPath().equals(waitPath)){
waitDownLatch.countDown();
}
}
});
//等待zookeeper连接后继续运行程序
countDownLatch.await();
//判断根结点/locks是否存在
//false表示不监听
Stat stat = zk.exists("/locks",false);
if(stat==null){
//在/locks下创建子节点
String create = zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* 加锁操作
*/
public void zklock(){
//在/locks节点下创建临时顺序节
try {
final String currentMode = zk.create("/locks" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zk.getChildren("/locks", false);
if(children.size()==1){
//children只有一个值 直接获取锁
return ;
}else {
Collections.sort(children);
//获取节点名称 seq-000000
String substring = currentMode.substring("/locks/".length());
int index = children.indexOf(substring);
if(index==-1){
System.out.println("error");
}else if(index==0){
//就一个节点
return ;
}else{
//需要监听 前一个节点变化
waitPath = "/locks" + children.get(index - 1);
zk.getData(waitPath,true,null);
waitDownLatch.wait();
return ;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//判断创建节点序号是否最小:是,获取锁;不是,监听序号前一个节点
}
/**
* 解锁操作
*/
public void unlock() throws KeeperException, InterruptedException {
//删除当前节点
zk.delete(currentMode,-1);
}
}
测试案例:线程lock1 和 lock2对锁的竞争
```java
package com.atcompany.lock;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class DistributedLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
final DistributeLock lock1 = new DistributeLock();
final DistributeLock lock2 = new DistributeLock();
new Thread(new Runnable() {
public void run() {
try {
lock1.zklock();
System.out.println("线程1启动,获取到锁");
Thread.sleep(5*1000);
lock1.unlock();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程1释放锁");
}
}).start();
new Thread(new Runnable() {
public void run() {
try {
lock2.zklock();
System.out.println("线程1启动,获取到锁");
Thread.sleep(5*1000);
lock2.unlock();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程1释放锁");
}
}).start();
}
}
Curator框架实现分布式锁
❎原生java api的问题:
- 会话异步连接(CountDownLatch
- Watch需重复注册
- 开发复杂性高
- 不支持多节点的删除与创建
```java package com.atcompany.curator;<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest {
public static void main(String[] args) {
//创建分布式锁1
final InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(),"/locks");
//分布式锁2
final InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(),"/locks");
new Thread(new Runnable() {
public void run() {
try {
lock1.acquire();
System.out.println("线程1 获取到锁");
lock1.acquire();
System.out.println("线程1 再次获取到锁");
lock1.release();
System.out.println("线程1 释放锁");
lock1.release();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("线程1 再次释放锁");
}
}).start();
new Thread(new Runnable() {
public void run() {
try {
lock2.acquire();
System.out.println("线程1 获取到锁");
lock2.acquire();
System.out.println("线程1 再次获取到锁");
lock2.release();
System.out.println("线程1 释放锁");
lock2.release();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("线程1 再次释放锁");
}
}).start();
}
private static CuratorFramework getCuratorFramework() {
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.213.100:2181,192.168.213.101:2181,192.168.213.120:2181")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(policy)
.build();
//启动客户端
client.start();
System.out.println("zookeeper启动成功");
return client;
}
}
源码分析
PAXOS算法
基于消息传递且具有高度容错性的一致性算法。快速正确的在一个分布式系统中对某个数据值达成一致,且保证不论发生任何异常,都不会破坏系统一致性。
将Paxos系统中,将所有节点划分为Proposer(提议者),Acceptor(接受者),Learner(学习者)。每个节点可以身兼数职。
一个完整的Paxos算法流程的三个阶段:
- Prepare
- Proposer向多个Acceptor发出Propose
- Acceptor根据收到的Proprose请求进行Promise
- Accept
- Proposer收到多数Acceptor承诺的Promise后,向Acceptor发出Propose请求
- Acceptor针对收到的Propose请求进行Accept处理
- Learn
- Proposer将形成的决议发送给所有Learners
- Prepare:Proposer生成全局唯一且递增的Proposal ID,向所有Acceptor发送Propose请求,无需携带提案内容,只携带proposal ID
- Promise:Accept收到Propose请求,做出两个承诺,一个应答
- 不再接受 <= 当前请求Proposal ID的Propose请求
- 不再接受 < 当前请求的Accept请求
- 不违背以前做出的承诺下,回复已经Accept过的提案中的Proposal ID最大的提案的Value和Proposal ID,没有则返回控制。
- Propose: Proposer收到多数Acceptor的Promise应答后,从应答中选择Proposal ID最大的提案的Value,作为本次要发起的提案。如果所有应答的提案Value均为空值,可以自己随意决定提案Value。然后携带当前Proposal ID,向所有Acceptor发送Propose请求
- Accept:Acceptor收到Propose,不违背已做出的承诺下,接受并持久化当前Proposal ID和提案Value
- Learn:Proposal收到多数Accepor的accept后,决议生成,将形成的决议发送给所有Learner。
当多个Proposers相互争夺Acceptor,会无法一致。因此改进Paxos算法:从系统中选出一个节点作为Leader。只有Leader能够发起提案。
ZAB协议
特别为zk设计的支持崩溃恢复的原子广播协议。zk设计为只有一个Leader负责处理外部的写事务请求,Leader客户端将数据同步到其他Follower节点。即zk只有一个Leader可以发起提案。
消息广播
- 客户端发起写操作请求
- Leader将客户端的请求转化为事务Proposal提案,同时为每个Proposal分配一个全局ID,即zxid
- Leader为每个Follower服务器分配一个单独的队列,将需要广播的Proposal依次放到队列,根据FIFO策略进行消息发送。
- Follower接收到Proposal后,会首先将其以事务日志的方式写入本地磁盘,写入成功后向Leader反馈一个Ack响应消息。
- Leader接收到超过半数以上Follower的Ack响应消息后,认为消息发送成功,可以发送commit消息。
- Leader向所有Follower广播commit,同时自身完成事务提交。Follower接收到commit提交,会将上一条事务提交
- zk即一台服务器提交了Proposal,确保所有服务器都能正确提交Proposal
崩溃恢复
Leader服务器出现崩溃或由于网络原因导致Leader服务器失去了过半Follower联系。
- Leader提出失误后宕机
- 事务在Leader提交后,过半的Follower响应Ack。但是Leader在Commit消息后宕机
前提:已经被Leader提交的Proposal,必须最终被Follower提交;丢弃被Leader提出但没有提交的Proposal。
Leader选举
- 新的Leader必须都是已经提交了Proposal的Follower服务器节点,不能包含未提交的Proposal。
-
数据同步
确定事务日志后所有的Proposal是否被集群中过半的服务器commit
- 等Follower将所有未同步的事务Proposal都从Leader服务器同步过,且应用到内存数据后,Leader才会把该Follower加入到真正可用的Follower列表。
CAP📚
一个分布式系统不可能同时满足 一致性Consistency 可用性Available 分区容错性Partition Tolerance。最多只能满足其中两项。
一致性:多个副本之间的数据一致的特性
可用性:系统提供的服务一直处于可用状态,对于用户的操作请求总是在有限时间内返回结果。
分区容错性:分布式系统遇到任何网络分区故障,仍然对外保证提供满足一致性和可用性的服务,除非整个网络都发生故障。
zk保证了cp,不能保证每次服务请求的可用性。进行Leader选举时集群都是不可用的。