第 1 章 算法基础

思考:Zookeeper 是如何保证数据一致性的?这也是困扰分布式系统框架的一个难题。

1.1 拜占庭将军问题

image.png

1.2 Paxos 算法

image.png
image.png
image.png
下面我们针对上述描述做三种情况的推演举例:为了简化流程,我们这里不设置 Learner。
image.png
image.png
Paxos 算法缺陷:在网络复杂的情况下,一个应用 Paxos 算法的分布式系统,可能很久无法收敛,甚至陷入活锁的情况。
image.png
造成这种情况的原因是系统中有一个以上的 Proposer,多个 Proposers 相互争夺 Acceptor,造成迟迟无法达成一致的情况。针对这种情况,一种改进的 Paxos 算法被提出:从系统中选出一个节点作为 Leader,只有 Leader 能够发起提案。这样,一次 Paxos 流程中只有一个Proposer,不会出现活锁的情况,此时只会出现例子中第一种情况。

1.3 ZAB 协议

1.3.1 什么是 ZAB 算法

Zab 借鉴了 Paxos 算法,是特别为 Zookeeper 设计的支持崩溃恢复的原子广播协议。基于该协议,Zookeeper 设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后Leader 客户端将数据同步到其他 Follower 节点。即 Zookeeper 只有一个 Leader 可以发起提案。

1.3.2 Zab 协议内容

Zab 协议包括两种基本的模式:消息广播、崩溃恢复。
1)消息广播
image.png
2)崩溃恢复
image.png
image.png
image.png

1.4 CAP理论

CAP理论告诉我们,一个分布式系统不可能同时满足以下三种

  • 一致性(C:Consistency)
  • 可用性(A:Available)
  • 分区容错性(P:Partition Tolerance)

这三个基本需求,最多只能同时满足其中的两项,因为P是必须的,因此往往选择就在CP或者AP中。
1)一致性(C:Consistency)
在分布式环境中,一致性是指数据在多个副本之间是否能够保持数据一致的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。
2)可用性(A:Available)
可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果。
3)分区容错性(P:Partition Tolerance)
分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障。
ZooKeeper保证的是CP
(1)ZooKeeper不能保证每次服务请求的可用性。(注:在极端环境下,ZooKeeper可能会丢弃一些请求,消费者程序需要重新请求才能获得结果)。所以说,ZooKeeper不能保证服务可用性。
(2)进行Leader选举时集群都是不可用。

第 2 章 源码详解

2.1 辅助源码

2.1.1 持久化源码

Leader 和 Follower 中的数据会在内存和磁盘中各保存一份。所以需要将内存中的数据持久化到磁盘中。
在 org.apache.zookeeper.server.persistence 包下的相关类都是序列化相关的代码。
image.png
1)快照

  1. public interface SnapShot {
  2. // 反序列化方法
  3. long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;
  4. // 序列化方法
  5. void serialize(DataTree dt, Map<Long, Integer> sessions, File name) throws IOException;
  6. /**
  7. * find the most recent snapshot file
  8. * 查找最近的快照文件
  9. */
  10. File findMostRecentSnapshot() throws IOException;
  11. // 释放资源
  12. void close() throws IOException;
  13. }

2)操作日志

  1. public interface TxnLog {
  2. // 设置服务状态
  3. void setServerStats(ServerStats serverStats);
  4. // 滚动日志
  5. void rollLog() throws IOException;
  6. // 追加
  7. boolean append(TxnHeader hdr, Record r) throws IOException;
  8. // 读取数据
  9. TxnIterator read(long zxid) throws IOException;
  10. // 获取最后一个 zxid
  11. long getLastLoggedZxid() throws IOException;
  12. // 删除日志
  13. boolean truncate(long zxid) throws IOException;
  14. // 获取 DbId
  15. long getDbId() throws IOException;
  16. // 提交
  17. void commit() throws IOException;
  18. // 日志同步时间
  19. long getTxnLogSyncElapsedTime();
  20. // 关闭日志
  21. void close() throws IOException;
  22. // 读取日志的接口
  23. public interface TxnIterator {
  24. // 获取头信息
  25. TxnHeader getHeader();
  26. // 获取传输的内容
  27. Record getTxn();
  28. // 下一条记录
  29. boolean next() throws IOException;
  30. // 关闭资源
  31. void close() throws IOException;
  32. // 获取存储的大小
  33. long getStorageSize() throws IOException;
  34. }
  35. }

3)处理持久化的核心类
image.png

2.1.2 序列化源码

zookeeper-jute 代码是关于 Zookeeper 序列化相关源码
image.png
1)序列化和反序列化方法

  1. public interface Record {
  2. // 序列化方法
  3. public void serialize(OutputArchive archive, String tag) throws IOException;
  4. // 反序列化方法
  5. public void deserialize(InputArchive archive, String tag) throws IOException;
  6. }

2)迭代

  1. public interface Index {
  2. // 结束
  3. public boolean done();
  4. // 下一个
  5. public void incr();
  6. }

3)序列化支持的数据类型

  1. /**
  2. * Interface that alll the serializers have to implement.
  3. */
  4. public interface OutputArchive {
  5. public void writeByte(byte b, String tag) throws IOException;
  6. public void writeBool(boolean b, String tag) throws
  7. IOException;
  8. public void writeInt(int i, String tag) throws IOException;
  9. public void writeLong(long l, String tag) throws IOException;
  10. public void writeFloat(float f, String tag) throws
  11. IOException;
  12. public void writeDouble(double d, String tag) throws
  13. IOException;
  14. public void writeString(String s, String tag) throws
  15. IOException;
  16. public void writeBuffer(byte buf[], String tag)
  17. throws IOException;
  18. public void writeRecord(Record r, String tag) throws
  19. IOException;
  20. public void startRecord(Record r, String tag) throws
  21. IOException;
  22. public void endRecord(Record r, String tag) throws
  23. IOException;
  24. public void startVector(List<?> v, String tag) throws
  25. IOException;
  26. public void endVector(List<?> v, String tag) throws
  27. IOException;
  28. public void startMap(TreeMap<?, ?> v, String tag) throws
  29. IOException;
  30. public void endMap(TreeMap<?, ?> v, String tag) throws
  31. IOException;
  32. }

4)反序列化支持的数据类型

  1. /**
  2. * Interface that all the Deserializers have to implement.
  3. */
  4. public interface InputArchive {
  5. public byte readByte(String tag) throws IOException;
  6. public boolean readBool(String tag) throws IOException;
  7. public int readInt(String tag) throws IOException;
  8. public long readLong(String tag) throws IOException;
  9. public float readFloat(String tag) throws IOException;
  10. public double readDouble(String tag) throws IOException;
  11. public String readString(String tag) throws IOException;
  12. public byte[] readBuffer(String tag) throws IOException;
  13. public void readRecord(Record r, String tag) throws
  14. IOException;
  15. public void startRecord(String tag) throws IOException;
  16. public void endRecord(String tag) throws IOException;
  17. public Index startVector(String tag) throws IOException;
  18. public void endVector(String tag) throws IOException;
  19. public Index startMap(String tag) throws IOException;
  20. public void endMap(String tag) throws IOException;
  21. }

2.2 ZK 服务端初始化源码解析

image.png

2.2.1 ZK 服务端启动脚本分析

1)Zookeeper 服务的启动命令是 zkServer.sh start

  1. #!/usr/bin/env bash
  2. # use POSTIX interface, symlink is followed automatically
  3. ZOOBIN="${BASH_SOURCE-$0}"
  4. ZOOBIN="$(dirname "${ZOOBIN}")"
  5. ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
  6. if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
  7. . "$ZOOBINDIR"/../libexec/zkEnv.sh
  8. else
  9. . "$ZOOBINDIR"/zkEnv.sh //相当于获取 zkEnv.sh 中的环境变量(ZOOCFG="zoo.cfg")
  10. fi
  11. # See the following page for extensive details on setting
  12. # up the JVM to accept JMX remote management:
  13. # http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html
  14. # by default we allow local JMX connections
  15. if [ "x$JMXLOCALONLY" = "x" ]
  16. then
  17. JMXLOCALONLY=false
  18. fi
  19. if [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ]
  20. then
  21. echo "ZooKeeper JMX enabled by default" >&2
  22. if [ "x$JMXPORT" = "x" ]
  23. then
  24. # for some reason these two options are necessary on jdk6 on Ubuntu
  25. # accord to the docs they are not necessary, but otw jconsole cannot
  26. # do a local attach
  27. ZOOMAIN="-Dcom.sun.management.jmxremote
  28. -
  29. Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY
  30. org.apache.zookeeper.server.quorum.QuorumPeerMain"
  31. else
  32. if [ "x$JMXAUTH" = "x" ]
  33. then
  34. JMXAUTH=false
  35. fi
  36. if [ "x$JMXSSL" = "x" ]
  37. then
  38. JMXSSL=false
  39. fi
  40. if [ "x$JMXLOG4J" = "x" ]
  41. then
  42. JMXLOG4J=true
  43. fi
  44. echo "ZooKeeper remote JMX Port set to $JMXPORT" >&2
  45. echo "ZooKeeper remote JMX authenticate set to $JMXAUTH" >&2
  46. echo "ZooKeeper remote JMX ssl set to $JMXSSL" >&2
  47. echo "ZooKeeper remote JMX log4j set to $JMXLOG4J" >&2
  48. ZOOMAIN="-Dcom.sun.management.jmxremote
  49. -
  50. Dcom.sun.management.jmxremote.port=$JMXPORT
  51. -
  52. Dcom.sun.management.jmxremote.authenticate=$JMXAUTH
  53. -
  54. Dcom.sun.management.jmxremote.ssl=$JMXSSL
  55. -
  56. Dzookeeper.jmx.log4j.disable=$JMXLOG4J
  57. org.apache.zookeeper.server.quorum.QuorumPeerMain"
  58. fi
  59. else
  60. echo "JMX disabled by user request" >&2
  61. ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
  62. fi
  63. if [ "x$SERVER_JVMFLAGS" != "x" ]
  64. then
  65. JVMFLAGS="$SERVER_JVMFLAGS $JVMFLAGS"
  66. fi
  67. case $1 in
  68. start)
  69. echo -n "Starting zookeeper ... "
  70. if [ -f "$ZOOPIDFILE" ]; then
  71. if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then
  72. echo $command already running as process `cat "$ZOOPIDFILE"`.
  73. exit 1
  74. fi
  75. fi
  76. nohup "$JAVA"
  77. $ZOO_DATADIR_AUTOCREATE "-
  78. Dzookeeper.log.dir=${ZOO_LOG_DIR}" \
  79. "-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-
  80. Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
  81. -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \
  82. -cp "$CLASSPATH" $JVMFLAGS
  83. $ZOOMAIN
  84. "$ZOOCFG" >
  85. "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &
  86. ;;
  87. stop)
  88. echo -n "Stopping zookeeper ... "
  89. if [ ! -f "$ZOOPIDFILE" ]
  90. then
  91. echo "no zookeeper to stop (could not find file $ZOOPIDFILE)"
  92. else
  93. $KILL $(cat "$ZOOPIDFILE")
  94. rm "$ZOOPIDFILE"
  95. sleep 1
  96. echo STOPPED
  97. fi
  98. exit 0
  99. ;;
  100. restart)
  101. shift
  102. "$0" stop ${@}
  103. sleep 3
  104. "$0" start ${@}
  105. ;;
  106. status)
  107. ;;
  108. *)
  109. echo "Usage: $0 [--config <conf-dir>] {start|start-foreground|stop|restart|status|print
  110. cmd}" >&2
  111. esac

2)zkServer.sh start 底层的实际执行内容

  1. nohup "$JAVA"
  2. + 一堆提交参数
  3. + $ZOOMAINorg.apache.zookeeper.server.quorum.QuorumPeerMain
  4. + "$ZOOCFG" zkEnv.sh 文件中 ZOOCFG="zoo.cfg"

3)所以程序的入口是 QuorumPeerMain.java 类

2.2.2 ZK 服务端启动入口

1)ctrl + n,查找 QuorumPeerMain

  1. public static void main(String[] args) {
  2. // 创建了一个 zk 节点
  3. QuorumPeerMain main = new QuorumPeerMain();
  4. try {
  5. // 初始化节点并运行,args 相当于提交参数中的 zoo.cfg
  6. main.initializeAndRun(args);
  7. } catch (IllegalArgumentException e) {
  8. ...
  9. }
  10. LOG.info("Exiting normally");
  11. System.exit(0);
  12. }

2)initializeAndRun

  1. protected void initializeAndRun(String[] args)
  2. throws ConfigException, IOException, AdminServerException {
  3. // 管理 zk 的配置信息
  4. QuorumPeerConfig config = new QuorumPeerConfig();
  5. if (args.length == 1) {
  6. // 1 解析参数,zoo.cfg 和 myid
  7. config.parse(args[0]);
  8. }
  9. // 2 启动定时任务,对过期的快照,执行删除(默认该功能关闭)
  10. // Start and schedule the the purge task
  11. DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
  12. .getDataDir(), config.getDataLogDir(), config
  13. .getSnapRetainCount(), config.getPurgeInterval());
  14. purgeMgr.start();
  15. if (args.length == 1 && config.isDistributed()) {
  16. // 3 启动集群
  17. runFromConfig(config);
  18. } else {
  19. LOG.warn("Either no config or no quorum defined in config, running "
  20. + " in standalone mode");
  21. // there is only server in the quorum -- run as standalone
  22. ZooKeeperServerMain.main(args);
  23. }
  24. }

2.2.3 解析参数 zoo.cfg 和 myid

  1. public void parse(String path) throws ConfigException {
  2. LOG.info("Reading configuration from: " + path);
  3. try {
  4. // 校验文件路径及是否存在
  5. File configFile = (new VerifyingFileFactory.Builder(LOG)
  6. .warnForRelativePath()
  7. .failForNonExistingPath()
  8. .build()).create(path);
  9. Properties cfg = new Properties();
  10. FileInputStream in = new FileInputStream(configFile);
  11. try {
  12. // 加载配置文件
  13. cfg.load(in);
  14. configFileStr = path;
  15. } finally {
  16. in.close();
  17. }
  18. // 解析配置文件
  19. parseProperties(cfg);
  20. } catch (IOException e) {
  21. throw new ConfigException("Error processing " + path, e);
  22. } catch (IllegalArgumentException e) {
  23. throw new ConfigException("Error processing " + path, e);
  24. }
  25. ... ...
  26. }
  1. public void parseProperties(Properties zkProp)
  2. throws IOException, ConfigException {
  3. int clientPort = 0;
  4. int secureClientPort = 0;
  5. String clientPortAddress = null;
  6. String secureClientPortAddress = null;
  7. VerifyingFileFactory vff = new
  8. VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
  9. // 读取 zoo.cfg 文件中的属性值,并赋值给 QuorumPeerConfig 的类对象
  10. for (Entry<Object, Object> entry : zkProp.entrySet()) {
  11. String key = entry.getKey().toString().trim();
  12. String value = entry.getValue().toString().trim();
  13. if (key.equals("dataDir")) {
  14. dataDir = vff.create(value);
  15. } else if (key.equals("dataLogDir")) {
  16. dataLogDir = vff.create(value);
  17. } else if (key.equals("clientPort")) {
  18. clientPort = Integer.parseInt(value);
  19. } else if (key.equals("localSessionsEnabled")) {
  20. localSessionsEnabled = Boolean.parseBoolean(value);
  21. } else if (key.equals("localSessionsUpgradingEnabled")) {
  22. localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
  23. } else if (key.equals("clientPortAddress")) {
  24. clientPortAddress = value.trim();
  25. } else if (key.equals("secureClientPort")) {
  26. secureClientPort = Integer.parseInt(value);
  27. } else if (key.equals("secureClientPortAddress")){
  28. secureClientPortAddress = value.trim();
  29. } else if (key.equals("tickTime")) {
  30. tickTime = Integer.parseInt(value);
  31. } else if (key.equals("maxClientCnxns")) {
  32. maxClientCnxns = Integer.parseInt(value);
  33. } else if (key.equals("minSessionTimeout")) {
  34. minSessionTimeout = Integer.parseInt(value);
  35. }
  36. ... ...
  37. }
  38. ... ...
  39. if (dynamicConfigFileStr == null) {
  40. setupQuorumPeerConfig(zkProp, true);
  41. if (isDistributed() && isReconfigEnabled()) {
  42. // we don't backup static config for standalone mode.
  43. // we also don't backup if reconfig feature is disabled.
  44. backupOldConfig();
  45. }
  46. }
  47. }
  1. void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
  2. throws IOException, ConfigException {
  3. quorumVerifier =
  4. parseDynamicConfig(prop, electionAlg, true,
  5. configBackwardCompatibilityMode);
  6. setupMyId();
  7. setupClientPort();
  8. setupPeerType();
  9. checkValidity();
  10. }
  1. private void setupMyId() throws IOException {
  2. File myIdFile = new File(dataDir, "myid");
  3. // standalone server doesn't need myid file.
  4. if (!myIdFile.isFile()) {
  5. return;
  6. }
  7. BufferedReader br = new BufferedReader(new FileReader(myIdFile));
  8. String myIdString;
  9. try {
  10. myIdString = br.readLine();
  11. } finally {
  12. br.close();
  13. }
  14. try {
  15. // 将解析 myid 文件中的 id 赋值给 serverId
  16. serverId = Long.parseLong(myIdString);
  17. MDC.put("myid", myIdString);
  18. } catch (NumberFormatException e) {
  19. throw new IllegalArgumentException("serverid " + myIdString
  20. + " is not a number");
  21. }
  22. }

2.2.4 过期快照删除

