1. Leader 选举过程

Leader 选举是保证分布式数据一致性的关键所在。当 Zookeeper 集群中的一台服务器出现以下两种情况之一时,需要进入 Leader 选举:

  • 服务器初始化启动。
  • 服务器运行期间无法和 Leader 保持连接。

下面就两种情况进行分析讲解。

1.1 服务器启动时期的 Leader 选举过程

若进行 Leader 选举,则至少需要 2 台机器,这里选取 3 台机器组成的服务器集群为例。在集群初始化阶段,当有一台服务器 Server1 启动时,其单独无法进行和完成 Leader 选举,当第二台服务器 Server2 启动时,此时两台机器可以相互通信,每台机器都试图找到 Leader,于是进入 Leader 选举过程。选举过程如下

  1. 每个 Server 发出一个投票。由于是初始情况,Server1 和 Server2 都会将自己作为 Leader 服务器来进行投票,每次投票会包含所推举的服务器的 myidZXID(事务ID),使用 (myid, ZXID) 来表示,此时 Server1 的投票为(1, 0),Server2 的投票为(2, 0),然后各自将这个投票发给集群中其他机器。
  2. 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自 LOOKING 状态的服务器。
  3. 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行 PK,PK规则如下
    1. 优先检查 ZXID,ZXID 比较大的服务器优先作为 Leader。
    2. 如果 ZXID 相同,那么就比较 myid。myid 较大的服务器作为Leader服务器。


  • 对于 Server1 而言,它的投票是(1, 0),接收 Server2 的投票为(2, 0),首先会比较两者的 ZXID,均为0,再比较 myid,此时 Server2 的 myid 最大,于是更新自己的投票为(2, 0),然后重新投票
  • 对于 Server2 而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。
  1. 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于 Server1、Server2 而言,都统计出集群中已经有两台机器接受了 (2, 0) 的投票信息,此时便认为已经选出了 Leader。

**注意**:如果启动顺序是 1、2、3,则第二台启动的时候,因为 2号的 myid > 1,2号票数又过半,此时就已经选举了2号为leader,3号启动直接为 follower。

  1. 改变服务器状态。一旦确定了 Leader,每个服务器就会更新自己的状态,如果是 Follower,那么就变更为 FOLLOWING,如果是 Leader,就变更为 LEADING

1.2 服务器运行时期的 Leader 选举

在 Zookeeper 运行期间,Leader 与非 Leader 服务器各司其职,即便当有非 Leader 服务器宕机或新加入,此时也不会影响 Leader,但是一旦 Leader 服务器挂了,那么整个集群将**暂停**对外服务,进入新一轮 Leader 选举,其过程和启动时期的 Leader 选举过程基本一致。假设正在运行的有 Server1、Server2、Server3 三台服务器,当前 Leader 是 Server2,若某一时刻 Leader 挂了,此时便开始 Leader 选举。选举过程如下

  1. 变更状态。Leader 挂后,余下的非 Observer 服务器都会讲自己的服务器状态变更为 LOOKING,然后开始进入 Leader 选举过程。
  2. 每个 Server 会发出一个投票。在运行期间,每个服务器上的 ZXID 可能不同,此时假定 Server1 的 ZXID 为 123,Server3 的 ZXID为 122;在第一轮投票中,Server1 和 Server3 都会投自己,产生投票 (1, 123),(3, 122),然后各自将投票发送给集群中所有机器。
  3. 接收来自各个服务器的投票。与启动时过程相同。
  4. 处理投票。与启动时过程相同,此时,Server1 将会成为 Leader。
  5. 统计投票。与启动时过程相同。
  6. 改变服务器的状态。与启动时过程相同。

2. Leader选举算法分析

在 3.4.0 后的 Zookeeper 的版本只保留了 TCP 版本的 FastLeaderElection 选举算法。当一台机器进入 Leader 选举时,当前集群可能会处于以下两种状态

  • 集群中已经存在 Leader。
  • 集群中不存在 Leader。

对于集群中已经存在 Leader 而言,此种情况一般都是某台机器启动得较晚,在其启动之前,集群已经在正常工作,对这种情况,该机器试图去选举 Leader 时,会被告知当前服务器的 Leader 信息,对于该机器而言,仅仅需要和 Leader 机器建立起连接,并进行状态同步即可。

而在集群中不存在 Leader 情况下则会相对复杂,其步骤如下:

(1) 第一次投票

无论哪种导致进行 Leader 选举,集群的所有机器都处于试图选举出一个 Leader 的状态,即 LOOKING 状态,LOOKING 机器会向所有其他机器发送消息,该消息称为投票。投票中包含了 **SID**(服务器的唯一标识)和 **ZXID**(事务ID),(SID, ZXID)形式来标识一次投票信息

假定 Zookeeper 由5台机器组成,SID 分别为 1、2、3、4、5,ZXID 分别为 9、9、9、8、8,并且此时 SID 为 2 的机器是 Leader 机器,某一时刻,1、2 所在机器出现故障,因此集群开始进行 Leader 选举。在第一次投票时,每台机器都会将自己作为投票对象,于是 SID 为 3、4、5 的机器投票情况分别为 (3, 9),(4, 8), (5, 8)。

