概述❓

开源的,分布式,为分布式框架提供协调服务的Apache项目。
工作机制: 基于观察者模式的分布式服务管理框架,负责存储和管理数据,接受观察者的注册。一旦数据状态发生变化,Zookeeper负责通知注册的观察者做出相应反应。

  1. 一个Leader,多个Follower组成的集群。
  2. 只要有半数以上的节点存活,Zookeeper集群就能正常服务。因此zk集群适合安排奇数台服务器。
  3. 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致
  4. 数据更新原子性,要么更新成功,要么失败。
  5. 实时性。一定时间范围内,Client读到最新数据
  6. 更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。

    数据结构⚓️

    类似于Unix文件系统。每个节点称作一个ZNode。每个ZNode默认能存储1MB的数据,通过其路径唯一标识。

    应用场景

    统一命名服务
    分布式环境,对应用/服务进行统一命名,便于识别。
    统一配置管理
    配置文件同步,一个集群中所有节点的配置信息是一致的(Kafka集群),对于配置文件修改后,希望可以快速同步到各个节点。可将配置信息写入zk中的一个ZNode,各客户端服务器监听这个Znode,截屏2022-02-02 下午7.13.11.png
    统一集群管理
    分布式环境中需要实时掌握各个节点状态,根据节点状态实时做出调整。可以将2节点信息写入ZNode,监听Znode获取实时状态变化。
    服务器节点动态上下线
    截屏2022-02-02 下午7.15.14.png
    软负载均衡
    zk中记录每台服务器的访问数,让访问数最少的服务器处理最新的客户端请求。

    下载配置

    解压

    [atguigu@hadoop102 software]$ tar -zxvf apache-zookeeper-3.5.7- bin.tar.gz -C /opt/module/

    配置修改

    (1)将/opt/module/zookeeper-3.5.7/conf 这个路径下的 zoo_sample.cfg 修改为 zoo.cfg; [atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
    (2)打开 zoo.cfg 文件,修改 dataDir 路径:
    [atguigu@hadoop102 zookeeper-3.5.7]$ vim zoo.cfg
    修改如下内容:
    dataDir=/opt/module/zookeeper-3.5.7/zkData
    (3)在/opt/module/zookeeper-3.5.7/这个目录上创建 zkData 文件夹
    [atguigu@hadoop102 zookeeper-3.5.7]$ mkdir zkData

    操作 Zookeeper

    (1)启动 Zookeeper
    [atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start
    (2)查看进程是否启动
    [atguigu@hadoop102 zookeeper-3.5.7]$ jps 4020 Jps
    4001 QuorumPeerMain
    (3)查看状态
    [atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh status ZooKeeper JMX enabled by default
    Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfgMode: standalone
    (4)启动客户端
    [atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh
    (5)退出客户端:
    [zk: localhost:2181(CONNECTED) 0] quit
    (6)停止 Zookeeper
    [atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh stop
    FF5D}R5KIQJSNG`5ZR{R3WY.png

    集群操作

    🖖选举机制

    第一次启动(5台zk服务器集群)

  7. 服务器1启动后发起一次选举,将票投给自己,不够半数以上,服务器1状态保持为LOOKING

  8. 服务器2启动后发起一次选举。服务器1和2分别投自己一票并交换选票信息。服务器1发现服务器2的myid比自己目前投票选举(服务器1)的myid大,更改投票推举服务器2。服务器1 0票,服务器2 2票,没有半数以上,选举无法完成,服务器1 2 状态保持为LOOKING
  9. 服务器3启动后发起一次选举。服务器1,2都会投票给服务器3.此时服务器1服务器2服务器3的票数情况为0,0,3。服务器3的票数超过半数,当选Leader。服务器1服务器2的状态更改为FOLLOWING,服务器3的状态为LEADING
  10. 服务器4启动后发起一次选举。服务器123不是LOOKING状态,不会改变选票信息。因此服务器3为3票,服务器4一票,服务器4服从多数更改选票信息为服务器3,并更改状态为FOLLOWING.
  11. 服务器5启动后发起一次选举。同服务器4一样,更改状态为FOLLOWING。

👓SID: 服务器ID。唯一标识一台zk集群中的机器,每台机器不能重复,与myid一致。
👓ZXID:事务ID。标识一次服务器状态的变更。某一时刻,集群中每台机器的ZXID值不一定完全一致,这与ZK服务器对于客户端更新请求的处理器逻辑有关。
👓Epoch: 每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是相同的,每投完票逻辑时钟值会增加。

第二次启动

  1. 当zk集群中的一台服务器出现以下情况会进行Leader选举:1. 服务器初始化启动2. 服务器运行时期无法与Leader保持连接。
  2. 当一台机器进入Leader选举流程,集群也可能会处于以下两种状态:
    1. 集群中本来存在一个Leader:会被告知当前服务器的Leader信息,仅需与Leader建立连接,进行状态同步。
    2. 集群中不存在Leader:假设zk集群五台服务器,SID分别为1/2/3/4/5,ZXID分别为8/8/8/7/7,此时SID为3的服务器是Leader。某一时刻3和5服务器出现故障,进行Leader选举。 | SID的投票情况 | EPOCH,ZXID,SID | 1.EPOCH大的直接胜出 | | —- | —- | —- | | 1 | 1,8,1 | 2.EPOCH相同,事务id大的胜出 | | 2 | 1,8,2 | 3.事务id相同,服务器id大的胜出 | | 4 | 1,7,4 | EPOCH>ZXID>SID |

客户端操作

znode节点数据信息

指定主机和端口号启动客户端:bin/zkCli.sh -server node1:2181

  1. [zk: node1:2181(CONNECTED) 1] ls -s /
  2. [zookeeper]cZxid = 0x0
  3. # znode被创建时的毫秒数
  4. ctime = Thu Jan 01 08:00:00 CST 1970
  5. # znode最后更新的事务zxid
  6. mZxid = 0x0
  7. # znode最后修改的毫秒数
  8. mtime = Thu Jan 01 08:00:00 CST 1970
  9. # znode最后更新的子节点zxid
  10. pZxid = 0x0
  11. # znode子节点修改次数
  12. cversion = -1
  13. # znode数据变化号
  14. dataVersion = 0
  15. # znode访问控制列表的变化号
  16. aclVersion = 0
  17. ephemeralOwner = 0x0
  18. # znode数据长度
  19. dataLength = 0
  20. # znode子节点个数
  21. numChildren = 1

节点类型

持久(Persistent

客户端和服务器端断开连接后,创建节点不删除
持久化目录节点
持久化顺序编号目录节点:断开连接后,zk给节点名称进行顺序编号。顺序号单调递增,由父节点维护。
🔣顺序号可以被用于为所有事件全局排序,客户端通过顺序号推断事件顺序。

短暂(Ephemeral

客户端和服务器端断开连接后,创建节点立即删除
临时目录节点
临时顺序编号目录节点:断开连接后节点被删除,只是zk给该节点名称进行顺序编号

  1. ls / #查看所有节点 [zookeeper]
  2. create /sanguo "diaochan" #创建永久节点三国 赋值diaochan [sanguo,zookeeper]
  3. create /sanguo/shuguo "liubei" # c创建sanguo的子节点shuguo 赋值liubei [sanguo,zookeeper]
  4. ls /sanguo #[shuguo]
  5. get -s /sanguo #diaochan
  6. create /sanguo/weiguo "caocao"
  7. ls /sanguo #[shuguo,weiguo]
  8. create -s /sanguo/weiguo/zhangliao "zhangliao" #-s创建永久节点带序号
  9. ls /sanguo/weiguo [zhangliao00000000000] #带序号的节点不允许重复
  10. # 退出客户端后不会被删除
  11. create -e /sanguo/wuguo "zhouyu" #-e创建临时节点zhouyu
  12. ls /sanguo [shuguo,weiguo,wuguo]
  13. create -e -s /sanguo/wuguo "zhouyu"
  14. ls /sanguo/wuguo []
  15. ls /sanguo #[shuguo,weiguo,wuguo,wuguo00000000003]
  16. # 临时节点退出客户端后会被删除
  17. set /sanguo/weiguo "simayi" #修改节点值

监听器原理

  1. main()线程
  2. 在main()中创建Zookeeper客户端,这时会创建两个线程,一个负责网络连接通信(connect),一个负责监听(listener)
  3. 通过connect线程将注册的监听事件发送给Zookeeper
  4. 在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中
  5. Zookeeper监听到有数据或路径变化,会把消息发送给listener线程
  6. listener线程内部调用process()方法

    客户端API操作

    连接、增加节点

    ```java public class zkClient {
  1. String connectString="192.168.213.100:2181,192.168.213.101:2181,192.168.213.120:2181";
  2. int sessionTimeout=2000;
  3. ZooKeeper zkClient;
  4. @Before
  5. public void init() throws IOException {
  6. zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher(){
  7. public void process(WatchedEvent watchedEvent) {
  8. }
  9. });
  10. }
  11. @Test
  12. public void create() throws KeeperException, InterruptedException {
  13. String nodeCreated = zkClient.create("/atcompany", "ss.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  14. }

}

  1. <a name="eGJgL"></a>
  2. #### 获取子节点并监听状态变化
  3. <a name="WSZv0"></a>
  4. #### [判断某个节点是否存在](https://zookeeper.apache.org/)
  5. ```java
  6. //此时需关闭监听process()
  7. @Test
  8. public void isExist() throws KeeperException, InterruptedException {
  9. Stat stat = zkClient.exists("/atcompany", false);
  10. System.out.println(stat==null?"not exist":"exist");
  11. }

服务器动态上下线监听案例

  1. package com.atcompany.case1;
  2. import org.apache.zookeeper.*;
  3. import java.io.IOException;
  4. import static java.lang.Long.MAX_VALUE;
  5. //服务器
  6. public class DistributeServer {
  7. ZooKeeper zk;
  8. public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
  9. DistributeServer server = new DistributeServer();
  10. //连接zk
  11. String connectString = "192.168.213.100:2181,192.168.213.101:2181,192.168.213.120:2181";
  12. int sessionTimeout = 2000;
  13. server.getConnect(connectString,sessionTimeout);
  14. //2.注册服务器到zk集群
  15. server.register(args[0]);
  16. //3.启动业务逻辑
  17. server.business();
  18. }
  19. private void business() throws InterruptedException {
  20. Thread.sleep(MAX_VALUE);
  21. }
  22. private void register(String hostname) throws KeeperException, InterruptedException {
  23. String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  24. System.out.println(hostname+"is online");
  25. }
  26. private void getConnect(String connectString,int sessionTimeout) throws IOException {
  27. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  28. public void process(WatchedEvent watchedEvent) {
  29. }
  30. });
  31. }
  32. }

客户端监听状态变化

  1. package com.atcompany.case1;
  2. import org.apache.zookeeper.KeeperException;
  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. import org.apache.zookeeper.ZooKeeper;
  6. import java.io.IOException;
  7. import java.util.ArrayList;
  8. import java.util.List;
  9. public class DistributeClient {
  10. String connectString= "192.168.213.100:2181,192.168.213.101:2181,192.168.213.120:2181";
  11. int sessionTimeout = 2000;
  12. ZooKeeper zk;
  13. public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
  14. DistributeClient client = new DistributeClient();
  15. //1. zk连接
  16. client.getConnect();
  17. //2.监听/servers下的子节点的增加和删除
  18. client.getServerList();
  19. //3. 业务逻辑
  20. client.business();
  21. }
  22. private void business() throws InterruptedException {
  23. Thread.sleep(Long.MAX_VALUE);
  24. }
  25. private void getServerList() throws KeeperException, InterruptedException {
  26. ArrayList<String> servers = new ArrayList<String>();
  27. List<String> children = zk.getChildren("/servers", true);
  28. for(String child:children){
  29. byte[] data = zk.getData("/servers/" + child, false, null);
  30. servers.add(new String(data));
  31. }
  32. System.out.println(servers);
  33. }
  34. private void getConnect() throws IOException {
  35. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  36. public void process(WatchedEvent watchedEvent) {
  37. //保证时刻监听节点状态
  38. try {
  39. getServerList();
  40. } catch (KeeperException e) {
  41. e.printStackTrace();
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. });
  47. }
  48. }

截屏2022-02-03 下午6.15.20.png

分布式锁

🔐分布式锁:进程使用资源时,先去获得锁。获得锁之后对资源保持独占,这样其他进程无法访问该资源。进程使用完资源后释放锁,让其他进程来获取。保证分布式系统多个进程有序的访问临界资源。

  1. 接收到请求后,在/locks节点下创建一个临时 顺序 节点
  2. 判断当前节点是否为最小节点:是,获取到锁;不是,对前一个节点进行监听。
  3. 处理完业务后,delete节点,释放锁,下面的节点收到通知,重复第二步。

    原生Zookeeper实现分布式锁

    ```java package com.atcompany.lock;

import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat;

import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;

public class DistributeLock {

  1. public final String connectString = "192.168.213.100:2181,192.168.213.101:2181,192.168.213.120:2181";
  2. int sessionTimeout = 2000;
  3. String waitPath = "";
  4. ZooKeeper zk;

//java的计数器,用来进行线程唤醒,且为原子性操作 //wait()休眠方法,count+1继续执行,count==0 继续休眠 private CountDownLatch countDownLatch = new CountDownLatch(1); //等待前一步骤执行完毕 private CountDownLatch waitDownLatch = new CountDownLatch(1); private String currentMode;

  1. public DistributeLock() throws IOException, InterruptedException, KeeperException {
  2. //获取连接
  3. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  4. public void process(WatchedEvent watchedEvent) {
  5. //进行监听
  6. //zk连接 释放countDownLatch
  7. if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
  8. countDownLatch.countDown();
  9. }
  10. //前一个路径对比
  11. if(watchedEvent.getType()==Event.EventType.NodeDeleted&&watchedEvent.getPath().equals(waitPath)){
  12. waitDownLatch.countDown();
  13. }
  14. }
  15. });
  16. //等待zookeeper连接后继续运行程序
  17. countDownLatch.await();
  18. //判断根结点/locks是否存在
  19. //false表示不监听
  20. Stat stat = zk.exists("/locks",false);
  21. if(stat==null){
  22. //在/locks下创建子节点
  23. String create = zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  24. }
  25. }
  26. /**
  27. * 加锁操作
  28. */
  29. public void zklock(){
  30. //在/locks节点下创建临时顺序节
  31. try {
  32. final String currentMode = zk.create("/locks" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  33. List<String> children = zk.getChildren("/locks", false);
  34. if(children.size()==1){
  35. //children只有一个值 直接获取锁
  36. return ;
  37. }else {
  38. Collections.sort(children);
  39. //获取节点名称 seq-000000
  40. String substring = currentMode.substring("/locks/".length());
  41. int index = children.indexOf(substring);
  42. if(index==-1){
  43. System.out.println("error");
  44. }else if(index==0){
  45. //就一个节点
  46. return ;
  47. }else{
  48. //需要监听 前一个节点变化
  49. waitPath = "/locks" + children.get(index - 1);
  50. zk.getData(waitPath,true,null);
  51. waitDownLatch.wait();
  52. return ;
  53. }
  54. }
  55. } catch (KeeperException e) {
  56. e.printStackTrace();
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. }
  60. //判断创建节点序号是否最小:是,获取锁;不是,监听序号前一个节点
  61. }
  62. /**
  63. * 解锁操作
  64. */
  65. public void unlock() throws KeeperException, InterruptedException {
  66. //删除当前节点
  67. zk.delete(currentMode,-1);
  68. }

}

  1. 测试案例:线程lock1 lock2对锁的竞争
  2. ```java
  3. package com.atcompany.lock;
  4. import org.apache.zookeeper.KeeperException;
  5. import java.io.IOException;
  6. public class DistributedLockTest {
  7. public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
  8. final DistributeLock lock1 = new DistributeLock();
  9. final DistributeLock lock2 = new DistributeLock();
  10. new Thread(new Runnable() {
  11. public void run() {
  12. try {
  13. lock1.zklock();
  14. System.out.println("线程1启动,获取到锁");
  15. Thread.sleep(5*1000);
  16. lock1.unlock();
  17. } catch (KeeperException e) {
  18. e.printStackTrace();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. System.out.println("线程1释放锁");
  23. }
  24. }).start();
  25. new Thread(new Runnable() {
  26. public void run() {
  27. try {
  28. lock2.zklock();
  29. System.out.println("线程1启动,获取到锁");
  30. Thread.sleep(5*1000);
  31. lock2.unlock();
  32. } catch (KeeperException e) {
  33. e.printStackTrace();
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. System.out.println("线程1释放锁");
  38. }
  39. }).start();
  40. }
  41. }

Curator框架实现分布式锁

❎原生java api的问题:

  1. 会话异步连接(CountDownLatch
  2. Watch需重复注册
  3. 开发复杂性高
  4. 不支持多节点的删除与创建
    1. <dependency>
    2. <groupId>org.apache.curator</groupId>
    3. <artifactId>curator-framework</artifactId>
    4. <version>4.3.0</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.apache.curator</groupId>
    8. <artifactId>curator-recipes</artifactId>
    9. <version>4.3.0</version>
    10. </dependency>
    11. <dependency>
    12. <groupId>org.apache.curator</groupId>
    13. <artifactId>curator-client</artifactId>
    14. <version>4.3.0</version>
    15. </dependency>
    ```java package com.atcompany.curator;

import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {

  1. public static void main(String[] args) {
  2. //创建分布式锁1
  3. final InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(),"/locks");
  4. //分布式锁2
  5. final InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(),"/locks");
  6. new Thread(new Runnable() {
  7. public void run() {
  8. try {
  9. lock1.acquire();
  10. System.out.println("线程1 获取到锁");
  11. lock1.acquire();
  12. System.out.println("线程1 再次获取到锁");
  13. lock1.release();
  14. System.out.println("线程1 释放锁");
  15. lock1.release();
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. }
  19. System.out.println("线程1 再次释放锁");
  20. }
  21. }).start();
  22. new Thread(new Runnable() {
  23. public void run() {
  24. try {
  25. lock2.acquire();
  26. System.out.println("线程1 获取到锁");
  27. lock2.acquire();
  28. System.out.println("线程1 再次获取到锁");
  29. lock2.release();
  30. System.out.println("线程1 释放锁");
  31. lock2.release();
  32. } catch (Exception e) {
  33. e.printStackTrace();
  34. }
  35. System.out.println("线程1 再次释放锁");
  36. }
  37. }).start();
  38. }
  39. private static CuratorFramework getCuratorFramework() {
  40. ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
  41. CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.213.100:2181,192.168.213.101:2181,192.168.213.120:2181")
  42. .connectionTimeoutMs(2000)
  43. .sessionTimeoutMs(2000)
  44. .retryPolicy(policy)
  45. .build();
  46. //启动客户端
  47. client.start();
  48. System.out.println("zookeeper启动成功");
  49. return client;
  50. }

}

```

源码分析

PAXOS算法

基于消息传递且具有高度容错性的一致性算法。快速正确的在一个分布式系统中对某个数据值达成一致,且保证不论发生任何异常,都不会破坏系统一致性。
将Paxos系统中,将所有节点划分为Proposer(提议者),Acceptor(接受者),Learner(学习者)。每个节点可以身兼数职。
一个完整的Paxos算法流程的三个阶段:

  1. Prepare
    1. Proposer向多个Acceptor发出Propose
    2. Acceptor根据收到的Proprose请求进行Promise

  2. Accept
    1. Proposer收到多数Acceptor承诺的Promise后,向Acceptor发出Propose请求
    2. Acceptor针对收到的Propose请求进行Accept处理
  3. Learn
    1. Proposer将形成的决议发送给所有Learners
  • Prepare:Proposer生成全局唯一且递增的Proposal ID,向所有Acceptor发送Propose请求,无需携带提案内容,只携带proposal ID
  • Promise:Accept收到Propose请求,做出两个承诺,一个应答
    • 不再接受 <= 当前请求Proposal ID的Propose请求
    • 不再接受 < 当前请求的Accept请求
    • 不违背以前做出的承诺下,回复已经Accept过的提案中的Proposal ID最大的提案的Value和Proposal ID,没有则返回控制。
  • Propose: Proposer收到多数Acceptor的Promise应答后,从应答中选择Proposal ID最大的提案的Value,作为本次要发起的提案。如果所有应答的提案Value均为空值,可以自己随意决定提案Value。然后携带当前Proposal ID,向所有Acceptor发送Propose请求
  • Accept:Acceptor收到Propose,不违背已做出的承诺下,接受并持久化当前Proposal ID和提案Value
  • Learn:Proposal收到多数Accepor的accept后,决议生成,将形成的决议发送给所有Learner。

当多个Proposers相互争夺Acceptor,会无法一致。因此改进Paxos算法:从系统中选出一个节点作为Leader。只有Leader能够发起提案。

ZAB协议

特别为zk设计的支持崩溃恢复的原子广播协议。zk设计为只有一个Leader负责处理外部的写事务请求,Leader客户端将数据同步到其他Follower节点。即zk只有一个Leader可以发起提案。

消息广播

  1. 客户端发起写操作请求
  2. Leader将客户端的请求转化为事务Proposal提案,同时为每个Proposal分配一个全局ID,即zxid
  3. Leader为每个Follower服务器分配一个单独的队列,将需要广播的Proposal依次放到队列,根据FIFO策略进行消息发送。
  4. Follower接收到Proposal后,会首先将其以事务日志的方式写入本地磁盘,写入成功后向Leader反馈一个Ack响应消息。
  5. Leader接收到超过半数以上Follower的Ack响应消息后,认为消息发送成功,可以发送commit消息。
  6. Leader向所有Follower广播commit,同时自身完成事务提交。Follower接收到commit提交,会将上一条事务提交
  7. zk即一台服务器提交了Proposal,确保所有服务器都能正确提交Proposal

    崩溃恢复

    Leader服务器出现崩溃或由于网络原因导致Leader服务器失去了过半Follower联系。
  • Leader提出失误后宕机
  • 事务在Leader提交后,过半的Follower响应Ack。但是Leader在Commit消息后宕机

前提:已经被Leader提交的Proposal,必须最终被Follower提交;丢弃被Leader提出但没有提交的Proposal。

Leader选举

  1. 新的Leader必须都是已经提交了Proposal的Follower服务器节点,不能包含未提交的Proposal。
  2. 新选举的Leader含有最大的zxid

    数据同步

  3. 确定事务日志后所有的Proposal是否被集群中过半的服务器commit

  4. 等Follower将所有未同步的事务Proposal都从Leader服务器同步过,且应用到内存数据后,Leader才会把该Follower加入到真正可用的Follower列表。

    CAP📚

    一个分布式系统不可能同时满足 一致性Consistency 可用性Available 分区容错性Partition Tolerance。最多只能满足其中两项。
    一致性:多个副本之间的数据一致的特性
    可用性:系统提供的服务一直处于可用状态,对于用户的操作请求总是在有限时间内返回结果。
    分区容错性:分布式系统遇到任何网络分区故障,仍然对外保证提供满足一致性和可用性的服务,除非整个网络都发生故障。
    zk保证了cp,不能保证每次服务请求的可用性。进行Leader选举时集群都是不可用的。