可以启动定时任务,对过期的快照,执行删除。默认该功能时关闭的

  1. protected void initializeAndRun(String[] args)
  2. throws ConfigException, IOException, AdminServerException {
  3. // 管理 zk 的配置信息
  4. QuorumPeerConfig config = new QuorumPeerConfig();
  5. if (args.length == 1) {
  6. // 1 解析参数,zoo.cfg 和 myid
  7. config.parse(args[0]);
  8. }
  9. // 2 启动定时任务,对过期的快照,执行删除(默认是关闭)
  10. // config.getSnapRetainCount() = 3 最少保留的快照个数
  11. // config.getPurgeInterval() = 0 默认 0 表示关闭
  12. // Start and schedule the the purge task
  13. DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
  14. .getDataDir(), config.getDataLogDir(), config
  15. .getSnapRetainCount(), config.getPurgeInterval());
  16. purgeMgr.start();
  17. if (args.length == 1 && config.isDistributed()) {
  18. // 3 启动集群
  19. runFromConfig(config);
  20. } else {
  21. LOG.warn("Either no config or no quorum defined in config, running "
  22. + " in standalone mode");
  23. // there is only server in the quorum -- run as standalone
  24. ZooKeeperServerMain.main(args);
  25. }
  26. }
  27. protected int snapRetainCount = 3;
  28. protected int purgeInterval = 0;
  29. public void start() {
  30. if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
  31. LOG.warn("Purge task is already running.");
  32. return;
  33. }
  34. // 默认情况 purgeInterval=0,该任务关闭,直接返回
  35. // Don't schedule the purge task with zero or negative purge interval.
  36. if (purgeInterval <= 0) {
  37. LOG.info("Purge task is not scheduled.");
  38. return;
  39. }
  40. // 创建一个定时器
  41. timer = new Timer("PurgeTask", true);
  42. // 创建一个清理快照任务
  43. TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
  44. // 如果 purgeInterval 设置的值是 1,表示 1 小时检查一次,判断是否有过期快照,
  45. 有则删除
  46. timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
  47. purgeTaskStatus = PurgeTaskStatus.STARTED;
  48. }
  49. static class PurgeTask extends TimerTask {
  50. private File logsDir;
  51. private File snapsDir;
  52. private int snapRetainCount;
  53. public PurgeTask(File dataDir, File snapDir, int count) {
  54. logsDir = dataDir;
  55. snapsDir = snapDir;
  56. snapRetainCount = count;
  57. }
  58. @Override
  59. public void run() {
  60. LOG.info("Purge task started.");
  61. try {
  62. // 清理过期的数据
  63. PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
  64. } catch (Exception e) {
  65. LOG.error("Error occurred while purging.", e);
  66. }
  67. LOG.info("Purge task completed.");
  68. }
  69. }
  70. public static void purge(File dataDir, File snapDir, int num) throws IOException {
  71. if (num < 3) {
  72. throw new IllegalArgumentException(COUNT_ERR_MSG);
  73. }
  74. FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
  75. List<File> snaps = txnLog.findNRecentSnapshots(num);
  76. int numSnaps = snaps.size();
  77. if (numSnaps > 0) {
  78. purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
  79. }
  80. }

2.2.5 初始化通信组件

  1. protected void initializeAndRun(String[] args)
  2. throws ConfigException, IOException, AdminServerException {
  3. // 管理 zk 的配置信息
  4. QuorumPeerConfig config = new QuorumPeerConfig();
  5. if (args.length == 1) {
  6. // 1 解析参数,zoo.cfg 和 myid
  7. config.parse(args[0]);
  8. }
  9. // 2 启动定时任务,对过期的快照,执行删除(默认是关闭)
  10. // config.getSnapRetainCount() = 3 最少保留的快照个数
  11. // config.getPurgeInterval() = 0 默认 0 表示关闭
  12. // Start and schedule the the purge task
  13. DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
  14. .getDataDir(), config.getDataLogDir(), config
  15. .getSnapRetainCount(), config.getPurgeInterval());
  16. purgeMgr.start();
  17. if (args.length == 1 && config.isDistributed()) {
  18. // 3 启动集群(集群模式)
  19. runFromConfig(config);
  20. } else {
  21. LOG.warn("Either no config or no quorum defined in config, running "
  22. + " in standalone mode");
  23. // there is only server in the quorum -- run as standalone
  24. // 本地模式
  25. ZooKeeperServerMain.main(args);
  26. }
  27. }

1)通信协议默认 NIO(可以支持 Netty)

  1. public void runFromConfig(QuorumPeerConfig config)
  2. throws IOException, AdminServerException {
  3. LOG.info("Starting quorum peer");
  4. try {
  5. ServerCnxnFactory cnxnFactory = null;
  6. ServerCnxnFactory secureCnxnFactory = null;
  7. // 通信组件初始化,默认是 NIO 通信
  8. if (config.getClientPortAddress() != null) {
  9. cnxnFactory = ServerCnxnFactory.createFactory();
  10. cnxnFactory.configure(config.getClientPortAddress(),
  11. config.getMaxClientCnxns(), false);
  12. }
  13. if (config.getSecureClientPortAddress() != null) {
  14. secureCnxnFactory = ServerCnxnFactory.createFactory();
  15. secureCnxnFactory.configure(config.getSecureClientPortAddress(),
  16. config.getMaxClientCnxns(), true);
  17. }
  18. // 把解析的参数赋值给该 zookeeper 节点
  19. quorumPeer = getQuorumPeer();
  20. quorumPeer.setTxnFactory(new FileTxnSnapLog(
  21. config.getDataLogDir(),
  22. config.getDataDir()));
  23. quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
  24. quorumPeer.enableLocalSessionsUpgrading(
  25. config.isLocalSessionsUpgradingEnabled());
  26. //quorumPeer.setQuorumPeers(config.getAllMembers());
  27. quorumPeer.setElectionType(config.getElectionAlg());
  28. quorumPeer.setMyid(config.getServerId());
  29. quorumPeer.setTickTime(config.getTickTime());
  30. quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
  31. quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
  32. quorumPeer.setInitLimit(config.getInitLimit());
  33. quorumPeer.setSyncLimit(config.getSyncLimit());
  34. quorumPeer.setConfigFileName(config.getConfigFilename());
  35. // 管理 zk 数据的存储
  36. quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
  37. quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
  38. if (config.getLastSeenQuorumVerifier() != null) {
  39. quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(),
  40. false);
  41. }
  42. quorumPeer.initConfigInZKDatabase();
  43. // 管理 zk 的通信
  44. quorumPeer.setCnxnFactory(cnxnFactory);
  45. quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
  46. quorumPeer.setSslQuorum(config.isSslQuorum());
  47. quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
  48. quorumPeer.setLearnerType(config.getPeerType());
  49. quorumPeer.setSyncEnabled(config.getSyncEnabled());
  50. quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
  51. if (config.sslQuorumReloadCertFiles) {
  52. quorumPeer.getX509Util().enableCertFileReloading();
  53. }
  54. quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
  55. quorumPeer.initialize();
  56. // 启动 zk
  57. quorumPeer.start();
  58. quorumPeer.join();
  59. } catch (InterruptedException e) {
  60. // warn, but generally this is ok
  61. LOG.warn("Quorum Peer interrupted", e);
  62. }
  63. }
  64. static public ServerCnxnFactory createFactory() throws IOException {
  65. String serverCnxnFactoryName =
  66. System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
  67. if (serverCnxnFactoryName == null) {
  68. serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
  69. }
  70. try {
  71. ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory)
  72. Class.forName(serverCnxnFactoryName)
  73. .getDeclaredConstructor().newInstance();
  74. LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
  75. return serverCnxnFactory;
  76. } catch (Exception e) {
  77. IOException ioe = new IOException("Couldn't instantiate "
  78. + serverCnxnFactoryName);
  79. ioe.initCause(e);
  80. throw ioe;
  81. }
  82. }
  83. public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
  84. zookeeperAdmin.md 文件中
  85. * *serverCnxnFactory* :
  86. (Java system property: **zookeeper.serverCnxnFactory**)
  87. Specifies ServerCnxnFactory implementation.
  88. This should be set to `NettyServerCnxnFactory` in order to use TLS based server communication.
  89. Default is `NIOServerCnxnFactory`.

2)初始化 NIO 服务端 Socket(并未启动)
ctrl + alt +B 查找 configure 实现类,NIOServerCnxnFactory.java

  1. public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
  2. if (secure) {
  3. throw new UnsupportedOperationException("SSL isn't supported in
  4. NIOServerCnxn");
  5. }
  6. configureSaslLogin();
  7. maxClientCnxns = maxcc;
  8. sessionlessCnxnTimeout = Integer.getInteger(
  9. ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
  10. // We also use the sessionlessCnxnTimeout as expiring interval for
  11. // cnxnExpiryQueue. These don't need to be the same, but the expiring
  12. // interval passed into the ExpiryQueue() constructor below should be
  13. // less than or equal to the timeout.
  14. cnxnExpiryQueue =
  15. new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
  16. expirerThread = new ConnectionExpirerThread();
  17. int numCores = Runtime.getRuntime().availableProcessors();
  18. // 32 cores sweet spot seems to be 4 selector threads
  19. numSelectorThreads = Integer.getInteger(
  20. ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
  21. Math.max((int) Math.sqrt((float) numCores / 2), 1));
  22. if (numSelectorThreads < 1) {
  23. throw new IOException("numSelectorThreads must be at least 1");
  24. }
  25. numWorkerThreads = Integer.getInteger(
  26. ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
  27. workerShutdownTimeoutMS = Long.getLong(
  28. ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
  29. ... ...
  30. for (int i = 0; i < numSelectorThreads; ++i) {
  31. selectorThreads.add(new SelectorThread(i));
  32. }
  33. // 初始化 NIO 服务端 socket,绑定 2181 端口,可以接收客户端请求
  34. this.ss = ServerSocketChannel.open();
  35. ss.socket().setReuseAddress(true);
  36. LOG.info("binding to port " + addr);
  37. // 绑定 2181 端口
  38. ss.socket().bind(addr);
  39. ss.configureBlocking(false);
  40. acceptThread = new AcceptThread(ss, addr, selectorThreads);
  41. }

2.3 ZK 服务端加载数据源码解析

image.png
(1)zk 中的数据模型,是一棵树,DataTree,每个节点,叫做 DataNode
(2)zk 集群中的 DataTree 时刻保持状态同步
(3)Zookeeper 集群中每个 zk 节点中,数据在内存和磁盘中都有一份完整的数据。

  • 内存数据:DataTree
  • 磁盘数据:快照文件 + 编辑日志

image.png

2.3.1 冷启动数据恢复快照数据

1)启动集群

  1. public void runFromConfig(QuorumPeerConfig config)
  2. throws IOException, AdminServerException {
  3. LOG.info("Starting quorum peer");
  4. try {
  5. ServerCnxnFactory cnxnFactory = null;
  6. ServerCnxnFactory secureCnxnFactory = null;
  7. // 通信组件初始化,默认是 NIO 通信
  8. if (config.getClientPortAddress() != null) {
  9. cnxnFactory = ServerCnxnFactory.createFactory();
  10. cnxnFactory.configure(config.getClientPortAddress(),
  11. config.getMaxClientCnxns(), false);
  12. }
  13. if (config.getSecureClientPortAddress() != null) {
  14. secureCnxnFactory = ServerCnxnFactory.createFactory();
  15. secureCnxnFactory.configure(config.getSecureClientPortAddress(),
  16. config.getMaxClientCnxns(), true);
  17. }
  18. // 把解析的参数赋值给该 Zookeeper 节点
  19. quorumPeer = getQuorumPeer();
  20. quorumPeer.setTxnFactory(new FileTxnSnapLog(
  21. config.getDataLogDir(),
  22. config.getDataDir()));
  23. quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
  24. quorumPeer.enableLocalSessionsUpgrading(
  25. config.isLocalSessionsUpgradingEnabled());
  26. //quorumPeer.setQuorumPeers(config.getAllMembers());
  27. quorumPeer.setElectionType(config.getElectionAlg());
  28. quorumPeer.setMyid(config.getServerId());
  29. quorumPeer.setTickTime(config.getTickTime());
  30. quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
  31. quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
  32. quorumPeer.setInitLimit(config.getInitLimit());
  33. quorumPeer.setSyncLimit(config.getSyncLimit());
  34. quorumPeer.setConfigFileName(config.getConfigFilename());
  35. // 管理 zk 数据的存储
  36. quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
  37. quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
  38. if (config.getLastSeenQuorumVerifier() != null) {
  39. quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(),
  40. false);
  41. }
  42. quorumPeer.initConfigInZKDatabase();
  43. // 管理 zk 的通信
  44. quorumPeer.setCnxnFactory(cnxnFactory);
  45. quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
  46. quorumPeer.setSslQuorum(config.isSslQuorum());
  47. quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
  48. quorumPeer.setLearnerType(config.getPeerType());
  49. quorumPeer.setSyncEnabled(config.getSyncEnabled());
  50. quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
  51. if (config.sslQuorumReloadCertFiles) {
  52. quorumPeer.getX509Util().enableCertFileReloading();
  53. }
  54. quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
  55. quorumPeer.initialize();
  56. // 启动 zk
  57. quorumPeer.start();
  58. quorumPeer.join();
  59. } catch (InterruptedException e) {
  60. // warn, but generally this is ok
  61. LOG.warn("Quorum Peer interrupted", e);
  62. }
  63. }

2)冷启动恢复数据

  1. public synchronized void start() {
  2. if (!getView().containsKey(myid)) {
  3. throw new RuntimeException("My id " + myid + " not in the peer list");
  4. }
  5. // 冷启动数据恢复
  6. loadDataBase();
  7. startServerCnxnFactory();
  8. try {
  9. // 启动通信工厂实例对象
  10. adminServer.start();
  11. } catch (AdminServerException e) {
  12. LOG.warn("Problem starting AdminServer", e);
  13. System.out.println(e);
  14. }
  15. // 准备选举环境
  16. startLeaderElection();
  17. // 执行选举
  18. super.start();
  19. }
  20. private void loadDataBase() {
  21. try {
  22. // 加载磁盘数据到内存,恢复 DataTree
  23. // zk 的操作分两种:事务操作和非事务操作
  24. // 事务操作:zk.cteate();都会被分配一个全局唯一的 zxid,zxid 组成:64 位: (前 32 位:epoch 每个 leader 任期的代号;后 32 位:txid 为事务 id)
  25. // 非事务操作:zk.getData()
  26. // 数据恢复过程:
  27. // (1)从快照文件中恢复大部分数据,并得到一个 lastProcessZXid
  28. // (2)再从编辑日志中执行 replay,执行到最后一条日志并更新 lastProcessZXid
  29. // (3)最终得到,datatree 和 lastProcessZXid,表示数据恢复完成
  30. zkDb.loadDataBase();
  31. // load the epochs
  32. long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
  33. long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
  34. try {
  35. currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
  36. } catch (FileNotFoundException e) {
  37. // pick a reasonable epoch number
  38. // this should only happen once when moving to a
  39. // new code version
  40. currentEpoch = epochOfZxid;
  41. LOG.info(CURRENT_EPOCH_FILENAME
  42. + " not found! Creating with a reasonable default of {}. This should
  43. only happen when you are upgrading your installation",
  44. currentEpoch);
  45. writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
  46. }
  47. if (epochOfZxid > currentEpoch) {
  48. throw new IOException("The current epoch, " +
  49. ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
  50. }
  51. try {
  52. acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
  53. } catch (FileNotFoundException e) {
  54. // pick a reasonable epoch number
  55. // this should only happen once when moving to a
  56. // new code version
  57. acceptedEpoch = epochOfZxid;
  58. LOG.info(ACCEPTED_EPOCH_FILENAME
  59. + " not found! Creating with a reasonable default of {}. This should
  60. only happen when you are upgrading your installation",
  61. acceptedEpoch);
  62. writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
  63. }
  64. if (acceptedEpoch < currentEpoch) {
  65. throw new IOException("The accepted epoch, " +
  66. ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " +
  67. ZxidUtils.zxidToString(currentEpoch));
  68. }
  69. } catch (IOException ie) {
  70. LOG.error("Unable to load database on disk", ie);
  71. throw new RuntimeException("Unable to run quorum server ", ie);
  72. }
  73. }
  74. public long loadDataBase() throws IOException {
  75. long zxid = snapLog.restore(dataTree, sessionsWithTimeouts,
  76. commitProposalPlaybackListener);
  77. initialized = true;
  78. return zxid;
  79. }
  80. public long restore(DataTree dt, Map<Long, Integer> sessions,
  81. PlayBackListener listener) throws IOException {
  82. // 恢复快照文件数据到 DataTree
  83. long deserializeResult = snapLog.deserialize(dt, sessions);
  84. FileTxnLog txnLog = new FileTxnLog(dataDir);
  85. RestoreFinalizer finalizer = () -> {
  86. // 恢复编辑日志数据到 DataTree
  87. long highestZxid = fastForwardFromEdits(dt, sessions, listener);
  88. return highestZxid;
  89. };
  90. if (-1L == deserializeResult) {
  91. /* this means that we couldn't find any snapshot, so we need to
  92. * initialize an empty database (reported in ZOOKEEPER-2325) */
  93. if (txnLog.getLastLoggedZxid() != -1) {
  94. // ZOOKEEPER-3056: provides an escape hatch for users upgrading
  95. // from old versions of zookeeper (3.4.x, pre 3.5.3).
  96. if (!trustEmptySnapshot) {
  97. throw new IOException(EMPTY_SNAPSHOT_WARNING +
  98. "Something is broken!");
  99. } else {
  100. LOG.warn("{}This should only be allowed during upgrading.",
  101. EMPTY_SNAPSHOT_WARNING);
  102. return finalizer.run();
  103. }
  104. }
  105. /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
  106. * or use Map on save() */
  107. save(dt, (ConcurrentHashMap<Long, Integer>) sessions);
  108. /* return a zxid of zero, since we the database is empty */
  109. return 0;
  110. }
  111. return finalizer.run();
  112. }

ctrl + alt +B 查找 deserialize 实现类 FileSnap.java

  1. public long deserialize(DataTree dt, Map<Long, Integer> sessions)
  2. throws IOException {
  3. // we run through 100 snapshots (not all of them)
  4. // if we cannot get it running within 100 snapshots
  5. // we should give up
  6. List<File> snapList = findNValidSnapshots(100);
  7. if (snapList.size() == 0) {
  8. return -1L;
  9. }
  10. File snap = null;
  11. boolean foundValid = false;
  12. // 依次遍历每一个快照的数据
  13. for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
  14. snap = snapList.get(i);
  15. LOG.info("Reading snapshot " + snap);
  16. // 反序列化环境准备
  17. try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
  18. CheckedInputStream crcIn = new CheckedInputStream(snapIS, new
  19. Adler32())) {
  20. InputArchive ia = BinaryInputArchive.getArchive(crcIn);
  21. // 反序列化,恢复数据到 DataTree
  22. deserialize(dt, sessions, ia);
  23. long checkSum = crcIn.getChecksum().getValue();
  24. long val = ia.readLong("val");
  25. if (val != checkSum) {
  26. throw new IOException("CRC corruption in snapshot : " + snap);
  27. }
  28. foundValid = true;
  29. break;
  30. } catch (IOException e) {
  31. LOG.warn("problem reading snap file " + snap, e);
  32. }
  33. }
  34. if (!foundValid) {
  35. throw new IOException("Not able to find valid snapshots in " + snapDir);
  36. }
  37. dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(),
  38. SNAPSHOT_FILE_PREFIX);
  39. return dt.lastProcessedZxid;
  40. }
  41. public void deserialize(DataTree dt, Map<Long, Integer> sessions,
  42. InputArchive ia) throws IOException {
  43. FileHeader header = new FileHeader();
  44. header.deserialize(ia, "fileheader");
  45. if (header.getMagic() != SNAP_MAGIC) {
  46. throw new IOException("mismatching magic headers "
  47. + header.getMagic() +
  48. " != " + FileSnap.SNAP_MAGIC);
  49. }
  50. // 恢复快照数据到 DataTree
  51. SerializeUtils.deserializeSnapshot(dt, ia, sessions);
  52. }
  53. public static void deserializeSnapshot(DataTree dt, InputArchive ia,
  54. Map<Long, Integer> sessions) throws IOException {
  55. int count = ia.readInt("count");
  56. while (count > 0) {
  57. long id = ia.readLong("id");
  58. int to = ia.readInt("timeout");
  59. sessions.put(id, to);
  60. if (LOG.isTraceEnabled()) {
  61. ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
  62. "loadData --- session in archive: " + id
  63. + " with timeout: " + to);
  64. }
  65. count--;
  66. }
  67. // 恢复快照数据到 DataTree
  68. dt.deserialize(ia, "tree");
  69. }
  70. public void deserialize(InputArchive ia, String tag) throws IOException {
  71. aclCache.deserialize(ia);
  72. nodes.clear();
  73. pTrie.clear();
  74. String path = ia.readString("path");
  75. // 从快照中恢复每一个 datanode 节点数据到 DataTree
  76. while (!"/".equals(path)) {
  77. // 每次循环创建一个节点对象
  78. DataNode node = new DataNode();
  79. ia.readRecord(node, "node");
  80. // 将 DataNode 恢复到 DataTree
  81. nodes.put(path, node);
  82. synchronized (node) {
  83. aclCache.addUsage(node.acl);
  84. }
  85. int lastSlash = path.lastIndexOf('/');
  86. if (lastSlash == -1) {
  87. root = node;
  88. } else {
  89. // 处理父节点
  90. String parentPath = path.substring(0, lastSlash);
  91. DataNode parent = nodes.get(parentPath);
  92. if (parent == null) {
  93. throw new IOException("Invalid Datatree, unable to find " +
  94. "parent " + parentPath + " of path " + path);
  95. }
  96. // 处理子节点
  97. parent.addChild(path.substring(lastSlash + 1));
  98. // 处理临时节点和永久节点
  99. long eowner = node.stat.getEphemeralOwner();
  100. EphemeralType ephemeralType = EphemeralType.get(eowner);
  101. if (ephemeralType == EphemeralType.CONTAINER) {
  102. containers.add(path);
  103. } else if (ephemeralType == EphemeralType.TTL) {
  104. ttls.add(path);
  105. } else if (eowner != 0) {
  106. HashSet<String> list = ephemerals.get(eowner);
  107. if (list == null) {
  108. list = new HashSet<String>();
  109. ephemerals.put(eowner, list);
  110. }
  111. list.add(path);
  112. }
  113. }
  114. path = ia.readString("path");
  115. }
  116. nodes.put("/", root);
  117. // we are done with deserializing the
  118. // the datatree
  119. // update the quotas - create path trie
  120. // and also update the stat nodes
  121. setupQuota();
  122. aclCache.purgeUnused();
  123. }

