算法基础

思考:Zookeeper 是如何保证数据一致性的?这也是困扰分布式系统框架的一个难题。

拜占庭将军问题

  1. 拜占庭将军问题是一个协议问题,拜占庭帝国军队的将军们必须全体一致的决定是否攻击某一支敌军。问题是这些将军在地理上是分隔开来的,并且将军中存在叛徒。叛徒可以任意行动以达到以下目标:欺骗某些将军采取进攻行动;促成一个不是所有将军都同意的决定,如当将军们不希望进攻时促成进攻行动;或者迷惑某些将军,使他们无法做出决定。如果叛徒达到了这些目的之一,则任何攻击行动的结果都是注定要失败的,只有完全达成一致的努力才能获得胜利。

Paxos 算法

请查看:Paxos算法

ZAB协议

请查看:ZAB协议

CAP理论

请查看:CAP理论

辅助源码

持久化源码

  1. Leader Follower 中的数据会在内存和磁盘中各保存一份。所以需要将内存中的数据持久化到磁盘中。<br /> `org.apache.zookeeper.server.persistence`包下的相关类都是持久化相关的代码。<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/8427674/1636008658170-d5211ed5-3b5c-42d1-b02d-7833ca0b0d07.png#clientId=u45bca8b6-4981-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=502&id=ua28feb30&margin=%5Bobject%20Object%5D&name=image.png&originHeight=502&originWidth=1079&originalType=binary&ratio=1&rotation=0&showTitle=false&size=147531&status=done&style=none&taskId=ud544ba36-7dd3-4096-8787-4c25efc08de&title=&width=1079)

快照

  1. package org.apache.zookeeper.server.persistence;
  2. public interface SnapShot {
  3. // 反序列化方法
  4. long deserialize(DataTree dt, Map<Long, Integer> sessions)
  5. throws IOException;
  6. // 序列化方法
  7. void serialize(DataTree dt, Map<Long, Integer> sessions,
  8. File name)
  9. throws IOException;
  10. /**
  11. * find the most recent snapshot file
  12. * 查找最近的快照文件
  13. */
  14. File findMostRecentSnapshot() throws IOException;
  15. // 释放资源
  16. void close() throws IOException;
  17. }

操作日志

  1. package org.apache.zookeeper.server.persistence;
  2. public interface TxnLog {
  3. // 设置服务状态
  4. void setServerStats(ServerStats serverStats);
  5. // 滚动日志
  6. void rollLog() throws IOException;
  7. // 追加
  8. boolean append(TxnHeader hdr, Record r) throws IOException;
  9. // 读取数据
  10. TxnIterator read(long zxid) throws IOException;
  11. // 获取最后一个 zxid
  12. long getLastLoggedZxid() throws IOException;
  13. // 删除日志
  14. boolean truncate(long zxid) throws IOException;
  15. // 获取 DbId
  16. long getDbId() throws IOException;
  17. // 提交
  18. void commit() throws IOException;
  19. // 日志同步时间
  20. long getTxnLogSyncElapsedTime();
  21. // 关闭日志
  22. void close() throws IOException;
  23. // 读取日志的接口
  24. public interface TxnIterator {
  25. // 获取头信息
  26. TxnHeader getHeader();
  27. // 获取传输的内容
  28. Record getTxn();
  29. // 下一条记录
  30. boolean next() throws IOException;
  31. // 关闭资源
  32. void close() throws IOException;
  33. // 获取存储的大小
  34. long getStorageSize() throws IOException;
  35. }
  36. }

处理持久化的核心类

image.png

序列化源码

zookeeper-jute 代码是关于 Zookeeper 序列化相关源码。
image.png

序列化和反序列化方法

  1. package org.apache.jute;
  2. public interface Record {
  3. // 序列化方法
  4. public void serialize(OutputArchive archive, String tag)
  5. throws IOException;
  6. // 反序列化方法
  7. public void deserialize(InputArchive archive, String tag)
  8. throws IOException;
  9. }

迭代

  1. public interface Index {
  2. // 结束
  3. public boolean done();
  4. // 下一个
  5. public void incr();
  6. }

序列化支持的数据类型

  1. package org.apache.jute;
  2. /**
  3. * Interface that alll the serializers have to implement.
  4. *
  5. */
  6. public interface OutputArchive {
  7. public void writeByte(byte b, String tag) throws IOException;
  8. public void writeBool(boolean b, String tag) throws IOException;
  9. public void writeInt(int i, String tag) throws IOException;
  10. public void writeLong(long l, String tag) throws IOException;
  11. public void writeFloat(float f, String tag) throws IOException;
  12. public void writeDouble(double d, String tag) throws IOException;
  13. public void writeString(String s, String tag) throws IOException;
  14. public void writeBuffer(byte buf[], String tag)
  15. throws IOException;
  16. public void writeRecord(Record r, String tag) throws IOException;
  17. public void startRecord(Record r, String tag) throws IOException;
  18. public void endRecord(Record r, String tag) throws IOException;
  19. public void startVector(List<?> v, String tag) throws IOException;
  20. public void endVector(List<?> v, String tag) throws IOException;
  21. public void startMap(TreeMap<?,?> v, String tag) throws IOException;
  22. public void endMap(TreeMap<?,?> v, String tag) throws IOException;
  23. }

如:

  • BinaryOutputArchive
  • CsvOutputArchive
  • XmlOutputArchive

    反序列化支持的数据类型

    ```java package org.apache.jute;

/**

  • Interface that all the Deserializers have to implement. / public interface InputArchive { public byte readByte(String tag) throws IOException; public boolean readBool(String tag) throws IOException; public int readInt(String tag) throws IOException; public long readLong(String tag) throws IOException; public float readFloat(String tag) throws IOException; public double readDouble(String tag) throws IOException; public String readString(String tag) throws IOException; public byte[] readBuffer(String tag) throws IOException; public void readRecord(Record r, String tag) throws IOException; public void startRecord(String tag) throws IOException; public void endRecord(String tag) throws IOException; public Index startVector(String tag) throws IOException; public void endVector(String tag) throws IOException; public Index startMap(String tag) throws IOException; public void endMap(String tag) throws IOException; } ``` 如:
  • BinaryInputArchive
  • CsvInputArchive
  • XmlInputArchive

    ZK服务端初始化源码

    image.png

    ZK服务端启动脚本源码

    Zookeeper 服务的启动命令是 zkServer.sh start
    zkServer.sh start 底层的实际执行内容: ```shell nohup “$JAVA”
  • 一堆提交参数
  • $ZOOMAIN(org.apache.zookeeper.server.quorum.QuorumPeerMain)
  • “$ZOOCFG” (zkEnv.sh 文件中 ZOOCFG=”zoo.cfg”) ``` 所以程序的入口是 QuorumPeerMain.java 类 .

    zkServer.sh

    ```shell

    !/usr/bin/env bash

use POSTIX interface, symlink is followed automatically

ZOOBIN=”${BASH_SOURCE-$0}” ZOOBIN=”$(dirname “${ZOOBIN}”)” ZOOBINDIR=”$(cd “${ZOOBIN}”; pwd)”

if [ -e “$ZOOBIN/../libexec/zkEnv.sh” ]; then . “$ZOOBINDIR”/../libexec/zkEnv.sh else . “$ZOOBINDIR”/zkEnv.sh fi

See the following page for extensive details on setting

up the JVM to accept JMX remote management:

http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html

by default we allow local JMX connections

if [ “x$JMXLOCALONLY” = “x” ] then JMXLOCALONLY=false fi

if [ “x$JMXDISABLE” = “x” ] || [ “$JMXDISABLE” = ‘false’ ] then echo “ZooKeeper JMX enabled by default” >&2 if [ “x$JMXPORT” = “x” ] then

# for some reason these two options are necessary on jdk6 on Ubuntu
#   accord to the docs they are not necessary, but otw jconsole cannot
#   do a local attach
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"

else if [ “x$JMXAUTH” = “x” ] then JMXAUTH=false fi if [ “x$JMXSSL” = “x” ] then JMXSSL=false fi if [ “x$JMXLOG4J” = “x” ] then JMXLOG4J=true fi echo “ZooKeeper remote JMX Port set to $JMXPORT” >&2 echo “ZooKeeper remote JMX authenticate set to $JMXAUTH” >&2 echo “ZooKeeper remote JMX ssl set to $JMXSSL” >&2 echo “ZooKeeper remote JMX log4j set to $JMXLOG4J” >&2 ZOOMAIN=”-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain” fi else echo “JMX disabled by user request” >&2 ZOOMAIN=”org.apache.zookeeper.server.quorum.QuorumPeerMain” fi

if [ “x$SERVER_JVMFLAGS” != “x” ] then JVMFLAGS=”$SERVER_JVMFLAGS $JVMFLAGS” fi

if [ “x$2” != “x” ] then ZOOCFG=”$ZOOCFGDIR/$2” fi

if we give a more complicated path to the config, don’t screw around in $ZOOCFGDIR

if [ “x$(dirname “$ZOOCFG”)” != “x$ZOOCFGDIR” ] then ZOOCFG=”$2” fi

if $cygwin then ZOOCFG=cygpath -wp "$ZOOCFG"

# cygwin has a "kill" in the shell itself, gets confused
KILL=/bin/kill

else KILL=kill fi

echo “Using config: $ZOOCFG” >&2

case “$OSTYPE” in solaris) GREP=/usr/xpg4/bin/grep ;; ) GREP=grep ;; esac ZOO_DATADIR=”$($GREP “^[[:space:]]dataDir” “$ZOOCFG” | sed -e ‘s/.=//‘)” ZOO_DATADIR=”$(echo -e “${ZOO_DATADIR}” | sed -e ‘s/^[[:space:]]//‘ -e ‘s/[[:space:]]$//‘)” ZOO_DATALOGDIR=”$($GREP “^[[:space:]]dataLogDir” “$ZOOCFG” | sed -e ‘s/.*=//‘)”

iff autocreate is turned off and the datadirs don’t exist fail

immediately as we can’t create the PID file, etc…, anyway.

if [ -n “$ZOO_DATADIR_AUTOCREATE_DISABLE” ]; then if [ ! -d “$ZOO_DATADIR/version-2” ]; then echo “ZooKeeper data directory is missing at $ZOO_DATADIR fix the path or run initialize” exit 1 fi

if [ -n "$ZOO_DATALOGDIR" ] && [ ! -d "$ZOO_DATALOGDIR/version-2" ]; then
    echo "ZooKeeper txnlog directory is missing at $ZOO_DATALOGDIR fix the path or run initialize"
    exit 1
fi
ZOO_DATADIR_AUTOCREATE="-Dzookeeper.datadir.autocreate=false"

fi

if [ -z “$ZOOPIDFILE” ]; then if [ ! -d “$ZOO_DATADIR” ]; then mkdir -p “$ZOO_DATADIR” fi ZOOPIDFILE=”$ZOO_DATADIR/zookeeper_server.pid” else

# ensure it exists, otw stop will fail
mkdir -p "$(dirname "$ZOOPIDFILE")"

fi

if [ ! -w “$ZOO_LOG_DIR” ] ; then mkdir -p “$ZOO_LOG_DIR” fi

ZOO_LOG_FILE=zookeeper-$USER-server-$HOSTNAME.log _ZOO_DAEMON_OUT=”$ZOO_LOG_DIR/zookeeper-$USER-server-$HOSTNAME.out”

case $1 in start) echo -n “Starting zookeeper … “ if [ -f “$ZOOPIDFILE” ]; then if kill -0 cat "$ZOOPIDFILE" > /dev/null 2>&1; then echo $command already running as process cat "$ZOOPIDFILE". exit 1 fi fi nohup “$JAVA” $ZOO_DATADIR_AUTOCREATE “-Dzookeeper.log.dir=${ZOO_LOG_DIR}” \ “-Dzookeeper.log.file=${ZOO_LOG_FILE}” “-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}” \ -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError=’kill -9 %p’ \ -cp “$CLASSPATH” $JVMFLAGS $ZOOMAIN “$ZOOCFG” > “$_ZOO_DAEMON_OUT” 2>&1 < /dev/null & if [ $? -eq 0 ] then case “$OSTYPE” in solaris) /bin/echo “${!}\c” > “$ZOOPIDFILE” ;; *) /bin/echo -n $! > “$ZOOPIDFILE” ;; esac if [ $? -eq 0 ]; then sleep 1 pid=$(cat “${ZOOPIDFILE}”) if ps -p “${pid}” > /dev/null 2>&1; then echo STARTED else echo FAILED TO START exit 1 fi else echo FAILED TO WRITE PID exit 1 fi else echo SERVER DID NOT START exit 1 fi ;; start-foreground) ZOO_CMD=(exec “$JAVA”) if [ “${ZOO_NOEXEC}” != “” ]; then ZOO_CMD=(“$JAVA”) fi “${ZOO_CMD[@]}” $ZOO_DATADIR_AUTOCREATE “-Dzookeeper.log.dir=${ZOO_LOG_DIR}” \ “-Dzookeeper.log.file=${ZOO_LOG_FILE}” “-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}” \ -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError=’kill -9 %p’ \ -cp “$CLASSPATH” $JVMFLAGS $ZOOMAIN “$ZOOCFG” ;; print-cmd) echo “\”$JAVA\” $ZOO_DATADIR_AUTOCREATE -Dzookeeper.log.dir=\”${ZOO_LOG_DIR}\” \ -Dzookeeper.log.file=\”${ZOO_LOG_FILE}\” -Dzookeeper.root.logger=\”${ZOO_LOG4J_PROP}\” \ -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError=’kill -9 %p’ \ -cp \”$CLASSPATH\” $JVMFLAGS $ZOOMAIN \”$ZOOCFG\” > \”$_ZOO_DAEMON_OUT\” 2>&1 < /dev/null” ;; stop) echo -n “Stopping zookeeper … “ if [ ! -f “$ZOOPIDFILE” ] then echo “no zookeeper to stop (could not find file $ZOOPIDFILE)” else $KILL $(cat “$ZOOPIDFILE”) rm “$ZOOPIDFILE” sleep 1 echo STOPPED fi exit 0 ;; restart) shift “$0” stop ${@} sleep 3 “$0” start ${@} ;; status)

# -q is necessary on some versions of linux where nc returns too quickly, and no stat result is output
clientPortAddress=`$GREP "^[[:space:]]*clientPortAddress[^[:alpha:]]" "$ZOOCFG" | sed -e 's/.*=//'`
if ! [ $clientPortAddress ]
then
clientPortAddress="localhost"
fi
clientPort=`$GREP "^[[:space:]]*clientPort[^[:alpha:]]" "$ZOOCFG" | sed -e 's/.*=//'`
if ! [[ "$clientPort"  =~ ^[0-9]+$ ]]
then
   dataDir=`$GREP "^[[:space:]]*dataDir" "$ZOOCFG" | sed -e 's/.*=//'`
   myid=`cat "$dataDir/myid"`
   if ! [[ "$myid" =~ ^[0-9]+$ ]] ; then
     echo "clientPort not found and myid could not be determined. Terminating."
     exit 1
   fi
   clientPortAndAddress=`$GREP "^[[:space:]]*server.$myid=.*;.*" "$ZOOCFG" | sed -e 's/.*=//' | sed -e 's/.*;//'`
   if [ ! "$clientPortAndAddress" ] ; then
       echo "Client port not found in static config file. Looking in dynamic config file."
       dynamicConfigFile=`$GREP "^[[:space:]]*dynamicConfigFile" "$ZOOCFG" | sed -e 's/.*=//'`
       clientPortAndAddress=`$GREP "^[[:space:]]*server.$myid=.*;.*" "$dynamicConfigFile" | sed -e 's/.*=//' | sed -e 's/.*;//'`
   fi
   if [ ! "$clientPortAndAddress" ] ; then
      echo "Client port not found. Terminating."
      exit 1
   fi
   if [[ "$clientPortAndAddress" =~ ^.*:[0-9]+ ]] ; then
      clientPortAddress=`echo "$clientPortAndAddress" | sed -e 's/:.*//'`
   fi
   clientPort=`echo "$clientPortAndAddress" | sed -e 's/.*://'`
   if [ ! "$clientPort" ] ; then
      echo "Client port not found. Terminating."
      exit 1
   fi
fi
echo "Client port found: $clientPort. Client address: $clientPortAddress."
STAT=`"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" "-Dzookeeper.log.file=${ZOO_LOG_FILE}" \
         -cp "$CLASSPATH" $JVMFLAGS org.apache.zookeeper.client.FourLetterWordMain \
         $clientPortAddress $clientPort srvr 2> /dev/null    \
      | $GREP Mode`
if [ "x$STAT" = "x" ]
then
    echo "Error contacting service. It is probably not running."
    exit 1
else
    echo $STAT
    exit 0
fi
;;

*) echo “Usage: $0 [—config ] {start|start-foreground|stop|restart|status|print-cmd}” >&2

esac

<a name="zkEnv.sh"></a>
#### zkEnv.sh
```shell
#!/usr/bin/env bash

