1. 算法基础
1.1 拜占庭将军问题
拜占庭将军问题是一个协议问题,拜占庭帝国军队的将军们必须全体一致的决定是否攻击某一支敌军。
问题是这些将军在地里上是分隔开的,并且将军中存在叛徒。
叛徒可以任意行动以达到以下目标:
- 欺骗某些将军采取仅供行动;
- 促成一个不是所有将军都同意的决定;
- 如当将军们不希望进攻时促成进程行动或者迷惑某些将军,使他们无法作出决定。
如果叛徒达到了这些目的之一,则任何攻击行动的结果都是注定要失败的,只有完全达成一致的努力才能获得胜利。
1.2 Paxos算法
Paxos算法:一种基于消息传递且具有高度容错特性的一致性算法。
解决的问题:如何快速正确的在一个分布式系统中对某个数据值达成一致,并且保证不论发生任何异常,都不会破坏整个系统的一致性。
1.2.1 算法描述
在一个Paxos系统中,首先将所有节点划分为Proposer(提议者)、Acceptor(接收者)和Learner(学习者)。(注意:每个节点都可以身兼数职)。
一个完整的Paxos算法流程可以分为三个阶段:
- Prepare准备阶段
- Proposer向多个Acceptor发出Propose请求Promise(承诺)
- Acceptor针对收到的Propose请求进行Promise(承诺)
- Accpect接受阶段
- Proposer收到多数Acceptor承诺的Promise后,向Acceptor发出Propose请求
- Acceptor针对收到的Propose请求进行Accept处理
- Learn学习阶段
- Proposer将形成的决议发送给所有Learners
1.2.2 算法流程