2.3.2 冷启动数据恢复编辑日志

回到 FileTxnSnapLog.java 类中的 restore 方法

  1. public long restore(DataTree dt, Map<Long, Integer> sessions,
  2. PlayBackListener listener) throws IOException {
  3. // 恢复快照文件数据到 DataTree
  4. long deserializeResult = snapLog.deserialize(dt, sessions);
  5. FileTxnLog txnLog = new FileTxnLog(dataDir);
  6. RestoreFinalizer finalizer = () -> {
  7. // 恢复编辑日志数据到 DataTree
  8. long highestZxid = fastForwardFromEdits(dt, sessions, listener);
  9. return highestZxid;
  10. };
  11. ... ...
  12. return finalizer.run();
  13. }
  14. public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
  15. PlayBackListener listener) throws IOException {
  16. // 在此之前,已经从快照文件中恢复了大部分数据,接下来只需从快照的 zxid + 1 位置开始恢复
  17. TxnIterator itr = txnLog.read(dt.lastProcessedZxid + 1);
  18. // 快照中最大的 zxid,在执行编辑日志时,这个值会不断更新,直到所有操作执行完
  19. long highestZxid = dt.lastProcessedZxid;
  20. TxnHeader hdr;
  21. try {
  22. // 从 lastProcessedZxid 事务编号器开始,不断的从编辑日志中恢复剩下的还没有恢复的数据
  23. while (true) {
  24. // iterator points to
  25. // the first valid txn when initialized
  26. // 获取事务头信息(有 zxid)
  27. hdr = itr.getHeader();
  28. if (hdr == null) {
  29. //empty logs
  30. return dt.lastProcessedZxid;
  31. }
  32. if (hdr.getZxid() < highestZxid && highestZxid != 0) {
  33. LOG.error("{}(highestZxid) > {}(next log) for type {}",
  34. highestZxid, hdr.getZxid(), hdr.getType());
  35. } else {
  36. highestZxid = hdr.getZxid();
  37. }
  38. try {
  39. // 根据编辑日志恢复数据到 DataTree,每执行一次,对应的事务 id,
  40. highestZxid + 1
  41. processTransaction(hdr, dt, sessions, itr.getTxn());
  42. } catch (KeeperException.NoNodeException e) {
  43. throw new IOException("Failed to process transaction type: " +
  44. hdr.getType() + " error: " + e.getMessage(), e);
  45. }
  46. listener.onTxnLoaded(hdr, itr.getTxn());
  47. if (!itr.next())
  48. break;
  49. }
  50. } finally {
  51. if (itr != null) {
  52. itr.close();
  53. }
  54. }
  55. return highestZxid;
  56. }
  57. public void processTransaction(TxnHeader hdr, DataTree dt,
  58. Map<Long, Integer> sessions, Record txn)
  59. throws KeeperException.NoNodeException {
  60. ProcessTxnResult rc;
  61. switch (hdr.getType()) {
  62. case OpCode.createSession:
  63. sessions.put(hdr.getClientId(),
  64. ((CreateSessionTxn) txn).getTimeOut());
  65. if (LOG.isTraceEnabled()) {
  66. ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
  67. "playLog --- create session in log: 0x"
  68. + Long.toHexString(hdr.getClientId())
  69. + " with timeout: "
  70. + ((CreateSessionTxn) txn).getTimeOut());
  71. }
  72. // give dataTree a chance to sync its lastProcessedZxid
  73. rc = dt.processTxn(hdr, txn);
  74. break;
  75. case OpCode.closeSession:
  76. sessions.remove(hdr.getClientId());
  77. if (LOG.isTraceEnabled()) {
  78. ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
  79. "playLog --- close session in log: 0x"
  80. + Long.toHexString(hdr.getClientId()));
  81. }
  82. rc = dt.processTxn(hdr, txn);
  83. break;
  84. default:
  85. // 创建节点、删除节点和其他的各种事务操作等
  86. rc = dt.processTxn(hdr, txn);
  87. }
  88. /**
  89. * Snapshots are lazily created. So when a snapshot is in progress,
  90. * there is a chance for later transactions to make into the
  91. * snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS
  92. * errors could occur. It should be safe to ignore these.
  93. */
  94. if (rc.err != Code.OK.intValue()) {
  95. LOG.debug(
  96. "Ignoring processTxn failure hdr: {}, error: {}, path: {}",
  97. hdr.getType(), rc.err, rc.path);
  98. }
  99. }
  100. public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
  101. ProcessTxnResult rc = new ProcessTxnResult();
  102. try {
  103. rc.clientId = header.getClientId();
  104. rc.cxid = header.getCxid();
  105. rc.zxid = header.getZxid();
  106. rc.type = header.getType();
  107. rc.err = 0;
  108. rc.multiResult = null;
  109. switch (header.getType()) {
  110. case OpCode.create:
  111. CreateTxn createTxn = (CreateTxn) txn;
  112. rc.path = createTxn.getPath();
  113. createNode(
  114. createTxn.getPath(),
  115. createTxn.getData(),
  116. createTxn.getAcl(),
  117. createTxn.getEphemeral() ? header.getClientId() : 0,
  118. createTxn.getParentCVersion(),
  119. header.getZxid(), header.getTime(), null);
  120. break;
  121. case OpCode.create2:
  122. CreateTxn create2Txn = (CreateTxn) txn;
  123. rc.path = create2Txn.getPath();
  124. Stat stat = new Stat();
  125. createNode(
  126. create2Txn.getPath(),
  127. create2Txn.getData(),
  128. create2Txn.getAcl(),
  129. create2Txn.getEphemeral() ? header.getClientId() : 0,
  130. create2Txn.getParentCVersion(),
  131. header.getZxid(), header.getTime(), stat);
  132. rc.stat = stat;
  133. break;
  134. case OpCode.createTTL:
  135. CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
  136. rc.path = createTtlTxn.getPath();
  137. stat = new Stat();
  138. createNode(
  139. createTtlTxn.getPath(),
  140. createTtlTxn.getData(),
  141. createTtlTxn.getAcl(),
  142. EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
  143. createTtlTxn.getParentCVersion(),
  144. header.getZxid(), header.getTime(), stat);
  145. rc.stat = stat;
  146. break;
  147. case OpCode.createContainer:
  148. CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
  149. rc.path = createContainerTxn.getPath();
  150. stat = new Stat();
  151. createNode(
  152. createContainerTxn.getPath(),
  153. createContainerTxn.getData(),
  154. createContainerTxn.getAcl(),
  155. EphemeralType.CONTAINER_EPHEMERAL_OWNER,
  156. createContainerTxn.getParentCVersion(),
  157. header.getZxid(), header.getTime(), stat);
  158. rc.stat = stat;
  159. break;
  160. case OpCode.delete:
  161. case OpCode.deleteContainer:
  162. DeleteTxn deleteTxn = (DeleteTxn) txn;
  163. rc.path = deleteTxn.getPath();
  164. deleteNode(deleteTxn.getPath(), header.getZxid());
  165. break;
  166. case OpCode.reconfig:
  167. case OpCode.setData:
  168. SetDataTxn setDataTxn = (SetDataTxn) txn;
  169. rc.path = setDataTxn.getPath();
  170. rc.stat = setData(setDataTxn.getPath(), setDataTxn
  171. .getData(), setDataTxn.getVersion(), header
  172. .getZxid(), header.getTime());
  173. break;
  174. case OpCode.setACL:
  175. SetACLTxn setACLTxn = (SetACLTxn) txn;
  176. rc.path = setACLTxn.getPath();
  177. rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),
  178. setACLTxn.getVersion());
  179. break;
  180. case OpCode.closeSession:
  181. killSession(header.getClientId(), header.getZxid());
  182. break;
  183. case OpCode.error:
  184. ErrorTxn errTxn = (ErrorTxn) txn;
  185. rc.err = errTxn.getErr();
  186. break;
  187. case OpCode.check:
  188. CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
  189. rc.path = checkTxn.getPath();
  190. break;
  191. case OpCode.multi:
  192. MultiTxn multiTxn = (MultiTxn) txn;
  193. List<Txn> txns = multiTxn.getTxns();
  194. rc.multiResult = new ArrayList<ProcessTxnResult>();
  195. boolean failed = false;
  196. for (Txn subtxn : txns) {
  197. if (subtxn.getType() == OpCode.error) {
  198. failed = true;
  199. break;
  200. }
  201. }
  202. ... ...
  203. }
  204. } catch (KeeperException e) {
  205. ... ...
  206. } catch (IOException e) {
  207. ... ...
  208. }
  209. ... ...
  210. return rc;
  211. }

2.4 ZK 选举源码解析

image.png
image.png
image.png

2.4.1 选举准备

image.png

  1. public synchronized void start() {
  2. if (!getView().containsKey(myid)) {
  3. throw new RuntimeException("My id " + myid + " not in the peer list");
  4. }
  5. loadDataBase();
  6. startServerCnxnFactory();
  7. try {
  8. adminServer.start();
  9. } catch (AdminServerException e) {
  10. LOG.warn("Problem starting AdminServer", e);
  11. System.out.println(e);
  12. }
  13. // 选举准备
  14. startLeaderElection();
  15. super.start();
  16. }
  17. synchronized public void startLeaderElection() {
  18. try {
  19. if (getPeerState() == ServerState.LOOKING) {
  20. // 创建选票
  21. // (1)选票组件:epoch(leader 的任期代号)、zxid(某个 leader 当选 期间执行的事务编号)、myid(serverid)
  22. // (2)开始选票时,都是先投自己
  23. currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
  24. }
  25. } catch (IOException e) {
  26. RuntimeException re = new RuntimeException(e.getMessage());
  27. re.setStackTrace(e.getStackTrace());
  28. throw re;
  29. }
  30. // if (!getView().containsKey(myid)) {
  31. // throw new RuntimeException("My id " + myid + " not in the peer list");
  32. //}
  33. if (electionType == 0) {
  34. try {
  35. udpSocket = new DatagramSocket(getQuorumAddress().getPort());
  36. responder = new ResponderThread();
  37. responder.start();
  38. } catch (SocketException e) {
  39. throw new RuntimeException(e);
  40. }
  41. }
  42. // 创建选举算法实例
  43. this.electionAlg = createElectionAlgorithm(electionType);
  44. }
  45. protected Election createElectionAlgorithm(int electionAlgorithm) {
  46. Election le = null;
  47. //TODO: use a factory rather than a switch
  48. switch (electionAlgorithm) {
  49. case 0:
  50. le = new LeaderElection(this);
  51. break;
  52. case 1:
  53. le = new AuthFastLeaderElection(this);
  54. break;
  55. case 2:
  56. le = new AuthFastLeaderElection(this, true);
  57. break;
  58. case 3:
  59. // 1 创建 QuorumCnxnManager,负责选举过程中的所有网络通信
  60. QuorumCnxManager qcm = createCnxnManager();
  61. QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
  62. if (oldQcm != null) {
  63. LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election ?) ");
  64. oldQcm.halt();
  65. }
  66. QuorumCnxManager.Listener listener = qcm.listener;
  67. if (listener != null) {
  68. // 2 启动监听线程
  69. listener.start();
  70. // 3 准备开始选举
  71. FastLeaderElection fle = new FastLeaderElection(this, qcm);
  72. fle.start();
  73. le = fle;
  74. } else {
  75. LOG.error("Null listener when initializing cnx manager");
  76. }
  77. break;
  78. default:
  79. assert false;
  80. }
  81. return le;
  82. }

1)网络通信组件初始化

  1. public QuorumCnxManager createCnxnManager() {
  2. return new QuorumCnxManager(this,
  3. this.getId(),
  4. this.getView(),
  5. this.authServer,
  6. this.authLearner,
  7. this.tickTime * this.syncLimit,
  8. this.getQuorumListenOnAllIPs(),
  9. this.quorumCnxnThreadsSize,
  10. this.isQuorumSaslAuthEnabled());
  11. }
  12. public QuorumCnxManager(QuorumPeer self,
  13. final long mySid,
  14. Map<Long, QuorumPeer.QuorumServer> view,
  15. QuorumAuthServer authServer,
  16. QuorumAuthLearner authLearner,
  17. int socketTimeout,
  18. boolean listenOnAllIPs,
  19. int quorumCnxnThreadsSize,
  20. boolean quorumSaslAuthEnabled) {
  21. // 创建各种队列
  22. this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
  23. this.queueSendMap
  24. = new ConcurrentHashMap<Long,
  25. ArrayBlockingQueue<ByteBuffer>>();
  26. this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
  27. this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
  28. String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
  29. if (cnxToValue != null) {
  30. this.cnxTO = Integer.parseInt(cnxToValue);
  31. }
  32. this.self = self;
  33. this.mySid = mySid;
  34. this.socketTimeout = socketTimeout;
  35. this.view = view;
  36. this.listenOnAllIPs = listenOnAllIPs;
  37. initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
  38. quorumSaslAuthEnabled);
  39. // Starts listener thread that waits for connection requests
  40. listener = new Listener();
  41. listener.setName("QuorumPeerListener");
  42. }

2)监听线程初始化
点击 QuorumCnxManager.Listener,找到对应的 run 方法

  1. public void run() {
  2. int numRetries = 0;
  3. InetSocketAddress addr;
  4. Socket client = null;
  5. Exception exitException = null;
  6. while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
  7. try {
  8. if (self.shouldUsePortUnification()) {
  9. LOG.info("Creating TLS-enabled quorum server socket");
  10. ss = new UnifiedServerSocket(self.getX509Util(), true);
  11. } else if (self.isSslQuorum()) {
  12. LOG.info("Creating TLS-only quorum server socket");
  13. ss = new UnifiedServerSocket(self.getX509Util(), false);
  14. } else {
  15. ss = new ServerSocket();
  16. }
  17. ss.setReuseAddress(true);
  18. if (self.getQuorumListenOnAllIPs()) {
  19. int port = self.getElectionAddress().getPort();
  20. addr = new InetSocketAddress(port);
  21. } else {
  22. // Resolve hostname for this server in case the
  23. // underlying ip address has changed.
  24. self.recreateSocketAddresses(self.getId());
  25. addr = self.getElectionAddress();
  26. }
  27. LOG.info("My election bind port: " + addr.toString());
  28. setName(addr.toString());
  29. // 绑定服务器地址
  30. ss.bind(addr);
  31. // 死循环
  32. while (!shutdown) {
  33. try {
  34. // 阻塞,等待处理请求
  35. client = ss.accept();
  36. setSockOpts(client);
  37. LOG.info("Received connection request " +
  38. formatInetAddr((InetSocketAddress) client.getRemoteSocketAddress()));
  39. // Receive and handle the connection request
  40. // asynchronously if the quorum sasl authentication is
  41. // enabled. This is required because sasl server
  42. // authentication process may take few seconds to finish,
  43. // this may delay next peer connection requests.
  44. if (quorumSaslAuthEnabled) {
  45. receiveConnectionAsync(client);
  46. } else {
  47. receiveConnection(client);
  48. }
  49. numRetries = 0;
  50. } catch (SocketTimeoutException e) {
  51. LOG.warn("The socket is listening for the election accepted "
  52. + "and it timed out unexpectedly, but will retry."
  53. + "see ZOOKEEPER-2836");
  54. }
  55. }
  56. } catch (IOException e) {
  57. ... ...
  58. closeSocket(client);
  59. }
  60. }
  61. ... ...
  62. }

3)选举准备
点击 FastLeaderElection

  1. public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
  2. this.stop = false;
  3. this.manager = manager;
  4. starter(self, manager);
  5. }
  6. private void starter(QuorumPeer self, QuorumCnxManager manager) {
  7. this.self = self;
  8. proposedLeader = -1;
  9. proposedZxid = -1;
  10. // 初始化队列和信息
  11. sendqueue = new LinkedBlockingQueue<ToSend>();
  12. recvqueue = new LinkedBlockingQueue<Notification>();
  13. this.messenger = new Messenger(manager);
  14. }

2.4.2 选举执行

image.png

  1. public synchronized void start() {
  2. if (!getView().containsKey(myid)) {
  3. throw new RuntimeException("My id " + myid + " not in the peer list");
  4. }
  5. // 冷启动数据恢复
  6. loadDataBase();
  7. startServerCnxnFactory();
  8. try {
  9. // 启动通信工厂实例对象
  10. adminServer.start();
  11. } catch (AdminServerException e) {
  12. LOG.warn("Problem starting AdminServer", e);
  13. System.out.println(e);
  14. }
  15. // 准备选举环境
  16. startLeaderElection();
  17. // 执行选举
  18. super.start();
  19. }

