1. 算法基础

1.1 拜占庭将军问题

拜占庭将军问题是一个协议问题,拜占庭帝国军队的将军们必须全体一致的决定是否攻击某一支敌军。
问题是这些将军在地里上是分隔开的,并且将军中存在叛徒。

叛徒可以任意行动以达到以下目标:

  • 欺骗某些将军采取仅供行动;
  • 促成一个不是所有将军都同意的决定;
  • 如当将军们不希望进攻时促成进程行动或者迷惑某些将军,使他们无法作出决定。

如果叛徒达到了这些目的之一,则任何攻击行动的结果都是注定要失败的,只有完全达成一致的努力才能获得胜利。

1.2 Paxos算法

Paxos算法:一种基于消息传递且具有高度容错特性的一致性算法。
解决的问题:如何快速正确的在一个分布式系统中对某个数据值达成一致,并且保证不论发生任何异常,都不会破坏整个系统的一致性。

1.2.1 算法描述

在一个Paxos系统中,首先将所有节点划分为Proposer(提议者)、Acceptor(接收者)和Learner(学习者)。(注意:每个节点都可以身兼数职)。
image.png
一个完整的Paxos算法流程可以分为三个阶段:

  1. Prepare准备阶段
    1. Proposer向多个Acceptor发出Propose请求Promise(承诺)
    2. Acceptor针对收到的Propose请求进行Promise(承诺)
  2. Accpect接受阶段
    1. Proposer收到多数Acceptor承诺的Promise后,向Acceptor发出Propose请求
    2. Acceptor针对收到的Propose请求进行Accept处理
  3. Learn学习阶段
    1. Proposer将形成的决议发送给所有Learners

1.2.2 算法流程

image.png

  1. Prepare:Proposer生成全局唯一且递增的Proposal ID,向所有Acceptor发送Propose请求,这里无需携带提案内容,只携带Proposal ID即可。
  2. Promise:Acceptor收到Propose请求后,作出“两个承诺,一个应答”:
    1. 不再接受Proposal ID小于等于当前请求的Propose请求
    2. 不再接受Proposal ID小于当前请求的Accept请求
    3. 不违背以前做出的承诺下,回复已经Accept过的提案中Proposal ID最大的那个天的Value和Proposal ID,没有则返回空值。
  3. Propose:Proposer收到多数Acceptor的Promise应答后,从应答中选择Proposal ID最大的提案Value,作为本次要发起的提案。如果所有应答的提案Value均为空值,则可用自己随意决定提案Value。然后携带当前Proposal ID,向所有Acceptor发送Propose请求。
  4. Accept:Acceptor收到Propose请求后,在不违背自己之前做出的承诺下,接受并持久胡当前Proposal ID和提案Value。
  5. Learn:Proposer收到多数Acceptor的Accept后,决议形成,将形成的决议发送给所有Learner。

例如:有A1~A5 5个议员,就税率问题进行决议

情况1:仅有A1发起提案,将税率定为10%
image.png

  • A1发起1号Proposal的Propose,等待Promise承诺;
  • A2~A5回应Promise;
  • A1在收到两份回复时就会发起税率10%的Proposal;
  • A2~A5回应Accept;
  • 通过Proposal,税率10%。

情况2:假设A1在提出提案的同时,A5决定将税率定为20%
image.png

  • A1、A5同时发起Propose(序号分别为1、2);
  • A2承诺A1,A4承诺A5,A3行为成为关键;
  • A3先收到A1消息,承诺A1;
  • A1发起Proposal(1,10%),A2、A3接受;
  • 之后A3又收到A5消息,回复A1:(1,10%),并承诺A5;
  • A5发起Proposal(2, 20%),A3、A4接受。之后A1、A5同时广播决议。

情况三:假设A1在提出提案的同时,A5决定将税率定为20%
image.png

  • A1、A5同时发起Propose(序号分别为1、2);
  • A2承诺A1,A4承诺A5,A3行为成为关键;
  • A3先收到A1消息,承诺A1。之后立刻收到A5消息,承诺A5;
  • A1发起Proposal(1, 10%),无足够响应,A1重新Propose(序号3),A3再次承诺A1;
  • A5发起Proposal(2, 10%),无足够响应,A5重新Propose(序号4),A3再次承诺A5;
  • ……