(2) 变更投票。每台机器发出投票后,也会收到其他机器的投票,每台机器会根据一定规则来处理收到的其他机器的投票,并以此来决定是否需要变更自己的投票,这个规则也是整个 Leader 选举算法的核心所在,其中术语描述如下

  • vote_sid:接收到的投票中所推举 Leader 服务器的 SID。
  • vote_zxid:接收到的投票中所推举 Leader 服务器的 ZXID。
  • self_sid:当前服务器自己的 SID。
  • self_zxid:当前服务器自己的 ZXID。

每次对收到的投票的处理,都是对 (vote_sid, vote_zxid)(self_sid, self_zxid) 对比的过程。

  • 规则一:如果 vote_zxid 大于 self_zxid,就认可当前收到的投票,并再次将该投票发送出去。
  • 规则二:如果 vote_zxid 小于 self_zxid,那么坚持自己的投票,不做任何变更。
  • 规则三:如果 vote_zxid 等于 self_zxid,那么就对比两者的 SID,
    • 如果 vote_sid 大于 self_sid,那么就认可当前收到的投票,并再次将该投票发送出去。
    • 如果 vote_sid 小于 self_sid,那么坚持自己的投票,不做任何变更。

结合上面规则,给出下面的集群变更过程。
image.png

(3) 确定Leader。经过第二轮投票后,集群中的每台机器都会再次接收到其他机器的投票,然后开始统计投票,如果一台机器收到了超过半数的相同投票,那么这个投票对应的 SID 机器即为 Leader。此时 Server3 将成为 Leader。

由上面规则可知,通常那台服务器上的数据越新(ZXID会越大),其成为 Leader 的可能性越大,也就越能够保证数据的恢复。如果 ZXID 相同,则 SID 越大机会越大。

3. Leader选举实现细节

3.1 服务器状态

服务器具有四种状态,分别是 LOOKING、FOLLOWING、LEADING、OBSERVING。

  • LOOKING:寻找 Leader 状态。当服务器处于该状态时,它会认为当前集群中没有 Leader,因此需要进入 Leader 选举状态。
  • FOLLOWING:跟随者状态。表明当前服务器角色是 Follower。
  • LEADING:领导者状态。表明当前服务器角色是 Leader。
  • OBSERVING:观察者状态。表明当前服务器角色是 Observer。

3.2 投票数据结构

每个投票中包含了两个最基本的信息,所推举服务器的 SID 和 ZXID,投票(Vote)在Zookeeper中包含字段如下

  • id:被推举的 Leader 的 SID。
  • zxid:被推举的 Leader事务 ID。
  • electionEpoch:逻辑时钟,用来判断多个投票是否在同一轮选举周期中,该值在服务端是一个自增序列,每次进入新一轮的投票后,都会对该值进行加 1 操作(即投票数)。
  • peerEpoch:被推举的 Leader 的 epoch。
  • state:当前服务器的状态。

3.3 QuorumCnxManager:网络I/O

每台服务器在启动的过程中,会启动一个 QuorumPeerManager,负责各台服务器之间的底层 Leader 选举过程中的网络通信。

(1) 消息队列。

QuorumCnxManager 内部维护了一系列的队列,用来保存接收到的、待发送的消息以及消息的发送器。除接收队列以外,其他队列都按照 SID 分组形成队列集合,如一个集群中除了自身还有 3 台机器,那么就会为这 3 台机器分别创建一个发送队列,互不干扰。

  • recvQueue:消息接收队列,用于存放那些从其他服务器接收到的消息。
  • queueSendMap:消息发送队列,用于保存那些待发送的消息,按照 SID 进行分组。
  • senderWorkerMap:发送器集合,每个 SenderWorker 消息发送器,都对应一台远程 Zookeeper 服务器,负责消息的发送,也按照 SID 进行分组。
  • lastMessageSent:最近发送过的消息,为每个 SID 保留最近发送过的一个消息。

(2) 建立连接。

为了能够相互投票,Zookeeper 集群中的所有机器都需要两两建立起网络连接。QuorumCnxManager 在启动时会创建一个 ServerSocket 来监听 Leader 选举的通信端口(默认为3888)。开启监听后,Zookeeper 能够不断地接收到来自其他服务器的创建连接请求,在接收到其他服务器的 TCP 连接请求时,会进行处理。为了避免两台机器之间重复地创建 TCP 连接,Zookeeper 只允许 SID 大的服务器主动和其他机器建立连接,否则断开连接。在接收到创建连接请求后,服务器通过对比自己和远程服务器的 SID 值来判断是否接收连接请求,如果当前服务器发现自己的 SID 更大,那么会断开当前连接,然后自己主动和远程服务器建立连接。一旦连接建立,就会根据远程服务器的 SID 来创建相应的消息发送器 SendWorker 和消息接收器 RecvWorker,并启动。

(3) 消息接收与发送。