1)执行 super.start();就相当于执行 QuorumPeer.java 类中的 run()方法
当 Zookeeper 启动后,首先都是 Looking 状态,通过选举,让其中一台服务器成为 Leader,其他的服务器成为 Follower。

  1. public void run() {
  2. updateThreadName();
  3. LOG.debug("Starting quorum peer");
  4. try {
  5. jmxQuorumBean = new QuorumBean(this);
  6. MBeanRegistry.getInstance().register(jmxQuorumBean, null);
  7. for (QuorumServer s : getView().values()) {
  8. ZKMBeanInfo p;
  9. if (getId() == s.id) {
  10. p = jmxLocalPeerBean = new LocalPeerBean(this);
  11. try {
  12. MBeanRegistry.getInstance().register(p, jmxQuorumBean);
  13. } catch (Exception e) {
  14. LOG.warn("Failed to register with JMX", e);
  15. jmxLocalPeerBean = null;
  16. }
  17. } else {
  18. RemotePeerBean rBean = new RemotePeerBean(this, s);
  19. try {
  20. MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
  21. jmxRemotePeerBean.put(s.id, rBean);
  22. } catch (Exception e) {
  23. LOG.warn("Failed to register with JMX", e);
  24. }
  25. }
  26. }
  27. } catch (Exception e) {
  28. LOG.warn("Failed to register with JMX", e);
  29. jmxQuorumBean = null;
  30. }
  31. try {
  32. /*
  33. * Main loop
  34. */
  35. while (running) {
  36. switch (getPeerState()) {
  37. case LOOKING:
  38. LOG.info("LOOKING");
  39. if (Boolean.getBoolean("readonlymode.enabled")) {
  40. LOG.info("Attempting to start ReadOnlyZooKeeperServer");
  41. // Create read-only server but don't start it immediately
  42. final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
  43. // Instead of starting roZk immediately, wait some grace
  44. // period before we decide we're partitioned.
  45. //
  46. // Thread is used here because otherwise it would require
  47. // changes in each of election strategy classes which is
  48. // unnecessary code coupling.
  49. Thread roZkMgr = new Thread() {
  50. public void run() {
  51. try {
  52. // lower-bound grace period to 2 secs
  53. sleep(Math.max(2000, tickTime));
  54. if (ServerState.LOOKING.equals(getPeerState())) {
  55. roZk.startup();
  56. }
  57. } catch (InterruptedException e) {
  58. LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
  59. } catch (Exception e) {
  60. LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
  61. }
  62. }
  63. };
  64. try {
  65. roZkMgr.start();
  66. reconfigFlagClear();
  67. if (shuttingDownLE) {
  68. shuttingDownLE = false;
  69. startLeaderElection();
  70. }
  71. // 进行选举,选举结束,返回最终成为 Leader 胜选的那张
  72. 选票
  73. setCurrentVote(makeLEStrategy().lookForLeader());
  74. } catch (Exception e) {
  75. LOG.warn("Unexpected exception", e);
  76. setPeerState(ServerState.LOOKING);
  77. } finally {
  78. // If the thread is in the the grace period, interrupt
  79. // to come out of waiting.
  80. roZkMgr.interrupt();
  81. roZk.shutdown();
  82. }
  83. } else {
  84. try {
  85. reconfigFlagClear();
  86. if (shuttingDownLE) {
  87. shuttingDownLE = false;
  88. startLeaderElection();
  89. }
  90. setCurrentVote(makeLEStrategy().lookForLeader());
  91. } catch (Exception e) {
  92. LOG.warn("Unexpected exception", e);
  93. setPeerState(ServerState.LOOKING);
  94. }
  95. }
  96. break;
  97. case OBSERVING:
  98. try {
  99. LOG.info("OBSERVING");
  100. setObserver(makeObserver(logFactory));
  101. observer.observeLeader();
  102. } catch (Exception e) {
  103. LOG.warn("Unexpected exception", e);
  104. } finally {
  105. observer.shutdown();
  106. setObserver(null);
  107. updateServerState();
  108. }
  109. break;
  110. case FOLLOWING:
  111. try {
  112. LOG.info("FOLLOWING");
  113. setFollower(makeFollower(logFactory));
  114. follower.followLeader();
  115. } catch (Exception e) {
  116. LOG.warn("Unexpected exception", e);
  117. } finally {
  118. follower.shutdown();
  119. setFollower(null);
  120. updateServerState();
  121. }
  122. break;
  123. case LEADING:
  124. LOG.info("LEADING");
  125. try {
  126. setLeader(makeLeader(logFactory));
  127. leader.lead();
  128. setLeader(null);
  129. } catch (Exception e) {
  130. LOG.warn("Unexpected exception", e);
  131. } finally {
  132. if (leader != null) {
  133. leader.shutdown("Forcing shutdown");
  134. setLeader(null);
  135. }
  136. updateServerState();
  137. }
  138. break;
  139. }
  140. start_fle = Time.currentElapsedTime();
  141. }
  142. } finally {
  143. ... ...
  144. }
  145. }

2)ctrl+alt+b 点击 lookForLeader()的实现类 FastLeaderElection.java

  1. public Vote lookForLeader() throws InterruptedException {
  2. try {
  3. self.jmxLeaderElectionBean = new LeaderElectionBean();
  4. MBeanRegistry.getInstance().register(
  5. self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
  6. } catch (Exception e) {
  7. LOG.warn("Failed to register with JMX", e);
  8. self.jmxLeaderElectionBean = null;
  9. }
  10. if (self.start_fle == 0) {
  11. self.start_fle = Time.currentElapsedTime();
  12. }
  13. try {
  14. // 正常启动中,所有其他服务器,都会给我发送一个投票
  15. // 保存每一个服务器的最新合法有效的投票
  16. HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
  17. // 存储合法选举之外的投票结果
  18. HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
  19. // 一次选举的最大等待时间,默认值是 0.2s
  20. int notTimeout = finalizeWait;
  21. // 每发起一轮选举,logicalclock++
  22. // 在没有合法的 epoch 数据之前,都使用逻辑时钟代替
  23. // 选举 leader 的规则:依次比较 epoch(任期) zxid(事务 id) serverid (myid)谁大谁当选 leader
  24. synchronized (this) {
  25. // 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟
  26. // logicalclock = epoch
  27. logicalclock.incrementAndGet();
  28. // 更新选票(serverid, zxid, epoch),
  29. updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
  30. }
  31. LOG.info("New election. My id = " + self.getId() +
  32. ", proposed zxid=0x" + Long.toHexString(proposedZxid));
  33. // 广播选票,把自己的选票发给其他服务器
  34. sendNotifications();
  35. /*
  36. * Loop in which we exchange notifications until we find a leader
  37. */
  38. // 一轮一轮的选举直到选举成功
  39. while ((self.getPeerState() == ServerState.LOOKING) &&
  40. (!stop)) {
  41. }
  42. return null;
  43. } finally {
  44. }
  45. }

3)点击 sendNotifications,广播选票,把自己的选票发给其他服务器

  1. private void sendNotifications() {
  2. // 遍历投票参与者,给每台服务器发送选票
  3. for (long sid : self.getCurrentAndNextConfigVoters()) {
  4. QuorumVerifier qv = self.getQuorumVerifier();
  5. // 创建发送选票
  6. ToSend notmsg = new ToSend(ToSend.mType.notification,
  7. proposedLeader,
  8. proposedZxid,
  9. logicalclock.get(),
  10. QuorumPeer.ServerState.LOOKING,
  11. sid,
  12. proposedEpoch, qv.toString().getBytes());
  13. if (LOG.isDebugEnabled()) {
  14. LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
  15. Long.toHexString(proposedZxid) + " (n.zxid), 0x" +
  16. Long.toHexString(logicalclock.get()) +
  17. " (n.round), " + sid + " (recipient), " + self.getId() +
  18. " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
  19. }
  20. // 把发送选票放入发送队列
  21. sendqueue.offer(notmsg);
  22. }
  23. }

4)在 FastLeaderElection.java 类中查找 WorkerSender 线程。

  1. class WorkerSender extends ZooKeeperThread {
  2. volatile boolean stop;
  3. QuorumCnxManager manager;
  4. WorkerSender(QuorumCnxManager manager) {
  5. super("WorkerSender");
  6. this.stop = false;
  7. this.manager = manager;
  8. }
  9. public void run() {
  10. while (!stop) {
  11. try {
  12. // 队列阻塞,时刻准备接收要发送的选票
  13. ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
  14. if (m == null) continue;
  15. // 处理要发送的选票
  16. process(m);
  17. } catch (InterruptedException e) {
  18. break;
  19. }
  20. }
  21. LOG.info("WorkerSender is down");
  22. }
  23. /**
  24. * Called by run() once there is a new message to send.
  25. *
  26. * @param m message to send
  27. */
  28. void process(ToSend m) {
  29. ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
  30. m.leader,
  31. m.zxid,
  32. m.electionEpoch,
  33. m.peerEpoch,
  34. m.configData);
  35. // 发送选票
  36. manager.toSend(m.sid, requestBuffer);
  37. }
  38. }
  39. public void toSend(Long sid, ByteBuffer b) {
  40. /*
  41. * If sending message to myself, then simply enqueue it (loopback).
  42. */
  43. // 判断如果是发给自己的消息,直接进入自己的 RecvQueue
  44. if (this.mySid == sid) {
  45. b.position(0);
  46. addToRecvQueue(new Message(b.duplicate(), sid));
  47. /*
  48. * Otherwise send to the corresponding thread to send.
  49. */
  50. } else {
  51. /*
  52. * Start a new connection if doesn't have one already.
  53. */
  54. // 如果是发给其他服务器,创建对应的发送队列或者获取已经存在的发送队列
  55. // ,并把要发送的消息放入该队列
  56. ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
  57. SEND_CAPACITY);
  58. ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
  59. if (oldq != null) {
  60. addToSendQueue(oldq, b);
  61. } else {
  62. addToSendQueue(bq, b);
  63. }
  64. // 将选票发送出去
  65. connectOne(sid);
  66. }
  67. }

5)如果数据是发送给自己的,添加到自己的接收队列

  1. public void addToRecvQueue(Message msg) {
  2. synchronized (recvQLock) {
  3. if (recvQueue.remainingCapacity() == 0) {
  4. try {
  5. recvQueue.remove();
  6. } catch (NoSuchElementException ne) {
  7. // element could be removed by poll()
  8. LOG.debug("Trying to remove from an empty " +
  9. "recvQueue. Ignoring exception " + ne);
  10. }
  11. }
  12. try {
  13. // 将发送给自己的选票添加到 recvQueue 队列
  14. recvQueue.add(msg);
  15. } catch (IllegalStateException ie) {
  16. // This should never happen
  17. LOG.error("Unable to insert element in the recvQueue " + ie);
  18. }
  19. }
  20. }

6)数据添加到发送队列

  1. private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
  2. ByteBuffer buffer) {
  3. if (queue.remainingCapacity() == 0) {
  4. try {
  5. queue.remove();
  6. } catch (NoSuchElementException ne) {
  7. // element could be removed by poll()
  8. LOG.debug("Trying to remove from an empty " +
  9. "Queue. Ignoring exception " + ne);
  10. }
  11. }
  12. try {
  13. // 将要发送的消息添加到发送队列
  14. queue.add(buffer);
  15. } catch (IllegalStateException ie) {
  16. // This should never happen
  17. LOG.error("Unable to insert an element in the queue " + ie);
  18. }
  19. }

7)与要发送的服务器节点建立通信连接

  1. synchronized void connectOne(long sid) {
  2. if (senderWorkerMap.get(sid) != null) {
  3. LOG.debug("There is a connection already for server " + sid);
  4. return;
  5. }
  6. synchronized (self.QV_LOCK) {
  7. boolean knownId = false;
  8. // Resolve hostname for the remote server before attempting to
  9. // connect in case the underlying ip address has changed.
  10. self.recreateSocketAddresses(sid);
  11. Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
  12. QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
  13. Map<Long, QuorumPeer.QuorumServer> lastProposedView =
  14. lastSeenQV.getAllMembers();
  15. if (lastCommittedView.containsKey(sid)) {
  16. knownId = true;
  17. if (connectOne(sid, lastCommittedView.get(sid).electionAddr))
  18. return;
  19. }
  20. if (lastSeenQV != null && lastProposedView.containsKey(sid)
  21. && (!knownId || (lastProposedView.get(sid).electionAddr !=
  22. lastCommittedView.get(sid).electionAddr))) {
  23. knownId = true;
  24. if (connectOne(sid, lastProposedView.get(sid).electionAddr))
  25. return;
  26. }
  27. if (!knownId) {
  28. LOG.warn("Invalid server id: " + sid);
  29. return;
  30. }
  31. }
  32. }
  33. synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr) {
  34. if (senderWorkerMap.get(sid) != null) {
  35. LOG.debug("There is a connection already for server " + sid);
  36. return true;
  37. }
  38. Socket sock = null;
  39. try {
  40. LOG.debug("Opening channel to server " + sid);
  41. if (self.isSslQuorum()) {
  42. SSLSocket sslSock = self.getX509Util().createSSLSocket();
  43. setSockOpts(sslSock);
  44. sslSock.connect(electionAddr, cnxTO);
  45. sslSock.startHandshake();
  46. sock = sslSock;
  47. LOG.info("SSL handshake complete with {}
  48. -
  49. {}
  50. -
  51. {}",
  52. sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(),
  53. sslSock.getSession().getCipherSuite());
  54. } else {
  55. sock = new Socket();
  56. setSockOpts(sock);
  57. sock.connect(electionAddr, cnxTO);
  58. }
  59. LOG.debug("Connected to server " + sid);
  60. // Sends connection request asynchronously if the quorum
  61. // sasl authentication is enabled. This is required because
  62. // sasl server authentication process may take few seconds to
  63. // finish, this may delay next peer connection requests.
  64. if (quorumSaslAuthEnabled) {
  65. initiateConnectionAsync(sock, sid);
  66. } else {
  67. // 处理连接
  68. initiateConnection(sock, sid);
  69. }
  70. return true;
  71. } catch (UnresolvedAddressException e) {
  72. ... ...
  73. }
  74. }
  75. public void initiateConnection(final Socket sock, final Long sid) {
  76. try {
  77. startConnection(sock, sid);
  78. } catch (IOException e) {
  79. LOG.error("Exception while connecting, id: {}, addr: {}, closing learner
  80. connection",
  81. new Object[]{sid, sock.getRemoteSocketAddress()}, e);
  82. closeSocket(sock);
  83. return;
  84. }
  85. }

8)创建并启动发送器线程和接收器线程

  1. private boolean startConnection(Socket sock, Long sid)
  2. throws IOException {
  3. DataOutputStream dout = null;
  4. DataInputStream din = null;
  5. try {
  6. // Use BufferedOutputStream to reduce the number of IP packets. This is
  7. // important for x-DC scenarios.
  8. // 通过输出流,向服务器发送数据
  9. BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
  10. dout = new DataOutputStream(buf);
  11. // Sending id and challenge
  12. // represents protocol version (in other words - message type)
  13. dout.writeLong(PROTOCOL_VERSION);
  14. dout.writeLong(self.getId());
  15. String addr = formatInetAddr(self.getElectionAddress());
  16. byte[] addr_bytes = addr.getBytes();
  17. dout.writeInt(addr_bytes.length);
  18. dout.write(addr_bytes);
  19. dout.flush();
  20. // 通过输入流读取对方发送过来的选票
  21. din = new DataInputStream(
  22. new BufferedInputStream(sock.getInputStream()));
  23. } catch (IOException e) {
  24. LOG.warn("Ignoring exception reading or writing challenge: ", e);
  25. closeSocket(sock);
  26. return false;
  27. }
  28. // authenticate learner
  29. QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
  30. if (qps != null) {
  31. // TODO - investigate why reconfig makes qps null.
  32. authLearner.authenticate(sock, qps.hostname);
  33. }
  34. // If lost the challenge, then drop the new connection
  35. // 如果对方的 id 比我的大,我是没有资格给对方发送连接请求的,直接关闭自
  36. 己的客户端
  37. if (sid > self.getId()) {
  38. LOG.info("Have smaller server identifier, so dropping the " +
  39. "connection: (" + sid + ", " + self.getId() + ")");
  40. closeSocket(sock);
  41. // Otherwise proceed with the connection
  42. } else {
  43. // 初始化,发送器 和 接收器
  44. SendWorker sw = new SendWorker(sock, sid);
  45. RecvWorker rw = new RecvWorker(sock, din, sid, sw);
  46. sw.setRecv(rw);
  47. SendWorker vsw = senderWorkerMap.get(sid);
  48. if (vsw != null)
  49. vsw.finish();
  50. senderWorkerMap.put(sid, sw);
  51. queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
  52. SEND_CAPACITY));
  53. // 启动发送器线程和接收器线程
  54. sw.start();
  55. rw.start();
  56. return true;
  57. }
  58. return false;
  59. }

9)点击 SendWorker,并查找该类下的 run 方法

  1. public void run() {
  2. threadCnt.incrementAndGet();
  3. try {
  4. /**
  5. * If there is nothing in the queue to send, then we
  6. * send the lastMessage to ensure that the last message
  7. * was received by the peer. The message could be dropped
  8. * in case self or the peer shutdown their connection
  9. * (and exit the thread) prior to reading/processing
  10. * the last message. Duplicate messages are handled correctly
  11. * by the peer.
  12. *
  13. * If the send queue is non-empty, then we have a recent
  14. * message than that stored in lastMessage. To avoid sending
  15. * stale message, we should send the message in the send queue.
  16. */
  17. ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
  18. if (bq == null || isSendQueueEmpty(bq)) {
  19. ByteBuffer b = lastMessageSent.get(sid);
  20. if (b != null) {
  21. LOG.debug("Attempting to send lastMessage to sid=" + sid);
  22. send(b);
  23. }
  24. }
  25. } catch (IOException e) {
  26. LOG.error("Failed to send last message. Shutting down thread.", e);
  27. this.finish();
  28. }
  29. try {
  30. // 只要连接没有断开
  31. while (running && !shutdown && sock != null) {
  32. ByteBuffer b = null;
  33. try {
  34. ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
  35. .get(sid);
  36. if (bq != null) {
  37. // 不断从发送队列 SendQueue 中,获取发送消息,并执行发送
  38. b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
  39. } else {
  40. LOG.error("No queue of incoming messages for " +
  41. "server " + sid);
  42. break;
  43. }
  44. if(b != null){
  45. // 更新对于 sid 这台服务器的最近一条消息
  46. lastMessageSent.put(sid, b);
  47. // 执行发送
  48. send(b);
  49. }
  50. } catch (InterruptedException e) {
  51. LOG.warn("Interrupted while waiting for message on queue",
  52. e);
  53. }
  54. }
  55. } catch (Exception e) {
  56. LOG.warn("Exception when using channel: for id " + sid
  57. + " my id = " + QuorumCnxManager.this.mySid
  58. + " error = " + e);
  59. }
  60. this.finish();
  61. LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
  62. }
  63. synchronized void send(ByteBuffer b) throws IOException {
  64. byte[] msgBytes = new byte[b.capacity()];
  65. try {
  66. b.position(0);
  67. b.get(msgBytes);
  68. } catch (BufferUnderflowException be) {
  69. LOG.error("BufferUnderflowException ", be);
  70. return;
  71. }
  72. // 输出流向外发送
  73. dout.writeInt(b.capacity());
  74. dout.write(b.array());
  75. dout.flush();
  76. }