ZOOBINDIR="${ZOOBINDIR:-/usr/bin}"
ZOOKEEPER_PREFIX="${ZOOBINDIR}/.."

#check to see if the conf dir is given as an optional argument
if [ $# -gt 1 ]
then
    if [ "--config" = "$1" ]
      then
          shift
          confdir=$1
          shift
          ZOOCFGDIR=$confdir
    fi
fi

if [ "x$ZOOCFGDIR" = "x" ]
then
  if [ -e "${ZOOKEEPER_PREFIX}/conf" ]; then
    ZOOCFGDIR="$ZOOBINDIR/../conf"
  else
    ZOOCFGDIR="$ZOOBINDIR/../etc/zookeeper"
  fi
fi

if [ -f "${ZOOCFGDIR}/zookeeper-env.sh" ]; then
  . "${ZOOCFGDIR}/zookeeper-env.sh"
fi

if [ "x$ZOOCFG" = "x" ]
then
    ZOOCFG="zoo.cfg"
fi

ZOOCFG="$ZOOCFGDIR/$ZOOCFG"

if [ -f "$ZOOCFGDIR/java.env" ]
then
    . "$ZOOCFGDIR/java.env"
fi

if [ "x${ZOO_LOG_DIR}" = "x" ]
then
    ZOO_LOG_DIR="$ZOOKEEPER_PREFIX/logs"
fi

if [ "x${ZOO_LOG4J_PROP}" = "x" ]
then
    ZOO_LOG4J_PROP="INFO,CONSOLE"
fi

if [[ -n "$JAVA_HOME" ]] && [[ -x "$JAVA_HOME/bin/java" ]];  then
    JAVA="$JAVA_HOME/bin/java"
elif type -p java; then
    JAVA=java
else
    echo "Error: JAVA_HOME is not set and java could not be found in PATH." 1>&2
    exit 1
fi

#add the zoocfg dir to classpath
CLASSPATH="$ZOOCFGDIR:$CLASSPATH"

for i in "$ZOOBINDIR"/../zookeeper-server/src/main/resources/lib/*.jar
do
    CLASSPATH="$i:$CLASSPATH"
done

#make it work in the binary package
#(use array for LIBPATH to account for spaces within wildcard expansion)
if ls "${ZOOKEEPER_PREFIX}"/share/zookeeper/zookeeper-*.jar > /dev/null 2>&1; then 
  LIBPATH=("${ZOOKEEPER_PREFIX}"/share/zookeeper/*.jar)
else
  #release tarball format
  for i in "$ZOOBINDIR"/../zookeeper-*.jar
  do
    CLASSPATH="$i:$CLASSPATH"
  done
  LIBPATH=("${ZOOBINDIR}"/../lib/*.jar)
fi

for i in "${LIBPATH[@]}"
do
    CLASSPATH="$i:$CLASSPATH"
done

#make it work for developers
for d in "$ZOOBINDIR"/../build/lib/*.jar
do
   CLASSPATH="$d:$CLASSPATH"
done

for d in "$ZOOBINDIR"/../zookeeper-server/target/lib/*.jar
do
   CLASSPATH="$d:$CLASSPATH"
done

#make it work for developers
CLASSPATH="$ZOOBINDIR/../build/classes:$CLASSPATH"

#make it work for developers
CLASSPATH="$ZOOBINDIR/../zookeeper-server/target/classes:$CLASSPATH"

case "`uname`" in
    CYGWIN*|MINGW*) cygwin=true ;;
    *) cygwin=false ;;
esac

if $cygwin
then
    CLASSPATH=`cygpath -wp "$CLASSPATH"`
fi

#echo "CLASSPATH=$CLASSPATH"

# default heap for zookeeper server
ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-1000}"
export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS"

# default heap for zookeeper client
ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-256}"
export CLIENT_JVMFLAGS="-Xmx${ZK_CLIENT_HEAP}m $CLIENT_JVMFLAGS"

ZK服务端启动入口

从启动脚本中找到实际执行的类是org.apache.zookeeper.server.quorum.QuorumPeerMain

QuorumPeerMain.java

package org.apache.zookeeper.server.quorum;

public class QuorumPeerMain {
    public static void main(String[] args) {
     // 创建了一个 zk 节点
     QuorumPeerMain main = new QuorumPeerMain();
     try {
     // 初始化节点并运行,args 相当于提交参数中的 zoo.cfg
     main.initializeAndRun(args);
     } catch (IllegalArgumentException e) {
     ... ...
     }
     LOG.info("Exiting normally");
     System.exit(0);
    }
}

initializeAndRun()

    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException, AdminServerException
    {
        // 管理 zk 的配置信息
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            // 1 解析参数,zoo.cfg 和 myid
            config.parse(args[0]);
        }

        // 2 启动定时任务,对过期的快照,执行删除(默认该功能关闭)
        // Start and schedule the the purge task
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();

        if (args.length == 1 && config.isDistributed()) {
            // 3 启动集群
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);
        }
    }

解析参数zoo.cfg和myid

package org.apache.zookeeper.server.quorum;
public class QuorumPeerConfig {
    public void parse(String path) throws ConfigException {
         LOG.info("Reading configuration from: " + path);

         try {
            // 校验文件路径及是否存在
             File configFile = (new VerifyingFileFactory.Builder(LOG)
             .warnForRelativePath()
             .failForNonExistingPath()
             .build()).create(path);

             Properties cfg = new Properties();
             FileInputStream in = new FileInputStream(configFile);
             try {
                // 加载配置文件
                 cfg.load(in);
                 configFileStr = path;
            } finally {
                in.close();
            }
            // 解析配置文件
            parseProperties(cfg);
        } catch (IOException e) {
            throw new ConfigException("Error processing " + path, e);
        } catch (IllegalArgumentException e) {
            throw new ConfigException("Error processing " + path, e);
        } 

        ... ...
    }
}

parseProperties:

    public void parseProperties(Properties zkProp)
    throws IOException, ConfigException {
        int clientPort = 0;
        int secureClientPort = 0;
        String clientPortAddress = null;
        String secureClientPortAddress = null;
        VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
        // 读取 zoo.cfg 文件中的属性值,并赋值给 QuorumPeerConfig 的类对象
        for (Entry<Object, Object> entry : zkProp.entrySet()) {
            String key = entry.getKey().toString().trim();
            String value = entry.getValue().toString().trim();
            if (key.equals("dataDir")) {
                dataDir = vff.create(value);
            } else if (key.equals("dataLogDir")) {
                dataLogDir = vff.create(value);
            } else if (key.equals("clientPort")) {
                clientPort = Integer.parseInt(value);
            } else if (key.equals("localSessionsEnabled")) {
                localSessionsEnabled = Boolean.parseBoolean(value);
            } else if (key.equals("localSessionsUpgradingEnabled")) {
                localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
            } else if (key.equals("clientPortAddress")) {
                clientPortAddress = value.trim();
            } else if (key.equals("secureClientPort")) {
                secureClientPort = Integer.parseInt(value);
            } else if (key.equals("secureClientPortAddress")){
                secureClientPortAddress = value.trim();
            } else if (key.equals("tickTime")) {
                tickTime = Integer.parseInt(value);
            } else if (key.equals("maxClientCnxns")) {
                maxClientCnxns = Integer.parseInt(value);
            } else if (key.equals("minSessionTimeout")) {
                minSessionTimeout = Integer.parseInt(value);
            } else if (key.equals("maxSessionTimeout")) {
                maxSessionTimeout = Integer.parseInt(value);
            } else if (key.equals("initLimit")) {
                initLimit = Integer.parseInt(value);
            } else if (key.equals("syncLimit")) {
                syncLimit = Integer.parseInt(value);
            } else if (key.equals("electionAlg")) {
                electionAlg = Integer.parseInt(value);
            } else if (key.equals("quorumListenOnAllIPs")) {
                quorumListenOnAllIPs = Boolean.parseBoolean(value);
            } else if (key.equals("peerType")) {
                if (value.toLowerCase().equals("observer")) {
                    peerType = LearnerType.OBSERVER;
                } else if (value.toLowerCase().equals("participant")) {
                    peerType = LearnerType.PARTICIPANT;
                } else
                {
                    throw new ConfigException("Unrecognised peertype: " + value);
                }
            } else if (key.equals( "syncEnabled" )) {
                syncEnabled = Boolean.parseBoolean(value);
            } else if (key.equals("dynamicConfigFile")){
                dynamicConfigFileStr = value;
            } else if (key.equals("autopurge.snapRetainCount")) {
                snapRetainCount = Integer.parseInt(value);
            } else if (key.equals("autopurge.purgeInterval")) {
                purgeInterval = Integer.parseInt(value);
            } else if (key.equals("standaloneEnabled")) {
                if (value.toLowerCase().equals("true")) {
                    setStandaloneEnabled(true);
                } else if (value.toLowerCase().equals("false")) {
                    setStandaloneEnabled(false);
                } else {
                    throw new ConfigException("Invalid option " + value + " for standalone mode. Choose 'true' or 'false.'");
                }
            } else if (key.equals("reconfigEnabled")) {
                if (value.toLowerCase().equals("true")) {
                    setReconfigEnabled(true);
                } else if (value.toLowerCase().equals("false")) {
                    setReconfigEnabled(false);
                } else {
                    throw new ConfigException("Invalid option " + value + " for reconfigEnabled flag. Choose 'true' or 'false.'");
                }
            } else if (key.equals("sslQuorum")){
                sslQuorum = Boolean.parseBoolean(value);
            } else if (key.equals("portUnification")){
                shouldUsePortUnification = Boolean.parseBoolean(value);
            } else if (key.equals("sslQuorumReloadCertFiles")) {
                sslQuorumReloadCertFiles = Boolean.parseBoolean(value);
            } else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight")) && zkProp.containsKey("dynamicConfigFile")) {
                throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file");
            } else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) {
                quorumEnableSasl = Boolean.parseBoolean(value);
            } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED)) {
                quorumServerRequireSasl = Boolean.parseBoolean(value);
            } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED)) {
                quorumLearnerRequireSasl = Boolean.parseBoolean(value);
            } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT)) {
                quorumLearnerLoginContext = value;
            } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT)) {
                quorumServerLoginContext = value;
            } else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) {
                quorumServicePrincipal = value;
            } else if (key.equals("quorum.cnxn.threads.size")) {
                quorumCnxnThreadsSize = Integer.parseInt(value);
            } else {
                System.setProperty("zookeeper." + key, value);
            }
        }

        if (!quorumEnableSasl && quorumServerRequireSasl) {
            throw new IllegalArgumentException(
                    QuorumAuth.QUORUM_SASL_AUTH_ENABLED
                            + " is disabled, so cannot enable "
                            + QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);
        }
        if (!quorumEnableSasl && quorumLearnerRequireSasl) {
            throw new IllegalArgumentException(
                    QuorumAuth.QUORUM_SASL_AUTH_ENABLED
                            + " is disabled, so cannot enable "
                            + QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED);
        }
        // If quorumpeer learner is not auth enabled then self won't be able to
        // join quorum. So this condition is ensuring that the quorumpeer learner
        // is also auth enabled while enabling quorum server require sasl.
        if (!quorumLearnerRequireSasl && quorumServerRequireSasl) {
            throw new IllegalArgumentException(
                    QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED
                            + " is disabled, so cannot enable "
                            + QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);
        }

        // Reset to MIN_SNAP_RETAIN_COUNT if invalid (less than 3)
        // PurgeTxnLog.purge(File, File, int) will not allow to purge less
        // than 3.
        if (snapRetainCount < MIN_SNAP_RETAIN_COUNT) {
            LOG.warn("Invalid autopurge.snapRetainCount: " + snapRetainCount
                    + ". Defaulting to " + MIN_SNAP_RETAIN_COUNT);
            snapRetainCount = MIN_SNAP_RETAIN_COUNT;
        }

        if (dataDir == null) {
            throw new IllegalArgumentException("dataDir is not set");
        }
        if (dataLogDir == null) {
            dataLogDir = dataDir;
        }

        if (clientPort == 0) {
            LOG.info("clientPort is not set");
            if (clientPortAddress != null) {
                throw new IllegalArgumentException("clientPortAddress is set but clientPort is not set");
            }
        } else if (clientPortAddress != null) {
            this.clientPortAddress = new InetSocketAddress(
                    InetAddress.getByName(clientPortAddress), clientPort);
            LOG.info("clientPortAddress is {}", formatInetAddr(this.clientPortAddress));
        } else {
            this.clientPortAddress = new InetSocketAddress(clientPort);
            LOG.info("clientPortAddress is {}", formatInetAddr(this.clientPortAddress));
        }

        if (secureClientPort == 0) {
            LOG.info("secureClientPort is not set");
            if (secureClientPortAddress != null) {
                throw new IllegalArgumentException("secureClientPortAddress is set but secureClientPort is not set");
            }
        } else if (secureClientPortAddress != null) {
            this.secureClientPortAddress = new InetSocketAddress(
                    InetAddress.getByName(secureClientPortAddress), secureClientPort);
            LOG.info("secureClientPortAddress is {}", formatInetAddr(this.secureClientPortAddress));
        } else {
            this.secureClientPortAddress = new InetSocketAddress(secureClientPort);
            LOG.info("secureClientPortAddress is {}", formatInetAddr(this.secureClientPortAddress));
        }
        if (this.secureClientPortAddress != null) {
            configureSSLAuth();
        }

        if (tickTime == 0) {
            throw new IllegalArgumentException("tickTime is not set");
        }

        minSessionTimeout = minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
        maxSessionTimeout = maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;

        if (minSessionTimeout > maxSessionTimeout) {
            throw new IllegalArgumentException(
                    "minSessionTimeout must not be larger than maxSessionTimeout");
        }          

        // backward compatibility - dynamic configuration in the same file as
        // static configuration params see writeDynamicConfig()
        if (dynamicConfigFileStr == null) {
            setupQuorumPeerConfig(zkProp, true);
            if (isDistributed() && isReconfigEnabled()) {
                // we don't backup static config for standalone mode.
                // we also don't backup if reconfig feature is disabled.
                backupOldConfig();
            }
        }
    }

setupQuorumPeerConfig:

    void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
            throws IOException, ConfigException {
        quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
        setupMyId();
        setupClientPort();
        setupPeerType();
        checkValidity();
    }

setupMyId:

    private void setupMyId() throws IOException {
        File myIdFile = new File(dataDir, "myid");
        // standalone server doesn't need myid file.
        if (!myIdFile.isFile()) {
            return;
        }
        BufferedReader br = new BufferedReader(new FileReader(myIdFile));
        String myIdString;
        try {
            myIdString = br.readLine();
        } finally {
            br.close();
        }
        try {
            // 将解析 myid 文件中的 id 赋值给 serverId
            serverId = Long.parseLong(myIdString);
            MDC.put("myid", myIdString);
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException("serverid " + myIdString
                    + " is not a number");
        }
    }

过期快照删除

可以启动定时任务,对过期的快照,执行删除。默认该功能是关闭的。

protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException
{
     // 管理 zk 的配置信息
     QuorumPeerConfig config = new QuorumPeerConfig();
     if (args.length == 1) {
        // 1 解析参数,zoo.cfg 和 myid
     config.parse(args[0]);
     }
    // 2 启动定时任务,对过期的快照,执行删除(默认是关闭)
    // config.getSnapRetainCount() = 3 最少保留的快照个数
    // config.getPurgeInterval() = 0 默认 0 表示关闭
    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), 
            config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();
    if (args.length == 1 && config.isDistributed()) {
        // 3 启动集群
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    } 
}
public void start() {
     if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
        LOG.warn("Purge task is already running.");
        return;
     }
     // 默认情况 purgeInterval=0,该任务关闭,直接返回
     // Don't schedule the purge task with zero or negative purge interval.
     if (purgeInterval <= 0) {
        LOG.info("Purge task is not scheduled.");
        return;
     }
    // 创建一个定时器
     timer = new Timer("PurgeTask", true);
    // 创建一个清理快照任务
     TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
    // 如果 purgeInterval 设置的值是 1,表示 1 小时检查一次,判断是否有过期快照,
    有则删除
     timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
     purgeTaskStatus = PurgeTaskStatus.STARTED;
}
static class PurgeTask extends TimerTask {
    private File logsDir;
    private File snapsDir;
    private int snapRetainCount;
    public PurgeTask(File dataDir, File snapDir, int count) {
     logsDir = dataDir;
     snapsDir = snapDir;
     snapRetainCount = count;
    }
    @Override
    public void run() {
        LOG.info("Purge task started.");
        try {
            // 清理过期的数据
            PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
        } catch (Exception e) {
        LOG.error("Error occurred while purging.", e);
        }
            LOG.info("Purge task completed.");
        } 
    }
    public static void purge(File dataDir, File snapDir, int num) throws IOException {
        if (num < 3) {
            throw new IllegalArgumentException(COUNT_ERR_MSG);
        }
        FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
        List<File> snaps = txnLog.findNRecentSnapshots(num);
        int numSnaps = snaps.size();
        if (numSnaps > 0) {
            purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
    } 
}

初始化通信组件

protected void initializeAndRun(String[] args)
 throws ConfigException, IOException, AdminServerException
{
// 管理 zk 的配置信息
 QuorumPeerConfig config = new QuorumPeerConfig();
 if (args.length == 1) {
// 1 解析参数,zoo.cfg 和 myid
 config.parse(args[0]);
 }
 // 2 启动定时任务,对过期的快照,执行删除(默认是关闭)
 // config.getSnapRetainCount() = 3 最少保留的快照个数
 // config.getPurgeInterval() = 0 默认 0 表示关闭
 // Start and schedule the the purge task
 DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
 .getDataDir(), config.getDataLogDir(), config
 .getSnapRetainCount(), config.getPurgeInterval());
 purgeMgr.start();
 if (args.length == 1 && config.isDistributed()) {
    // 3 启动集群(集群模式)
    runFromConfig(config);
 } else {
      LOG.warn("Either no config or no quorum defined in config, running "
     + " in standalone mode");
    // there is only server in the quorum -- run as standalone
    // 本地模式
    ZooKeeperServerMain.main(args);
 } 
}

1、通信协议默认 NIO(可以支持 Netty)

public void runFromConfig(QuorumPeerConfig config)
 throws IOException, AdminServerException
{
 … …
 LOG.info("Starting quorum peer");
 try {
 ServerCnxnFactory cnxnFactory = null;
 ServerCnxnFactory secureCnxnFactory = null;
 // 通信组件初始化,默认是 NIO 通信
 if (config.getClientPortAddress() != null) {
 cnxnFactory = ServerCnxnFactory.createFactory();
 cnxnFactory.configure(config.getClientPortAddress(),
 config.getMaxClientCnxns(), false);
 }
 if (config.getSecureClientPortAddress() != null) {
 secureCnxnFactory = ServerCnxnFactory.createFactory();
 secureCnxnFactory.configure(config.getSecureClientPortAddress(),
 config.getMaxClientCnxns(), true);
 }
 // 把解析的参数赋值给该 zookeeper 节点
 quorumPeer = getQuorumPeer();
 quorumPeer.setTxnFactory(new FileTxnSnapLog(
 config.getDataLogDir(),
 config.getDataDir()));
 quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
 quorumPeer.enableLocalSessionsUpgrading(
 config.isLocalSessionsUpgradingEnabled());
 //quorumPeer.setQuorumPeers(config.getAllMembers());
 quorumPeer.setElectionType(config.getElectionAlg());
 quorumPeer.setMyid(config.getServerId());
 quorumPeer.setTickTime(config.getTickTime());
 quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
 quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
 quorumPeer.setInitLimit(config.getInitLimit());
 quorumPeer.setSyncLimit(config.getSyncLimit());
 quorumPeer.setConfigFileName(config.getConfigFilename());
 // 管理 zk 数据的存储
 quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
 quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
 if (config.getLastSeenQuorumVerifier()!=null) {
 quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(),false);
 }
 quorumPeer.initConfigInZKDatabase();
 // 管理 zk 的通信
 quorumPeer.setCnxnFactory(cnxnFactory);
 quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
 quorumPeer.setSslQuorum(config.isSslQuorum());
 quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
 quorumPeer.setLearnerType(config.getPeerType());
 quorumPeer.setSyncEnabled(config.getSyncEnabled());
 quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
 if (config.sslQuorumReloadCertFiles) {
 quorumPeer.getX509Util().enableCertFileReloading();
 }
… …
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
 quorumPeer.initialize();

 // 启动 zk
 quorumPeer.start();
 quorumPeer.join();
 } catch (InterruptedException e) {
 // warn, but generally this is ok
 LOG.warn("Quorum Peer interrupted", e);
 } }
static public ServerCnxnFactory createFactory() throws IOException {
 String serverCnxnFactoryName =
 System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
 if (serverCnxnFactoryName == null) {
 serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
 }
 try {
 ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) 
Class.forName(serverCnxnFactoryName)
 .getDeclaredConstructor().newInstance();
 LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
 return serverCnxnFactory;
 } catch (Exception e) {
 IOException ioe = new IOException("Couldn't instantiate "
 + serverCnxnFactoryName);
 ioe.initCause(e);
 throw ioe;
 } }
public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = 
"zookeeper.serverCnxnFactory";
zookeeperAdmin.md 文件中
* *serverCnxnFactory* :
(Java system property: **zookeeper.serverCnxnFactory**)
Specifies ServerCnxnFactory implementation. 
 This should be set to `NettyServerCnxnFactory` in order to use TLS based server 
communication.Default is `NIOServerCnxnFactory`.

2、初始化 NIO 服务端 Socket(并未启动)

ctrl + alt +B 查找 configure 实现类,NIOServerCnxnFactory.java

public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException 
{
 if (secure) {
 throw new UnsupportedOperationException("SSL isn't supported in 
NIOServerCnxn");
 }
 configureSaslLogin();
 maxClientCnxns = maxcc;
 sessionlessCnxnTimeout = Integer.getInteger(
 ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
 // We also use the sessionlessCnxnTimeout as expiring interval for
 // cnxnExpiryQueue. These don't need to be the same, but the expiring
 // interval passed into the ExpiryQueue() constructor below should be
 // less than or equal to the timeout.
 cnxnExpiryQueue =
 new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
 expirerThread = new ConnectionExpirerThread();
 int numCores = Runtime.getRuntime().availableProcessors();
 // 32 cores sweet spot seems to be 4 selector threads
 numSelectorThreads = Integer.getInteger(
 ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
 Math.max((int) Math.sqrt((float) numCores/2), 1));
 if (numSelectorThreads < 1) {
 throw new IOException("numSelectorThreads must be at least 1");
 }
 numWorkerThreads = Integer.getInteger(
 ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
 workerShutdownTimeoutMS = Long.getLong(
 ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
 ... ...
 for(int i=0; i<numSelectorThreads; ++i) {
 selectorThreads.add(new SelectorThread(i));
 }
// 初始化 NIO 服务端 socket,绑定 2181 端口,可以接收客户端请求
 this.ss = ServerSocketChannel.open();
 ss.socket().setReuseAddress(true);
 LOG.info("binding to port " + addr);
// 绑定 2181 端口
 ss.socket().bind(addr);
 ss.configureBlocking(false);
 acceptThread = new AcceptThread(ss, addr, selectorThreads);
}

ZK服务端加载数据源码

image.png

  1. zk 中的数据模型,是一棵树,DataTree,每个节点,叫做 DataNode
  2. zk 集群中的 DataTree 时刻保持状态同步
  3. Zookeeper 集群中每个 zk 节点中,数据在内存和磁盘中都有一份完整的数据。
    • 内存数据:DataTree
    • 磁盘数据:快照文件 + 编辑日志

image.png

冷启动数据恢复快照数据

1、启动集群

public void runFromConfig(QuorumPeerConfig config)
 throws IOException, AdminServerException
{
 … …
      LOG.info("Starting quorum peer");
 try {
 ServerCnxnFactory cnxnFactory = null;
 ServerCnxnFactory secureCnxnFactory = null;
 // 通信组件初始化,默认是 NIO 通信
 if (config.getClientPortAddress() != null) {
 cnxnFactory = ServerCnxnFactory.createFactory();
 cnxnFactory.configure(config.getClientPortAddress(),
 config.getMaxClientCnxns(), false);
 }
 if (config.getSecureClientPortAddress() != null) {
 secureCnxnFactory = ServerCnxnFactory.createFactory();
 secureCnxnFactory.configure(config.getSecureClientPortAddress(),
 config.getMaxClientCnxns(), true);
 }
 // 把解析的参数赋值给该 Zookeeper 节点
 quorumPeer = getQuorumPeer();
 quorumPeer.setTxnFactory(new FileTxnSnapLog(
 config.getDataLogDir(),
 config.getDataDir()));
 quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
 quorumPeer.enableLocalSessionsUpgrading(
 config.isLocalSessionsUpgradingEnabled());
 //quorumPeer.setQuorumPeers(config.getAllMembers());
 quorumPeer.setElectionType(config.getElectionAlg());
 quorumPeer.setMyid(config.getServerId());
 quorumPeer.setTickTime(config.getTickTime());
 quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
 quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
 quorumPeer.setInitLimit(config.getInitLimit());
 quorumPeer.setSyncLimit(config.getSyncLimit());
 quorumPeer.setConfigFileName(config.getConfigFilename());
 // 管理 zk 数据的存储
 quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
 quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
 if (config.getLastSeenQuorumVerifier()!=null) {
 quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), 
false);
 }
 quorumPeer.initConfigInZKDatabase();
 // 管理 zk 的通信
 quorumPeer.setCnxnFactory(cnxnFactory);
 quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
 quorumPeer.setSslQuorum(config.isSslQuorum());
 quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
 quorumPeer.setLearnerType(config.getPeerType());
 quorumPeer.setSyncEnabled(config.getSyncEnabled());
 quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
 if (config.sslQuorumReloadCertFiles) {
 quorumPeer.getX509Util().enableCertFileReloading();
 }
      quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
 quorumPeer.initialize();

 // 启动 zk
 quorumPeer.start();
 quorumPeer.join();
 } catch (InterruptedException e) {
 // warn, but generally this is ok
 LOG.warn("Quorum Peer interrupted", e);
 } }

2、冷启动恢复数据

QuorumPeer.java

public synchronized void start() {
 if (!getView().containsKey(myid)) {
 throw new RuntimeException("My id " + myid + " not in the peer list");
 }
// 冷启动数据恢复
 loadDataBase();
 startServerCnxnFactory();
 try {
// 启动通信工厂实例对象
 adminServer.start();
 } catch (AdminServerException e) {
 LOG.warn("Problem starting AdminServer", e);
 System.out.println(e);
 }
// 准备选举环境
 startLeaderElection();
// 执行选举
 super.start();
}
private void loadDataBase() {
 try {
// 加载磁盘数据到内存,恢复 DataTree
// zk 的操作分两种:事务操作和非事务操作
// 事务操作:zk.cteate();都会被分配一个全局唯一的 zxid,zxid 组成:64 位:
(前 32 位:epoch 每个 leader 任期的代号;后 32 位:txid 为事务 id)
// 非事务操作:zk.getData()
// 数据恢复过程:
// (1)从快照文件中恢复大部分数据,并得到一个 lastProcessZXid
// (2)再从编辑日志中执行 replay,执行到最后一条日志并更新 lastProcessZXid
// (3)最终得到,datatree 和 lastProcessZXid,表示数据恢复完成
 zkDb.loadDataBase();
 // load the epochs
 long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
 long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
 try {
 currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
 } catch(FileNotFoundException e) {
 // pick a reasonable epoch number
 // this should only happen once when moving to a
 // new code version
 currentEpoch = epochOfZxid;
 LOG.info(CURRENT_EPOCH_FILENAME
 + " not found! Creating with a reasonable default of {}. This should 
only happen when you are upgrading your installation",
 currentEpoch);
 writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
 }
 if (epochOfZxid > currentEpoch) {
 throw new IOException("The current epoch, " + 
ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
 }
 try {
 acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
 } catch(FileNotFoundException e) {
 // pick a reasonable epoch number
 // this should only happen once when moving to a
 // new code version
 acceptedEpoch = epochOfZxid;
 LOG.info(ACCEPTED_EPOCH_FILENAME
 + " not found! Creating with a reasonable default of {}. This should 
only happen when you are upgrading your installation",
 acceptedEpoch);
 writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
 }
 if (acceptedEpoch < currentEpoch) {
 throw new IOException("The accepted epoch, " + 
ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + 
ZxidUtils.zxidToString(currentEpoch));
 }
 } catch(IOException ie) {
 LOG.error("Unable to load database on disk", ie);
 throw new RuntimeException("Unable to run quorum server ", ie);
 } }
public long loadDataBase() throws IOException {
 long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, 
commitProposalPlaybackListener);
 initialized = true;
 return zxid;
}
public long restore(DataTree dt, Map<Long, Integer> sessions,
 PlayBackListener listener) throws IOException {
// 恢复快照文件数据到 DataTree
 long deserializeResult = snapLog.deserialize(dt, sessions);
 FileTxnLog txnLog = new FileTxnLog(dataDir);
 RestoreFinalizer finalizer = () -> {
// 恢复编辑日志数据到 DataTree
 long highestZxid = fastForwardFromEdits(dt, sessions, listener);
 return highestZxid;
 };
 if (-1L == deserializeResult) {
 /* this means that we couldn't find any snapshot, so we need to
 * initialize an empty database (reported in ZOOKEEPER-2325) */
 if (txnLog.getLastLoggedZxid() != -1) {
 // ZOOKEEPER-3056: provides an escape hatch for users upgrading
 // from old versions of zookeeper (3.4.x, pre 3.5.3).
 if (!trustEmptySnapshot) {
 throw new IOException(EMPTY_SNAPSHOT_WARNING + 
"Something is broken!");
 } else {
 LOG.warn("{}This should only be allowed during upgrading.", 
EMPTY_SNAPSHOT_WARNING);
 return finalizer.run();
 }
 }
 /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
 * or use Map on save() */
 save(dt, (ConcurrentHashMap<Long, Integer>)sessions);
 /* return a zxid of zero, since we the database is empty */
 return 0;
 }
 return finalizer.run();
}

