Zookeeper(源码分析)
第 1 章 算法基础
思考:Zookeeper 是如何保证数据一致性的?这也是困扰分布式系统框架的一个难题。
1.1 拜占庭将军问题
1.2 Paxos 算法
Paxos算法描述:
下面我们针对上述描述做三种情况的推演举例:为了简化流程,我们这里不设置 Learner。
情况一:
情况二:
Paxos 算法缺陷:在网络复杂的情况下,一个应用 Paxos 算法的分布式系统,可能很久无法收敛,甚至陷入活锁的情况。
情况三:
造成这种情况的原因是系统中有一个以上的 Proposer,多个 Proposers 相互争夺 Acceptor,造成迟迟无法达成一致的情况。针对这种情况,一种改进的 Paxos 算法被提出:从系统中选出一个节点作为 Leader,只有 Leader 能够发起提案。这样,一次 Paxos 流程中只有一个Proposer,不会出现活锁的情况,此时只会出现例子中第一种情况。
1.3 ZAB 协议
1.3.1 什么是 ZAB 算法
Zab 借鉴了 Paxos 算法,是特别为 Zookeeper 设计的支持崩溃恢复的原子广播协议。基于该协议,Zookeeper 设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后Leader 客户端将数据同步到其他 Follower 节点。即 Zookeeper 只有一个 Leader 可以发起提案。
1.3.2 Zab 协议内容
Zab 协议包括两种基本的模式:消息广播、崩溃恢复。
消息广播:
崩溃恢复——异常假设
崩溃恢复——Leader选举
崩溃恢复——数据恢复
1.4 CAP
CAP理论告诉我们,一个分布式系统不可能同时满足以下三种
CAP理论
⚫ 一致性(C:Consistency)
⚫ 可用性(A:Available)
⚫ 分区容错性(P:Partition Tolerance)
这三个基本需求,最多只能同时满足其中的两项,因为P是必须的,因此往往选择就在CP或者AP中。
1)一致性(C:Consistency)
在分布式环境中,一致性是指数据在多个副本之间是否能够保持数据一致的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。
2)可用性(A:Available)
可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果。
3)分区容错性(P:Partition Tolerance)
分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障。
ZooKeeper保证的是CP
(1)ZooKeeper不能保证每次服务请求的可用性。(注:在极端环境下,ZooKeeper可能会丢弃一些请求,消费者程序需要重新请求才能获得结果)。所以说,ZooKeeper不能保证服务可用性。
(2)进行Leader选举时集群都是不可用。
第 2 章 源码详解
2.1 辅助源码
2.1.1 持久化源码
Leader 和 Follower 中的数据会在内存和磁盘中各保存一份。所以需要将内存中的数据持久化到磁盘中。
在 org.apache.zookeeper.server.persistence 包下的相关类都是序列化相关的代码。
1)快照
public interface SnapShot {
// 反序列化方法
long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException;
// 序列化方法
void serialize(DataTree dt, Map<Long, Integer> sessions,
File name)
throws IOException;
/**
* find the most recent snapshot file
* 查找最近的快照文件
*/
File findMostRecentSnapshot() throws IOException;
// 释放资源
void close() throws IOException;
}
2)操作日志
public interface TxnLog {
// 设置服务状态
void setServerStats(ServerStats serverStats);
// 滚动日志
void rollLog() throws IOException;
// 追加
boolean append(TxnHeader hdr, Record r) throws IOException;
// 读取数据
TxnIterator read(long zxid) throws IOException;
// 获取最后一个 zxid
long getLastLoggedZxid() throws IOException;
// 删除日志
boolean truncate(long zxid) throws IOException;
// 获取 DbId
long getDbId() throws IOException;
// 提交
void commit() throws IOException;
// 日志同步时间
long getTxnLogSyncElapsedTime();
// 关闭日志
void close() throws IOException;
// 读取日志的接口
public interface TxnIterator {
// 获取头信息
TxnHeader getHeader();
// 获取传输的内容
Record getTxn();
// 下一条记录
boolean next() throws IOException;
// 关闭资源
void close() throws IOException;
// 获取存储的大小
long getStorageSize() throws IOException;
}
}
3)处理持久化的核心类
2.1.2 序列化源码
zookeeper-jute 代码是关于 Zookeeper 序列化相关源码
1)序列化和反序列化方法
public interface Record {
// 序列化方法
public void serialize(OutputArchive archive, String tag)
throws IOException;
// 反序列化方法
public void deserialize(InputArchive archive, String tag)
throws IOException;
}
2)迭代
public interface Index {
// 结束
public boolean done();
// 下一个
public void incr();
}
3)序列化支持的数据类型
/**
* Interface that alll the serializers have to implement.
*
*/
public interface OutputArchive {
public void writeByte(byte b, String tag) throws IOException;
public void writeBool(boolean b, String tag) throws IOException;
public void writeInt(int i, String tag) throws IOException;
public void writeLong(long l, String tag) throws IOException;
public void writeFloat(float f, String tag) throws IOException;
public void writeDouble(double d, String tag) throws IOException;
public void writeString(String s, String tag) throws IOException;
public void writeBuffer(byte buf[], String tag) throws IOException;
public void writeRecord(Record r, String tag) throws IOException;
public void startRecord(Record r, String tag) throws IOException;
public void endRecord(Record r, String tag) throws IOException;
public void startVector(List<?> v, String tag) throws IOException;
public void endVector(List<?> v, String tag) throws IOException;
public void startMap(TreeMap<?,?> v, String tag) throws IOException;
public void endMap(TreeMap<?,?> v, String tag) throws IOException;
}
4)反序列化支持的数据类型
/**
* Interface that all the Deserializers have to implement.
*
*/
public interface InputArchive {
public byte readByte(String tag) throws IOException;
public boolean readBool(String tag) throws IOException;
public int readInt(String tag) throws IOException;
public long readLong(String tag) throws IOException;
public float readFloat(String tag) throws IOException;
public double readDouble(String tag) throws IOException;
public String readString(String tag) throws IOException;
public byte[] readBuffer(String tag) throws IOException;
public void readRecord(Record r, String tag) throws IOException;
public void startRecord(String tag) throws IOException;
public void endRecord(String tag) throws IOException;
public Index startVector(String tag) throws IOException;
public void endVector(String tag) throws IOException;
public Index startMap(String tag) throws IOException;
public void endMap(String tag) throws IOException;
}
2.2 ZK 服务端初始化源码解析
2.2.1 ZK 服务端启动脚本分析
1)Zookeeper 服务的启动命令是 zkServer.sh start
zkServer.sh
#!/usr/bin/env bash
# use POSTIX interface, symlink is followed automatically
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
. "$ZOOBINDIR"/../libexec/zkEnv.sh
else
. "$ZOOBINDIR"/zkEnv.sh //相当于获取 zkEnv.sh 中的环境变量(ZOOCFG="zoo.cfg")
fi
# See the following page for extensive details on setting
# up the JVM to accept JMX remote management:
# http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html
# by default we allow local JMX connections
if [ "x$JMXLOCALONLY" = "x" ]
then
JMXLOCALONLY=false
fi
if [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ]
then
echo "ZooKeeper JMX enabled by default" >&2
if [ "x$JMXPORT" = "x" ]
then
# for some reason these two options are necessary on jdk6 on Ubuntu
# accord to the docs they are not necessary, but otw jconsole cannot
# do a local attach
ZOOMAIN="-Dcom.sun.management.jmxremote -
Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY
org.apache.zookeeper.server.quorum.QuorumPeerMain"
else
if [ "x$JMXAUTH" = "x" ]
then
JMXAUTH=false
fi
if [ "x$JMXSSL" = "x" ]
then
JMXSSL=false
fi
if [ "x$JMXLOG4J" = "x" ]
then
JMXLOG4J=true
fi
echo "ZooKeeper remote JMX Port set to $JMXPORT" >&2
echo "ZooKeeper remote JMX authenticate set to $JMXAUTH" >&2
echo "ZooKeeper remote JMX ssl set to $JMXSSL" >&2
echo "ZooKeeper remote JMX log4j set to $JMXLOG4J" >&2
ZOOMAIN="-Dcom.sun.management.jmxremote -
Dcom.sun.management.jmxremote.port=$JMXPORT -
Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -
Dcom.sun.management.jmxremote.ssl=$JMXSSL -
Dzookeeper.jmx.log4j.disable=$JMXLOG4J
org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi
else
echo "JMX disabled by user request" >&2
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi
if [ "x$SERVER_JVMFLAGS" != "x" ]
then
JVMFLAGS="$SERVER_JVMFLAGS $JVMFLAGS"
fi
… …
case $1 in
start)
echo -n "Starting zookeeper ... "
if [ -f "$ZOOPIDFILE" ]; then
if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then
echo $command already running as process `cat "$ZOOPIDFILE"`
exit 1
fi
fi
nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-
Dzookeeper.log.dir=${ZOO_LOG_DIR}" \
"-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-
Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
-XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \
-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" >
"$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &
… …
;;
stop)
echo -n "Stopping zookeeper ... "
if [ ! -f "$ZOOPIDFILE" ]
then
echo "no zookeeper to stop (could not find file $ZOOPIDFILE)"
else
$KILL $(cat "$ZOOPIDFILE")
rm "$ZOOPIDFILE"
sleep 1
echo STOPPED
fi
exit 0
;;
restart)
shift
"$0" stop ${@}
sleep 3
"$0" start ${@}
;;
status)
… …
;;
*)
echo "Usage: $0 [--config <conf-dir>] {start|start-foreground|stop|restart|status|printcmd}" >&2
esac
2)zkServer.sh start 底层的实际执行内容
nohup "$JAVA"
+ 一堆提交参数
+ $ZOOMAIN(org.apache.zookeeper.server.quorum.QuorumPeerMain) + "$ZOOCFG" (zkEnv.sh 文件中 ZOOCFG="zoo.cfg")
3)所以程序的入口是 QuorumPeerMain.java 类
2.2.2 ZK 服务端启动入口
1)ctrl + n,查找 QuorumPeerMain
QuorumPeerMain.java
public static void main(String[] args) {
// 创建了一个 zk 节点
QuorumPeerMain main = new QuorumPeerMain();
try {
// 初始化节点并运行,args 相当于提交参数中的 zoo.cfg
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
... ...
}
LOG.info("Exiting normally");
System.exit(0);
}
2)initializeAndRun
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
// 管理 zk 的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 1 解析参数,zoo.cfg 和 myid
config.parse(args[0]);
}
// 2 启动定时任务,对过期的快照,执行删除(默认该功能关闭)
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
// 3 启动集群
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
2.2.3 解析参数 zoo.cfg 和 myid
QuorumPeerConfig.java
public void parse(String path) throws ConfigException {
LOG.info("Reading configuration from: " + path);
try {
// 校验文件路径及是否存在
File configFile = (new VerifyingFileFactory.Builder(LOG)
.warnForRelativePath()
.failForNonExistingPath()
.build()).create(path);
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
// 加载配置文件
cfg.load(in);
configFileStr = path;
} finally {
in.close();
}
// 解析配置文件
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}
... ...
}
QuorumPeerConfig.java
public void parseProperties(Properties zkProp)
throws IOException, ConfigException {
int clientPort = 0;
int secureClientPort = 0;
String clientPortAddress = null;
String secureClientPortAddress = null;
VerifyingFileFactory vff = new
VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
// 读取 zoo.cfg 文件中的属性值,并赋值给 QuorumPeerConfig 的类对象
for (Entry<Object, Object> entry : zkProp.entrySet()) {
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
if (key.equals("dataDir")) {
dataDir = vff.create(value);
} else if (key.equals("dataLogDir")) {
dataLogDir = vff.create(value);
} else if (key.equals("clientPort")) {
clientPort = Integer.parseInt(value);
} else if (key.equals("localSessionsEnabled")) {
localSessionsEnabled = Boolean.parseBoolean(value);
} else if (key.equals("localSessionsUpgradingEnabled")) {
localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
} else if (key.equals("clientPortAddress")) {
clientPortAddress = value.trim();
} else if (key.equals("secureClientPort")) {
secureClientPort = Integer.parseInt(value);
} else if (key.equals("secureClientPortAddress")){
secureClientPortAddress = value.trim();
} else if (key.equals("tickTime")) {
tickTime = Integer.parseInt(value);
} else if (key.equals("maxClientCnxns")) {
maxClientCnxns = Integer.parseInt(value);
} else if (key.equals("minSessionTimeout")) {
minSessionTimeout = Integer.parseInt(value);
}
... ...
}
... ...
if (dynamicConfigFileStr == null) {
setupQuorumPeerConfig(zkProp, true);
if (isDistributed() && isReconfigEnabled()) {
// we don't backup static config for standalone mode.
// we also don't backup if reconfig feature is disabled.
backupOldConfig();
}
}
}
QuorumPeerConfig.java
void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
throws IOException, ConfigException {
quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
setupMyId();
setupClientPort();
setupPeerType();
checkValidity();
}
QuorumPeerConfig.java
private void setupMyId() throws IOException {
File myIdFile = new File(dataDir, "myid");
// standalone server doesn't need myid file.
if (!myIdFile.isFile()) {
return;
}
BufferedReader br = new BufferedReader(new FileReader(myIdFile));
String myIdString;
try {
myIdString = br.readLine();
} finally {
br.close();
}
try {
// 将解析 myid 文件中的 id 赋值给 serverId
serverId = Long.parseLong(myIdString);
MDC.put("myid", myIdString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("serverid " + myIdString + " is not a number");
}
}
2.2.4 过期快照删除
可以启动定时任务,对过期的快照,执行删除。默认该功能时关闭的
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException{
// 管理 zk 的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 1 解析参数,zoo.cfg 和 myid
config.parse(args[0]);
}
// 2 启动定时任务,对过期的快照,执行删除(默认是关闭)
// config.getSnapRetainCount() = 3 最少保留的快照个数
// config.getPurgeInterval() = 0 默认 0 表示关闭
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
// 3 启动集群
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
protected int snapRetainCount = 3;
protected int purgeInterval = 0;
public void start() {
if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
LOG.warn("Purge task is already running.");
return;
}
// 默认情况 purgeInterval=0,该任务关闭,直接返回
// Don't schedule the purge task with zero or negative purge interval.
if (purgeInterval <= 0) {
LOG.info("Purge task is not scheduled.");
return;
}
// 创建一个定时器
timer = new Timer("PurgeTask", true);
// 创建一个清理快照任务
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
// 如果 purgeInterval 设置的值是 1,表示 1 小时检查一次,判断是否有过期快照,
有则删除
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
purgeTaskStatus = PurgeTaskStatus.STARTED;
}
static class PurgeTask extends TimerTask {
private File logsDir;
private File snapsDir;
private int snapRetainCount;
public PurgeTask(File dataDir, File snapDir, int count) {
logsDir = dataDir;
snapsDir = snapDir;
snapRetainCount = count;
}
@Override
public void run() {
LOG.info("Purge task started.");
try {
// 清理过期的数据
PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
} catch (Exception e) {
LOG.error("Error occurred while purging.", e);
}
LOG.info("Purge task completed.");
}
}
public static void purge(File dataDir, File snapDir, int num) throws IOException {
if (num < 3) {
throw new IllegalArgumentException(COUNT_ERR_MSG);
}
FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
List<File> snaps = txnLog.findNRecentSnapshots(num);
int numSnaps = snaps.size();
if (numSnaps > 0) {
purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
}
}