10)点击 RecvWorker,并查找该类下的 run 方法

  1. public void run() {
  2. threadCnt.incrementAndGet();
  3. try {
  4. // 只要连接没有断开
  5. while (running && !shutdown && sock != null) {
  6. /**
  7. * Reads the first int to determine the length of the
  8. * message
  9. */
  10. int length = din.readInt();
  11. if (length <= 0 || length > PACKETMAXSIZE) {
  12. throw new IOException(
  13. "Received packet with invalid packet: "
  14. + length);
  15. }
  16. /**
  17. * Allocates a new ByteBuffer to receive the message
  18. */
  19. byte[] msgArray = new byte[length];
  20. // 输入流接收消息
  21. din.readFully(msgArray, 0, length);
  22. ByteBuffer message = ByteBuffer.wrap(msgArray);
  23. // 接收对方发送过来的选票
  24. addToRecvQueue(new Message(message.duplicate(), sid));
  25. }
  26. } catch (Exception e) {
  27. LOG.warn("Connection broken for id " + sid + ", my id = "
  28. + QuorumCnxManager.this.mySid + ", error = ", e);
  29. } finally {
  30. LOG.warn("Interrupting SendWorker");
  31. sw.finish();
  32. closeSocket(sock);
  33. }
  34. }
  35. public void addToRecvQueue(Message msg) {
  36. synchronized (recvQLock) {
  37. if (recvQueue.remainingCapacity() == 0) {
  38. try {
  39. recvQueue.remove();
  40. } catch (NoSuchElementException ne) {
  41. // element could be removed by poll()
  42. LOG.debug("Trying to remove from an empty " +
  43. "recvQueue. Ignoring exception " + ne);
  44. }
  45. }
  46. try {
  47. // 将接收到的消息,放入接收消息队列
  48. recvQueue.add(msg);
  49. } catch (IllegalStateException ie) {
  50. // This should never happen
  51. LOG.error("Unable to insert element in the recvQueue " + ie);
  52. }
  53. }
  54. }

11)在 FastLeaderElection.java 类中查找 WorkerReceiver 线程。

  1. class WorkerReceiver extends ZooKeeperThread {
  2. volatile boolean stop;
  3. QuorumCnxManager manager;
  4. WorkerReceiver(QuorumCnxManager manager) {
  5. super("WorkerReceiver");
  6. this.stop = false;
  7. this.manager = manager;
  8. }
  9. public void run() {
  10. Message response;
  11. while (!stop) {
  12. // Sleeps on receive
  13. try {
  14. // 从 RecvQueue 中取出选举投票消息(其他服务器发送过来的)
  15. response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
  16. } catch (InterruptedException e) {
  17. LOG.warn("Interrupted Exception while waiting for new message" +
  18. e.toString());
  19. }
  20. }
  21. LOG.info("WorkerReceiver is down");
  22. }
  23. }

2.5 Follower 和 Leader 状态同步源码

当选举结束后,每个节点都需要根据自己的角色更新自己的状态。选举出的 Leader 更新自己状态为 Leader,其他节点更新自己状态为 Follower。
Leader 更新状态入口:leader.lead()
Follower 更新状态入口:follower.followerLeader()
注意:
(1)follower 必须要让 leader 知道自己的状态:epoch、zxid、sid
必须要找出谁是 leader;
发起请求连接 leader;
发送自己的信息给 leader;
leader 接收到信息,必须要返回对应的信息给 follower。
(2)当leader得知follower的状态了,就确定需要做何种方式的数据同步DIFF、TRUNC、SNAP
(3)执行数据同步
(4)当 leader 接收到超过半数 follower 的 ack 之后,进入正常工作状态,集群启动完成了
最终总结同步的方式:
(1)DIFF 咱两一样,不需要做什么
(2)TRUNC follower 的 zxid 比 leader 的 zxid 大,所以 Follower 要回滚
(3)COMMIT leader 的 zxid 比 follower 的 zxid 大,发送 Proposal 给 foloower 提交执行
(4)如果 follower 并没有任何数据,直接使用 SNAP 的方式来执行数据同步(直接把数据全部序列到 follower)
image.png
image.png

2.5.1 Leader.lead()等待接收 follower 的状态同步申请

1)在 Leader.java 种查找 lead()方法

  1. void lead() throws IOException, InterruptedException {
  2. self.end_fle = Time.currentElapsedTime();
  3. long electionTimeTaken = self.end_fle - self.start_fle;
  4. self.setElectionTimeTaken(electionTimeTaken);
  5. LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
  6. QuorumPeer.FLE_TIME_UNIT);
  7. self.start_fle = 0;
  8. self.end_fle = 0;
  9. zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
  10. try {
  11. self.tick.set(0);
  12. // 恢复数据到内存,启动时,其实已经加载过了
  13. zk.loadData();
  14. leaderStateSummary = new StateSummary(self.getCurrentEpoch(),
  15. zk.getLastProcessedZxid());
  16. // Start thread that waits for connection requests from
  17. // new followers.
  18. // 等待其他 follower 节点向 leader 节点发送同步状态
  19. cnxAcceptor = new LearnerCnxAcceptor();
  20. cnxAcceptor.start();
  21. long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
  22. } finally {
  23. zk.unregisterJMX(this);
  24. }
  25. }
  1. class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
  2. private volatile boolean stop = false;
  3. public LearnerCnxAcceptor() {
  4. super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk
  5. .getZooKeeperServerListener());
  6. }
  7. @Override
  8. public void run() {
  9. try {
  10. while (!stop) {
  11. Socket s = null;
  12. boolean error = false;
  13. try {
  14. // 等待接收 follower 的状态同步申请
  15. s = ss.accept();
  16. // start with the initLimit, once the ack is processed
  17. // in LearnerHandler switch to the syncLimit
  18. s.setSoTimeout(self.tickTime * self.initLimit);
  19. s.setTcpNoDelay(nodelay);
  20. BufferedInputStream is = new BufferedInputStream(
  21. s.getInputStream());
  22. // 一旦接收到 follower 的请求,就创建 LearnerHandler 对象,
  23. 处理请求
  24. LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
  25. // 启动线程
  26. fh.start();
  27. } catch (SocketException e) {
  28. ... ...
  29. }
  30. ... ...
  31. }
  32. } catch (Exception e) {
  33. LOG.warn("Exception while accepting follower", e.getMessage());
  34. handleException(this.getName(), e);
  35. }
  36. }
  37. public void halt() {
  38. stop = true;
  39. }
  40. }

其中 ss 的初始化是在创建 Leader 对象时,创建的 socket

  1. private final ServerSocket ss;
  2. Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
  3. this.self = self;
  4. this.proposalStats = new BufferStats();
  5. try {
  6. if (self.shouldUsePortUnification() || self.isSslQuorum()) {
  7. boolean allowInsecureConnection = self.shouldUsePortUnification();
  8. if (self.getQuorumListenOnAllIPs()) {
  9. ss = new UnifiedServerSocket(self.getX509Util(),
  10. allowInsecureConnection, self.getQuorumAddress().getPort());
  11. } else {
  12. ss = new UnifiedServerSocket(self.getX509Util(),
  13. allowInsecureConnection);
  14. }
  15. } else {
  16. if (self.getQuorumListenOnAllIPs()) {
  17. ss = new ServerSocket(self.getQuorumAddress().getPort());
  18. } else {
  19. ss = new ServerSocket();
  20. }
  21. }
  22. ss.setReuseAddress(true);
  23. if (!self.getQuorumListenOnAllIPs()) {
  24. ss.bind(self.getQuorumAddress());
  25. }
  26. } catch (BindException e) {
  27. ... ...
  28. }
  29. this.zk = zk;
  30. this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
  31. maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
  32. }

2.5.2 Follower.lead()查找并连接 Leader

1)在 Follower.java 种查找 followLeader()方法

  1. void followLeader() throws InterruptedException {
  2. self.end_fle = Time.currentElapsedTime();
  3. long electionTimeTaken = self.end_fle - self.start_fle;
  4. self.setElectionTimeTaken(electionTimeTaken);
  5. LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
  6. QuorumPeer.FLE_TIME_UNIT);
  7. self.start_fle = 0;
  8. self.end_fle = 0;
  9. fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
  10. try {
  11. // 查找 leader
  12. QuorumServer leaderServer = findLeader();
  13. try {
  14. // 连接 leader
  15. connectToLeader(leaderServer.addr, leaderServer.hostname);
  16. // 向 leader 注册
  17. long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
  18. if (self.isReconfigStateChange())
  19. throw new Exception("learned about role change");
  20. //check to see if the leader zxid is lower than ours
  21. //this should never happen but is just a safety check
  22. long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
  23. if (newEpoch < self.getAcceptedEpoch()) {
  24. LOG.error("Proposed leader epoch " +
  25. ZxidUtils.zxidToString(newEpochZxid)
  26. + " is less than our accepted epoch " +
  27. ZxidUtils.zxidToString(self.getAcceptedEpoch()));
  28. throw new IOException("Error: Epoch of leader is lower");
  29. }
  30. syncWithLeader(newEpochZxid);
  31. QuorumPacket qp = new QuorumPacket();
  32. while (this.isRunning()) {
  33. readPacket(qp);
  34. processPacket(qp);
  35. }
  36. } catch (Exception e) {
  37. LOG.warn("Exception when following the leader", e);
  38. try {
  39. sock.close();
  40. } catch (IOException e1) {
  41. e1.printStackTrace();
  42. }
  43. // clear pending revalidations
  44. pendingRevalidations.clear();
  45. }
  46. } finally {
  47. zk.unregisterJMX((Learner) this);
  48. }
  49. }
  50. /**
  51. * Returns the address of the node we think is the leader.
  52. */
  53. protected QuorumServer findLeader() {
  54. QuorumServer leaderServer = null;
  55. // Find the leader by id
  56. // 选举投票的时候记录的,最后推荐的 leader 的 sid
  57. Vote current = self.getCurrentVote();
  58. // 如果这个 sid 在启动的所有服务器范围中
  59. for (QuorumServer s : self.getView().values()) {
  60. if (s.id == current.getId()) {
  61. // Ensure we have the leader's correct IP address before
  62. // attempting to connect.
  63. // 尝试连接 leader 的正确 IP 地址
  64. s.recreateSocketAddresses();
  65. leaderServer = s;
  66. break;
  67. }
  68. }
  69. if (leaderServer == null) {
  70. LOG.warn("Couldn't find the leader with id = "
  71. + current.getId());
  72. }
  73. return leaderServer;
  74. }
  75. protected void connectToLeader(InetSocketAddress addr, String hostname)
  76. throws IOException, InterruptedException, X509Exception {
  77. this.sock = createSocket();
  78. int initLimitTime = self.tickTime * self.initLimit;
  79. int remainingInitLimitTime = initLimitTime;
  80. long startNanoTime = nanoTime();
  81. for (int tries = 0; tries < 5; tries++) {
  82. try {
  83. // recalculate the init limit time because retries sleep for 1000 milliseconds
  84. remainingInitLimitTime = initLimitTime - (int) ((nanoTime() - startNanoTime)
  85. / 1000000);
  86. if (remainingInitLimitTime <= 0) {
  87. LOG.error("initLimit exceeded on retries.");
  88. throw new IOException("initLimit exceeded on retries.");
  89. }
  90. sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit,
  91. remainingInitLimitTime));
  92. if (self.isSslQuorum()) {
  93. ((SSLSocket) sock).startHandshake();
  94. }
  95. sock.setTcpNoDelay(nodelay);
  96. break;
  97. } catch (IOException e) {
  98. ... ...
  99. }
  100. Thread.sleep(1000);
  101. }
  102. self.authLearner.authenticate(sock, hostname);
  103. leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
  104. sock.getInputStream()));
  105. bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
  106. leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
  107. }

2.5.3 Leader.lead()创建 LearnerHandler

  1. void lead() throws IOException, InterruptedException {
  2. self.end_fle = Time.currentElapsedTime();
  3. long electionTimeTaken = self.end_fle - self.start_fle;
  4. self.setElectionTimeTaken(electionTimeTaken);
  5. LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
  6. QuorumPeer.FLE_TIME_UNIT);
  7. self.start_fle = 0;
  8. self.end_fle = 0;
  9. zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
  10. try {
  11. self.tick.set(0);
  12. zk.loadData();
  13. leaderStateSummary = new StateSummary(self.getCurrentEpoch(),
  14. zk.getLastProcessedZxid());
  15. // Start thread that waits for connection requests from
  16. // new followers.
  17. cnxAcceptor = new LearnerCnxAcceptor();
  18. cnxAcceptor.start();
  19. ......
  20. } finally {
  21. zk.unregisterJMX(this);
  22. }
  23. }
  24. class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
  25. private volatile boolean stop = false;
  26. ......
  27. @Override
  28. public void run() {
  29. try {
  30. while (!stop) {
  31. Socket s = null;
  32. boolean error = false;
  33. try {
  34. s = ss.accept();
  35. // start with the initLimit, once the ack is processed
  36. // in LearnerHandler switch to the syncLimit
  37. s.setSoTimeout(self.tickTime * self.initLimit);
  38. s.setTcpNoDelay(nodelay);
  39. BufferedInputStream is = new BufferedInputStream(
  40. s.getInputStream());
  41. LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
  42. fh.start();
  43. } catch (SocketException e) {
  44. ... ...
  45. }
  46. }
  47. } catch (Exception e) {
  48. LOG.warn("Exception while accepting follower", e.getMessage());
  49. handleException(this.getName(), e);
  50. }
  51. }
  52. public void halt() {
  53. stop = true;
  54. }
  55. }