消息接收:由消息接收器 RecvWorker 负责,由于 Zookeeper 为每个远程服务器都分配一个单独的 RecvWorker,因此,每个 RecvWorker 只需要不断地从这个 TCP 连接中读取消息,并将其保存到 recvQueue 队列中。消息发送:由于 Zookeeper 为每个远程服务器都分配一个单独的 SendWorker,因此,每个 SendWorker 只需要不断地从对应的消息发送队列中获取出一个消息发送即可,同时将这个消息放入 lastMessageSent中。在SendWorker中,一旦Zookeeper发现针对当前服务器的消息发送队列为空,那么此时需要从lastMessageSent 中取出一个最近发送过的消息来进行再次发送,这是为了解决接收方在消息接收前或者接收到消息后服务器挂了,导致消息尚未被正确处理。同时,Zookeeper能够保证接收方在处理消息时,会对重复消息进行正确的处理。

4. FastLeaderElection:选举算法核心

  • 外部投票:特指其他服务器发来的投票。
  • 内部投票:服务器自身当前的投票。
  • 选举轮次:Zookeeper 服务器 Leader 选举的轮次,即 logicalclock。
  • PK:对内部投票和外部投票进行对比来确定是否需要变更内部投票。

(1) 选票管理

  • sendqueue:选票发送队列,用于保存待发送的选票。
  • recvqueue:选票接收队列,用于保存接收到的外部投票。
  • WorkerReceiver:选票接收器。其会不断地从 QuorumCnxManager 中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到 recvqueue 中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票。
  • WorkerSender:选票发送器,不断地从 sendqueue 中获取待发送的选票,并将其传递到底层 QuorumCnxManager 中。

(2) 算法核心

image.png

上图展示了FastLeaderElection模块是如何与底层网络I/O进行交互的。Leader选举的基本流程如下

  1. 自增选举轮次。Zookeeper 规定所有有效的投票都必须在同一轮次中,在开始新一轮投票时,会首先对logicalclock 进行自增操作。
  2. 初始化选票。在开始进行新一轮投票之前,每个服务器都会初始化自身的选票,并且在初始化阶段,每台服务器都会将自己推举为Leader。
  3. 发送初始化选票。完成选票的初始化后,服务器就会发起第一次投票。Zookeeper 会将刚刚初始化好的选票放入sendqueue中,由发送器 WorkerSender 负责发送出去。
  4. 接收外部投票。每台服务器会不断地从 recvqueue 队列中获取外部选票。如果服务器发现无法获取到任何外部投票,那么就会立即确认自己是否和集群中其他服务器保持着有效的连接,如果没有连接,则马上建立连接,如果已经建立了连接,则再次发送自己当前的内部投票。
  5. 判断选举轮次。在发送完初始化选票之后,接着开始处理外部投票。在处理外部投票时,会根据选举轮次来进行不同的处理。
    1. 外部投票的选举轮次大于内部投票。若服务器自身的选举轮次落后于该外部投票对应服务器的选举轮次,那么就会立即更新自己的选举轮次(logicalclock),并且清空所有已经收到的投票,然后使用初始化的投票来进行PK以确定是否变更内部投票。最终再将内部投票发送出去。
    2. 外部投票的选举轮次小于内部投票。若服务器接收的外选票的选举轮次落后于自身的选举轮次,那么Zookeeper就会直接忽略该外部投票,不做任何处理,并返回步骤4。
    3. 外部投票的选举轮次等于内部投票。此时可以开始进行选票PK。
  6. 选票PK。在进行选票PK时,符合任意一个条件就需要变更投票。
    1. 若外部投票中推举的Leader服务器的选举轮次大于内部投票,那么需要变更投票。
    2. 若选举轮次一致,那么就对比两者的ZXID,若外部投票的ZXID大,那么需要变更投票。
    3. 若两者的ZXID一致,那么就对比两者的SID,若外部投票的SID大,那么就需要变更投票。
  7. 变更投票。经过PK后,若确定了外部投票优于内部投票,那么就变更投票,即使用外部投票的选票信息来覆盖内部投票,变更完成后,再次将这个变更后的内部投票发送出去。
  8. 选票归档。无论是否变更了投票,都会将刚刚收到的那份外部投票放入选票集合recvset中进行归档。recvset用于记录当前服务器在本轮次的Leader选举中收到的所有外部投票(按照服务队的SID区别,如{(1, vote1), (2, vote2)…})。
  9. 统计投票。完成选票归档后,就可以开始统计投票,统计投票是为了统计集群中是否已经有过半的服务器认可了当前的内部投票,如果确定已经有过半服务器认可了该投票,则终止投票。否则返回步骤4。
  10. 更新服务器状态。若已经确定可以终止投票,那么就开始更新服务器状态,服务器首选判断当前被过半服务器认可的投票所对应的Leader服务器是否是自己,若是自己,则将自己的服务器状态更新为LEADING,若不是,则根据具体情况来确定自己是FOLLOWING或是OBSERVING。

以上10个步骤就是 FastLeaderElection 的核心,其中步骤 4-9 会经过几轮循环,直到有 Leader 选举产生。