造成这种情况的原因是系统中有一个以上的Proposer,多个Proposer相互争夺Acceptor,造成迟迟无法达成一致的情况。针对这种情况,一种改进的Paxos算法被提出:从系统中选出一个节点作为Leader,只有Leader能够发起提案。这样,一次Paxos流程只有一个Proposer,不会出现活锁的情况,此时只会出情况1。

1.3 ZAB协议

1.3.1 ZAB算法

Zab算法借鉴了Paxos,是特别为Zookeeper设计的支持崩溃恢复的原子广播协议。基于该协议,Zookeeper设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后Leader客户端将数据同步到其他Follower节点。即Zookeeper只有一个Leader可用发起提案。

1.3.2 协议内容

Zab协议包括两种基本的模式:消息广播、崩溃修复。

消息广播
Zab协议针对事务请求的处理过程类似于一个两阶段提交过程:

  1. 广播事务阶段
  2. 广播提交操作

两阶段提交模型如下:
image.png

  1. 客户端发起一个写操作请求;
  2. Leader服务器将客户端的请求转化为事务Proposal提案,同时为每个Proposal分配一个全局的ID,即zxid;
  3. Leader服务器为每个Follower服务器分配一个单独的队列,然后将需要广播的Proposal依次放到队列中去,并且根据FIFO策略进行消息发送;
  4. Follower接收到Proposal后,会首先将其以事务日志的方式写入到本地磁盘中,写入成功后向Leader反馈一个Ack响应消息;
  5. Leader接收到超过半数以上Follower的Ack响应消息后,即认为消息发送成功,可用发送commit消息;
  6. Leader向所有Follower广播commit消息,同时自身也会完成事务提交。Follower接收到commit消息后,会将上一条事务提交。

两阶段提交模型也有缺陷,有可能因为Leader宕机带来数据不一致,比如:

  1. Leader发起一个事务Proposal1后就宕机,Follower都没有Proposal1;
  2. Leader收到半数ACK宕机,没来得及向Follower发送Commit。

Zookeeper采用Zab协议的核心,就是只要有一台服务器提交了Proposal,就要确保所有的服务器最终都能正确提交Proposal。

崩溃修复
一旦Leader服务器出现崩溃或者由于网络原因导致Leader服务器失去了与过半Follower的联系,那么就会进入崩溃恢复模式。
image (1).png
(1)假设两种服务器异常情况

  1. 假设一个事务在Leader提出之后,Leader挂了;
  2. 一个事务在Leader上提交了,并且过半的Follwer都响应Ack了,但是Leader在Commit消息发出之前挂了。

(2)Zab协议崩溃恢复要求满足以下两个要求

  1. 确保已经Leader提交的提案Proposal,必须最终被所有的Follower服务器提交;
  2. 确保丢弃已经被Leader提出的,但是没有被提交的Proposal。

崩溃恢复主要包括两部分:Leader选举和数据恢复。
image (1).png
Leader选举,Zab协议需要保证选举出来的Leader满足以下条件:

  1. 新选举出来的Leader不能包含未提交的Proposal。即新Leader必须都是已经提交了Proposal的Follower服务器节点。
  2. 新选举的Leader节点含有最大的zxid。这样做的好处是可用避免Leader服务器检查Proposal提交和丢弃工作。

数据同步:

  1. 完成Leader选举后,在正式开始工作之前(接收事务请求,然后提出新的Proposal),Leader服务器会首先确认事务日志中所有的Proposal是否已经被集群中过半的服务器Commit。
  2. Leader服务器需要确保所有的Follower服务器能够接收到每一条事务的Proposal,并且能将所有已经提交的事务Proposal应用到内存数据中。等到Follower将所有尚未同步的事务Proposal都从Leader服务器上同步过,并且应用到内存数据中以后,Leader才会把该Follower加入到真正可用的Follower列表中。

1.4 CAP理论

CAP理论,一个分布式系统不可能同时满足以下三种:

  • C(Consistency,一致性):系统用在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。
  • A(Availability,可用性):系统必须一直处于可用的状态,对于每个操作请求都能在有限的时间内返回。
  • P(Partion Tolerance,分区容错性):系统在任何网络分区故障的时候,仍然保证对外提供满足一致性和可用性的服务。

Zookeeper保证的是CP,不能保证每次请求服务的可用性,进行Leader选举时集群都是不可用。

2. 源码分析

2.1 辅助源码

2.1.1 持久化源码