由于 public class LearnerHandler extends ZooKeeperThread{},说明 LearnerHandler 是一个线程。所以 fh.start()执行的是 LearnerHandler 中的 run()方法。

  1. public void run() {
  2. try {
  3. leader.addLearnerHandler(this);
  4. // 心跳处理
  5. tickOfNextAckDeadline = leader.self.tick.get()
  6. + leader.self.initLimit + leader.self.syncLimit;
  7. ia = BinaryInputArchive.getArchive(bufferedInput);
  8. bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
  9. oa = BinaryOutputArchive.getArchive(bufferedOutput);
  10. // 从网络中接收消息,并反序列化为 packet
  11. QuorumPacket qp = new QuorumPacket();
  12. ia.readRecord(qp, "packet");
  13. // 选举结束后,observer 和 follower 都应该给 leader 发送一个标志信息:
  14. FOLLOWERINFO 或者 OBSERVERINFO
  15. if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() !=
  16. Leader.OBSERVERINFO) {
  17. LOG.error("First packet " + qp.toString()
  18. + " is not FOLLOWERINFO or OBSERVERINFO!");
  19. return;
  20. }
  21. byte learnerInfoData[] = qp.getData();
  22. if (learnerInfoData != null) {
  23. ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
  24. if (learnerInfoData.length >= 8) {
  25. this.sid = bbsid.getLong();
  26. }
  27. if (learnerInfoData.length >= 12) {
  28. this.version = bbsid.getInt(); // protocolVersion
  29. }
  30. if (learnerInfoData.length >= 20) {
  31. long configVersion = bbsid.getLong();
  32. if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
  33. throw new IOException("Follower is ahead of the leader (has a later
  34. activated configuration) ");
  35. }
  36. }
  37. } else {
  38. this.sid = leader.followerCounter.getAndDecrement();
  39. }
  40. if (leader.self.getView().containsKey(this.sid)) {
  41. LOG.info("Follower sid: " + this.sid + " : info : "
  42. + leader.self.getView().get(this.sid).toString());
  43. } else {
  44. LOG.info("Follower sid: " + this.sid + " not in the current config " +
  45. Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
  46. }
  47. if (qp.getType() == Leader.OBSERVERINFO) {
  48. learnerType = LearnerType.OBSERVER;
  49. }
  50. // 读取 Follower 发送过来的 lastAcceptedEpoch
  51. // 选举过程中,所使用的 epoch,其实还是上一任 leader 的 epoch
  52. long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
  53. long peerLastZxid;
  54. StateSummary ss = null;
  55. // 读取 follower 发送过来的 zxid
  56. long zxid = qp.getZxid();
  57. // Leader 根据从 Follower 获取 sid 和旧的 epoch,构建新的 epoch
  58. long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
  59. long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
  60. if (this.getVersion() < 0x10000) {
  61. // we are going to have to extrapolate the epoch information
  62. long epoch = ZxidUtils.getEpochFromZxid(zxid);
  63. ss = new StateSummary(epoch, zxid);
  64. // fake the message
  65. leader.waitForEpochAck(this.getSid(), ss);
  66. } else {
  67. byte ver[] = new byte[4];
  68. ByteBuffer.wrap(ver).putInt(0x10000);
  69. // Leader 向 Follower 发送信息(包含:zxid 和 newEpoch)
  70. QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO,
  71. newLeaderZxid, ver, null);
  72. oa.writeRecord(newEpochPacket, "packet");
  73. bufferedOutput.flush();
  74. QuorumPacket ackEpochPacket = new QuorumPacket();
  75. ia.readRecord(ackEpochPacket, "packet");
  76. if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
  77. LOG.error(ackEpochPacket.toString()
  78. + " is not ACKEPOCH");
  79. return;
  80. }
  81. ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
  82. ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
  83. leader.waitForEpochAck(this.getSid(), ss);
  84. }
  85. peerLastZxid = ss.getLastZxid();
  86. // Take any necessary action if we need to send TRUNC or DIFF
  87. // startForwarding() will be called in all cases
  88. boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(),
  89. leader);
  90. /* if we are not truncating or sending a diff just send a snapshot */
  91. if (needSnap) {
  92. boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
  93. LearnerSnapshot snapshot =
  94. leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
  95. try {
  96. long zxidToSend =
  97. leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
  98. oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null,
  99. null), "packet");
  100. bufferedOutput.flush();
  101. LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is
  102. 0x {
  103. },"
  104. + "send zxid of db as 0x{}, {} concurrent snapshots, "
  105. + "snapshot was {} from throttle",
  106. Long.toHexString(peerLastZxid),
  107. Long.toHexString(leaderLastZxid),
  108. Long.toHexString(zxidToSend),
  109. snapshot.getConcurrentSnapshotNumber(),
  110. snapshot.isEssential() ? "exempt" : "not exempt");
  111. // Dump data to peer
  112. leader.zk.getZKDatabase().serializeSnapshot(oa);
  113. oa.writeString("BenWasHere", "signature");
  114. bufferedOutput.flush();
  115. } finally {
  116. snapshot.close();
  117. }
  118. }
  119. LOG.debug("Sending NEWLEADER message to " + sid);
  120. // the version of this quorumVerifier will be set by leader.lead() in case
  121. // the leader is just being established. waitForEpochAck makes sure that readyToStart
  122. is true if
  123. // we got here, so the version was set
  124. if (getVersion() < 0x10000) {
  125. QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
  126. newLeaderZxid, null, null);
  127. oa.writeRecord(newLeaderQP, "packet");
  128. } else {
  129. QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
  130. newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
  131. .toString().getBytes(), null);
  132. queuedPackets.add(newLeaderQP);
  133. }
  134. bufferedOutput.flush();
  135. // Start thread that blast packets in the queue to learner
  136. startSendingPackets();
  137. /*
  138. * Have to wait for the first ACK, wait until
  139. * the leader is ready, and only then we can
  140. * start processing messages.
  141. */
  142. qp = new QuorumPacket();
  143. ia.readRecord(qp, "packet");
  144. if (qp.getType() != Leader.ACK) {
  145. LOG.error("Next packet was supposed to be an ACK,"
  146. + " but received packet: {}", packetToString(qp));
  147. return;
  148. }
  149. if (LOG.isDebugEnabled()) {
  150. LOG.debug("Received NEWLEADER-ACK message from " + sid);
  151. }
  152. leader.waitForNewLeaderAck(getSid(), qp.getZxid());
  153. syncLimitCheck.start();
  154. // now that the ack has been processed expect the syncLimit
  155. sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
  156. /*
  157. * Wait until leader starts up
  158. */
  159. synchronized (leader.zk) {
  160. while (!leader.zk.isRunning() && !this.isInterrupted()) {
  161. leader.zk.wait(20);
  162. }
  163. }
  164. // Mutation packets will be queued during the serialize,
  165. // so we need to mark when the peer can actually start
  166. // using the data
  167. LOG.debug("Sending UPTODATE message to " + sid);
  168. queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
  169. while (true) {
  170. qp = new QuorumPacket();
  171. ia.readRecord(qp, "packet");
  172. long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
  173. if (qp.getType() == Leader.PING) {
  174. traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
  175. }
  176. if (LOG.isTraceEnabled()) {
  177. ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
  178. }
  179. tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;
  180. ByteBuffer bb;
  181. long sessionId;
  182. int cxid;
  183. int type;
  184. switch (qp.getType()) {
  185. case Leader.ACK:
  186. if (this.learnerType == LearnerType.OBSERVER) {
  187. if (LOG.isDebugEnabled()) {
  188. LOG.debug("Received ACK from Observer " + this.sid);
  189. }
  190. }
  191. syncLimitCheck.updateAck(qp.getZxid());
  192. leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
  193. break;
  194. case Leader.PING:
  195. // Process the touches
  196. ByteArrayInputStream bis = new ByteArrayInputStream(qp
  197. .getData());
  198. DataInputStream dis = new DataInputStream(bis);
  199. while (dis.available() > 0) {
  200. long sess = dis.readLong();
  201. int to = dis.readInt();
  202. leader.zk.touch(sess, to);
  203. }
  204. break;
  205. case Leader.REVALIDATE:
  206. bis = new ByteArrayInputStream(qp.getData());
  207. dis = new DataInputStream(bis);
  208. long id = dis.readLong();
  209. int to = dis.readInt();
  210. ByteArrayOutputStream bos = new ByteArrayOutputStream();
  211. DataOutputStream dos = new DataOutputStream(bos);
  212. dos.writeLong(id);
  213. boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
  214. if (valid) {
  215. try {
  216. //set the session owner
  217. // as the follower that
  218. // owns the session
  219. leader.zk.setOwner(id, this);
  220. } catch (SessionExpiredException e) {
  221. LOG.error("Somehow session " + Long.toHexString(id) +
  222. " expired right after being renewed! (impossible)", e);
  223. }
  224. }
  225. if (LOG.isTraceEnabled()) {
  226. ZooTrace.logTraceMessage(LOG,
  227. ZooTrace.SESSION_TRACE_MASK,
  228. "Session 0x" + Long.toHexString(id)
  229. + " is valid: " + valid);
  230. }
  231. dos.writeBoolean(valid);
  232. qp.setData(bos.toByteArray());
  233. queuedPackets.add(qp);
  234. break;
  235. case Leader.REQUEST:
  236. bb = ByteBuffer.wrap(qp.getData());
  237. sessionId = bb.getLong();
  238. cxid = bb.getInt();
  239. type = bb.getInt();
  240. bb = bb.slice();
  241. Request si;
  242. if (type == OpCode.sync) {
  243. si = new LearnerSyncRequest(this, sessionId, cxid, type, bb,
  244. qp.getAuthinfo());
  245. } else {
  246. si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
  247. }
  248. si.setOwner(this);
  249. leader.zk.submitLearnerRequest(si);
  250. break;
  251. default:
  252. LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
  253. break;
  254. }
  255. }
  256. } catch (IOException e) {
  257. ... ...
  258. } finally {
  259. ... ...
  260. }
  261. }

2.5.4 Follower.lead()创建 registerWithLeader

  1. void followLeader() throws InterruptedException {
  2. self.end_fle = Time.currentElapsedTime();
  3. long electionTimeTaken = self.end_fle - self.start_fle;
  4. self.setElectionTimeTaken(electionTimeTaken);
  5. LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
  6. QuorumPeer.FLE_TIME_UNIT);
  7. self.start_fle = 0;
  8. self.end_fle = 0;
  9. fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
  10. try {
  11. // 查找 leader
  12. QuorumServer leaderServer = findLeader();
  13. try {
  14. // 连接 leader
  15. connectToLeader(leaderServer.addr, leaderServer.hostname);
  16. // 向 leader 注册
  17. long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
  18. if (self.isReconfigStateChange())
  19. throw new Exception("learned about role change");
  20. //check to see if the leader zxid is lower than ours
  21. //this should never happen but is just a safety check
  22. long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
  23. if (newEpoch < self.getAcceptedEpoch()) {
  24. LOG.error("Proposed leader epoch " +
  25. ZxidUtils.zxidToString(newEpochZxid)
  26. + " is less than our accepted epoch " +
  27. ZxidUtils.zxidToString(self.getAcceptedEpoch()));
  28. throw new IOException("Error: Epoch of leader is lower");
  29. }
  30. syncWithLeader(newEpochZxid);
  31. QuorumPacket qp = new QuorumPacket();
  32. // 循环等待接收消息
  33. while (this.isRunning()) {
  34. // 读取 packet 信息
  35. readPacket(qp);
  36. // 处理 packet 消息
  37. processPacket(qp);
  38. }
  39. } catch (Exception e) {
  40. LOG.warn("Exception when following the leader", e);
  41. try {
  42. sock.close();
  43. } catch (IOException e1) {
  44. e1.printStackTrace();
  45. }
  46. // clear pending revalidations
  47. pendingRevalidations.clear();
  48. }
  49. } finally {
  50. zk.unregisterJMX((Learner) this);
  51. }
  52. }
  53. protected long registerWithLeader(int pktType) throws IOException {
  54. /*
  55. * Send follower info, including last zxid and sid
  56. */
  57. long lastLoggedZxid = self.getLastLoggedZxid();
  58. QuorumPacket qp = new QuorumPacket();
  59. qp.setType(pktType);
  60. qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
  61. /*
  62. * Add sid to payload
  63. */
  64. LearnerInfo li = new LearnerInfo(self.getId(), 0x10000,
  65. self.getQuorumVerifier().getVersion());
  66. ByteArrayOutputStream bsid = new ByteArrayOutputStream();
  67. BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
  68. boa.writeRecord(li, "LearnerInfo");
  69. qp.setData(bsid.toByteArray());
  70. // 发送 FollowerInfo 给 Leader
  71. writePacket(qp, true);
  72. // 读取 Leader 返回的结果:LeaderInfo
  73. readPacket(qp);
  74. final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
  75. // 如果接收到 LeaderInfo
  76. if (qp.getType() == Leader.LEADERINFO) {
  77. // we are connected to a 1.0 server so accept the new epoch and read the next packet
  78. leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
  79. byte epochBytes[] = new byte[4];
  80. final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
  81. // 接收 leader 的 epoch
  82. if (newEpoch > self.getAcceptedEpoch()) {
  83. // 把自己原来的 epoch 保存在 wrappedEpochBytes 里
  84. wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
  85. // 把 leader 发送过来的 epoch 保存起来
  86. self.setAcceptedEpoch(newEpoch);
  87. } else if (newEpoch == self.getAcceptedEpoch()) {
  88. // since we have already acked an epoch equal to the leaders, we cannot ack
  89. // again, but we still need to send our lastZxid to the leader so that we can
  90. // sync with it if it does assume leadership of the epoch.
  91. // the -1 indicates that this reply should not count as an ack for the new epoch
  92. wrappedEpochBytes.putInt(-1);
  93. } else {
  94. throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted
  95. epoch, " + self.getAcceptedEpoch());
  96. }
  97. // 发送 ackepoch 给 leader(包含了自己的:epoch 和 zxid)
  98. QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH,
  99. lastLoggedZxid, epochBytes, null);
  100. writePacket(ackNewEpoch, true);
  101. return ZxidUtils.makeZxid(newEpoch, 0);
  102. } else {
  103. if (newEpoch > self.getAcceptedEpoch()) {
  104. self.setAcceptedEpoch(newEpoch);
  105. }
  106. if (qp.getType() != Leader.NEWLEADER) {
  107. LOG.error("First packet should have been NEWLEADER");
  108. throw new IOException("First packet should have been NEWLEADER");
  109. }
  110. return qp.getZxid();
  111. }
  112. }

2.5.5 Leader.lead()接收 Follwer 状态,根据同步方式发送同步消息

  1. public void run() {
  2. try {
  3. leader.addLearnerHandler(this);
  4. // 心跳处理
  5. tickOfNextAckDeadline = leader.self.tick.get()
  6. + leader.self.initLimit + leader.self.syncLimit;
  7. ia = BinaryInputArchive.getArchive(bufferedInput);
  8. bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
  9. oa = BinaryOutputArchive.getArchive(bufferedOutput);
  10. // 从网络中接收消息,并反序列化为 packet
  11. QuorumPacket qp = new QuorumPacket();
  12. ia.readRecord(qp, "packet");
  13. // 选举结束后,observer 和 follower 都应该给 leader 发送一个标志信息:
  14. FOLLOWERINFO 或者 OBSERVERINFO
  15. if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() !=
  16. Leader.OBSERVERINFO) {
  17. LOG.error("First packet " + qp.toString()
  18. + " is not FOLLOWERINFO or OBSERVERINFO!");
  19. return;
  20. }
  21. byte learnerInfoData[] = qp.getData();
  22. if (learnerInfoData != null) {
  23. ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
  24. if (learnerInfoData.length >= 8) {
  25. this.sid = bbsid.getLong();
  26. }
  27. if (learnerInfoData.length >= 12) {
  28. this.version = bbsid.getInt(); // protocolVersion
  29. }
  30. if (learnerInfoData.length >= 20) {
  31. long configVersion = bbsid.getLong();
  32. if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
  33. throw new IOException("Follower is ahead of the leader (has a later
  34. activated configuration) ");
  35. }
  36. }
  37. } else {
  38. this.sid = leader.followerCounter.getAndDecrement();
  39. }
  40. if (leader.self.getView().containsKey(this.sid)) {
  41. LOG.info("Follower sid: " + this.sid + " : info : "
  42. + leader.self.getView().get(this.sid).toString());
  43. } else {
  44. LOG.info("Follower sid: " + this.sid + " not in the current config " +
  45. Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
  46. }
  47. if (qp.getType() == Leader.OBSERVERINFO) {
  48. learnerType = LearnerType.OBSERVER;
  49. }
  50. // 读取 Follower 发送过来的 lastAcceptedEpoch
  51. // 选举过程中,所使用的 epoch,其实还是上一任 leader 的 epoch
  52. long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
  53. long peerLastZxid;
  54. StateSummary ss = null;
  55. // 读取 follower 发送过来的 zxid
  56. long zxid = qp.getZxid();
  57. // 获取 leader 的最新 epoch
  58. // 新的 leader 会构建一个新的 epoch
  59. long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
  60. long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
  61. if (this.getVersion() < 0x10000) {
  62. // we are going to have to extrapolate the epoch information
  63. long epoch = ZxidUtils.getEpochFromZxid(zxid);
  64. ss = new StateSummary(epoch, zxid);
  65. // fake the message
  66. leader.waitForEpochAck(this.getSid(), ss);
  67. } else {
  68. byte ver[] = new byte[4];
  69. ByteBuffer.wrap(ver).putInt(0x10000);
  70. // Leader 向 Follower 发送信息(包含:zxid 和 newEpoch)
  71. QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO,
  72. newLeaderZxid, ver, null);
  73. oa.writeRecord(newEpochPacket, "packet");
  74. bufferedOutput.flush();
  75. // 接收到 Follower 应答的 ackepoch
  76. QuorumPacket ackEpochPacket = new QuorumPacket();
  77. ia.readRecord(ackEpochPacket, "packet");
  78. if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
  79. LOG.error(ackEpochPacket.toString()
  80. + " is not ACKEPOCH");
  81. return;
  82. }
  83. ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
  84. // 保存了对方 follower 或者 observer 的状态:epoch 和 zxid
  85. ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
  86. leader.waitForEpochAck(this.getSid(), ss);
  87. }
  88. peerLastZxid = ss.getLastZxid();
  89. // Take any necessary action if we need to send TRUNC or DIFF
  90. // startForwarding() will be called in all cases
  91. // 方法判断 Leader 和 Follower 是否需要同步
  92. boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(),
  93. leader);
  94. /* if we are not truncating or sending a diff just send a snapshot */
  95. if (needSnap) {
  96. boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
  97. LearnerSnapshot snapshot =
  98. leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
  99. try {
  100. long zxidToSend =
  101. leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
  102. oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null,
  103. null), "packet");
  104. bufferedOutput.flush();
  105. // Dump data to peer
  106. leader.zk.getZKDatabase().serializeSnapshot(oa);
  107. oa.writeString("BenWasHere", "signature");
  108. bufferedOutput.flush();
  109. } finally {
  110. snapshot.close();
  111. }
  112. }
  113. if (getVersion() < 0x10000) {
  114. QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
  115. newLeaderZxid, null, null);
  116. oa.writeRecord(newLeaderQP, "packet");
  117. } else {
  118. QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
  119. newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
  120. .toString().getBytes(), null);
  121. queuedPackets.add(newLeaderQP);
  122. }
  123. while (true) {
  124. }
  125. } catch (IOException e) {
  126. ... ...
  127. } finally {
  128. ... ...
  129. }
  130. }
  131. public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
  132. /*
  133. * When leader election is completed, the leader will set its
  134. * lastProcessedZxid to be (epoch < 32). There will be no txn associated
  135. * with this zxid.
  136. *
  137. * The learner will set its lastProcessedZxid to the same value if
  138. * it get DIFF or SNAP from the leader. If the same learner come
  139. * back to sync with leader using this zxid, we will never find this
  140. * zxid in our history. In this case, we will ignore TRUNC logic and
  141. * always send DIFF if we have old enough history
  142. */
  143. boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
  144. // Keep track of the latest zxid which already queued
  145. long currentZxid = peerLastZxid;
  146. boolean needSnap = true;
  147. boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
  148. ReentrantReadWriteLock lock = db.getLogLock();
  149. ReadLock rl = lock.readLock();
  150. try {
  151. rl.lock();
  152. long maxCommittedLog = db.getmaxCommittedLog();
  153. long minCommittedLog = db.getminCommittedLog();
  154. long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
  155. LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}"
  156. + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
  157. + " peerLastZxid=0x{}", getSid(),
  158. Long.toHexString(maxCommittedLog),
  159. Long.toHexString(minCommittedLog),
  160. Long.toHexString(lastProcessedZxid),
  161. Long.toHexString(peerLastZxid));
  162. if (db.getCommittedLog().isEmpty()) {
  163. /*
  164. * It is possible that committedLog is empty. In that case
  165. * setting these value to the latest txn in leader db
  166. * will reduce the case that we need to handle
  167. *
  168. * Here is how each case handle by the if block below
  169. * 1. lastProcessZxid == peerZxid -> Handle by (2)
  170. * 2. lastProcessZxid < peerZxid -> Handle by (3)
  171. * 3. lastProcessZxid > peerZxid -> Handle by (5)
  172. */
  173. minCommittedLog = lastProcessedZxid;
  174. maxCommittedLog = lastProcessedZxid;
  175. }
  176. /*
  177. * Here are the cases that we want to handle
  178. *
  179. * 1. Force sending snapshot (for testing purpose)
  180. * 2. Peer and leader is already sync, send empty diff
  181. * 3. Follower has txn that we haven't seen. This may be old leader
  182. * so we need to send TRUNC. However, if peer has newEpochZxid,
  183. * we cannot send TRUNC since the follower has no txnlog
  184. * 4. Follower is within committedLog range or already in-sync.
  185. * We may need to send DIFF or TRUNC depending on follower's zxid
  186. * We always send empty DIFF if follower is already in-sync
  187. * 5. Follower missed the committedLog. We will try to use on-disk
  188. * txnlog + committedLog to sync with follower. If that fail,
  189. * we will send snapshot
  190. */
  191. if (forceSnapSync) {
  192. // Force leader to use snapshot to sync with follower
  193. LOG.warn("Forcing snapshot sync - should not see this in production");
  194. } else if (lastProcessedZxid == peerLastZxid) {
  195. // Follower is already sync with us, send empty diff
  196. LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) +
  197. " for peer sid: " + getSid());
  198. queueOpPacket(Leader.DIFF, peerLastZxid);
  199. needOpPacket = false;
  200. needSnap = false;
  201. } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
  202. // Newer than committedLog, send trunc and done
  203. LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
  204. Long.toHexString(maxCommittedLog) +
  205. " for peer sid:" + getSid());
  206. queueOpPacket(Leader.TRUNC, maxCommittedLog);
  207. currentZxid = maxCommittedLog;
  208. needOpPacket = false;
  209. needSnap = false;
  210. } else if ((maxCommittedLog >= peerLastZxid)
  211. && (minCommittedLog <= peerLastZxid)) {
  212. // Follower is within commitLog range
  213. LOG.info("Using committedLog for peer sid: " + getSid());
  214. Iterator<Proposal> itr = db.getCommittedLog().iterator();
  215. currentZxid = queueCommittedProposals(itr, peerLastZxid,
  216. null, maxCommittedLog);
  217. needSnap = false;
  218. } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
  219. // Use txnlog and committedLog to sync
  220. // Calculate sizeLimit that we allow to retrieve txnlog from disk
  221. long sizeLimit = db.calculateTxnLogSizeLimit();
  222. // This method can return empty iterator if the requested zxid
  223. // is older than on-disk txnlog
  224. Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
  225. peerLastZxid, sizeLimit);
  226. if (txnLogItr.hasNext()) {
  227. LOG.info("Use txnlog and committedLog for peer sid: " + getSid());
  228. currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
  229. minCommittedLog,
  230. maxCommittedLog);
  231. LOG.debug("Queueing committedLog 0x" +
  232. Long.toHexString(currentZxid));
  233. Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
  234. currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
  235. null, maxCommittedLog);
  236. needSnap = false;
  237. }
  238. // closing the resources
  239. if (txnLogItr instanceof TxnLogProposalIterator) {
  240. TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator)
  241. txnLogItr;
  242. txnProposalItr.close();
  243. }
  244. } else {
  245. LOG.warn("Unhandled scenario for peer sid: " + getSid());
  246. }
  247. LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
  248. " for peer sid: " + getSid());
  249. leaderLastZxid = leader.startForwarding(this, currentZxid);
  250. } finally {
  251. rl.unlock();
  252. }
  253. if (needOpPacket && !needSnap) {
  254. // This should never happen, but we should fall back to sending
  255. // snapshot just in case.
  256. LOG.error("Unhandled scenario for peer sid: " + getSid() +
  257. " fall back to use snapshot");
  258. needSnap = true;
  259. }
  260. return needSnap;
  261. }