3、FileSnap

ctrl + alt +B 查找 deserialize 实现类 FileSnap.java

public long deserialize(DataTree dt, Map<Long, Integer> sessions)
 throws IOException {
 // we run through 100 snapshots (not all of them)
 // if we cannot get it running within 100 snapshots
 // we should give up
 List<File> snapList = findNValidSnapshots(100);
 if (snapList.size() == 0) {
 return -1L;
 }
 File snap = null;
boolean foundValid = false;
// 依次遍历每一个快照的数据
 for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
 snap = snapList.get(i);
 LOG.info("Reading snapshot " + snap);
 // 反序列化环境准备
 try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
 CheckedInputStream crcIn = new CheckedInputStream(snapIS, new 
Adler32())) {
 InputArchive ia = BinaryInputArchive.getArchive(crcIn);
// 反序列化,恢复数据到 DataTree
 deserialize(dt, sessions, ia);
 long checkSum = crcIn.getChecksum().getValue();
 long val = ia.readLong("val");
 if (val != checkSum) {
 throw new IOException("CRC corruption in snapshot : " + snap);
 }
 foundValid = true;
 break;
 } catch (IOException e) {
 LOG.warn("problem reading snap file " + snap, e);
 }
 }
 if (!foundValid) {
 throw new IOException("Not able to find valid snapshots in " + snapDir);
 }
 dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), 