Leader和Follower中的数据会在内存和磁盘中各保存一份,所以需要将内存中的数据持久化到磁盘中。
org.apache.zookeeper.server.persistence包下的相关类都是序列化相关的代码。
image.png

(1)快照

  1. public interface SnapShot {
  2. // 反序列化方法
  3. long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;
  4. // 序列化方法
  5. void serialize(DataTree dt, Map<Long, Integer> sessions, File name, boolean fsync) throws IOException;
  6. // 查找最近的快照文件
  7. File findMostRecentSnapshot() throws IOException;
  8. // 释放资源
  9. void close() throws IOException;
  10. }

(2)操作日志

  1. public interface TxnLog extends Closeable {
  2. // 设置服务状态
  3. void setServerStats(ServerStats serverStats);
  4. // 滚动日志
  5. void rollLog() throws IOException;
  6. // 追加
  7. boolean append(TxnHeader hdr, Record r) throws IOException;
  8. // 读取数据
  9. TxnIterator read(long zxid) throws IOException;
  10. // 获取最后一个zxid
  11. long getLastLoggedZxid() throws IOException;
  12. // 删除日志
  13. boolean truncate(long zxid) throws IOException;
  14. // 获取DbId
  15. long getDbId() throws IOException;
  16. // 提交
  17. void commit() throws IOException;
  18. // 日志同步时间
  19. long getTxnLogSyncElapsedTime();
  20. // 关闭日志
  21. void close() throws IOException;
  22. // 读取日志的接口
  23. interface TxnIterator extends Closeable {
  24. // 获取头信息
  25. TxnHeader getHeader();
  26. // 获取传输的内容
  27. Record getTxn();
  28. // 下一条记录
  29. boolean next() throws IOException;
  30. // 关闭资源
  31. void close() throws IOException;
  32. // 获取存储的大小
  33. long getStorageSize() throws IOException;
  34. }
  35. }

2.1.2 序列化源码

zookeeper-jute这个模块的代码是关于Zookeeper序列化相关的源码。
image.png

(1)序列化

  1. public interface InputArchive {
  2. byte readByte(String tag) throws IOException;
  3. boolean readBool(String tag) throws IOException;
  4. int readInt(String tag) throws IOException;
  5. long readLong(String tag) throws IOException;
  6. float readFloat(String tag) throws IOException;
  7. double readDouble(String tag) throws IOException;
  8. String readString(String tag) throws IOException;
  9. byte[] readBuffer(String tag) throws IOException;
  10. void readRecord(Record r, String tag) throws IOException;
  11. void startRecord(String tag) throws IOException;
  12. void endRecord(String tag) throws IOException;
  13. Index startVector(String tag) throws IOException;
  14. void endVector(String tag) throws IOException;
  15. Index startMap(String tag) throws IOException;
  16. void endMap(String tag) throws IOException;
  17. }

反序列化:

  1. public interface OutputArchive {
  2. void writeByte(byte b, String tag) throws IOException;
  3. void writeBool(boolean b, String tag) throws IOException;
  4. void writeInt(int i, String tag) throws IOException;
  5. void writeLong(long l, String tag) throws IOException;
  6. void writeFloat(float f, String tag) throws IOException;
  7. void writeDouble(double d, String tag) throws IOException;
  8. void writeString(String s, String tag) throws IOException;
  9. void writeBuffer(byte[] buf, String tag)
  10. throws IOException;
  11. void writeRecord(Record r, String tag) throws IOException;
  12. void startRecord(Record r, String tag) throws IOException;
  13. void endRecord(Record r, String tag) throws IOException;
  14. void startVector(List<?> v, String tag) throws IOException;
  15. void endVector(List<?> v, String tag) throws IOException;
  16. void startMap(TreeMap<?, ?> v, String tag) throws IOException;
  17. void endMap(TreeMap<?, ?> v, String tag) throws IOException;
  18. }

2.2 服务端初始化源码

2.2.1 ZK服务端启动脚本分析

(1)从bin的zkServer.sh脚本入手,提取关键部分:

  1. ZOOBIN="${BASH_SOURCE-$0}"
  2. ZOOBIN="$(dirname "${ZOOBIN}")"
  3. ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
  4. ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
  5. nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" \
  6. "-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
  7. -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \
  8. -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &

可以看到主启动类是QuorumPeerMain,那么源码从这个类开始着手。

Zookeeper.svg