2.5.6 Follower.lead()应答 Leader 同步结果

  1. protected void processPacket(QuorumPacket qp) throws Exception {
  2. switch (qp.getType()) {
  3. case Leader.PING:
  4. ping(qp);
  5. break;
  6. case Leader.PROPOSAL:
  7. TxnHeader hdr = new TxnHeader();
  8. Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
  9. if (hdr.getZxid() != lastQueued + 1) {
  10. LOG.warn("Got zxid 0x"
  11. + Long.toHexString(hdr.getZxid())
  12. + " expected 0x"
  13. + Long.toHexString(lastQueued + 1));
  14. }
  15. lastQueued = hdr.getZxid();
  16. if (hdr.getType() == OpCode.reconfig) {
  17. SetDataTxn setDataTxn = (SetDataTxn) txn;
  18. QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
  19. self.setLastSeenQuorumVerifier(qv, true);
  20. }
  21. fzk.logRequest(hdr, txn);
  22. break;
  23. case Leader.COMMIT:
  24. fzk.commit(qp.getZxid());
  25. break;
  26. case Leader.UPTODATE:
  27. LOG.error("Received an UPTODATE message after Follower started");
  28. break;
  29. case Leader.REVALIDATE:
  30. revalidate(qp);
  31. break;
  32. case Leader.SYNC:
  33. fzk.sync();
  34. break;
  35. default:
  36. LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
  37. break;
  38. }
  39. }
  40. public void commit(long zxid) {
  41. if (pendingTxns.size() == 0) {
  42. LOG.warn("Committing " + Long.toHexString(zxid)
  43. + " without seeing txn");
  44. return;
  45. }
  46. long firstElementZxid = pendingTxns.element().zxid;
  47. if (firstElementZxid != zxid) {
  48. LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
  49. + " but next pending txn 0x"
  50. + Long.toHexString(firstElementZxid));
  51. System.exit(12);
  52. }
  53. Request request = pendingTxns.remove();
  54. commitProcessor.commit(request);
  55. }

2.5.7 Leader.lead()应答 Follower

由于 public class LearnerHandler extends ZooKeeperThread{},说明 LearnerHandler 是一个线程。所以 fh.start()执行的是 LearnerHandler 中的 run()方法。

  1. public void run() {
  2. LOG.debug("Sending UPTODATE message to " + sid);
  3. queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
  4. while (true) {
  5. }
  6. } catch (IOException e) {
  7. ... ...
  8. } finally {
  9. ... ...
  10. }

2.6 服务端 Leader 启动

image.png
ZooKeeperServer
Ctrl + n 全局查找 Leader,然后 ctrl + f 查找 lead(

  1. void lead() throws IOException, InterruptedException {
  2. ... ...
  3. // 启动 zookeeper 服务
  4. startZkServer();
  5. ... ...
  6. }
  7. final LeaderZooKeeperServer zk;
  8. private synchronized void startZkServer() {
  9. ... ...
  10. // 启动 Zookeeper
  11. zk.startup();
  12. ... ...
  13. }
  1. @Override
  2. public synchronized void startup() {
  3. super.startup();
  4. if (containerManager != null) {
  5. containerManager.start();
  6. }
  7. }
  1. public synchronized void startup() {
  2. if (sessionTracker == null) {
  3. createSessionTracker();
  4. }
  5. startSessionTracker();
  6. setupRequestProcessors();
  7. registerJMX();
  8. setState(State.RUNNING);
  9. notifyAll();
  10. }
  11. protected void setupRequestProcessors() {
  12. RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  13. RequestProcessor syncProcessor = new SyncRequestProcessor(this,
  14. finalProcessor);
  15. ((SyncRequestProcessor) syncProcessor).start();
  16. firstProcessor = new PrepRequestProcessor(this, syncProcessor);
  17. ((PrepRequestProcessor) firstProcessor).start();
  18. }

点击 PrepRequestProcessor,并查找它的 run 方法

  1. public void run() {
  2. try {
  3. while (true) {
  4. Request request = submittedRequests.take();
  5. long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
  6. if (request.type == OpCode.ping) {
  7. traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
  8. }
  9. if (LOG.isTraceEnabled()) {
  10. ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
  11. }
  12. if (Request.requestOfDeath == request) {
  13. break;
  14. }
  15. pRequest(request);
  16. }
  17. } catch (RequestProcessorException e) {
  18. if (e.getCause() instanceof XidRolloverException) {
  19. LOG.info(e.getCause().getMessage());
  20. }
  21. handleException(this.getName(), e);
  22. } catch (Exception e) {
  23. handleException(this.getName(), e);
  24. }
  25. LOG.info("PrepRequestProcessor exited loop!");
  26. }
  27. protected void pRequest(Request request) throws RequestProcessorException {
  28. // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
  29. // request.type + " id = 0x" + Long.toHexString(request.sessionId));
  30. request.setHdr(null);
  31. request.setTxn(null);
  32. try {
  33. switch (request.type) {
  34. case OpCode.createContainer:
  35. case OpCode.create:
  36. case OpCode.create2:
  37. CreateRequest create2Request = new CreateRequest();
  38. pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
  39. break;
  40. case OpCode.createTTL:
  41. CreateTTLRequest createTtlRequest = new CreateTTLRequest();
  42. pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest,
  43. true);
  44. break;
  45. case OpCode.deleteContainer:
  46. case OpCode.delete:
  47. DeleteRequest deleteRequest = new DeleteRequest();
  48. pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
  49. break;
  50. case OpCode.setData:
  51. SetDataRequest setDataRequest = new SetDataRequest();
  52. pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
  53. break;
  54. case OpCode.reconfig:
  55. ReconfigRequest reconfigRequest = new ReconfigRequest();
  56. ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
  57. pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
  58. break;
  59. case OpCode.setACL:
  60. SetACLRequest setAclRequest = new SetACLRequest();
  61. pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
  62. break;
  63. case OpCode.check:
  64. CheckVersionRequest checkRequest = new CheckVersionRequest();
  65. pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
  66. break;
  67. case OpCode.multi:
  68. MultiTransactionRecord multiRequest = new MultiTransactionRecord();
  69. try {
  70. ByteBufferInputStream.byteBuffer2Record(request.request,
  71. multiRequest);
  72. } catch (IOException e) {
  73. request.setHdr(new TxnHeader(request.sessionId, request.cxid,
  74. zks.getNextZxid(),
  75. Time.currentWallTime(), OpCode.multi));
  76. throw e;
  77. }
  78. List<Txn> txns = new ArrayList<Txn>();
  79. //Each op in a multi-op must have the same zxid!
  80. long zxid = zks.getNextZxid();
  81. KeeperException ke = null;
  82. //Store off current pending change records in case we need to rollback
  83. Map<String, ChangeRecord> pendingChanges =
  84. getPendingChanges(multiRequest);
  85. for (Op op : multiRequest) {
  86. Record subrequest = op.toRequestRecord();
  87. int type;
  88. Record txn;
  89. /* If we've already failed one of the ops, don't bother
  90. * trying the rest as we know it's going to fail and it
  91. * would be confusing in the logfiles.
  92. */
  93. if (ke != null) {
  94. type = OpCode.error;
  95. txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
  96. }
  97. /* Prep the request and convert to a Txn */
  98. else {
  99. try {
  100. pRequest2Txn(op.getType(), zxid, request, subrequest, false);
  101. type = request.getHdr().getType();
  102. txn = request.getTxn();
  103. } catch (KeeperException e) {
  104. ke = e;
  105. type = OpCode.error;
  106. txn = new ErrorTxn(e.code().intValue());
  107. if (e.code().intValue() > Code.APIERROR.intValue()) {
  108. LOG.info("Got user-level KeeperException when
  109. processing {
  110. }
  111. aborting " +
  112. " remaining multi ops. Error Path:{} Error:{}",
  113. request.toString(), e.getPath(), e.getMessage());
  114. }
  115. request.setException(e);
  116. /* Rollback change records from failed multi-op */
  117. rollbackPendingChanges(zxid, pendingChanges);
  118. }
  119. }
  120. //FIXME: I don't want to have to serialize it here and then
  121. // immediately deserialize in next processor. But I'm
  122. // not sure how else to get the txn stored into our list.
  123. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  124. BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
  125. txn.serialize(boa, "request");
  126. ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
  127. txns.add(new Txn(type, bb.array()));
  128. }
  129. request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
  130. Time.currentWallTime(), request.type));
  131. request.setTxn(new MultiTxn(txns));
  132. break;
  133. //create/close session don't require request record
  134. case OpCode.createSession:
  135. case OpCode.closeSession:
  136. if (!request.isLocalSession()) {
  137. pRequest2Txn(request.type, zks.getNextZxid(), request,
  138. null, true);
  139. }
  140. break;
  141. //All the rest don't need to create a Txn - just verify session
  142. case OpCode.sync:
  143. case OpCode.exists:
  144. case OpCode.getData:
  145. case OpCode.getACL:
  146. case OpCode.getChildren:
  147. case OpCode.getChildren2:
  148. case OpCode.ping:
  149. case OpCode.setWatches:
  150. case OpCode.checkWatches:
  151. case OpCode.removeWatches:
  152. zks.sessionTracker.checkSession(request.sessionId,
  153. request.getOwner());
  154. break;
  155. default:
  156. LOG.warn("unknown type " + request.type);
  157. break;
  158. }
  159. } catch (KeeperException e) {
  160. ... ...
  161. } catch (Exception e) {
  162. ... ...
  163. }
  164. request.zxid = zks.getZxid();
  165. nextProcessor.processRequest(request);
  166. }

2.7 服务端 Follower 启动

image.png
FollowerZooKeeperServer
Ctrl + n 全局查找 Follower,然后 ctrl + f 查找 followLeader

  1. void followLeader() throws InterruptedException {
  2. self.end_fle = Time.currentElapsedTime();
  3. long electionTimeTaken = self.end_fle - self.start_fle;
  4. self.setElectionTimeTaken(electionTimeTaken);
  5. LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
  6. QuorumPeer.FLE_TIME_UNIT);
  7. self.start_fle = 0;
  8. self.end_fle = 0;
  9. fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
  10. try {
  11. QuorumServer leaderServer = findLeader();
  12. try {
  13. connectToLeader(leaderServer.addr, leaderServer.hostname);
  14. long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
  15. if (self.isReconfigStateChange())
  16. throw new Exception("learned about role change");
  17. //check to see if the leader zxid is lower than ours
  18. //this should never happen but is just a safety check
  19. long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
  20. if (newEpoch < self.getAcceptedEpoch()) {
  21. LOG.error("Proposed leader epoch " +
  22. ZxidUtils.zxidToString(newEpochZxid)
  23. + " is less than our accepted epoch " +
  24. ZxidUtils.zxidToString(self.getAcceptedEpoch()));
  25. throw new IOException("Error: Epoch of leader is lower");
  26. }
  27. syncWithLeader(newEpochZxid);
  28. QuorumPacket qp = new QuorumPacket();
  29. while (this.isRunning()) {
  30. readPacket(qp);
  31. processPacket(qp);
  32. }
  33. } catch (Exception e) {
  34. ... ...
  35. }
  36. } finally {
  37. zk.unregisterJMX((Learner) this);
  38. }
  39. }
  40. void readPacket(QuorumPacket pp) throws IOException {
  41. synchronized (leaderIs) {
  42. leaderIs.readRecord(pp, "packet");
  43. }
  44. if (LOG.isTraceEnabled()) {
  45. final long traceMask =
  46. (pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK
  47. : ZooTrace.SERVER_PACKET_TRACE_MASK;
  48. ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
  49. }
  50. }
  51. protected void processPacket(QuorumPacket qp) throws Exception {
  52. switch (qp.getType()) {
  53. case Leader.PING:
  54. ping(qp);
  55. break;
  56. case Leader.PROPOSAL:
  57. TxnHeader hdr = new TxnHeader();
  58. Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
  59. if (hdr.getZxid() != lastQueued + 1) {
  60. LOG.warn("Got zxid 0x"
  61. + Long.toHexString(hdr.getZxid())
  62. + " expected 0x"
  63. + Long.toHexString(lastQueued + 1));
  64. }
  65. lastQueued = hdr.getZxid();
  66. if (hdr.getType() == OpCode.reconfig) {
  67. SetDataTxn setDataTxn = (SetDataTxn) txn;
  68. QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
  69. self.setLastSeenQuorumVerifier(qv, true);
  70. }
  71. fzk.logRequest(hdr, txn);
  72. break;
  73. case Leader.COMMIT:
  74. fzk.commit(qp.getZxid());
  75. break;
  76. case Leader.COMMITANDACTIVATE:
  77. // get the new configuration from the request
  78. Request request = fzk.pendingTxns.element();
  79. SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
  80. QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
  81. // get new designated leader from (current) leader's message
  82. ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
  83. long suggestedLeaderId = buffer.getLong();
  84. boolean majorChange =
  85. self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
  86. // commit (writes the new config to ZK tree (/zookeeper/config)
  87. fzk.commit(qp.getZxid());
  88. if (majorChange) {
  89. throw new Exception("changes proposed in reconfig");
  90. }
  91. break;
  92. case Leader.UPTODATE:
  93. LOG.error("Received an UPTODATE message after Follower started");
  94. break;
  95. case Leader.REVALIDATE:
  96. revalidate(qp);
  97. break;
  98. case Leader.SYNC:
  99. fzk.sync();
  100. break;
  101. default:
  102. LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
  103. break;
  104. }
  105. }

2.8 客户端启动

image.png

  1. #!/usr/bin/env bash
  2. ZOOBIN="${BASH_SOURCE-$0}"
  3. ZOOBIN="$(dirname "${ZOOBIN}")"
  4. ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
  5. if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
  6. . "$ZOOBINDIR"/../libexec/zkEnv.sh
  7. else
  8. . "$ZOOBINDIR"/zkEnv.sh
  9. fi
  10. ZOO_LOG_FILE=zookeeper-$USER-cli-$HOSTNAME.log
  11. "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-
  12. Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" "-
  13. Dzookeeper.log.file=${ZOO_LOG_FILE}" \
  14. -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
  15. org.apache.zookeeper.ZooKeeperMain "$@"

在 ZkCli.sh 启动 Zookeeper 时,会调用 ZooKeeperMain.java
Ctrl + n 查找 ZooKeeperMain,找到程序的入口 main()方法

  1. public static void main(String args[]) throws CliException, IOException, InterruptedException {
  2. ZooKeeperMain main = new ZooKeeperMain(args);
  3. main.run();
  4. }

2.8.1 创建 ZookeeperMain

  1. public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
  2. cl.parseOptions(args);
  3. System.out.println("Connecting to " + cl.getOption("server"));
  4. connectToZK(cl.getOption("server"));
  5. }
  6. protected void connectToZK(String newHost) throws InterruptedException, IOException {
  7. if (zk != null && zk.getState().isAlive()) {
  8. zk.close();
  9. }
  10. host = newHost;
  11. boolean readOnly = cl.getOption("readonly") != null;
  12. if (cl.getOption("secure") != null) {
  13. System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
  14. System.out.println("Secure connection is enabled");
  15. }
  16. zk = new ZooKeeperAdmin(host, Integer.parseInt(cl.getOption("timeout")), new
  17. MyWatcher(), readOnly);
  18. }
  1. public ZooKeeperAdmin(String connectString, int sessionTimeout, Watcher watcher,
  2. boolean canBeReadOnly) throws IOException {
  3. super(connectString, sessionTimeout, watcher, canBeReadOnly);
  4. }
  5. public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
  6. boolean canBeReadOnly) throws IOException {
  7. this(connectString, sessionTimeout, watcher, canBeReadOnly,
  8. createDefaultHostProvider(connectString));
  9. }
  10. public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
  11. boolean canBeReadOnly, HostProvider aHostProvider)
  12. throws IOException {
  13. this(connectString, sessionTimeout, watcher, canBeReadOnly,
  14. aHostProvider, null);
  15. }