SNAPSHOT_FILE_PREFIX);
 return dt.lastProcessedZxid;
}
public void deserialize(DataTree dt, Map<Long, Integer> sessions,
 InputArchive ia) throws IOException {
 FileHeader header = new FileHeader();
 header.deserialize(ia, "fileheader");
 if (header.getMagic() != SNAP_MAGIC) {
 throw new IOException("mismatching magic headers "
 + header.getMagic() +
 " != " + FileSnap.SNAP_MAGIC);
 }
// 恢复快照数据到 DataTree
 SerializeUtils.deserializeSnapshot(dt,ia,sessions);
}
public static void deserializeSnapshot(DataTree dt,InputArchive ia,
 Map<Long, Integer> sessions) throws IOException {
 int count = ia.readInt("count");
 while (count > 0) {
 long id = ia.readLong("id");
 int to = ia.readInt("timeout");
 sessions.put(id, to);
 if (LOG.isTraceEnabled()) {
 ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
 "loadData --- session in archive: " + id
 + " with timeout: " + to);
 }
 count--;
 }
// 恢复快照数据到 DataTree
 dt.deserialize(ia, "tree");
}
public void deserialize(InputArchive ia, String tag) throws IOException {
 aclCache.deserialize(ia);
 nodes.clear();
 pTrie.clear();
 String path = ia.readString("path");
// 从快照中恢复每一个 datanode 节点数据到 DataTree
while (!"/".equals(path)) {
// 每次循环创建一个节点对象
 DataNode node = new DataNode();
 ia.readRecord(node, "node");
// 将 DataNode 恢复到 DataTree
 nodes.put(path, node);
 synchronized (node) {
 aclCache.addUsage(node.acl);
 }
 int lastSlash = path.lastIndexOf('/');
 if (lastSlash == -1) {
 root = node;
 } else {
// 处理父节点
 String parentPath = path.substring(0, lastSlash);
 DataNode parent = nodes.get(parentPath);
 if (parent == null) {
 throw new IOException("Invalid Datatree, unable to find " +
 "parent " + parentPath + " of path " + path);
 }
// 处理子节点
 parent.addChild(path.substring(lastSlash + 1));
// 处理临时节点和永久节点
 long eowner = node.stat.getEphemeralOwner();
 EphemeralType ephemeralType = EphemeralType.get(eowner);
 if (ephemeralType == EphemeralType.CONTAINER) {
 containers.add(path);
 } else if (ephemeralType == EphemeralType.TTL) {
 ttls.add(path);
 } else if (eowner != 0) {
 HashSet<String> list = ephemerals.get(eowner);
 if (list == null) {
 list = new HashSet<String>();
 ephemerals.put(eowner, list);
 }
 list.add(path);
 }
 }
 path = ia.readString("path");
 }
 nodes.put("/", root);
 // we are done with deserializing the
 // the datatree
 // update the quotas - create path trie
 // and also update the stat nodes
 setupQuota();
 aclCache.purgeUnused();
}

冷启动数据恢复编辑日志

回到 FileTxnSnapLog.java 类中的 restore 方法

public long restore(DataTree dt, Map<Long, Integer> sessions,
 PlayBackListener listener) throws IOException {
// 恢复快照文件数据到 DataTree
 long deserializeResult = snapLog.deserialize(dt, sessions);
 FileTxnLog txnLog = new FileTxnLog(dataDir);
 RestoreFinalizer finalizer = () -> {
// 恢复编辑日志数据到 DataTree
 long highestZxid = fastForwardFromEdits(dt, sessions, listener);
 return highestZxid;
 };
 … …
 return finalizer.run();
}
public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
 PlayBackListener listener) throws IOException {
// 在此之前,已经从快照文件中恢复了大部分数据,接下来只需从快照的 zxid + 1
位置开始恢复
 TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
// 快照中最大的 zxid,在执行编辑日志时,这个值会不断更新,直到所有操作执行
完
 long highestZxid = dt.lastProcessedZxid;
 TxnHeader hdr;
 try {
// 从 lastProcessedZxid 事务编号器开始,不断的从编辑日志中恢复剩下的还
没有恢复的数据
 while (true) {
 // iterator points to
 // the first valid txn when initialized
// 获取事务头信息(有 zxid)
 hdr = itr.getHeader();
 if (hdr == null) {
 //empty logs
 return dt.lastProcessedZxid;
 }
 if (hdr.getZxid() < highestZxid && highestZxid != 0) {
 LOG.error("{}(highestZxid) > {}(next log) for type {}",
 highestZxid, hdr.getZxid(), hdr.getType());
 } else {
 highestZxid = hdr.getZxid();
 }
 try {
// 根据编辑日志恢复数据到 DataTree,每执行一次,对应的事务 id,
highestZxid + 1
 processTransaction(hdr,dt,sessions, itr.getTxn());
 } catch(KeeperException.NoNodeException e) {
 throw new IOException("Failed to process transaction type: " +
 hdr.getType() + " error: " + e.getMessage(), e);
 }
 listener.onTxnLoaded(hdr, itr.getTxn());
 if (!itr.next())
 break;
 }
 } finally {
 if (itr != null) {
 itr.close();
 }
 }
 return highestZxid;
}
public void processTransaction(TxnHeader hdr,DataTree dt,
 Map<Long, Integer> sessions, Record txn)
 throws KeeperException.NoNodeException {
 ProcessTxnResult rc;
 switch (hdr.getType()) {
 case OpCode.createSession:
 sessions.put(hdr.getClientId(),
 ((CreateSessionTxn) txn).getTimeOut());
 if (LOG.isTraceEnabled()) {
 ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
 "playLog --- create session in log: 0x"
 + Long.toHexString(hdr.getClientId())
 + " with timeout: "
 + ((CreateSessionTxn) txn).getTimeOut());
 }
 // give dataTree a chance to sync its lastProcessedZxid
 rc = dt.processTxn(hdr, txn);
 break;
 case OpCode.closeSession:
 sessions.remove(hdr.getClientId());
 if (LOG.isTraceEnabled()) {
 ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
 "playLog --- close session in log: 0x"
 + Long.toHexString(hdr.getClientId()));
 }
 rc = dt.processTxn(hdr, txn);
 break;
 default:
// 创建节点、删除节点和其他的各种事务操作等
 rc = dt.processTxn(hdr, txn);
 }
 /**
 * Snapshots are lazily created. So when a snapshot is in progress,
 * there is a chance for later transactions to make into the
 * snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS
 * errors could occur. It should be safe to ignore these.
 */
 if (rc.err != Code.OK.intValue()) {
 LOG.debug(
 "Ignoring processTxn failure hdr: {}, error: {}, path: {}",
 hdr.getType(), rc.err, rc.path);
 } }
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn)
{
 ProcessTxnResult rc = new ProcessTxnResult();
 try {
 rc.clientId = header.getClientId();
 rc.cxid = header.getCxid();
 rc.zxid = header.getZxid();
 rc.type = header.getType();
 rc.err = 0;
 rc.multiResult = null;
 switch (header.getType()) {
 case OpCode.create:
 CreateTxn createTxn = (CreateTxn) txn;
 rc.path = createTxn.getPath();
 createNode(
 createTxn.getPath(),
 createTxn.getData(),
 createTxn.getAcl(),
 createTxn.getEphemeral() ? header.getClientId() : 0,
 createTxn.getParentCVersion(),
 header.getZxid(), header.getTime(), null);
 break;
 case OpCode.create2:
 CreateTxn create2Txn = (CreateTxn) txn;
 rc.path = create2Txn.getPath();
 Stat stat = new Stat();
 createNode(
 create2Txn.getPath(),
 create2Txn.getData(),
 create2Txn.getAcl(),
 create2Txn.getEphemeral() ? header.getClientId() : 0,
 create2Txn.getParentCVersion(),
 header.getZxid(), header.getTime(), stat);
 rc.stat = stat;
 break;
 case OpCode.createTTL:
 CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
 rc.path = createTtlTxn.getPath();
 stat = new Stat();
 createNode(
 createTtlTxn.getPath(),
 createTtlTxn.getData(),
 createTtlTxn.getAcl(),

EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
 createTtlTxn.getParentCVersion(),
 header.getZxid(), header.getTime(), stat);
 rc.stat = stat;
 break;
 case OpCode.createContainer:
 CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
 rc.path = createContainerTxn.getPath();
 stat = new Stat();
 createNode(
 createContainerTxn.getPath(),
      createContainerTxn.getData(),
 createContainerTxn.getAcl(),
 EphemeralType.CONTAINER_EPHEMERAL_OWNER,
 createContainerTxn.getParentCVersion(),
 header.getZxid(), header.getTime(), stat);
 rc.stat = stat;
 break;
 case OpCode.delete:
 case OpCode.deleteContainer:
 DeleteTxn deleteTxn = (DeleteTxn) txn;
 rc.path = deleteTxn.getPath();
 deleteNode(deleteTxn.getPath(), header.getZxid());
 break;
 case OpCode.reconfig:
 case OpCode.setData:
 SetDataTxn setDataTxn = (SetDataTxn) txn;
 rc.path = setDataTxn.getPath();
 rc.stat = setData(setDataTxn.getPath(), setDataTxn
 .getData(), setDataTxn.getVersion(), header
 .getZxid(), header.getTime());
 break;
 case OpCode.setACL:
 SetACLTxn setACLTxn = (SetACLTxn) txn;
 rc.path = setACLTxn.getPath();
 rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),
 setACLTxn.getVersion());
 break;
 case OpCode.closeSession:
 killSession(header.getClientId(), header.getZxid());
 break;
 case OpCode.error:
 ErrorTxn errTxn = (ErrorTxn) txn;
 rc.err = errTxn.getErr();
 break;
 case OpCode.check:
 CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
 rc.path = checkTxn.getPath();
 break;
 case OpCode.multi:
 MultiTxn multiTxn = (MultiTxn) txn ;
 List<Txn> txns = multiTxn.getTxns();
 rc.multiResult = new ArrayList<ProcessTxnResult>();
 boolean failed = false;
 for (Txn subtxn : txns) {
 if (subtxn.getType() == OpCode.error) {
 failed = true;
 break;
 }
 }
 .. …
 }
 } catch (KeeperException e) {
 ... ...
 } catch (IOException e) {
 ... ...
 }
     ... ...
 return rc;
}

ZK选举源码

Zookeeper选举机制

第一次启动

image.png

非第一次启动

image.png

选举源码逻辑

image.png

选举准备

image.png
QuorumPeer.java

public synchronized void start() {
 if (!getView().containsKey(myid)) {
 throw new RuntimeException("My id " + myid + " not in the peer list");
 }
 loadDataBase();
 startServerCnxnFactory();
 try {
 adminServer.start();
 } catch (AdminServerException e) {
 LOG.warn("Problem starting AdminServer", e);
 System.out.println(e);
 }
    // 选举准备
 startLeaderElection();
 super.start();
}
synchronized public void startLeaderElection() {
 try {
 if (getPeerState() == ServerState.LOOKING) {
 // 创建选票
 // (1)选票组件:epoch(leader 的任期代号)、zxid(某个 leader 当选
期间执行的事务编号)、myid(serverid)
 // (2)开始选票时,都是先投自己
 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
 }
 } catch(IOException e) {
 RuntimeException re = new RuntimeException(e.getMessage());
 re.setStackTrace(e.getStackTrace());
 throw re;
 }
 // if (!getView().containsKey(myid)) {
 // throw new RuntimeException("My id " + myid + " not in the peer list");
 //}
 if (electionType == 0) {
 try {
 udpSocket = new DatagramSocket(getQuorumAddress().getPort());
 responder = new ResponderThread();
 responder.start();
 } catch (SocketException e) {
 throw new RuntimeException(e);
 }
 }
// 创建选举算法实例
 this.electionAlg = createElectionAlgorithm(electionType);
}
protected Election createElectionAlgorithm(int electionAlgorithm){
 Election le=null;
 //TODO: use a factory rather than a switch
 switch (electionAlgorithm) {
 case 0:
 le = new LeaderElection(this);
 break;
 case 1:
 le = new AuthFastLeaderElection(this);
 break;
 case 2:
 le = new AuthFastLeaderElection(this, true);
 break;
 case 3:
// 1 创建 QuorumCnxnManager,负责选举过程中的所有网络通信
 QuorumCnxManager qcm = createCnxnManager();
 QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
 if (oldQcm != null) {
      LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader 
election?)");
 oldQcm.halt();
 }
 QuorumCnxManager.Listener listener = qcm.listener;
 if(listener != null){
// 2 启动监听线程
 listener.start();
// 3 准备开始选举
 FastLeaderElection fle = new FastLeaderElection(this, qcm);
 fle.start();
 le = fle;
 } else {
 LOG.error("Null listener when initializing cnx manager");
 }
 break;
 default:
 assert false;
 }
 return le;
}

1、网络通信组件初始化

public QuorumCnxManager createCnxnManager() {
 return new QuorumCnxManager(this,
 this.getId(),
 this.getView(),
 this.authServer,
 this.authLearner,
 this.tickTime * this.syncLimit,
 this.getQuorumListenOnAllIPs(),
 this.quorumCnxnThreadsSize,
 this.isQuorumSaslAuthEnabled());
}
public QuorumCnxManager(QuorumPeer self,
 final long mySid,
 Map<Long,QuorumPeer.QuorumServer> view,
 QuorumAuthServer authServer,
 QuorumAuthLearner authLearner,
 int socketTimeout,
 boolean listenOnAllIPs,
 int quorumCnxnThreadsSize,
 boolean quorumSaslAuthEnabled) {
// 创建各种队列
 this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
 this.queueSendMap = new ConcurrentHashMap<Long, 
ArrayBlockingQueue<ByteBuffer>>();
 this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
 this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
 String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
 if(cnxToValue != null){
 this.cnxTO = Integer.parseInt(cnxToValue);
 }
     this.self = self;
 this.mySid = mySid;
 this.socketTimeout = socketTimeout;
 this.view = view;
 this.listenOnAllIPs = listenOnAllIPs;
 initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
 quorumSaslAuthEnabled);
 // Starts listener thread that waits for connection requests
 listener = new Listener();
 listener.setName("QuorumPeerListener");
}

2、监听线程初始化

点击 QuorumCnxManager.Listener,找到对应的 run 方法

public void run() {
 int numRetries = 0;
 InetSocketAddress addr;
 Socket client = null;
 Exception exitException = null;
 while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
 try {
 if (self.shouldUsePortUnification()) {
 LOG.info("Creating TLS-enabled quorum server socket");
 ss = new UnifiedServerSocket(self.getX509Util(), true);
 } else if (self.isSslQuorum()) {
 LOG.info("Creating TLS-only quorum server socket");
 ss = new UnifiedServerSocket(self.getX509Util(), false);
 } else {
 ss = new ServerSocket();
 }
 ss.setReuseAddress(true);
 if (self.getQuorumListenOnAllIPs()) {
 int port = self.getElectionAddress().getPort();
 addr = new InetSocketAddress(port);
 } else {
 // Resolve hostname for this server in case the
 // underlying ip address has changed.
 self.recreateSocketAddresses(self.getId());
 addr = self.getElectionAddress();
 }
 LOG.info("My election bind port: " + addr.toString());
 setName(addr.toString());
// 绑定服务器地址
 ss.bind(addr);
// 死循环
 while (!shutdown) {
 try {
// 阻塞,等待处理请求
      client = ss.accept();
 setSockOpts(client);
 LOG.info("Received connection request "
 + 
formatInetAddr((InetSocketAddress)client.getRemoteSocketAddress()));
 // Receive and handle the connection request
 // asynchronously if the quorum sasl authentication is
 // enabled. This is required because sasl server
 // authentication process may take few seconds to finish,
 // this may delay next peer connection requests.
 if (quorumSaslAuthEnabled) {
 receiveConnectionAsync(client);
 } else {
 receiveConnection(client);
 }
 numRetries = 0;
 } catch (SocketTimeoutException e) {
 LOG.warn("The socket is listening for the election accepted "
 + "and it timed out unexpectedly, but will retry."
 + "see ZOOKEEPER-2836");
 }
 }
 } catch (IOException e) {
 ... ...
 closeSocket(client);
 }
 }
 ... ...
}