- Prepare:Proposer生成全局唯一且递增的Proposal ID,向所有Acceptor发送Propose请求,这里无需携带提案内容,只携带Proposal ID即可。
- Promise:Acceptor收到Propose请求后,作出“两个承诺,一个应答”:
- 不再接受Proposal ID小于等于当前请求的Propose请求
- 不再接受Proposal ID小于当前请求的Accept请求
- 不违背以前做出的承诺下,回复已经Accept过的提案中Proposal ID最大的那个天的Value和Proposal ID,没有则返回空值。
- Propose:Proposer收到多数Acceptor的Promise应答后,从应答中选择Proposal ID最大的提案Value,作为本次要发起的提案。如果所有应答的提案Value均为空值,则可用自己随意决定提案Value。然后携带当前Proposal ID,向所有Acceptor发送Propose请求。
- Accept:Acceptor收到Propose请求后,在不违背自己之前做出的承诺下,接受并持久胡当前Proposal ID和提案Value。
- Learn:Proposer收到多数Acceptor的Accept后,决议形成,将形成的决议发送给所有Learner。
例如:有A1~A5 5个议员,就税率问题进行决议
情况1:仅有A1发起提案,将税率定为10%
- A1发起1号Proposal的Propose,等待Promise承诺;
- A2~A5回应Promise;
- A1在收到两份回复时就会发起税率10%的Proposal;
- A2~A5回应Accept;
- 通过Proposal,税率10%。
情况2:假设A1在提出提案的同时,A5决定将税率定为20%
- A1、A5同时发起Propose(序号分别为1、2);
- A2承诺A1,A4承诺A5,A3行为成为关键;
- A3先收到A1消息,承诺A1;
- A1发起Proposal(1,10%),A2、A3接受;
- 之后A3又收到A5消息,回复A1:(1,10%),并承诺A5;
- A5发起Proposal(2, 20%),A3、A4接受。之后A1、A5同时广播决议。
情况三:假设A1在提出提案的同时,A5决定将税率定为20%
- A1、A5同时发起Propose(序号分别为1、2);
- A2承诺A1,A4承诺A5,A3行为成为关键;
- A3先收到A1消息,承诺A1。之后立刻收到A5消息,承诺A5;
- A1发起Proposal(1, 10%),无足够响应,A1重新Propose(序号3),A3再次承诺A1;
- A5发起Proposal(2, 10%),无足够响应,A5重新Propose(序号4),A3再次承诺A5;
- ……
造成这种情况的原因是系统中有一个以上的Proposer,多个Proposer相互争夺Acceptor,造成迟迟无法达成一致的情况。针对这种情况,一种改进的Paxos算法被提出:从系统中选出一个节点作为Leader,只有Leader能够发起提案。这样,一次Paxos流程只有一个Proposer,不会出现活锁的情况,此时只会出情况1。
1.3 ZAB协议
1.3.1 ZAB算法
Zab算法借鉴了Paxos,是特别为Zookeeper设计的支持崩溃恢复的原子广播协议。基于该协议,Zookeeper设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后Leader客户端将数据同步到其他Follower节点。即Zookeeper只有一个Leader可用发起提案。
1.3.2 协议内容
Zab协议包括两种基本的模式:消息广播、崩溃修复。
消息广播
Zab协议针对事务请求的处理过程类似于一个两阶段提交过程:
- 广播事务阶段
- 广播提交操作
两阶段提交模型如下:
- 客户端发起一个写操作请求;
- Leader服务器将客户端的请求转化为事务Proposal提案,同时为每个Proposal分配一个全局的ID,即zxid;
- Leader服务器为每个Follower服务器分配一个单独的队列,然后将需要广播的Proposal依次放到队列中去,并且根据FIFO策略进行消息发送;
- Follower接收到Proposal后,会首先将其以事务日志的方式写入到本地磁盘中,写入成功后向Leader反馈一个Ack响应消息;
- Leader接收到超过半数以上Follower的Ack响应消息后,即认为消息发送成功,可用发送commit消息;
- Leader向所有Follower广播commit消息,同时自身也会完成事务提交。Follower接收到commit消息后,会将上一条事务提交。
两阶段提交模型也有缺陷,有可能因为Leader宕机带来数据不一致,比如:
- Leader发起一个事务Proposal1后就宕机,Follower都没有Proposal1;
- Leader收到半数ACK宕机,没来得及向Follower发送Commit。
Zookeeper采用Zab协议的核心,就是只要有一台服务器提交了Proposal,就要确保所有的服务器最终都能正确提交Proposal。
崩溃修复
一旦Leader服务器出现崩溃或者由于网络原因导致Leader服务器失去了与过半Follower的联系,那么就会进入崩溃恢复模式。
(1)假设两种服务器异常情况
- 假设一个事务在Leader提出之后,Leader挂了;
- 一个事务在Leader上提交了,并且过半的Follwer都响应Ack了,但是Leader在Commit消息发出之前挂了。
(2)Zab协议崩溃恢复要求满足以下两个要求
- 确保已经Leader提交的提案Proposal,必须最终被所有的Follower服务器提交;
- 确保丢弃已经被Leader提出的,但是没有被提交的Proposal。
崩溃恢复主要包括两部分:Leader选举和数据恢复。
Leader选举,Zab协议需要保证选举出来的Leader满足以下条件:
- 新选举出来的Leader不能包含未提交的Proposal。即新Leader必须都是已经提交了Proposal的Follower服务器节点。
- 新选举的Leader节点含有最大的zxid。这样做的好处是可用避免Leader服务器检查Proposal提交和丢弃工作。
数据同步:
- 完成Leader选举后,在正式开始工作之前(接收事务请求,然后提出新的Proposal),Leader服务器会首先确认事务日志中所有的Proposal是否已经被集群中过半的服务器Commit。
- Leader服务器需要确保所有的Follower服务器能够接收到每一条事务的Proposal,并且能将所有已经提交的事务Proposal应用到内存数据中。等到Follower将所有尚未同步的事务Proposal都从Leader服务器上同步过,并且应用到内存数据中以后,Leader才会把该Follower加入到真正可用的Follower列表中。
1.4 CAP理论
CAP理论,一个分布式系统不可能同时满足以下三种:
- C(Consistency,一致性):系统用在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。
- A(Availability,可用性):系统必须一直处于可用的状态,对于每个操作请求都能在有限的时间内返回。
- P(Partion Tolerance,分区容错性):系统在任何网络分区故障的时候,仍然保证对外提供满足一致性和可用性的服务。
Zookeeper保证的是CP,不能保证每次请求服务的可用性,进行Leader选举时集群都是不可用。
2. 源码分析
2.1 辅助源码
2.1.1 持久化源码
Leader和Follower中的数据会在内存和磁盘中各保存一份,所以需要将内存中的数据持久化到磁盘中。
在org.apache.zookeeper.server.persistence包下的相关类都是序列化相关的代码。
(1)快照
public interface SnapShot {// 反序列化方法long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;// 序列化方法void serialize(DataTree dt, Map<Long, Integer> sessions, File name, boolean fsync) throws IOException;// 查找最近的快照文件File findMostRecentSnapshot() throws IOException;// 释放资源void close() throws IOException;}
(2)操作日志
public interface TxnLog extends Closeable {// 设置服务状态void setServerStats(ServerStats serverStats);// 滚动日志void rollLog() throws IOException;// 追加boolean append(TxnHeader hdr, Record r) throws IOException;// 读取数据TxnIterator read(long zxid) throws IOException;// 获取最后一个zxidlong getLastLoggedZxid() throws IOException;// 删除日志boolean truncate(long zxid) throws IOException;// 获取DbIdlong getDbId() throws IOException;// 提交void commit() throws IOException;// 日志同步时间long getTxnLogSyncElapsedTime();// 关闭日志void close() throws IOException;// 读取日志的接口interface TxnIterator extends Closeable {// 获取头信息TxnHeader getHeader();// 获取传输的内容Record getTxn();// 下一条记录boolean next() throws IOException;// 关闭资源void close() throws IOException;// 获取存储的大小long getStorageSize() throws IOException;}}
2.1.2 序列化源码
zookeeper-jute这个模块的代码是关于Zookeeper序列化相关的源码。
(1)序列化
public interface InputArchive {byte readByte(String tag) throws IOException;boolean readBool(String tag) throws IOException;int readInt(String tag) throws IOException;long readLong(String tag) throws IOException;float readFloat(String tag) throws IOException;double readDouble(String tag) throws IOException;String readString(String tag) throws IOException;byte[] readBuffer(String tag) throws IOException;void readRecord(Record r, String tag) throws IOException;void startRecord(String tag) throws IOException;void endRecord(String tag) throws IOException;Index startVector(String tag) throws IOException;void endVector(String tag) throws IOException;Index startMap(String tag) throws IOException;void endMap(String tag) throws IOException;}
反序列化:
public interface OutputArchive {void writeByte(byte b, String tag) throws IOException;void writeBool(boolean b, String tag) throws IOException;void writeInt(int i, String tag) throws IOException;void writeLong(long l, String tag) throws IOException;void writeFloat(float f, String tag) throws IOException;void writeDouble(double d, String tag) throws IOException;void writeString(String s, String tag) throws IOException;void writeBuffer(byte[] buf, String tag)throws IOException;void writeRecord(Record r, String tag) throws IOException;void startRecord(Record r, String tag) throws IOException;void endRecord(Record r, String tag) throws IOException;void startVector(List<?> v, String tag) throws IOException;void endVector(List<?> v, String tag) throws IOException;void startMap(TreeMap<?, ?> v, String tag) throws IOException;void endMap(TreeMap<?, ?> v, String tag) throws IOException;}
2.2 服务端初始化源码
2.2.1 ZK服务端启动脚本分析
(1)从bin的zkServer.sh脚本入手,提取关键部分:
ZOOBIN="${BASH_SOURCE-$0}"ZOOBIN="$(dirname "${ZOOBIN}")"ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" \"-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \-XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &
可以看到主启动类是QuorumPeerMain,那么源码从这个类开始着手。