2.8.2 初始化监听器

  1. public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
  2. boolean canBeReadOnly, HostProvider aHostProvider,
  3. ZKClientConfig clientConfig) throws IOException {
  4. LOG.info("Initiating client connection, connectString=" + connectString
  5. + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
  6. if (clientConfig == null) {
  7. clientConfig = new ZKClientConfig();
  8. }
  9. this.clientConfig = clientConfig;
  10. watchManager = defaultWatchManager();
  11. // 赋值 watcher 给默认的 defaultWatcher
  12. watchManager.defaultWatcher = watcher;
  13. ConnectStringParser connectStringParser = new ConnectStringParser(
  14. connectString);
  15. hostProvider = aHostProvider;
  16. // 客户端与服务器端通信的终端
  17. cnxn = createConnection(connectStringParser.getChrootPath(),
  18. hostProvider, sessionTimeout, this, watchManager,
  19. getClientCnxnSocket(), canBeReadOnly);
  20. cnxn.start();
  21. }

2.8.3 解析连接地址

  1. public ConnectStringParser(String connectString) {
  2. // connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"
  3. // parse out chroot, if any
  4. int off = connectString.indexOf('/');
  5. if (off >= 0) {
  6. String chrootPath = connectString.substring(off);
  7. // ignore "/" chroot spec, same as null
  8. if (chrootPath.length() == 1) {
  9. this.chrootPath = null;
  10. } else {
  11. PathUtils.validatePath(chrootPath);
  12. this.chrootPath = chrootPath;
  13. }
  14. connectString = connectString.substring(0, off);
  15. } else {
  16. this.chrootPath = null;
  17. }
  18. // "hadoop102:2181,hadoop103:2181,hadoop104:2181"用逗号切割
  19. List<String> hostsList = split(connectString, ",");
  20. for (String host : hostsList) {
  21. int port = DEFAULT_PORT;
  22. int pidx = host.lastIndexOf(':');
  23. if (pidx >= 0) {
  24. // otherwise : is at the end of the string, ignore
  25. if (pidx < host.length() - 1) {
  26. port = Integer.parseInt(host.substring(pidx + 1));
  27. }
  28. host = host.substring(0, pidx);
  29. }
  30. serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
  31. }
  32. }
  33. private final ArrayList<InetSocketAddress>
  34. serverAddresses
  35. = new
  36. ArrayList<InetSocketAddress>();
  37. public class InetSocketAddress extends SocketAddress {
  38. // Private implementation class pointed to by all public methods.
  39. private static class InetSocketAddressHolder {
  40. // The hostname of the Socket Address 主机名称
  41. private String hostname;
  42. // The IP address of the Socket Address 通信地址
  43. private InetAddress addr;
  44. // The port number of the Socket Address 端口号
  45. private int port;
  46. ......
  47. }
  48. ......
  49. }

2.8.4 创建通信

  1. public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
  2. boolean canBeReadOnly, HostProvider aHostProvider,
  3. ZKClientConfig clientConfig) throws IOException {
  4. LOG.info("Initiating client connection, connectString=" + connectString
  5. + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
  6. if (clientConfig == null) {
  7. clientConfig = new ZKClientConfig();
  8. }
  9. this.clientConfig = clientConfig;
  10. watchManager = defaultWatchManager();
  11. // 赋值 watcher 给默认的 defaultWatcher
  12. watchManager.defaultWatcher = watcher;
  13. ConnectStringParser connectStringParser = new ConnectStringParser(
  14. connectString);
  15. hostProvider = aHostProvider;
  16. // 客户端与服务器端通信的终端
  17. cnxn = createConnection(connectStringParser.getChrootPath(),
  18. hostProvider, sessionTimeout, this, watchManager,
  19. getClientCnxnSocket(), canBeReadOnly);
  20. cnxn.start();
  21. }
  22. private ClientCnxnSocket getClientCnxnSocket() throws IOException {
  23. String clientCnxnSocketName = getClientConfig().getProperty(
  24. ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
  25. if (clientCnxnSocketName == null) {
  26. clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
  27. }
  28. try {
  29. // 通过反射获取 clientCxnSocket 对象
  30. Constructor<?> clientCxnConstructor =
  31. Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);
  32. ClientCnxnSocket
  33. clientCxnSocket
  34. = (ClientCnxnSocket)
  35. clientCxnConstructor.newInstance(getClientConfig());
  36. return clientCxnSocket;
  37. } catch (Exception e) {
  38. IOException ioe = new IOException("Couldn't instantiate "
  39. + clientCnxnSocketName);
  40. ioe.initCause(e);
  41. throw ioe;
  42. }
  43. }
  44. public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET =
  45. ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
  46. public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET =
  47. "zookeeper.clientCnxnSocket";
  48. protected ClientCnxn createConnection(String chrootPath,
  49. HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
  50. ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
  51. boolean canBeReadOnly) throws IOException {
  52. return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
  53. watchManager, clientCnxnSocket, canBeReadOnly);
  54. }
  55. public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout,
  56. ZooKeeper zooKeeper,
  57. ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean
  58. canBeReadOnly)
  59. throws IOException {
  60. this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
  61. clientCnxnSocket, 0, new byte[16], canBeReadOnly);
  62. }
  63. public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout,
  64. ZooKeeper zooKeeper,
  65. ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
  66. long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
  67. this.zooKeeper = zooKeeper;
  68. this.watcher = watcher;
  69. this.sessionId = sessionId;
  70. this.sessionPasswd = sessionPasswd;
  71. this.sessionTimeout = sessionTimeout;
  72. this.hostProvider = hostProvider;
  73. this.chrootPath = chrootPath;
  74. connectTimeout = sessionTimeout / hostProvider.size();
  75. readTimeout = sessionTimeout * 2 / 3;
  76. readOnly = canBeReadOnly;
  77. // 创建了两个线程
  78. sendThread = new SendThread(clientCnxnSocket);
  79. eventThread = new EventThread();
  80. this.clientConfig = zooKeeper.getClientConfig();
  81. initRequestTimeout();
  82. }
  83. public void start() {
  84. sendThread.start();
  85. eventThread.start();
  86. }
  87. SendThread(ClientCnxnSocket clientCnxnSocket) {
  88. super(makeThreadName("-SendThread()"));
  89. state = States.CONNECTING;
  90. this.clientCnxnSocket = clientCnxnSocket;
  91. setDaemon(true);
  92. }
  93. public ZooKeeperThread(String threadName) {
  94. super(threadName);
  95. setUncaughtExceptionHandler(uncaughtExceptionalHandler);
  96. }
  97. public class ZooKeeperThread extends Thread {
  98. }
  99. // ZooKeeperThread 是一个线程,执行它的 run()方法
  100. public void run() {
  101. clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
  102. clientCnxnSocket.updateNow();
  103. clientCnxnSocket.updateLastSendAndHeard();
  104. int to;
  105. long lastPingRwServer = Time.currentElapsedTime();
  106. final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
  107. InetSocketAddress serverAddress = null;
  108. // 在循环里面,循环发送,循环接收
  109. while (state.isAlive()) {
  110. try {
  111. if (!clientCnxnSocket.isConnected()) {
  112. // don't re-establish connection if we are closing
  113. if (closing) {
  114. break;
  115. }
  116. if (rwServerAddress != null) {
  117. serverAddress = rwServerAddress;
  118. rwServerAddress = null;
  119. } else {
  120. serverAddress = hostProvider.next(1000);
  121. }
  122. // 启动连接服务端
  123. startConnect(serverAddress);
  124. clientCnxnSocket.updateLastSendAndHeard();
  125. }
  126. if (state.isConnected()) {
  127. ... ...
  128. to = readTimeout - clientCnxnSocket.getIdleRecv();
  129. } else {
  130. to = connectTimeout - clientCnxnSocket.getIdleRecv();
  131. }
  132. if (to <= 0) {
  133. String warnInfo;
  134. warnInfo = "Client session timed out, have not heard from server in "
  135. + clientCnxnSocket.getIdleRecv()
  136. + "ms"
  137. + " for sessionid 0x"
  138. + Long.toHexString(sessionId);
  139. LOG.warn(warnInfo);
  140. throw new SessionTimeoutException(warnInfo);
  141. }
  142. if (state.isConnected()) {
  143. // 1000(1 second) is to prevent race condition missing to send the second ping
  144. // also make sure not to send too many pings when readTimeout is small
  145. int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
  146. ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
  147. //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
  148. if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() >
  149. MAX_SEND_PING_INTERVAL) {
  150. sendPing();
  151. clientCnxnSocket.updateLastSend();
  152. } else {
  153. if (timeToNextPing < to) {
  154. to = timeToNextPing;
  155. }
  156. }
  157. }
  158. // If we are in read-only mode, seek for read/write server
  159. if (state == States.CONNECTEDREADONLY) {
  160. long now = Time.currentElapsedTime();
  161. int idlePingRwServer = (int) (now - lastPingRwServer);
  162. if (idlePingRwServer >= pingRwTimeout) {
  163. lastPingRwServer = now;
  164. idlePingRwServer = 0;
  165. pingRwTimeout =
  166. Math.min(2 * pingRwTimeout, maxPingRwTimeout);
  167. pingRwServer();
  168. }
  169. to = Math.min(to, pingRwTimeout - idlePingRwServer);
  170. }
  171. // 接收服务端响应,并处理
  172. clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
  173. } catch (Throwable e) {
  174. ... ...
  175. }
  176. }
  177. synchronized (state) {
  178. // When it comes to this point, it guarantees that later queued
  179. // packet to outgoingQueue will be notified of death.
  180. cleanup();
  181. }
  182. clientCnxnSocket.close();
  183. if (state.isAlive()) {
  184. eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
  185. Event.KeeperState.Disconnected, null));
  186. }
  187. eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
  188. Event.KeeperState.Closed, null));
  189. ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
  190. "SendThread exited loop for session: 0x"
  191. + Long.toHexString(getSessionId()));
  192. }
  193. private void startConnect(InetSocketAddress addr) throws IOException {
  194. // initializing it for new connection
  195. saslLoginFailed = false;
  196. if (!isFirstConnect) {
  197. try {
  198. Thread.sleep(r.nextInt(1000));
  199. } catch (InterruptedException e) {
  200. LOG.warn("Unexpected exception", e);
  201. }
  202. }
  203. state = States.CONNECTING;
  204. String hostPort = addr.getHostString() + ":" + addr.getPort();
  205. MDC.put("myid", hostPort);
  206. setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
  207. if (clientConfig.isSaslClientEnabled()) {
  208. try {
  209. if (zooKeeperSaslClient != null) {
  210. zooKeeperSaslClient.shutdown();
  211. }
  212. zooKeeperSaslClient = new
  213. ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig),
  214. clientConfig);
  215. } catch (LoginException e) {
  216. ... ...
  217. }
  218. }
  219. logStartConnect(addr);
  220. // 建立连接
  221. clientCnxnSocket.connect(addr);
  222. }

ctrl +alt + B 查找 connect 实现类,ClientCnxnSocketNIO.java

  1. void connect(InetSocketAddress addr) throws IOException {
  2. SocketChannel sock = createSock();
  3. try {
  4. registerAndConnect(sock, addr);
  5. } catch (IOException e) {
  6. LOG.error("Unable to open socket to " + addr);
  7. sock.close();
  8. throw e;
  9. }
  10. initialized = false;
  11. /*
  12. * Reset incomingBuffer
  13. */
  14. lenBuffer.clear();
  15. incomingBuffer = lenBuffer;
  16. }
  17. void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
  18. throws IOException {
  19. sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
  20. boolean immediateConnect = sock.connect(addr);
  21. if (immediateConnect) {
  22. sendThread.primeConnection();
  23. }
  24. }
  25. void primeConnection() throws IOException {
  26. LOG.info("Socket connection established, initiating session, client: {}, server: {}",
  27. clientCnxnSocket.getLocalSocketAddress(),
  28. clientCnxnSocket.getRemoteSocketAddress());
  29. // 标记不是第一次连接
  30. isFirstConnect = false;
  31. ... ...
  32. }

ctrl + alt +B 查找 doTransport 实现类,ClientCnxnSocketNIO.java

  1. void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
  2. throws IOException, InterruptedException {
  3. selector.select(waitTimeOut);
  4. Set<SelectionKey> selected;
  5. synchronized (this) {
  6. selected = selector.selectedKeys();
  7. }
  8. // Everything below and until we get back to the select is
  9. // non blocking, so time is effectively a constant. That is
  10. // Why we just have to do this once, here
  11. updateNow();
  12. for (SelectionKey k : selected) {
  13. SocketChannel sc = ((SocketChannel) k.channel());
  14. if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
  15. if (sc.finishConnect()) {
  16. updateLastSendAndHeard();
  17. updateSocketAddresses();
  18. sendThread.primeConnection();
  19. }
  20. } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) !=
  21. 0) {
  22. doIO(pendingQueue, cnxn);
  23. }
  24. }
  25. if (sendThread.getZkState().isConnected()) {
  26. if (findSendablePacket(outgoingQueue,
  27. sendThread.tunnelAuthInProgress()) != null) {
  28. enableWrite();
  29. }
  30. }
  31. selected.clear();
  32. }
  33. void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
  34. throws InterruptedException, IOException {
  35. SocketChannel sock = (SocketChannel) sockKey.channel();
  36. if (sock == null) {
  37. throw new IOException("Socket is null!");
  38. }
  39. if (sockKey.isReadable()) {
  40. int rc = sock.read(incomingBuffer);
  41. if (rc < 0) {
  42. throw new EndOfStreamException(
  43. "Unable to read additional data from server sessionid 0x"
  44. + Long.toHexString(sessionId)
  45. + ", likely server has closed socket");
  46. }
  47. if (!incomingBuffer.hasRemaining()) {
  48. incomingBuffer.flip();
  49. if (incomingBuffer == lenBuffer) {
  50. recvCount.getAndIncrement();
  51. readLength();
  52. } else if (!initialized) {
  53. readConnectResult();
  54. enableRead();
  55. if (findSendablePacket(outgoingQueue,
  56. sendThread.tunnelAuthInProgress()) != null) {
  57. // Since SASL authentication has completed (if client is configured to
  58. do so),
  59. // outgoing packets waiting in the outgoingQueue can now be sent.
  60. enableWrite();
  61. }
  62. lenBuffer.clear();
  63. incomingBuffer = lenBuffer;
  64. updateLastHeard();
  65. initialized = true;
  66. } else {
  67. // 读取服务端应答
  68. sendThread.readResponse(incomingBuffer);
  69. lenBuffer.clear();
  70. incomingBuffer = lenBuffer;
  71. updateLastHeard();
  72. }
  73. }
  74. }
  75. if (sockKey.isWritable()) {
  76. Packet p = findSendablePacket(outgoingQueue,
  77. sendThread.tunnelAuthInProgress());
  78. if (p != null) {
  79. updateLastSend();
  80. // If we already started writing p, p.bb will already exist
  81. if (p.bb == null) {
  82. if ((p.requestHeader != null) &&
  83. (p.requestHeader.getType() != OpCode.ping) &&
  84. (p.requestHeader.getType() != OpCode.auth)) {
  85. p.requestHeader.setXid(cnxn.getXid());
  86. }
  87. p.createBB();
  88. }
  89. sock.write(p.bb);
  90. if (!p.bb.hasRemaining()) {
  91. sentCount.getAndIncrement();
  92. outgoingQueue.removeFirstOccurrence(p);
  93. if (p.requestHeader != null
  94. && p.requestHeader.getType() != OpCode.ping
  95. && p.requestHeader.getType() != OpCode.auth) {
  96. synchronized (pendingQueue) {
  97. pendingQueue.add(p);
  98. }
  99. }
  100. }
  101. }
  102. if (outgoingQueue.isEmpty()) {
  103. // No more packets to send: turn off write interest flag.
  104. // Will be turned on later by a later call to enableWrite(),
  105. // from within ZooKeeperSaslClient (if client is configured
  106. // to attempt SASL authentication), or in either doIO() or
  107. // in doTransport() if not.
  108. disableWrite();
  109. } else if (!initialized && p != null && !p.bb.hasRemaining()) {
  110. // On initial connection, write the complete connect request
  111. // packet, but then disable further writes until after
  112. // receiving a successful connection response. If the
  113. // session is expired, then the server sends the expiration
  114. // response and immediately closes its end of the socket. If
  115. // the client is simultaneously writing on its end, then the
  116. // TCP stack may choose to abort with RST, in which case the
  117. // client would never receive the session expired event. See
  118. //
  119. http:
  120. //docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
  121. disableWrite();
  122. } else {
  123. // Just in case
  124. enableWrite();
  125. }
  126. }
  127. }

2.8.5 执行 run()

public static void main(String args[]) throws CliException, IOException, InterruptedException {
    ZooKeeperMain main = new ZooKeeperMain(args);
    main.run();
}
void run() throws CliException, IOException, InterruptedException {
    if (cl.getCommand() == null) {
        System.out.println("Welcome to ZooKeeper!");
        boolean jlinemissing = false;
        // only use jline if it's in the classpath 
        try {
            Class<?> consoleC = Class.forName("jline.console.ConsoleReader");
            Class<?> completorC =
                    Class.forName("org.apache.zookeeper.JLineZNodeCompleter");
            System.out.println("JLine support is enabled");
            Object console =
                    consoleC.getConstructor().newInstance();
            Object completor =
                    completorC.getConstructor(ZooKeeper.class).newInstance(zk);
            Method addCompletor = consoleC.getMethod("addCompleter",
                    Class.forName("jline.console.completer.Completer"));
            addCompletor.invoke(console, completor);
            String line;
            Method readLine = consoleC.getMethod("readLine", String.class);
            while ((line = (String) readLine.invoke(console, getPrompt())) != null) {
                executeLine(line);
            }
        } catch (ClassNotFoundException e) {
            ... ...
        }
        if (jlinemissing) {
            System.out.println("JLine support is disabled");
            BufferedReader br =
                    new BufferedReader(new InputStreamReader(System.in));
            String line;
            while ((line = br.readLine()) != null) {
                // 一行一行读取命令 
                executeLine(line);
            }
        }
    } else {
        // Command line args non-null. Run what was passed. 
        processCmd(cl);
    }
    System.exit(exitCode);
}
public void executeLine(String line) throws CliException, InterruptedException, IOException {
    if (!line.equals("")) {
        cl.parseCommand(line);
        addToHistory(commandCount, line);
        // 处理客户端命令 
        processCmd(cl);
        commandCount++;
    }
}
protected boolean processCmd(MyCommandOptions co) throws CliException, IOException,
        InterruptedException {
    boolean watch = false;
    try {
        // 解析命令 
        watch = processZKCmd(co);
        exitCode = 0;
    } catch (CliException ex) {
        exitCode = ex.getExitCode();
        System.err.println(ex.getMessage());
    }
    return watch;
}
protected boolean processZKCmd(MyCommandOptions co) throws CliException,
        IOException, InterruptedException {
    String[] args = co.getArgArray();
    String cmd = co.getCommand();
    if (args.length < 1) {
        usage();
        throw new MalformedCommandException("No command entered");
    }
    if (!commandMap.containsKey(cmd)) {
        usage();
        throw new CommandNotFoundException("Command not found " + cmd);
    }
    boolean watch = false;
    LOG.debug("Processing " + cmd);
    if (cmd.equals("quit")) {
        zk.close();
        System.exit(exitCode);
    } else if (cmd.equals("redo") && args.length >= 2) {
        Integer i = Integer.decode(args[1]);
        if (commandCount <= i || i < 0) { // don't allow redoing this redo 
            throw new MalformedCommandException("Command index out of range");
        }
        cl.parseCommand(history.get(i));
        if (cl.getCommand().equals("redo")) {
            throw new MalformedCommandException("No redoing redos");
        }
        history.put(commandCount, history.get(i));
        processCmd(cl);
    } else if (cmd.equals("history")) {
        for (int i = commandCount - 10; i <= commandCount; ++i) {
            if (i < 0) continue;
            System.out.println(i + " - " + history.get(i));
        }
    } else if (cmd.equals("printwatches")) {
        if (args.length == 1) {
            System.out.println("printwatches is " + (printWatches ? "on" : "off"));
        } else {
            printWatches = args[1].equals("on");
        }
    } else if (cmd.equals("connect")) {
        if (args.length >= 2) {
            connectToZK(args[1]);
        } else {
            connectToZK(host);
        }
    }
    // Below commands all need a live connection 
    if (zk == null || !zk.getState().isAlive()) {
        System.out.println("Not connected");
        return false;
    }
    // execute from commandMap 
    CliCommand cliCmd = commandMapCli.get(cmd);
    if (cliCmd != null) {
        cliCmd.setZk(zk);
        watch = cliCmd.parse(args).exec();
    } else if (!commandMap.containsKey(cmd)) {
        usage();
    }
    return watch;
}