3、选举准备

点击 FastLeaderElection

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
 this.stop = false;
 this.manager = manager;
 starter(self, manager);
}
private void starter(QuorumPeer self, QuorumCnxManager manager) {
 this.self = self;
 proposedLeader = -1;
 proposedZxid = -1;
// 初始化队列和信息
 sendqueue = new LinkedBlockingQueue<ToSend>();
 recvqueue = new LinkedBlockingQueue<Notification>();
 this.messenger = new Messenger(manager);
}

选举执行

image.png
QuorumPeer.java

public synchronized void start() {
 if (!getView().containsKey(myid)) {
 throw new RuntimeException("My id " + myid + " not in the peer list");
 }
// 冷启动数据恢复
 loadDataBase();
 startServerCnxnFactory();
 try {
// 启动通信工厂实例对象
 adminServer.start();
 } catch (AdminServerException e) {
 LOG.warn("Problem starting AdminServer", e);
 System.out.println(e);
 }
// 准备选举环境
 startLeaderElection();
// 执行选举
 super.start();
}
  1. 执行 super.start();就相当于执行 QuorumPeer.java 类中的 run()方法,当 Zookeeper 启动后,首先都是 Looking 状态,通过选举,让其中一台服务器成为 Leader,其他的服务器成为 Follower。

QuorumPeer.java

public void run() {
 updateThreadName();
 LOG.debug("Starting quorum peer");
 try {
 jmxQuorumBean = new QuorumBean(this);
 MBeanRegistry.getInstance().register(jmxQuorumBean, null);
 for(QuorumServer s: getView().values()){
 ZKMBeanInfo p;
 if (getId() == s.id) {
 p = jmxLocalPeerBean = new LocalPeerBean(this);
 try {
 MBeanRegistry.getInstance().register(p, jmxQuorumBean);
 } catch (Exception e) {
 LOG.warn("Failed to register with JMX", e);
 jmxLocalPeerBean = null;
 }
 } else {
 RemotePeerBean rBean = new RemotePeerBean(this, s);
 try {
 MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
 jmxRemotePeerBean.put(s.id, rBean);
 } catch (Exception e) {
 LOG.warn("Failed to register with JMX", e);
 }
 }
 }
 } catch (Exception e) {
 LOG.warn("Failed to register with JMX", e);
 jmxQuorumBean = null;
 }
 try {
 /*
 * Main loop
 */
 while (running) {
 switch (getPeerState()) {
 case LOOKING:
 LOG.info("LOOKING");
 if (Boolean.getBoolean("readonlymode.enabled")) {
 LOG.info("Attempting to start ReadOnlyZooKeeperServer");
 // Create read-only server but don't start it immediately
 final ReadOnlyZooKeeperServer roZk =
 new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
 // Instead of starting roZk immediately, wait some grace
 // period before we decide we're partitioned.
 //
 // Thread is used here because otherwise it would require
 // changes in each of election strategy classes which is
 // unnecessary code coupling.
 Thread roZkMgr = new Thread() {
 public void run() {
 try {
 // lower-bound grace period to 2 secs
 sleep(Math.max(2000, tickTime));
 if (ServerState.LOOKING.equals(getPeerState())) {
 roZk.startup();
 }
 } catch (InterruptedException e) {
     LOG.info("Interrupted while attempting to start 
ReadOnlyZooKeeperServer, not started");
 } catch (Exception e) {
 LOG.error("FAILED to start 
ReadOnlyZooKeeperServer", e);
 }
 }
 };
 try {
 roZkMgr.start();
 reconfigFlagClear();
 if (shuttingDownLE) {
 shuttingDownLE = false;
 startLeaderElection();
 }
 // 进行选举,选举结束,返回最终成为 Leader 胜选的那张
选票
 setCurrentVote(makeLEStrategy().lookForLeader());
 } catch (Exception e) {
 LOG.warn("Unexpected exception", e);
 setPeerState(ServerState.LOOKING);
 } finally {
 // If the thread is in the the grace period, interrupt
 // to come out of waiting.
 roZkMgr.interrupt();
 roZk.shutdown();
 }
 } else {
 try {
 reconfigFlagClear();
 if (shuttingDownLE) {
 shuttingDownLE = false;
 startLeaderElection();
 }
 setCurrentVote(makeLEStrategy().lookForLeader());
 } catch (Exception e) {
 LOG.warn("Unexpected exception", e);
 setPeerState(ServerState.LOOKING);
 } 
 }
 break;
 case OBSERVING:
 try {
 LOG.info("OBSERVING");
 setObserver(makeObserver(logFactory));
 observer.observeLeader();
 } catch (Exception e) {
 LOG.warn("Unexpected exception",e );
 } finally {
 observer.shutdown();
 setObserver(null); 
 updateServerState();
 }
 break;
 case FOLLOWING:
 try {
     LOG.info("FOLLOWING");
 setFollower(makeFollower(logFactory));
 follower.followLeader();
 } catch (Exception e) {
 LOG.warn("Unexpected exception",e);
 } finally {
 follower.shutdown();
 setFollower(null);
 updateServerState();
 }
 break;
 case LEADING:
 LOG.info("LEADING");
 try {
 setLeader(makeLeader(logFactory));
 leader.lead();
 setLeader(null);
 } catch (Exception e) {
 LOG.warn("Unexpected exception",e);
 } finally {
 if (leader != null) {
 leader.shutdown("Forcing shutdown");
 setLeader(null);
 }
 updateServerState();
 }
 break;
 }
 start_fle = Time.currentElapsedTime();
 }
 } finally {
 ... ...
 } }

ctrl+alt+b 点击 lookForLeader()的实现类 FastLeaderElection.java

public Vote lookForLeader() throws InterruptedException {
 try {
 self.jmxLeaderElectionBean = new LeaderElectionBean();
 MBeanRegistry.getInstance().register(
 self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
 } catch (Exception e) {
 LOG.warn("Failed to register with JMX", e);
 self.jmxLeaderElectionBean = null;
 }
 if (self.start_fle == 0) {
 self.start_fle = Time.currentElapsedTime();
 }
 try {
// 正常启动中,所有其他服务器,都会给我发送一个投票
// 保存每一个服务器的最新合法有效的投票
 HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
// 存储合法选举之外的投票结果
 HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
// 一次选举的最大等待时间,默认值是 0.2s
 int notTimeout = finalizeWait;
// 每发起一轮选举,logicalclock++
// 在没有合法的 epoch 数据之前,都使用逻辑时钟代替
// 选举 leader 的规则:依次比较 epoch(任期) zxid(事务 id) serverid
(myid) 谁大谁当选 leader
 synchronized(this){
// 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟
// logicalclock = epoch 
 logicalclock.incrementAndGet();
// 更新选票(serverid, zxid, epoch),
 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
 }
 LOG.info("New election. My id = " + self.getId() +
 ", proposed zxid=0x" + Long.toHexString(proposedZxid));
// 广播选票,把自己的选票发给其他服务器
 sendNotifications();
 /*
 * Loop in which we exchange notifications until we find a leader
 */
// 一轮一轮的选举直到选举成功
 while ((self.getPeerState() == ServerState.LOOKING) &&
 (!stop)){
 … …
 }
 return null;
 } finally {
 … …
 } }

点击 sendNotifications,广播选票,把自己的选票发给其他服务器

private void sendNotifications() {
// 遍历投票参与者,给每台服务器发送选票
 for (long sid : self.getCurrentAndNextConfigVoters()) {
 QuorumVerifier qv = self.getQuorumVerifier();
// 创建发送选票
 ToSend notmsg = new ToSend(ToSend.mType.notification,
 proposedLeader,
 proposedZxid,
 logicalclock.get(),
 QuorumPeer.ServerState.LOOKING,
 sid,
 proposedEpoch, qv.toString().getBytes());
 if(LOG.isDebugEnabled()){
 LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
 Long.toHexString(proposedZxid) + " (n.zxid), 0x" + 
Long.toHexString(logicalclock.get()) +
 " (n.round), " + sid + " (recipient), " + self.getId() +
 " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
 }
     // 把发送选票放入发送队列
 sendqueue.offer(notmsg);
 } }

在 FastLeaderElection.java 类中查找 WorkerSender 线程。

class WorkerSender extends ZooKeeperThread {
 volatile boolean stop;
 QuorumCnxManager manager;
 WorkerSender(QuorumCnxManager manager){
 super("WorkerSender");
 this.stop = false;
 this.manager = manager;
 }
 public void run() {
 while (!stop) {
 try {
// 队列阻塞,时刻准备接收要发送的选票
 ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
 if(m == null) continue;
// 处理要发送的选票
 process(m);
 } catch (InterruptedException e) {
 break;
 }
 }
 LOG.info("WorkerSender is down");
 }
 /**
 * Called by run() once there is a new message to send.
 * @param m message to send
 */
 void process(ToSend m) {
 ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
 m.leader,
 m.zxid,
 m.electionEpoch,
 m.peerEpoch,
 m.configData);
// 发送选票
 manager.toSend(m.sid, requestBuffer);
 } }
public void toSend(Long sid, ByteBuffer b) {
 /*
 * If sending message to myself, then simply enqueue it (loopback).
 */
// 判断如果是发给自己的消息,直接进入自己的 RecvQueue
 if (this.mySid == sid) {
 b.position(0);
 addToRecvQueue(new Message(b.duplicate(), sid));
      /*
 * Otherwise send to the corresponding thread to send.
 */
 } else {
 /*
 * Start a new connection if doesn't have one already.
 */
// 如果是发给其他服务器,创建对应的发送队列或者获取已经存在的发送队
列
// ,并把要发送的消息放入该队列
 ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
 SEND_CAPACITY);
 ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
 if (oldq != null) {
 addToSendQueue(oldq, b);
 } else {
 addToSendQueue(bq, b);
 }
// 将选票发送出去
 connectOne(sid);
 } }

如果数据是发送给自己的,添加到自己的接收队列

public void addToRecvQueue(Message msg) {
 synchronized(recvQLock) {
 if (recvQueue.remainingCapacity() == 0) {
 try {
 recvQueue.remove();
 } catch (NoSuchElementException ne) {
 // element could be removed by poll()
 LOG.debug("Trying to remove from an empty " +
 "recvQueue. Ignoring exception " + ne);
 }
 }
 try {
// 将发送给自己的选票添加到 recvQueue 队列
 recvQueue.add(msg);
 } catch (IllegalStateException ie) {
 // This should never happen
 LOG.error("Unable to insert element in the recvQueue " + ie);
 }
 } }

数据添加到发送队列

private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
 ByteBuffer buffer) {
 if (queue.remainingCapacity() == 0) {
 try {
 queue.remove();
 } catch (NoSuchElementException ne) {
 // element could be removed by poll()
 LOG.debug("Trying to remove from an empty " +
 "Queue. Ignoring exception " + ne);
 }
 }
 try {
// 将要发送的消息添加到发送队列
 queue.add(buffer);
 } catch (IllegalStateException ie) {
 // This should never happen
 LOG.error("Unable to insert an element in the queue " + ie);
 } }

与要发送的服务器节点建立通信连接

synchronized void connectOne(long sid){
 if (senderWorkerMap.get(sid) != null) {
 LOG.debug("There is a connection already for server " + sid);
 return;
 }
 synchronized (self.QV_LOCK) {
 boolean knownId = false;
 // Resolve hostname for the remote server before attempting to
 // connect in case the underlying ip address has changed.
 self.recreateSocketAddresses(sid);
 Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
 QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
 Map<Long, QuorumPeer.QuorumServer> lastProposedView = 
lastSeenQV.getAllMembers();
 if (lastCommittedView.containsKey(sid)) {
 knownId = true;
 if (connectOne(sid, lastCommittedView.get(sid).electionAddr))
 return;
 }
 if (lastSeenQV != null && lastProposedView.containsKey(sid)
 && (!knownId || (lastProposedView.get(sid).electionAddr !=
 lastCommittedView.get(sid).electionAddr))) {
 knownId = true;
 if (connectOne(sid, lastProposedView.get(sid).electionAddr))
 return;
 }
 if (!knownId) {
 LOG.warn("Invalid server id: " + sid);
 return;
 }
 } }
synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr){
 if (senderWorkerMap.get(sid) != null) {
 LOG.debug("There is a connection already for server " + sid);
 return true;
 }
 Socket sock = null;
 try {
 LOG.debug("Opening channel to server " + sid);
 if (self.isSslQuorum()) {
      SSLSocket sslSock = self.getX509Util().createSSLSocket();
 setSockOpts(sslSock);
 sslSock.connect(electionAddr, cnxTO);
 sslSock.startHandshake();
 sock = sslSock;
 LOG.info("SSL handshake complete with {} - {} - {}", 
sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(), 
sslSock.getSession().getCipherSuite());
 } else {
 sock = new Socket();
 setSockOpts(sock);
 sock.connect(electionAddr, cnxTO);
 }
 LOG.debug("Connected to server " + sid);
 // Sends connection request asynchronously if the quorum
 // sasl authentication is enabled. This is required because
 // sasl server authentication process may take few seconds to
 // finish, this may delay next peer connection requests.
 if (quorumSaslAuthEnabled) {
 initiateConnectionAsync(sock, sid);
 } else {
// 处理连接
 initiateConnection(sock, sid);
 }
 return true;
 } catch (UnresolvedAddressException e) {
 ... ...
 } }
public void initiateConnection(final Socket sock, final Long sid) {
 try {
 startConnection(sock, sid);
 } catch (IOException e) {
 LOG.error("Exception while connecting, id: {}, addr: {}, closing learner 
connection",
 new Object[] { sid, sock.getRemoteSocketAddress() }, e);
 closeSocket(sock);
 return;
 } }

创建并启动发送器线程和接收器线程

private boolean startConnection(Socket sock, Long sid)
 throws IOException {
 DataOutputStream dout = null;
 DataInputStream din = null;
 try {
 // Use BufferedOutputStream to reduce the number of IP packets. This is
 // important for x-DC scenarios.
// 通过输出流,向服务器发送数据
 BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
 dout = new DataOutputStream(buf);
 // Sending id and challenge
 // represents protocol version (in other words - message type)
 dout.writeLong(PROTOCOL_VERSION);
 dout.writeLong(self.getId());
 String addr = formatInetAddr(self.getElectionAddress());
 byte[] addr_bytes = addr.getBytes();
 dout.writeInt(addr_bytes.length);
 dout.write(addr_bytes);
 dout.flush();
// 通过输入流读取对方发送过来的选票
 din = new DataInputStream(
 new BufferedInputStream(sock.getInputStream()));
 } catch (IOException e) {
 LOG.warn("Ignoring exception reading or writing challenge: ", e);
 closeSocket(sock);
 return false;
 }
 // authenticate learner
 QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
 if (qps != null) {
 // TODO - investigate why reconfig makes qps null.
 authLearner.authenticate(sock, qps.hostname);
 }
 // If lost the challenge, then drop the new connection
// 如果对方的 id 比我的大,我是没有资格给对方发送连接请求的,直接关闭自
己的客户端
 if (sid > self.getId()) {
 LOG.info("Have smaller server identifier, so dropping the " +
 "connection: (" + sid + ", " + self.getId() + ")");
 closeSocket(sock);
 // Otherwise proceed with the connection
 } else {
// 初始化,发送器 和 接收器
 SendWorker sw = new SendWorker(sock, sid);
 RecvWorker rw = new RecvWorker(sock, din, sid, sw);
 sw.setRecv(rw);
 SendWorker vsw = senderWorkerMap.get(sid);
 if(vsw != null)
 vsw.finish();
 senderWorkerMap.put(sid, sw);
 queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
 SEND_CAPACITY));
// 启动发送器线程和接收器线程
 sw.start();
 rw.start();
 return true;
 }
 return false;
}

点击 SendWorker,并查找该类下的 run 方法

QuorumCnxManager.java
public void run() {
 threadCnt.incrementAndGet();
 try {
 /**
 * If there is nothing in the queue to send, then we
 * send the lastMessage to ensure that the last message
 * was received by the peer. The message could be dropped
 * in case self or the peer shutdown their connection
 * (and exit the thread) prior to reading/processing
 * the last message. Duplicate messages are handled correctly
 * by the peer.
 *
 * If the send queue is non-empty, then we have a recent
 * message than that stored in lastMessage. To avoid sending
 * stale message, we should send the message in the send queue.
 */
 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
 if (bq == null || isSendQueueEmpty(bq)) {
 ByteBuffer b = lastMessageSent.get(sid);
 if (b != null) {
 LOG.debug("Attempting to send lastMessage to sid=" + sid);
 send(b);
 }
 }
 } catch (IOException e) {
 LOG.error("Failed to send last message. Shutting down thread.", e);
 this.finish();
 }
 try {
// 只要连接没有断开
 while (running && !shutdown && sock != null) {
 ByteBuffer b = null;
 try {
 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
 .get(sid);
 if (bq != null) {
// 不断从发送队列 SendQueue 中,获取发送消息,并执行发送
 b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
 } else {
 LOG.error("No queue of incoming messages for " +
 "server " + sid);
 break;
 }
 if(b != null){
// 更新对于 sid 这台服务器的最近一条消息
 lastMessageSent.put(sid, b);
// 执行发送
 send(b);
      }
 } catch (InterruptedException e) {
 LOG.warn("Interrupted while waiting for message on queue",
 e);
 }
 }
 } catch (Exception e) {
 LOG.warn("Exception when using channel: for id " + sid
 + " my id = " + QuorumCnxManager.this.mySid
 + " error = " + e);
 }
 this.finish();
 LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
}
synchronized void send(ByteBuffer b) throws IOException {
 byte[] msgBytes = new byte[b.capacity()];
 try {
 b.position(0);
 b.get(msgBytes);
 } catch (BufferUnderflowException be) {
 LOG.error("BufferUnderflowException ", be);
 return;
 }
 // 输出流向外发送
 dout.writeInt(b.capacity());
 dout.write(b.array());
 dout.flush();
}

点击 RecvWorker,并查找该类下的 run 方法

QuorumCnxManager.java
public void run() {
 threadCnt.incrementAndGet();
 try {
// 只要连接没有断开
 while (running && !shutdown && sock != null) {
 /**
 * Reads the first int to determine the length of the
 * message
 */
 int length = din.readInt();
 if (length <= 0 || length > PACKETMAXSIZE) {
 throw new IOException(
 "Received packet with invalid packet: "
 + length);
 }
 /**
 * Allocates a new ByteBuffer to receive the message
 */
 byte[] msgArray = new byte[length];
 // 输入流接收消息
 din.readFully(msgArray, 0, length);
 ByteBuffer message = ByteBuffer.wrap(msgArray);
// 接收对方发送过来的选票
      addToRecvQueue(new Message(message.duplicate(), sid));
 }
 } catch (Exception e) {
 LOG.warn("Connection broken for id " + sid + ", my id = "
 + QuorumCnxManager.this.mySid + ", error = " , e);
 } finally {
 LOG.warn("Interrupting SendWorker");
 sw.finish();
 closeSocket(sock);
 } }
public void addToRecvQueue(Message msg) {
 synchronized(recvQLock) {
 if (recvQueue.remainingCapacity() == 0) {
 try {
 recvQueue.remove();
 } catch (NoSuchElementException ne) {
 // element could be removed by poll()
 LOG.debug("Trying to remove from an empty " +
 "recvQueue. Ignoring exception " + ne);
 }
 }
 try {
// 将接收到的消息,放入接收消息队列
 recvQueue.add(msg);
 } catch (IllegalStateException ie) {
 // This should never happen
 LOG.error("Unable to insert element in the recvQueue " + ie);
 }
 } }

在 FastLeaderElection.java 类中查找 WorkerReceiver 线程。

class WorkerReceiver extends ZooKeeperThread {
 volatile boolean stop;
 QuorumCnxManager manager;
 WorkerReceiver(QuorumCnxManager manager) {
 super("WorkerReceiver");
 this.stop = false;
 this.manager = manager;
 }
 public void run() {
 Message response;
 while (!stop) {
 // Sleeps on receive
 try {
// 从 RecvQueue 中取出选举投票消息(其他服务器发送过来的)
 response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
 … …
 } catch (InterruptedException e) {
 LOG.warn("Interrupted Exception while waiting for new message" +
 e.toString());
      }
 }
 LOG.info("WorkerReceiver is down");
 } }

Follow和Leader状态同步源码

当选举结束后,每个节点都需要根据自己的角色更新自己的状态。

  • 选举出的 Leader 更新自己状态为 Leader,其他节点更新自己状态为 Follower。
  • Leader 更新状态入口:leader.lead()
  • Follower 更新状态入口:follower.followerLeader()

注意:

  1. follower 必须要让 leader 知道自己的状态:epoch、zxid、sid
    1. 必须要找出谁是 leader;
    2. 发起请求连接 leader;
    3. 发送自己的信息给 leader;
  2. leader 接收到信息,必须要返回对应的信息给 follower。 (2)当leader得知follower的状态了,就确定需要做何种方式的数据同步DIFF、TRUNC、
    SNAP
  3. 执行数据同步
  4. 当 leader 接收到超过半数 follower 的 ack 之后,进入正常工作状态,集群启动完成了

最终总结同步的方式:

  1. DIFF 咱两一样,不需要做什么
  2. TRUNC follower 的 zxid 比 leader 的 zxid 大,所以 Follower 要回滚
  3. COMMIT leader 的 zxid 比 follower 的 zxid 大,发送 Proposal 给 foloower 提交执行
  4. 如果 follower 并没有任何数据,直接使用 SNAP 的方式来执行数据同步(直接把数据全部序列到 follower)

Follower和Leader状态同步源码解析

image.png
image.png

Leader.lead()等待接收 follower 的状态同步申请

1、在 Leader.java 种查找 lead()方法
void lead() throws IOException, InterruptedException {
 self.end_fle = Time.currentElapsedTime();
 long electionTimeTaken = self.end_fle - self.start_fle;
 self.setElectionTimeTaken(electionTimeTaken);
 LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
 QuorumPeer.FLE_TIME_UNIT);
 self.start_fle = 0;
 self.end_fle = 0;
 zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
 try {
 self.tick.set(0);
     // 恢复数据到内存,启动时,其实已经加载过了
 zk.loadData();
 leaderStateSummary = new StateSummary(self.getCurrentEpoch(), 
zk.getLastProcessedZxid());
 // Start thread that waits for connection requests from
 // new followers.
// 等待其他 follower 节点向 leader 节点发送同步状态
 cnxAcceptor = new LearnerCnxAcceptor();
 cnxAcceptor.start();
 long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
 … …
 } finally {
 zk.unregisterJMX(this);
 } }
class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
 private volatile boolean stop = false;
 public LearnerCnxAcceptor() {
 super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk
 .getZooKeeperServerListener());
 }
 @Override
 public void run() {
 try {
 while (!stop) {
 Socket s = null;
 boolean error = false;
 try {
// 等待接收 follower 的状态同步申请
 s = ss.accept();
 // start with the initLimit, once the ack is processed
 // in LearnerHandler switch to the syncLimit
 s.setSoTimeout(self.tickTime * self.initLimit);
 s.setTcpNoDelay(nodelay);
 BufferedInputStream is = new BufferedInputStream(
 s.getInputStream());
// 一旦接收到 follower 的请求,就创建 LearnerHandler 对象,
处理请求
 LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
// 启动线程
 fh.start();
 } catch (SocketException e) {
 ... ...
 } 
... ...
 }
      } catch (Exception e) {
 LOG.warn("Exception while accepting follower", e.getMessage());
 handleException(this.getName(), e);
 }
 }
 public void halt() {
 stop = true;
 } }

其中 ss 的初始化是在创建 Leader 对象时,创建的 socket

private final ServerSocket ss;
Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
 this.self = self;
 this.proposalStats = new BufferStats();
 try {
 if (self.shouldUsePortUnification() || self.isSslQuorum()) {
 boolean allowInsecureConnection = self.shouldUsePortUnification();
 if (self.getQuorumListenOnAllIPs()) {
 ss = new UnifiedServerSocket(self.getX509Util(), 
allowInsecureConnection, self.getQuorumAddress().getPort());
 } else {
 ss = new UnifiedServerSocket(self.getX509Util(), 
allowInsecureConnection);
 }
 } else {
 if (self.getQuorumListenOnAllIPs()) {
 ss = new ServerSocket(self.getQuorumAddress().getPort());
 } else {
 ss = new ServerSocket();
 }
 }
 ss.setReuseAddress(true);
 if (!self.getQuorumListenOnAllIPs()) {
 ss.bind(self.getQuorumAddress());
 }
 } catch (BindException e) {
 ... ...
 }
 this.zk = zk;
 this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
 maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
}

2、Follower.lead()查找并连接 Leader

在 Follower.java 种查找 followLeader()方法

void followLeader() throws InterruptedException {
 self.end_fle = Time.currentElapsedTime();
 long electionTimeTaken = self.end_fle - self.start_fle;
 self.setElectionTimeTaken(electionTimeTaken);
 LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
 QuorumPeer.FLE_TIME_UNIT);
 self.start_fle = 0;
     self.end_fle = 0;
 fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
 try {
// 查找 leader
 QuorumServer leaderServer = findLeader(); 
 try {
 // 连接 leader
 connectToLeader(leaderServer.addr, leaderServer.hostname);
 // 向 leader 注册
 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
 if (self.isReconfigStateChange())
 throw new Exception("learned about role change");
 //check to see if the leader zxid is lower than ours
 //this should never happen but is just a safety check
 long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
 if (newEpoch < self.getAcceptedEpoch()) {
 LOG.error("Proposed leader epoch " + 
ZxidUtils.zxidToString(newEpochZxid)
 + " is less than our accepted epoch " + 
ZxidUtils.zxidToString(self.getAcceptedEpoch()));
 throw new IOException("Error: Epoch of leader is lower");
 }
 syncWithLeader(newEpochZxid); 
 QuorumPacket qp = new QuorumPacket();
 while (this.isRunning()) {
 readPacket(qp);
 processPacket(qp);
 }
 } catch (Exception e) {
 LOG.warn("Exception when following the leader", e);
 try {
 sock.close();
 } catch (IOException e1) {
 e1.printStackTrace();
 }
 // clear pending revalidations
 pendingRevalidations.clear();
 }
 } finally {
 zk.unregisterJMX((Learner)this);
 } }
/**
* Returns the address of the node we think is the leader.
*/
protected QuorumServer findLeader() {
 QuorumServer leaderServer = null;
 // Find the leader by id
// 选举投票的时候记录的,最后推荐的 leader 的 sid
 Vote current = self.getCurrentVote();
// 如果这个 sid 在启动的所有服务器范围中
     for (QuorumServer s : self.getView().values()) {
 if (s.id == current.getId()) {
 // Ensure we have the leader's correct IP address before
 // attempting to connect.
// 尝试连接 leader 的正确 IP 地址
 s.recreateSocketAddresses();
 leaderServer = s;
 break;
 }
 }
 if (leaderServer == null) {
 LOG.warn("Couldn't find the leader with id = "
 + current.getId());
 }
 return leaderServer;
}
protected void connectToLeader(InetSocketAddress addr, String hostname)
 throws IOException, InterruptedException, X509Exception {
 this.sock = createSocket();
 int initLimitTime = self.tickTime * self.initLimit;
 int remainingInitLimitTime = initLimitTime;
 long startNanoTime = nanoTime();
 for (int tries = 0; tries < 5; tries++) {
 try {
 // recalculate the init limit time because retries sleep for 1000 milliseconds
 remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) 
/ 1000000);
 if (remainingInitLimitTime <= 0) {
 LOG.error("initLimit exceeded on retries.");
 throw new IOException("initLimit exceeded on retries.");
 }
 sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, 
remainingInitLimitTime));
 if (self.isSslQuorum()) {
 ((SSLSocket) sock).startHandshake();
 }
 sock.setTcpNoDelay(nodelay);
 break;
 } catch (IOException e) {
 ... ...
 }
 Thread.sleep(1000);
 }
 self.authLearner.authenticate(sock, hostname);
 leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
 sock.getInputStream()));
 bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
 leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}

3、Leader.lead()创建 LearnerHandler
void lead() throws IOException, InterruptedException {
 self.end_fle = Time.currentElapsedTime();
 long electionTimeTaken = self.end_fle - self.start_fle;
 self.setElectionTimeTaken(electionTimeTaken);
 LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
 QuorumPeer.FLE_TIME_UNIT);
 self.start_fle = 0;
 self.end_fle = 0;
 zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
 try {
 self.tick.set(0);
 zk.loadData();
 leaderStateSummary = new StateSummary(self.getCurrentEpoch(), 
zk.getLastProcessedZxid());
 // Start thread that waits for connection requests from
 // new followers.
 cnxAcceptor = new LearnerCnxAcceptor();
 cnxAcceptor.start();
 ......
 } finally {
 zk.unregisterJMX(this);
 } }
class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
 private volatile boolean stop = false;
 ... ...
 @Override
 public void run() {
 try {
 while (!stop) {
 Socket s = null;
 boolean error = false;
 try {
 s = ss.accept();
 // start with the initLimit, once the ack is processed
 // in LearnerHandler switch to the syncLimit
 s.setSoTimeout(self.tickTime * self.initLimit);
 s.setTcpNoDelay(nodelay);
 BufferedInputStream is = new BufferedInputStream(
 s.getInputStream());
 LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
 fh.start();
 } catch (SocketException e) {
 ... ...
}
      }
 } catch (Exception e) {
 LOG.warn("Exception while accepting follower", e.getMessage());
 handleException(this.getName(), e);
 }
 }
 public void halt() {
 stop = true;
 } }

由于 public class LearnerHandler extends ZooKeeperThread{},说明 LearnerHandler 是一个线程。所以 fh.start()执行的是 LearnerHandler 中的 run()方法。

public void run() {
 try {
 leader.addLearnerHandler(this);
// 心跳处理
 tickOfNextAckDeadline = leader.self.tick.get()
 + leader.self.initLimit + leader.self.syncLimit;
 ia = BinaryInputArchive.getArchive(bufferedInput);
 bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
 oa = BinaryOutputArchive.getArchive(bufferedOutput);
// 从网络中接收消息,并反序列化为 packet
 QuorumPacket qp = new QuorumPacket();
 ia.readRecord(qp, "packet");
// 选举结束后,observer 和 follower 都应该给 leader 发送一个标志信息:
FOLLOWERINFO 或者 OBSERVERINFO
 if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != 
Leader.OBSERVERINFO){
 LOG.error("First packet " + qp.toString()
 + " is not FOLLOWERINFO or OBSERVERINFO!");
 return;
 }
 byte learnerInfoData[] = qp.getData();
 if (learnerInfoData != null) {
 ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
 if (learnerInfoData.length >= 8) {
 this.sid = bbsid.getLong();
 }
 if (learnerInfoData.length >= 12) {
 this.version = bbsid.getInt(); // protocolVersion
 }
 if (learnerInfoData.length >= 20) {
 long configVersion = bbsid.getLong();
 if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
 throw new IOException("Follower is ahead of the leader (has a later 
activated configuration)");
 }
 }
 } else {
 this.sid = leader.followerCounter.getAndDecrement();
 }
      if (leader.self.getView().containsKey(this.sid)) {
 LOG.info("Follower sid: " + this.sid + " : info : "
 + leader.self.getView().get(this.sid).toString());
 } else {
 LOG.info("Follower sid: " + this.sid + " not in the current config " + 
Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
 }

 if (qp.getType() == Leader.OBSERVERINFO) {
 learnerType = LearnerType.OBSERVER;
 }
// 读取 Follower 发送过来的 lastAcceptedEpoch
// 选举过程中,所使用的 epoch,其实还是上一任 leader 的 epoch
 long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
 long peerLastZxid;
 StateSummary ss = null;
// 读取 follower 发送过来的 zxid
 long zxid = qp.getZxid();
// Leader 根据从 Follower 获取 sid 和旧的 epoch,构建新的 epoch
 long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
 long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
 if (this.getVersion() < 0x10000) {
 // we are going to have to extrapolate the epoch information
 long epoch = ZxidUtils.getEpochFromZxid(zxid);
 ss = new StateSummary(epoch, zxid);
 // fake the message
 leader.waitForEpochAck(this.getSid(), ss);
 } else {
 byte ver[] = new byte[4];
 ByteBuffer.wrap(ver).putInt(0x10000);
// Leader 向 Follower 发送信息(包含:zxid 和 newEpoch)
 QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, 
newLeaderZxid, ver, null);
 oa.writeRecord(newEpochPacket, "packet");
 bufferedOutput.flush();
 QuorumPacket ackEpochPacket = new QuorumPacket();
 ia.readRecord(ackEpochPacket, "packet");
 if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
 LOG.error(ackEpochPacket.toString()
 + " is not ACKEPOCH");
 return;
}
 ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
 ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
 leader.waitForEpochAck(this.getSid(), ss);
 }
 peerLastZxid = ss.getLastZxid();

 // Take any necessary action if we need to send TRUNC or DIFF
 // startForwarding() will be called in all cases
      boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), 
leader);

 /* if we are not truncating or sending a diff just send a snapshot */
 if (needSnap) {
 boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
 LearnerSnapshot snapshot = 

leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
 try {
 long zxidToSend = 
leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
 oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, 
null), "packet");
 bufferedOutput.flush();
 LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 
0x{}, "
 + "send zxid of db as 0x{}, {} concurrent snapshots, " 
 + "snapshot was {} from throttle",
 Long.toHexString(peerLastZxid), 
 Long.toHexString(leaderLastZxid),
 Long.toHexString(zxidToSend), 
 snapshot.getConcurrentSnapshotNumber(),
 snapshot.isEssential() ? "exempt" : "not exempt");
 // Dump data to peer
 leader.zk.getZKDatabase().serializeSnapshot(oa);
 oa.writeString("BenWasHere", "signature");
 bufferedOutput.flush();
 } finally {
 snapshot.close();
 }
 }
 LOG.debug("Sending NEWLEADER message to " + sid);
 // the version of this quorumVerifier will be set by leader.lead() in case
 // the leader is just being established. waitForEpochAck makes sure that readyToStart 
is true if
 // we got here, so the version was set
 if (getVersion() < 0x10000) {
 QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
 newLeaderZxid, null, null);
 oa.writeRecord(newLeaderQP, "packet");
 } else {
 QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
 newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
 .toString().getBytes(), null);
 queuedPackets.add(newLeaderQP);
 }
 bufferedOutput.flush();
 // Start thread that blast packets in the queue to learner
 startSendingPackets();

 /*
 * Have to wait for the first ACK, wait until
 * the leader is ready, and only then we can
  * start processing messages.
 */
 qp = new QuorumPacket();
 ia.readRecord(qp, "packet");
 if(qp.getType() != Leader.ACK){
 LOG.error("Next packet was supposed to be an ACK,"
 + " but received packet: {}", packetToString(qp));
 return;
 }
 if(LOG.isDebugEnabled()){
 LOG.debug("Received NEWLEADER-ACK message from " + sid); 
 }
 leader.waitForNewLeaderAck(getSid(), qp.getZxid());
 syncLimitCheck.start();

 // now that the ack has been processed expect the syncLimit
 sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
 /*
 * Wait until leader starts up
 */
 synchronized(leader.zk){
 while(!leader.zk.isRunning() && !this.isInterrupted()){
 leader.zk.wait(20);
 }
 }
 // Mutation packets will be queued during the serialize,
 // so we need to mark when the peer can actually start
 // using the data
 //
 LOG.debug("Sending UPTODATE message to " + sid); 
 queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
 while (true) {
 qp = new QuorumPacket();
 ia.readRecord(qp, "packet");
 long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
 if (qp.getType() == Leader.PING) {
 traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
 }
 if (LOG.isTraceEnabled()) {
 ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
 }
 tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;
 ByteBuffer bb;
 long sessionId;
 int cxid;
 int type;
 switch (qp.getType()) {
 case Leader.ACK:
 if (this.learnerType == LearnerType.OBSERVER) {
      if (LOG.isDebugEnabled()) {
 LOG.debug("Received ACK from Observer " + this.sid);
 }
 }
 syncLimitCheck.updateAck(qp.getZxid());
 leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
 break;
 case Leader.PING:
 // Process the touches
 ByteArrayInputStream bis = new ByteArrayInputStream(qp
 .getData());
 DataInputStream dis = new DataInputStream(bis);
 while (dis.available() > 0) {
 long sess = dis.readLong();
 int to = dis.readInt();
 leader.zk.touch(sess, to);
 }
 break;
 case Leader.REVALIDATE:
 bis = new ByteArrayInputStream(qp.getData());
 dis = new DataInputStream(bis);
 long id = dis.readLong();
 int to = dis.readInt();
 ByteArrayOutputStream bos = new ByteArrayOutputStream();
 DataOutputStream dos = new DataOutputStream(bos);
 dos.writeLong(id);
 boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
 if (valid) {
 try {
 //set the session owner
 // as the follower that
 // owns the session
 leader.zk.setOwner(id, this);
 } catch (SessionExpiredException e) {
 LOG.error("Somehow session " + Long.toHexString(id) +
 " expired right after being renewed! (impossible)", e);
 }
 }
 if (LOG.isTraceEnabled()) {
 ZooTrace.logTraceMessage(LOG,

ZooTrace.SESSION_TRACE_MASK,
 "Session 0x" + Long.toHexString(id)
 + " is valid: "+ valid);
 }
 dos.writeBoolean(valid);
 qp.setData(bos.toByteArray());
 queuedPackets.add(qp);
 break;
 case Leader.REQUEST:
 bb = ByteBuffer.wrap(qp.getData());
 sessionId = bb.getLong();
 cxid = bb.getInt();
 type = bb.getInt();
 bb = bb.slice();
 Request si;
 if(type == OpCode.sync){
     si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, 
qp.getAuthinfo());
 } else {
 si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
 }
 si.setOwner(this);
 leader.zk.submitLearnerRequest(si);
 break;
 default:
 LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
 break;
 }
 }
 } catch (IOException e) {
 ... ...
 } finally {
 ... ...
 } }

4、Follower.lead()创建 registerWithLeader
void followLeader() throws InterruptedException {
 self.end_fle = Time.currentElapsedTime();
 long electionTimeTaken = self.end_fle - self.start_fle;
 self.setElectionTimeTaken(electionTimeTaken);
 LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
 QuorumPeer.FLE_TIME_UNIT);
 self.start_fle = 0;
 self.end_fle = 0;
 fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
 try {
 // 查找 leader
 QuorumServer leaderServer = findLeader(); 
 try {
// 连接 leader
 connectToLeader(leaderServer.addr, leaderServer.hostname);
// 向 leader 注册
 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
 if (self.isReconfigStateChange())
 throw new Exception("learned about role change");
 //check to see if the leader zxid is lower than ours
 //this should never happen but is just a safety check
 long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
 if (newEpoch < self.getAcceptedEpoch()) {
 LOG.error("Proposed leader epoch " + 
ZxidUtils.zxidToString(newEpochZxid)
 + " is less than our accepted epoch " + 
ZxidUtils.zxidToString(self.getAcceptedEpoch()));
 throw new IOException("Error: Epoch of leader is lower");
 }
 syncWithLeader(newEpochZxid); 
 QuorumPacket qp = new QuorumPacket();
// 循环等待接收消息
     while (this.isRunning()) {
// 读取 packet 信息
 readPacket(qp);
// 处理 packet 消息
 processPacket(qp);
 }
 } catch (Exception e) {
 LOG.warn("Exception when following the leader", e);
 try {
 sock.close();
 } catch (IOException e1) {
 e1.printStackTrace();
 }
 // clear pending revalidations
 pendingRevalidations.clear();
 }
 } finally {
 zk.unregisterJMX((Learner)this);
 } }
protected long registerWithLeader(int pktType) throws IOException{
 /*
 * Send follower info, including last zxid and sid
 */
long lastLoggedZxid = self.getLastLoggedZxid();
 QuorumPacket qp = new QuorumPacket(); 
 qp.setType(pktType);
 qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));

 /*
 * Add sid to payload
 */
 LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, 
self.getQuorumVerifier().getVersion());
 ByteArrayOutputStream bsid = new ByteArrayOutputStream();
 BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
 boa.writeRecord(li, "LearnerInfo");
 qp.setData(bsid.toByteArray());

// 发送 FollowerInfo 给 Leader
 writePacket(qp, true);
// 读取 Leader 返回的结果:LeaderInfo
 readPacket(qp); 

 final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
// 如果接收到 LeaderInfo
if (qp.getType() == Leader.LEADERINFO) {
 // we are connected to a 1.0 server so accept the new epoch and read the next packet
 leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
 byte epochBytes[] = new byte[4];
 final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
// 接收 leader 的 epoch
    if (newEpoch > self.getAcceptedEpoch()) {
// 把自己原来的 epoch 保存在 wrappedEpochBytes 里
 wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
// 把 leader 发送过来的 epoch 保存起来
 self.setAcceptedEpoch(newEpoch);
 } else if (newEpoch == self.getAcceptedEpoch()) {
 // since we have already acked an epoch equal to the leaders, we cannot ack
 // again, but we still need to send our lastZxid to the leader so that we can
 // sync with it if it does assume leadership of the epoch.
 // the -1 indicates that this reply should not count as an ack for the new epoch
 wrappedEpochBytes.putInt(-1);
 } else {
 throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted 
epoch, " + self.getAcceptedEpoch());
 }
// 发送 ackepoch 给 leader(包含了自己的:epoch 和 zxid)
 QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, 
lastLoggedZxid, epochBytes, null);
 writePacket(ackNewEpoch, true);
 return ZxidUtils.makeZxid(newEpoch, 0);
 } else {
 if (newEpoch > self.getAcceptedEpoch()) {
 self.setAcceptedEpoch(newEpoch);
 }
 if (qp.getType() != Leader.NEWLEADER) {
 LOG.error("First packet should have been NEWLEADER");
 throw new IOException("First packet should have been NEWLEADER");
 }
 return qp.getZxid();
 } }

5、Leader.lead()接收 Follwer 状态,根据同步方式发送同步消息
public void run() {
 try {
 leader.addLearnerHandler(this);
// 心跳处理
 tickOfNextAckDeadline = leader.self.tick.get()
 + leader.self.initLimit + leader.self.syncLimit;
 ia = BinaryInputArchive.getArchive(bufferedInput);
 bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
 oa = BinaryOutputArchive.getArchive(bufferedOutput);
// 从网络中接收消息,并反序列化为 packet
 QuorumPacket qp = new QuorumPacket();
 ia.readRecord(qp, "packet");
// 选举结束后,observer 和 follower 都应该给 leader 发送一个标志信息:
FOLLOWERINFO 或者 OBSERVERINFO
 if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != 
Leader.OBSERVERINFO){
 LOG.error("First packet " + qp.toString()
 + " is not FOLLOWERINFO or OBSERVERINFO!");
 return;
     }
 byte learnerInfoData[] = qp.getData();
 if (learnerInfoData != null) {
 ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
 if (learnerInfoData.length >= 8) {
 this.sid = bbsid.getLong();
 }
 if (learnerInfoData.length >= 12) {
 this.version = bbsid.getInt(); // protocolVersion
 }
 if (learnerInfoData.length >= 20) {
 long configVersion = bbsid.getLong();
 if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
 throw new IOException("Follower is ahead of the leader (has a later 
activated configuration)");
 }
 }
 } else {
 this.sid = leader.followerCounter.getAndDecrement();
 }
 if (leader.self.getView().containsKey(this.sid)) {
 LOG.info("Follower sid: " + this.sid + " : info : "
 + leader.self.getView().get(this.sid).toString());
 } else {
 LOG.info("Follower sid: " + this.sid + " not in the current config " + 
Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
 }

 if (qp.getType() == Leader.OBSERVERINFO) {
 learnerType = LearnerType.OBSERVER;
 }
// 读取 Follower 发送过来的 lastAcceptedEpoch
// 选举过程中,所使用的 epoch,其实还是上一任 leader 的 epoch
 long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
 long peerLastZxid;
 StateSummary ss = null;
// 读取 follower 发送过来的 zxid
 long zxid = qp.getZxid();
// 获取 leader 的最新 epoch
// 新的 leader 会构建一个新的 epoch
 long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
 long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
 if (this.getVersion() < 0x10000) {
 // we are going to have to extrapolate the epoch information
 long epoch = ZxidUtils.getEpochFromZxid(zxid);
 ss = new StateSummary(epoch, zxid);
 // fake the message
 leader.waitForEpochAck(this.getSid(), ss);
 } else {
 byte ver[] = new byte[4];
     ByteBuffer.wrap(ver).putInt(0x10000);
// Leader 向 Follower 发送信息(包含:zxid 和 newEpoch)
 QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, 
newLeaderZxid, ver, null);
 oa.writeRecord(newEpochPacket, "packet");
 bufferedOutput.flush();
// 接收到 Follower 应答的 ackepoch
 QuorumPacket ackEpochPacket = new QuorumPacket();
 ia.readRecord(ackEpochPacket, "packet");
 if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
 LOG.error(ackEpochPacket.toString()
 + " is not ACKEPOCH");
 return;
}
 ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
// 保存了对方 follower 或者 observer 的状态:epoch 和 zxid
 ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
 leader.waitForEpochAck(this.getSid(), ss);
 }
 peerLastZxid = ss.getLastZxid();

 // Take any necessary action if we need to send TRUNC or DIFF
 // startForwarding() will be called in all cases
// 方法判断 Leader 和 Follower 是否需要同步
 boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), 
leader);

 /* if we are not truncating or sending a diff just send a snapshot */
 if (needSnap) {
 boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
 LearnerSnapshot snapshot = 

leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
 try {
 long zxidToSend = 
leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
 oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, 
null), "packet");
 bufferedOutput.flush();
 … …
 // Dump data to peer
 leader.zk.getZKDatabase().serializeSnapshot(oa);
 oa.writeString("BenWasHere", "signature");
 bufferedOutput.flush();
 } finally {
 snapshot.close();
 }
 }
 if (getVersion() < 0x10000) {
     QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
 newLeaderZxid, null, null);
 oa.writeRecord(newLeaderQP, "packet");
 } else {
 QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
 newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
 .toString().getBytes(), null);
 queuedPackets.add(newLeaderQP);
 }
 … …
 while (true) {
 … …
 }
 } catch (IOException e) {
 ... ...
 } finally {
 ... ...
 } }
public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
 /*
 * When leader election is completed, the leader will set its
 * lastProcessedZxid to be (epoch < 32). There will be no txn associated
 * with this zxid.
 *
 * The learner will set its lastProcessedZxid to the same value if
 * it get DIFF or SNAP from the leader. If the same learner come
 * back to sync with leader using this zxid, we will never find this
 * zxid in our history. In this case, we will ignore TRUNC logic and
 * always send DIFF if we have old enough history
 */
 boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
 // Keep track of the latest zxid which already queued
 long currentZxid = peerLastZxid;
 boolean needSnap = true;
 boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
 ReentrantReadWriteLock lock = db.getLogLock();
 ReadLock rl = lock.readLock();
 try {
 rl.lock();
 long maxCommittedLog = db.getmaxCommittedLog();
 long minCommittedLog = db.getminCommittedLog();
 long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
 LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}"
 + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
 + " peerLastZxid=0x{}", getSid(),
 Long.toHexString(maxCommittedLog),
 Long.toHexString(minCommittedLog),
 Long.toHexString(lastProcessedZxid),
 Long.toHexString(peerLastZxid));
 if (db.getCommittedLog().isEmpty()) {
 /*
 * It is possible that committedLog is empty. In that case
 * setting these value to the latest txn in leader db
 * will reduce the case that we need to handle
 *
 * Here is how each case handle by the if block below
 * 1. lastProcessZxid == peerZxid -> Handle by (2)
 * 2. lastProcessZxid < peerZxid -> Handle by (3)
 * 3. lastProcessZxid > peerZxid -> Handle by (5)
 */
 minCommittedLog = lastProcessedZxid;
 maxCommittedLog = lastProcessedZxid;
 }
 /*
 * Here are the cases that we want to handle
 *
 * 1. Force sending snapshot (for testing purpose)
 * 2. Peer and leader is already sync, send empty diff
 * 3. Follower has txn that we haven't seen. This may be old leader
 * so we need to send TRUNC. However, if peer has newEpochZxid,
 * we cannot send TRUNC since the follower has no txnlog
 * 4. Follower is within committedLog range or already in-sync.
 * We may need to send DIFF or TRUNC depending on follower's zxid
 * We always send empty DIFF if follower is already in-sync
 * 5. Follower missed the committedLog. We will try to use on-disk
 * txnlog + committedLog to sync with follower. If that fail,
 * we will send snapshot
 */
 if (forceSnapSync) {
 // Force leader to use snapshot to sync with follower
 LOG.warn("Forcing snapshot sync - should not see this in production");
 } else if (lastProcessedZxid == peerLastZxid) {
 // Follower is already sync with us, send empty diff
 LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) +
 " for peer sid: " + getSid());
 queueOpPacket(Leader.DIFF, peerLastZxid);
 needOpPacket = false;
 needSnap = false;
 } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
 // Newer than committedLog, send trunc and done
 LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
 Long.toHexString(maxCommittedLog) +
 " for peer sid:" + getSid());
 queueOpPacket(Leader.TRUNC, maxCommittedLog);
 currentZxid = maxCommittedLog;
 needOpPacket = false;
 needSnap = false;
 } else if ((maxCommittedLog >= peerLastZxid)
 && (minCommittedLog <= peerLastZxid)) {
 // Follower is within commitLog range
 LOG.info("Using committedLog for peer sid: " + getSid());
 Iterator<Proposal> itr = db.getCommittedLog().iterator();
 currentZxid = queueCommittedProposals(itr, peerLastZxid,
 null, maxCommittedLog);
 needSnap = false;
 } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
     // Use txnlog and committedLog to sync
 // Calculate sizeLimit that we allow to retrieve txnlog from disk
 long sizeLimit = db.calculateTxnLogSizeLimit();
 // This method can return empty iterator if the requested zxid
 // is older than on-disk txnlog
 Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
 peerLastZxid, sizeLimit);
 if (txnLogItr.hasNext()) {
 LOG.info("Use txnlog and committedLog for peer sid: " + getSid());
 currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
 minCommittedLog, 
maxCommittedLog);
 LOG.debug("Queueing committedLog 0x" + 
Long.toHexString(currentZxid));
 Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
 currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
 null, maxCommittedLog);
 needSnap = false;
 }
 // closing the resources
 if (txnLogItr instanceof TxnLogProposalIterator) {
 TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) 
txnLogItr;
 txnProposalItr.close();
 }
 } else {
 LOG.warn("Unhandled scenario for peer sid: " + getSid());
 }
 LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
 " for peer sid: " + getSid());
 leaderLastZxid = leader.startForwarding(this, currentZxid);
 } finally {
 rl.unlock();
 }
 if (needOpPacket && !needSnap) {
 // This should never happen, but we should fall back to sending
 // snapshot just in case.
 LOG.error("Unhandled scenario for peer sid: " + getSid() +
 " fall back to use snapshot");
 needSnap = true;
 }
 return needSnap;
}

6、Follower.lead()应答 Leader 同步结果
protected void processPacket(QuorumPacket qp) throws Exception{
 switch (qp.getType()) {
 case Leader.PING: 
 ping(qp); 
 break;
 case Leader.PROPOSAL: 
 TxnHeader hdr = new TxnHeader();
          Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
 if (hdr.getZxid() != lastQueued + 1) {
 LOG.warn("Got zxid 0x"
 + Long.toHexString(hdr.getZxid())
 + " expected 0x"
 + Long.toHexString(lastQueued + 1));
 }
 lastQueued = hdr.getZxid();

 if (hdr.getType() == OpCode.reconfig){
 SetDataTxn setDataTxn = (SetDataTxn) txn; 
 QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
 self.setLastSeenQuorumVerifier(qv, true); 
 }

 fzk.logRequest(hdr, txn);
 break;
 case Leader.COMMIT:
 fzk.commit(qp.getZxid());
 break;

 … …
 case Leader.UPTODATE:
 LOG.error("Received an UPTODATE message after Follower started");
 break;
 case Leader.REVALIDATE:
 revalidate(qp);
 break;
 case Leader.SYNC:
 fzk.sync();
 break;
 default:
 LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
 break;
 } }
public void commit(long zxid) {
 if (pendingTxns.size() == 0) {
 LOG.warn("Committing " + Long.toHexString(zxid)
 + " without seeing txn");
 return;
 }
 long firstElementZxid = pendingTxns.element().zxid;
 if (firstElementZxid != zxid) {
 LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
 + " but next pending txn 0x"
 + Long.toHexString(firstElementZxid));
 System.exit(12);
 }
 Request request = pendingTxns.remove();
 commitProcessor.commit(request);
}

7、Leader.lead()应答 Follower

由于 public class LearnerHandler extends ZooKeeperThread{},说明 LearnerHandler 是一个线程。所以 fh.start()执行的是 LearnerHandler 中的 run()方法。

public void run() {
 … …
 //
 LOG.debug("Sending UPTODATE message to " + sid); 
 queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
 while (true) {
 … …
 }
 } catch (IOException e) {
 ... ...
 } finally {
 ... ...
 } }

服务端 Leader 启动

image.png
ZooKeeperServer
Ctrl + n 全局查找 Leader,然后 ctrl + f 查找 lead
Leader.java

void lead() throws IOException, InterruptedException {
... ...
// 启动 zookeeper 服务
 startZkServer();
... ...
}
final LeaderZooKeeperServer zk;
private synchronized void startZkServer() {
 ... ...
     // 启动 Zookeeper
 zk.startup();
 ... ...
}

LeaderZooKeeperServer.java

@Override
public synchronized void startup() {
 super.startup();
 if (containerManager != null) {
 containerManager.start();
 } }

ZookeeperServer.java

public synchronized void startup() {
 if (sessionTracker == null) {
 createSessionTracker();
 }
 startSessionTracker();
// 
 setupRequestProcessors();
 registerJMX();
 setState(State.RUNNING);
 notifyAll();
}
protected void setupRequestProcessors() {
 RequestProcessor finalProcessor = new FinalRequestProcessor(this);
 RequestProcessor syncProcessor = new SyncRequestProcessor(this,
 finalProcessor);
 ((SyncRequestProcessor)syncProcessor).start();
 firstProcessor = new PrepRequestProcessor(this, syncProcessor);
 ((PrepRequestProcessor)firstProcessor).start();
}

点击 PrepRequestProcessor,并查找它的 run 方法

public void run() {
 try {
 while (true) {
 Request request = submittedRequests.take();
 long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
 if (request.type == OpCode.ping) {
 traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
 }
 if (LOG.isTraceEnabled()) {
 ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
 }
 if (Request.requestOfDeath == request) {
 break;
 }
 pRequest(request);
 }
 } catch (RequestProcessorException e) {
 if (e.getCause() instanceof XidRolloverException) {
      LOG.info(e.getCause().getMessage());
 }
 handleException(this.getName(), e);
 } catch (Exception e) {
 handleException(this.getName(), e);
 }
 LOG.info("PrepRequestProcessor exited loop!");
}
protected void pRequest(Request request) throws RequestProcessorException {
 // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
 // request.type + " id = 0x" + Long.toHexString(request.sessionId));
 request.setHdr(null);
 request.setTxn(null);
 try {
 switch (request.type) {
 case OpCode.createContainer:
 case OpCode.create:
 case OpCode.create2:
 CreateRequest create2Request = new CreateRequest();
 pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
 break;
 case OpCode.createTTL:
 CreateTTLRequest createTtlRequest = new CreateTTLRequest();
 pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, 
true);
 break;
 case OpCode.deleteContainer:
 case OpCode.delete:
 DeleteRequest deleteRequest = new DeleteRequest();
 pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
 break;
 case OpCode.setData:
 SetDataRequest setDataRequest = new SetDataRequest(); 
 pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
 break;
 case OpCode.reconfig:
 ReconfigRequest reconfigRequest = new ReconfigRequest();
 ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
 pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
 break;
 case OpCode.setACL:
 SetACLRequest setAclRequest = new SetACLRequest(); 
 pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
 break;
 case OpCode.check:
 CheckVersionRequest checkRequest = new CheckVersionRequest(); 
 pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
 break;
 case OpCode.multi:
 MultiTransactionRecord multiRequest = new MultiTransactionRecord();
 try {
 ByteBufferInputStream.byteBuffer2Record(request.request, 
multiRequest);
 } catch(IOException e) {
 request.setHdr(new TxnHeader(request.sessionId, request.cxid,
zks.getNextZxid(),
 Time.currentWallTime(), OpCode.multi));
 throw e;
 }
 List<Txn> txns = new ArrayList<Txn>();
 //Each op in a multi-op must have the same zxid!
 long zxid = zks.getNextZxid();
 KeeperException ke = null;
 //Store off current pending change records in case we need to rollback
 Map<String, ChangeRecord> pendingChanges = 
getPendingChanges(multiRequest);
 for(Op op: multiRequest) {
 Record subrequest = op.toRequestRecord();
 int type;
 Record txn;
 /* If we've already failed one of the ops, don't bother
 * trying the rest as we know it's going to fail and it
 * would be confusing in the logfiles.
 */
 if (ke != null) {
 type = OpCode.error;
 txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
 }
 /* Prep the request and convert to a Txn */
 else {
 try {
 pRequest2Txn(op.getType(), zxid, request, subrequest, false);
 type = request.getHdr().getType();
 txn = request.getTxn();
 } catch (KeeperException e) {
 ke = e;
 type = OpCode.error;
 txn = new ErrorTxn(e.code().intValue());
 if (e.code().intValue() > Code.APIERROR.intValue()) {
 LOG.info("Got user-level KeeperException when 
processing {} aborting" +
 " remaining multi ops. Error Path:{} Error:{}",
 request.toString(), e.getPath(), e.getMessage());
 }
 request.setException(e);
 /* Rollback change records from failed multi-op */
 rollbackPendingChanges(zxid, pendingChanges);
 }
 }
 //FIXME: I don't want to have to serialize it here and then
 // immediately deserialize in next processor. But I'm
 // not sure how else to get the txn stored into our list.
 ByteArrayOutputStream baos = new ByteArrayOutputStream();
 BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
     txn.serialize(boa, "request") ;
 ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
 txns.add(new Txn(type, bb.array()));
 }
 request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
 Time.currentWallTime(), request.type));
 request.setTxn(new MultiTxn(txns));
 break;
 //create/close session don't require request record
 case OpCode.createSession:
 case OpCode.closeSession:
 if (!request.isLocalSession()) {
 pRequest2Txn(request.type, zks.getNextZxid(), request,
 null, true);
 }
 break;
 //All the rest don't need to create a Txn - just verify session
 case OpCode.sync:
 case OpCode.exists:
 case OpCode.getData:
 case OpCode.getACL:
 case OpCode.getChildren:
 case OpCode.getChildren2:
 case OpCode.ping:
 case OpCode.setWatches:
 case OpCode.checkWatches:
 case OpCode.removeWatches:
 zks.sessionTracker.checkSession(request.sessionId,
 request.getOwner());
 break;
 default:
 LOG.warn("unknown type " + request.type);
 break;
 }
 } catch (KeeperException e) {
 ... ...
 } catch (Exception e) {
 ... ...
 }
 request.zxid = zks.getZxid();
 nextProcessor.processRequest(request);
}

服务端 Follower 启动

image.png
FollowerZooKeeperServer
Ctrl + n 全局查找 Follower,然后 ctrl + f 查找 followLeader(

void followLeader() throws InterruptedException {
 self.end_fle = Time.currentElapsedTime();
 long electionTimeTaken = self.end_fle - self.start_fle;
 self.setElectionTimeTaken(electionTimeTaken);
 LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
 QuorumPeer.FLE_TIME_UNIT);
 self.start_fle = 0;
 self.end_fle = 0;
 fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
 try {
 QuorumServer leaderServer = findLeader(); 
 try {
 connectToLeader(leaderServer.addr, leaderServer.hostname);
 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
 if (self.isReconfigStateChange())
 throw new Exception("learned about role change");
 //check to see if the leader zxid is lower than ours
 //this should never happen but is just a safety check
 long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
 if (newEpoch < self.getAcceptedEpoch()) {
 LOG.error("Proposed leader epoch " + 
ZxidUtils.zxidToString(newEpochZxid)
 + " is less than our accepted epoch " + 
ZxidUtils.zxidToString(self.getAcceptedEpoch()));
 throw new IOException("Error: Epoch of leader is lower");
 }
 syncWithLeader(newEpochZxid); 
 QuorumPacket qp = new QuorumPacket();
 while (this.isRunning()) {
 readPacket(qp);
 processPacket(qp);
     }
 } catch (Exception e) {
 ... ...
 }
 } finally {
 zk.unregisterJMX((Learner)this);
 } }
void readPacket(QuorumPacket pp) throws IOException {
 synchronized (leaderIs) {
 leaderIs.readRecord(pp, "packet");
 }
 if (LOG.isTraceEnabled()) {
 final long traceMask =
 (pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK
 : ZooTrace.SERVER_PACKET_TRACE_MASK;
 ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
 } }
protected void processPacket(QuorumPacket qp) throws Exception{
 switch (qp.getType()) {
 case Leader.PING: 
 ping(qp); 
 break;
 case Leader.PROPOSAL: 
 TxnHeader hdr = new TxnHeader();
 Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
 if (hdr.getZxid() != lastQueued + 1) {
 LOG.warn("Got zxid 0x"
 + Long.toHexString(hdr.getZxid())
 + " expected 0x"
 + Long.toHexString(lastQueued + 1));
 }
 lastQueued = hdr.getZxid();

 if (hdr.getType() == OpCode.reconfig){
 SetDataTxn setDataTxn = (SetDataTxn) txn; 
 QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
 self.setLastSeenQuorumVerifier(qv, true); 
 }

 fzk.logRequest(hdr, txn);
 break;
 case Leader.COMMIT:
 fzk.commit(qp.getZxid());
 break;

 case Leader.COMMITANDACTIVATE:
 // get the new configuration from the request
 Request request = fzk.pendingTxns.element();
 SetDataTxn setDataTxn = (SetDataTxn) request.getTxn(); 
 QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData())); 
 // get new designated leader from (current) leader's message
         ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); 
 long suggestedLeaderId = buffer.getLong();
 boolean majorChange = 
 self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
 // commit (writes the new config to ZK tree (/zookeeper/config) 
 fzk.commit(qp.getZxid());
 if (majorChange) {
 throw new Exception("changes proposed in reconfig");
 }
 break;
 case Leader.UPTODATE:
 LOG.error("Received an UPTODATE message after Follower started");
 break;
 case Leader.REVALIDATE:
 revalidate(qp);
 break;
 case Leader.SYNC:
 fzk.sync();
 break;
 default:
 LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
 break;
 } }

客户端初始化源码解析

image.png

客户端启动

ZkCli.sh

#!/usr/bin/env bash
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
 . "$ZOOBINDIR"/../libexec/zkEnv.sh
else
     . "$ZOOBINDIR"/zkEnv.sh
fi
ZOO_LOG_FILE=zookeeper-$USER-cli-$HOSTNAME.log
"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-
Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" "-
Dzookeeper.log.file=${ZOO_LOG_FILE}" \
 -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
 org.apache.zookeeper.ZooKeeperMain "$@"

在 ZkCli.sh 启动 Zookeeper 时,会调用 ZooKeeperMain.java
Ctrl + n 查找 ZooKeeperMain,找到程序的入口 main()方法

public static void main(String args[]) throws CliException, IOException, InterruptedException
{
 ZooKeeperMain main = new ZooKeeperMain(args);
 main.run();
}

创建 ZookeeperMain