1. 场景

有一个向外提供的服务, 服务必须 7*24 小时提供服务, 不能有单点故障, 所以采用集群的方式, 采用 master、slave 的结构. 一台主机多台备机, 主机向外提供服务, 备机负责监听主机的状态, 一旦主机宕机, 备机要迅速接代主机继续向外提供服务, 从备机选择一台作为主机, 这就是 master 重新选举 / leader 重新选举;

2.Zookeeper 集群中的角色

如图:
Zookeeper的Leader选举 - 图1

Zookeeper的Leader选举 - 图2

说明:

①.ZooKeeper 集群的所有机器通过一个 Leader 选举过程来选定一台被称为[Leader]的机器, Leader 服务器为客户端提供读和写服务; ②.Follower 和 Observer 都能提供读服务, 不能提供写服务. 两者唯一的区别在于, Observer 机器不参与 Leader 选举过程, 也不参与写操作的[过半写成功]策略, 因此 Observer 可以在不影响写性能的情况下提升集群的读性能;

3.Zookeeper 工作原理

1>.Zookeeper 的核心是原子广播机制, 这个机制保证了各个 Server 之间的同步. 实现这个机制的协议叫做 Zab 协议.Zab 协议有两种模式, 它们分别是恢复模式 (选主) 和广播模式(同步);

2>. 当服务启动或者在领导者崩溃后, Zab 协议就进入了恢复模式, 当 Leader 被选举出来, 且大多数 Server 完成了和 Leader 的状态同步以后, 恢复模式就结束了. 状态同步保证了 Leader 和 Server 具有相同的系统状态;

3>. 一旦 Leader 已经和多数的 Follower 进行了状态同步后, 他就可以开始广播消息了, 即进入广播状态. 这时候当一个 Server 加入 Zookeeper 服务中, 它会在恢复模式下启动, 发现 Leader, 并和 Leader 进行状态同步. 待到同步结束, 它也参与消息广播. Zookeeper 服务一直维持在 Broadcast 状态, 直到 Leader 崩溃了或者 Leader 失去了大部分的 Followers 支持;

4>. 广播模式需要保证 Proposal 被按顺序处理, 因此 ZK 采用了递增的事务 id 号 (ZXID) 来保证所有的提议 (Proposal) 都在被提出的时候加上了 ZXID. 实现中 ZXID 是一个 64 位的数字, 它的高 32 位是 Epoch 用来标识 Leader 关系是否改变, 每次一个 Leader 被选出来, 它都会有一个新的 Epoch, 标识当前属于那个 Leader 的统治时期. 而低 32 位用于递增计数;

5>. 当 Leader 崩溃或者 Leader 失去大多数的 Follower, 这时候 ZK 进入恢复模式, 恢复模式需要重新选举出一个新的 Leader, 让所有的 Server 都恢复到一个正确的状态

6>.Leader 选举

①. 每个 Server 启动以后都询问其它的 Server 它要投票给谁; ②. 对于其他 Server 的询问, Server 每次根据自己的状态都回复自己推荐的 Leader 的 id 和上一次处理事务的 zxid(系统启动时每个 server 都会推荐自己); ③. 收到所有 Server 回复以后, 就计算出 zxid 最大的那个 Server, 并将这个 Server 相关信息设置成下一次要投票的 Server; ④. 计算这过程中获得票数最多的的 Server 为获胜者, 如果获胜者的票数超过半数, 则该 Server 被选为 Leader. 否则, 继续这个过程, 直到 Leader 被选举出来; ⑤.Leader 被选举出来之后就会开始等待 Server 连接; ⑥.Follower 连接 Leader, 将最大的 zxid 发送给 Leader; ⑦.Leader 根据 Follower 的 zxid 确定同步点, 进行状态同步; ⑧. 完成同步后通知 Follower 已经成为 uptodate 状态; ⑨.Follower 收到 uptodate 消息后, 又可以重新接受 client 的请求进行服务了;

7>. 每个 Zookeeper 服务在工作过程中有四种状态:

①.LOOKING: 竞选状态. 当前 Server 不知道 Leader 是谁, 正在搜寻;
②.LEADING: 领导者状态. 表明当前服务器角色是 Leader;
③.FOLLOWING: 随从状态. 表明当前服务器角色是 Follower, 同步 Leader 状态, 参与投票;
④.OBSERVING: 观察状态. 表明当前服务器角色是 Observer, 同步 Leader 状态, 不参与投票;

4.Zookeeper 的 Leader 选举原理分析

4.1.Leader 选举的前提条件

①. 只有服务器状态在 LOOKING(竞选状态) 状态才会去执行选举算法; ②.Zookeeper 的集群规模至少是 2 台机器, 才可以选举 Leader(这里以 5 台机器集群为例); ③. 当集群中只有一台服务器启动是不能选举的, 等第二台服务器启动后, 两台机器之间可以互相通信, 才可以进行 Leader 选举; ④. 服务器运行期间无法和 Leader 保持连接的时候, 也需要进行 Leader 选举;

4.2. 选举机制中的一些基本概念

1>.SID: 服务器 ID

即 myid, 表示服务器的编号; 编号越大在选择算法中的权重越大;

2>.ZXID:zookeeper 事务 id

zookeeper 状态的每一次改变, 都对应着一个递增的 Transaction ID, 该 id 称为 zxid, 他展示了所有的 zookeeper 的变更顺序. 由于 zxid 的递增性质, 如果 zxid1 小于 zxid2, 那么 zxid1 肯定先于 zxid2 发生

3>.Epoch: 逻辑时钟或者叫投票的次数

同一轮投票过程中的逻辑时钟值是相同的.每投完一次票这个数据就会增加, 然后与接收到的其它服务器返回的投票信息中的数值相比, 根据不同的值做出不同的判断; 如果收到低于当前轮次的投票结果, 该投票无效, 需更新到当前轮次和当前的投票结果;

4>.Vote: 投票

Leader 选举, 顾名思义必须通过投票来实现. 当集群中的机器发现自己无法检测到 Leader 机器的时候, 就会开始尝试进行投票;

5>.Quorum: 过半机器数

这是整个 Leader 选举算法中最重要的一个术语, 我们可以把这个术语理解为是一个量词, 指的是 ZooKeeper 集群中过半的机器数, 如果集群中总的机器数是 n 的话, 那么可以通过下面这个公式来计算 quorum 的值:
quorum = (n/2 + 1) 例如如果集群机器总数是 3, 那么 quorum 就是 2;

6>.state:server 选举状态

LOOKING: 竞选状态;
FOLLOWING: 随从状态, 同步 Leader 状态, 参与投票;
OBSERVING: 观察状态, 同步 Leader 状态, 不参与投票;
LEADING: 领导者状态;

4.3. 服务器启动时期的 Leader 选举

zookeeper 默认的算法是 FastLeaderElection. 采用投票数大于半数则胜出的逻辑

如图:
Zookeeper的Leader选举 - 图3

选举过程说明: 假设 zookeeper 集群有五台机器, 都是最新启动的没有历史数据

①. 服务器 1 启动, 读取自身的 zxid, 然后进行投票. 首先给自己投票, 然后再发出一个投票信息给集群中其他机器(投票信息包含两个最基本信息: 所选举 Leader 的 ServerID,Zxid); 由于其它机器还没有启动所以服务器 1 收不到反馈信息, 服务器 1 的状态一直属于 Looking 状态; ②. 服务器 2 启动, 首先给自己投票, 然后再发出一个投票信息给集群中其他机器; 服务器 1 收到投票信息后, 首先判断该投票的有效性 (如检查是否是本轮投票、是否来自 LOOKING 状态的服务器), 然后要处理投票 (对每一个投票, 服务器都需要将别人的投票和自己的投票进行 PK)
投票处理规则如下:

A. 优先检查 ZXID.ZXID 比较大的服务器优先作为 Leader;
B. 如果 ZXID 相同, 那么就比较所选举的 Leader 的 ServerID.ServerID 较大的服务器作为 Leader 服务器;

由于服务器 2 的所选举 Leader 的 ServerID 属性值大, 所以服务器 2 胜出, 之后服务器 1 更新自己的投票为 (2,0), 然后重新投票; 而对于服务器 2 而言, 其无须更新自己的投票, 只是再次向集群中所有机器发出上一次投票信息即可;每次投票后, 服务器都会统计投票信息, 判断是否已经有过半机器接受到相同的投票信息,但此时还没有超过半数机器接收到相同的投票信息, 所以两个服务器的状态依然是 LOOKING; ③. 服务器 3 启动, 先给自己投票, 然后再发出一个投票给集群中其他机器, 此时服务器 3 的投票为 (3,0); 服务器 1 和服务器 2 收到投票后, 首先判断该投票的有效性, 然后要处理投票, 由于服务器 3 的 myid 属性值最大, 所以服务器 3 胜出, 之后服务器 1 和服务器 2 更新自己的投票信息为 (3,0), 然后服务器经过统计发现有超过半数机器接收到相同的投票信息, 所以服务器 3 当选 Leader, 状态变成 LEADING; 服务器 1,2 的状态会变成 FOLLOWING; ④. 服务器 4 启动, 先给自己投票, 然后再发出一个投票给集群中其他机器, 但是其他服务器都不是 LOOKING 状态, 无法再参与投票, 尽管服务器 4 的 myid 属性值大, 但之前服务器 3 已经获得了 3 票, 服务器 4 会遵从半数原则 (/ 少数服从多数), 将自己的 1 票投给服务器 3, 而服务器 4 的状态会变成 FOLLOWING; ⑤. 服务器 5 启动, 先给自己投票, 然后再发出一个投票给集群中其他机器, 但是其他服务器都不是 LOOKING 状态, 无法再参与投票, 尽管服务器 5 的 myid 属性值大, 但之前服务器 3 已经获得了 4 票, 服务器 5 会遵从半数原则 (/ 少数服从多数), 将自己的 1 票投给服务器 3, 而服务器 5 的状态会变成 FOLLOWING; ⑥. 等到所有的服务器都启动完成, 服务器 3 会成为 Leader, 其他的服务器会变成 Follower; * 注意:

  • 集群中每个 Server 启动时要先进行 Leader 选举, 先投自己一票, 然后在发出一个投票信息给集群中的其他 Server;
  • 每个 Server 在接收到集群中其他机器的投票信息之后, 首先判断该投票的有效性 (如检查是否是本轮投票、是否来自 LOOKING 状态的服务器), 然后要处理投票 (对每一个投票, 服务器都需要将别人的投票和自己的投票进行 PK);
  • 每次投票后, 服务器都会统计投票信息, 判断是否已经有过半机器接受到相同的投票信息;
  • 一旦确定了 Leader, 每个服务器就会更新自己的状态, 如果是 Follower, 那么就变更为 FOLLOWING, 如果是 Leader, 就变更为 LEADING;

4.4.Zookeeper 在运行期的 Leader 选举

在 Zookeeper 运行期间, Leader 与非 Leader 服务器各司其职, 即便当有非 Leader 服务器宕机或新加入, 此时也不会影响 Leader, 但是一旦 Leader 服务器挂了, 那么整个集群将暂停对外服务, 进入新一轮 Leader 选举, 其过程和启动时期的 Leader 选举过程基本一致. 假设正在运行的有 Server1、Server2、Server3 三台服务器, 当前 Leader 是 Server2, 若某一时刻 Leader 挂了, 此时便开始 Leader 选举. 选举过程如下:

①.Leader 宕机后, 余下的非 Observer 服务器都会将自己的服务器状态变更为 LOOKING, 然后开始进入 Leader 选举过程; ②. 每个 Server 会发出一个投票. 在运行期间, 每个服务器上的 ZXID 可能不同, 此时假定 Server1 的 ZXID 为 123,Server3 的 ZXID 为 122; 在第一轮投票中, Server1 和 Server3 都会投自己, 然后各自将投票发送给集群中所有机器; ③. 每个 Server 会接收来自各个服务器的投票. 与启动时过程相同; ④. 处理投票. 与启动时过程相同, 此时, Server1 将会成为 Leader; ⑤. 统计投票. 与启动时过程相同; ⑥. 改变服务器的状态. 与启动时过程相同;

4.5.Leader 选举过程中的一些细节

1>. 每个投票中包含了两个最基本的信息: 所推举服务器的 SID 和 ZXID, 投票 (Vote) 在 Zookeeper 中包含字段如下:

①.id: 被推举的 Leader 的 SID; ②.zxid:被推举的 Leader 事务 ID; ③.electionEpoch: 逻辑时钟, 用来判断多个投票是否在同一轮选举周期中, 该值在服务端是一个自增序列, 每次进入新一轮的投票后, 都会对该值进行加 1 操作; ④.peerEpoch: 被推举的 Leader 的 epoch; ⑤.state: 当前服务器的状态;

2>. 选举 Leader 过程中集群各个节点在接收到其他节点的投票信息时首先要判断本次投票的有效性, 具体流程如下:

①. 如果服务器 B 接收到服务器 A 的数据[服务器 A 处于选举状态 (LOOKING 状态)]

A. 首先, 判断逻辑时钟值: 1>>.如果发送过来的逻辑时钟 Epoch 大于目前的逻辑时钟.首先, 更新本逻辑时钟 Epoch, 同时清空本轮逻辑时钟收集到的来自其他 Server 的选举数据. 然后判断是否需要更新当前自己的选举 Leader 的 ServerID.[判断规则 rules judging: 使用保存的 zxid 最大值和 Leader 的 ServerID 来进行判断的. 先看数据 zxid, 数据 zxid 大者胜出; 其次再判断 Leader 的 ServerID,Leader 的 ServerID 大者胜出;]然后再将自身最新的选举结果 (也就是上面提到的几种数据(被推举 Leader 的 ServerID,Zxid,Epoch) 广播给其他 Server); 2>>.如果发送过来的逻辑时钟 Epoch 小于目前的逻辑时钟.说明对方 Server 在一个相对较早的 Epoch 中, 这里只需要将本机的几种数据 (被推举 Leader 的 ServerID,Zxid,Epoch) 发送过去就行; 3>>.如果发送过来的逻辑时钟 Epoch 等于目前的逻辑时钟.再根据上述判断规则 rules judging 来选举 Leader, 然后再将自身最新的选举结果 (也就是上面提到的几种数据(被推举 Leader 的 ServerID,Zxid,Epoch) 广播给其他 Server);

B. 其次, 判断服务器是不是已经收集到了所有服务器的选举状态: 若是, 根据选举结果设置自己的角色 (FOLLOWING 还是 LEADER), 退出选举过程就是了; C. 最后, 若没有收到没有收集到所有服务器的选举状态: 也可以判断一下根据以上过程之后最新的选举 Leader 是不是得到了超过半数以上服务器的支持, 如果是, 那么尝试在 200ms 内接收一下数据, 如果没有新的数据到来, 说明大家都已经默认了这个结果, 同样也设置角色退出选举过程;

②. 如果所接收的服务器 A 处在其它状态 (FOLLOWING 或者 LEADING)

A. 如果逻辑时钟 Epoch 等于目前的逻辑时钟, 将该数据保存到 recvset. 此时 Server 已经处于 LEADING 状态, 说明此时这个 Server 已经投票选出结果. 若此时这个接收服务器宣称自己是 Leader, 那么将判断是不是有半数以上的服务器选举它, 如果是, 则设置选举状态, 然后退出选举过程; B. 否则这是一条与当前逻辑时钟不符合的投票消息, 那么说明在另一个选举过程中已经有了选举结果, 于是将该选举结果加入到 outofelection 集合中, 再根据 outofelection 来判断是否可以结束选举, 如果可以也是保存逻辑时钟, 设置选举状态, 退出选举过程;

3>.QuorumCnxManager: 网络 I/O
每台服务器在启动的过程中, 会启动一个 QuorumPeerManager, 负责各台服务器之间的底层 Leader 选举过程中的网络通信

①. 消息队列
QuorumCnxManager 内部维护了一系列的队列, 用来保存接收到的、待发送的消息以及消息的发送器, 除接收队列以外, 其他队列都按照 SID 分组形成队列集合, 如一个集群中除了自身还有 3 台机器, 那么就会为这 3 台机器分别创建一个发送队列, 互不干扰;

1>>.recvQueue: 消息接收队列, 用于存放那些从其他服务器接收到的消息; 2>>.queueSendMap: 消息发送队列, 用于保存那些待发送的消息, 按照 SID 进行分组; 3>>.senderWorkerMap: 发送器集合, 每个 SenderWorker 消息发送器, 都对应一台远程 Zookeeper 服务器, 负责消息的发送, 也按照 SID 进行分组; 4>>.lastMessageSent: 最近发送过的消息, 为每个 SID 保留最近发送过的一个消息;

②. 为了能够相互投票, Zookeeper 集群中的所有机器都需要两两建立起网络连接. QuorumCnxManager 在启动时会创建一个 ServerSocket 来监听 Leader 选举的通信端口 (默认为 3888). 开启监听后, Zookeeper 能够不断地接收到来自其他服务器的创建连接请求, 在接收到其他服务器的 TCP 连接请求时会进行处理. 为了避免两台机器之间重复地创建 TCP 连接, Zookeeper 只允许 SID 大的服务器主动和其他机器建立连接, 否则断开连接. 在接收到创建连接请求后, 服务器通过对比自己和远程服务器的 SID 值来判断是否接收连接请求, 如果当前服务器发现自己的 SID 更大, 那么会断开当前连接, 然后自己主动和远程服务器建立连接. 一旦连接建立, 就会根据远程服务器的 SID 来创建相应的消息发送器 SendWorker 和消息接收器 RecvWorker, 并启动; ③. 消息接收与消息发送

1>>. 消息接收 由消息接收器 RecvWorker 负责, 由于 Zookeeper 为每个远程服务器都分配一个单独的 RecvWorker, 因此, 每个 RecvWorker 只需要不断地从这个 TCP 连接中读取消息, 并将其保存到 recvQueue 队列中; 2>>. 消息发送 由于 Zookeeper 为每个远程服务器都分配一个单独的 SendWorker, 因此, 每个 SendWorker 只需要不断地从对应的消息发送队列中获取出一个消息发送即可, 同时将这个消息放入 lastMessageSent 中. 在 SendWorker 中, 一旦 Zookeeper 发现针对当前服务器的消息发送队列为空, 那么此时需要从 lastMessageSent 中取出一个最近发送过的消息来进行再次发送, 这是为了解决接收方在消息接收前或者接收到消息后服务器挂了, 导致消息尚未被正确处理. 同时, Zookeeper 能够保证接收方在处理消息时, 会对重复消息进行正确的处理;

4.6.Leader 选举算法分析

1>. 在 ZooKeeper 中, 提供了三种 Leader 选举的算法, 分别是:

①.1 对应: LeaderElection 算法;
②.2 对应: AuthFastLeaderElection 算法;
③.3 对应: FastLeaderElection 默认的算法; 注意: 可以通过 zoo.cfg 配置文件中的 electionAlg 属性指定 (1-3)!

在 3.4.x 版本之后的 Zookeeper 只保留了 TCP 版本的 FastLeaderElection 选举算法.当一台机器进入 Leader 选举时当前集群可能会处于以下两种状态:

①. 集群中已存在 Leader;
②. 集群中不存在 Leader;

2>. 对于集群中已经存在 Leader 而言, 此种情况一般都是某台机器启动得较晚, 在其启动之前, 集群已经在正常工作, 对这种情况, 该机器试图去选举 Leader 时, 会被告知当前服务器的 Leader 信息, 对于该机器而言, 仅仅需要和 Leader 机器建立起连接, 并进行状态同步即可;

3>. 而对于集群中不存在 Leader 情况下则会相对复杂, 其步骤如下:

①.第一次投票.无论哪种 (启动时 / 运行期) 导致进行 Leader 选举, 集群的所有机器都处于试图选举出一个 Leader 的状态, 即 LOOKING 状态, 处于 LOOKING 状态的机器会向所有其他机器发送消息, 该消息称为投票. 投票中包含了 SID(服务器的唯一标识)和 ZXID(事务 ID)两个最基本信息, 用 (SID, ZXID) 形式来标识一次投票信息. 假定 Zookeeper 由 5 台机器组成, SID 分别为 1、2、3、4、5,ZXID 分别为 9、9、9、8、8, 并且此时 SID 为 2 的机器是 Leader 机器. 某一时刻, 1、2 所在机器出现故障, 因此集群开始进行 Leader 选举, 在第一次投票时, 每台机器都会将自己作为投票对象, 于是 SID 为 3、4、5 的机器投票情况分别为(3, 9),(4, 8),(5, 8); ②.变更投票.每台机器发出投票后, 也会收到其他机器的投票, 每台机器会根据一定规则来处理收到的其他机器的投票, 并以此来决定是否需要变更自己的投票, 这个规则也是整个 Leader 选举算法的核心所在, 其中术语描述如下:

1>>.vote_sid: 接收到的投票中所推举 Leader 服务器的 SID; 2>>.vote_zxid: 接收到的投票中所推举 Leader 服务器的 ZXID; 3>>.self_sid: 当前服务器自己的 SID; 4>>.self_zxid: 当前服务器自己的 ZXID;

每次对收到的投票的处理, 都是对 (vote_sid, vote_zxid) 和 (self_sid, self_zxid) 对比的过程:

1>>. 规则一: 如果 vote_zxid 大于 self_zxid, 就认可当前收到的投票, 并再次将该投票发送出去; 2>>. 规则二: 如果 vote_zxid 小于 self_zxid, 那么坚持自己的投票, 不做任何变更: 3>>. 规则三: 如果 vote_zxid 等于 self_zxid, 那么就对比两者的 SID, 如果 vote_sid 大于 self_sid, 那么就认可当前收到的投票, 并再次将该投票发送出去; 4>>. 规则四: 如果 vote_zxid 等于 self_zxid, 并且 vote_sid 小于 self_sid, 那么坚持自己的投票, 不做任何变更;

结合上面规则, 给出下面的集群变更过程:
Zookeeper的Leader选举 - 图4 ③.确定 Leader.经过第二轮投票后, 集群中的每台机器都会再次接收到其他机器的投票, 然后开始统计投票, 如果一台机器收到了超过半数的相同投票, 那么这个投票对应的 SID 机器即为 Leader. 此时 Server3 将成为 Leader; 由上面规则可知, 通常那台服务器上的数据越新 (ZXID 会越大), 其成为 Leader 的可能性越大, 也就越能够保证数据的恢复. 如果 ZXID 相同, 则 SID 越大机会越大;

4.7.Leader 选举源码分析

4.7.1.QuorumPeer 类

主要看这个类,只有 LOOKING 状态才会去执行选举算法.每个服务器在启动时都会选择自己做为领导, 然后将投票信息发送出去, 循环一直到选举出领导为止;

  1. @Override
  2. public void run() {
  3. updateThreadName();
  4. LOG.debug("Starting quorum peer");
  5. try {
  6. jmxQuorumBean = new QuorumBean(this);
  7. MBeanRegistry.getInstance().register(jmxQuorumBean, null);
  8. for(QuorumServer s: getView().values()){
  9. ZKMBeanInfo p;
  10. if (getId() == s.id) {
  11. p = jmxLocalPeerBean = new LocalPeerBean(this);
  12. try {
  13. MBeanRegistry.getInstance().register(p, jmxQuorumBean);
  14. } catch (Exception e) {
  15. LOG.warn("Failed to register with JMX", e);
  16. jmxLocalPeerBean = null;
  17. }
  18. } else {
  19. RemotePeerBean rBean = new RemotePeerBean(this, s);
  20. try {
  21. MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
  22. jmxRemotePeerBean.put(s.id, rBean);
  23. } catch (Exception e) {
  24. LOG.warn("Failed to register with JMX", e);
  25. }
  26. }
  27. }
  28. } catch (Exception e) {
  29. LOG.warn("Failed to register with JMX", e);
  30. jmxQuorumBean = null;
  31. }
  32. try {
  33. while (running) {
  34. switch (getPeerState()) {
  35. case LOOKING:
  36. LOG.info("LOOKING");
  37. ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
  38. if (Boolean.getBoolean("readonlymode.enabled")) {
  39. LOG.info("Attempting to start ReadOnlyZooKeeperServer");
  40. final ReadOnlyZooKeeperServer roZk =
  41. new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
  42. Thread roZkMgr = new Thread() {
  43. public void run() {
  44. try {
  45. sleep(Math.max(2000, tickTime));
  46. if (ServerState.LOOKING.equals(getPeerState())) {
  47. roZk.startup();
  48. }
  49. } catch (InterruptedException e) {
  50. LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
  51. } catch (Exception e) {
  52. LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
  53. }
  54. }
  55. };
  56. try {
  57. roZkMgr.start();
  58. reconfigFlagClear();
  59. if (shuttingDownLE) {
  60. shuttingDownLE = false;
  61. startLeaderElection();
  62. }
  63. setCurrentVote(makeLEStrategy().lookForLeader());
  64. } catch (Exception e) {
  65. LOG.warn("Unexpected exception", e);
  66. setPeerState(ServerState.LOOKING);
  67. } finally {
  68. roZkMgr.interrupt();
  69. roZk.shutdown();
  70. }
  71. } else {
  72. try {
  73. reconfigFlagClear();
  74. if (shuttingDownLE) {
  75. shuttingDownLE = false;
  76. startLeaderElection();
  77. }
  78. setCurrentVote(makeLEStrategy().lookForLeader());
  79. } catch (Exception e) {
  80. LOG.warn("Unexpected exception", e);
  81. setPeerState(ServerState.LOOKING);
  82. }
  83. }
  84. break;
  85. case OBSERVING:
  86. try {
  87. LOG.info("OBSERVING");
  88. setObserver(makeObserver(logFactory));
  89. observer.observeLeader();
  90. } catch (Exception e) {
  91. LOG.warn("Unexpected exception",e );
  92. } finally {
  93. observer.shutdown();
  94. setObserver(null);
  95. updateServerState();
  96. if (isRunning()) {
  97. Observer.waitForObserverElectionDelay();
  98. }
  99. }
  100. break;
  101. case FOLLOWING:
  102. try {
  103. LOG.info("FOLLOWING");
  104. setFollower(makeFollower(logFactory));
  105. follower.followLeader();
  106. } catch (Exception e) {
  107. LOG.warn("Unexpected exception",e);
  108. } finally {
  109. follower.shutdown();
  110. setFollower(null);
  111. updateServerState();
  112. }
  113. break;
  114. case LEADING:
  115. LOG.info("LEADING");
  116. try {
  117. setLeader(makeLeader(logFactory));
  118. leader.lead();
  119. setLeader(null);
  120. } catch (Exception e) {
  121. LOG.warn("Unexpected exception",e);
  122. } finally {
  123. if (leader != null) {
  124. leader.shutdown("Forcing shutdown");
  125. setLeader(null);
  126. }
  127. updateServerState();
  128. }
  129. break;
  130. }
  131. }
  132. } finally {
  133. LOG.warn("QuorumPeer main thread exited");
  134. MBeanRegistry instance = MBeanRegistry.getInstance();
  135. instance.unregister(jmxQuorumBean);
  136. instance.unregister(jmxLocalPeerBean);
  137. for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
  138. instance.unregister(remotePeerBean);
  139. }
  140. jmxQuorumBean = null;
  141. jmxLocalPeerBean = null;
  142. jmxRemotePeerBean = null;
  143. }
  144. }

说明:

核心逻辑在 while 循环中, 判断节点的状态, 分为 LOOKING 、OBSERVING 、FOLLOWING 、LEADING, 当某个 QuorumPeer 刚启动时, 状态为 LOOKING, 启动线程将 zk 节点启动, 然后进行 leader 选举, 这是 zookeeper 的选举算法的核心, leader 的选举在org.apache.zookeeper.server.quorum.FastLeaderElection 的 lookForLeader 方法中

4.7.2.FastLeaderElection 算法

它是 zookeeper 默认提供的选举算法, 默认是采用投票数大于半数则胜出的逻辑,核心方法如下:

1>. 流程图:
Zookeeper的Leader选举 - 图5

2>. 源码:FastLeaderElection 的 lookForLeader() 方法

  1. public Vote lookForLeader() throws InterruptedException {
  2. try {
  3. self.jmxLeaderElectionBean = new LeaderElectionBean();
  4. MBeanRegistry.getInstance().register(
  5. self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
  6. } catch (Exception e) {
  7. LOG.warn("Failed to register with JMX", e);
  8. self.jmxLeaderElectionBean = null;
  9. }
  10. self.start_fle = Time.currentElapsedTime();
  11. try {
  12. Map<Long, Vote> recvset = new HashMap<Long, Vote>();
  13. Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
  14. int notTimeout = minNotificationInterval;
  15. synchronized(this){
  16. logicalclock.incrementAndGet();
  17. updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
  18. }
  19. LOG.info("New election. My id = " + self.getId() +
  20. ", proposed zxid=0x" + Long.toHexString(proposedZxid));
  21. sendNotifications();
  22. SyncedLearnerTracker voteSet;
  23. while ((self.getPeerState() == ServerState.LOOKING) &&
  24. (!stop)){
  25. Notification n = recvqueue.poll(notTimeout,
  26. TimeUnit.MILLISECONDS);
  27. if(n == null){
  28. if(manager.haveDelivered()){
  29. sendNotifications();
  30. } else {
  31. manager.connectAll();
  32. }
  33. int tmpTimeOut = notTimeout*2;
  34. notTimeout = (tmpTimeOut < maxNotificationInterval?
  35. tmpTimeOut : maxNotificationInterval);
  36. LOG.info("Notification time out: " + notTimeout);
  37. }
  38. else if (validVoter(n.sid) && validVoter(n.leader)) {
  39. switch (n.state) {
  40. case LOOKING:
  41. if (getInitLastLoggedZxid() == -1) {
  42. LOG.debug("Ignoring notification as our zxid is -1");
  43. break;
  44. }
  45. if (n.zxid == -1) {
  46. LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
  47. break;
  48. }
  49. if (n.electionEpoch > logicalclock.get()) {
  50. logicalclock.set(n.electionEpoch);
  51. recvset.clear();
  52. if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
  53. getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
  54. updateProposal(n.leader, n.zxid, n.peerEpoch);
  55. } else {
  56. updateProposal(getInitId(),
  57. getInitLastLoggedZxid(),
  58. getPeerEpoch());
  59. }
  60. sendNotifications();
  61. } else if (n.electionEpoch < logicalclock.get()) {
  62. if(LOG.isDebugEnabled()){
  63. LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
  64. + Long.toHexString(n.electionEpoch)
  65. + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
  66. }
  67. break;
  68. } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
  69. proposedLeader, proposedZxid, proposedEpoch)) {
  70. updateProposal(n.leader, n.zxid, n.peerEpoch);
  71. sendNotifications();
  72. }
  73. if(LOG.isDebugEnabled()){
  74. LOG.debug("Adding vote: from=" + n.sid +
  75. ", proposed leader=" + n.leader +
  76. ", proposed zxid=0x" + Long.toHexString(n.zxid) +
  77. ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
  78. }
  79. recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
  80. voteSet = getVoteTracker(
  81. recvset, new Vote(proposedLeader, proposedZxid,
  82. logicalclock.get(), proposedEpoch));
  83. if (voteSet.hasAllQuorums()) {
  84. while((n = recvqueue.poll(finalizeWait,
  85. TimeUnit.MILLISECONDS)) != null){
  86. if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
  87. proposedLeader, proposedZxid, proposedEpoch)){
  88. recvqueue.put(n);
  89. break;
  90. }
  91. }
  92. if (n == null) {
  93. setPeerState(proposedLeader, voteSet);
  94. Vote endVote = new Vote(proposedLeader,
  95. proposedZxid, logicalclock.get(),
  96. proposedEpoch);
  97. leaveInstance(endVote);
  98. return endVote;
  99. }
  100. }
  101. break;
  102. case OBSERVING:
  103. LOG.debug("Notification from observer: {}", n.sid);
  104. break;
  105. case FOLLOWING:
  106. case LEADING:
  107. if(n.electionEpoch == logicalclock.get()){
  108. recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
  109. voteSet = getVoteTracker(recvset, new Vote(n.version,
  110. n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
  111. if (voteSet.hasAllQuorums() &&
  112. checkLeader(outofelection, n.leader, n.electionEpoch)) {
  113. setPeerState(n.leader, voteSet);
  114. Vote endVote = new Vote(n.leader,
  115. n.zxid, n.electionEpoch, n.peerEpoch);
  116. leaveInstance(endVote);
  117. return endVote;
  118. }
  119. }
  120. outofelection.put(n.sid, new Vote(n.version, n.leader,
  121. n.zxid, n.electionEpoch, n.peerEpoch, n.state));
  122. voteSet = getVoteTracker(outofelection, new Vote(n.version,
  123. n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
  124. if (voteSet.hasAllQuorums() &&
  125. checkLeader(outofelection, n.leader, n.electionEpoch)) {
  126. synchronized(this){
  127. logicalclock.set(n.electionEpoch);
  128. setPeerState(n.leader, voteSet);
  129. }
  130. Vote endVote = new Vote(n.leader, n.zxid,
  131. n.electionEpoch, n.peerEpoch);
  132. leaveInstance(endVote);
  133. return endVote;
  134. }
  135. break;
  136. default:
  137. LOG.warn("Notification state unrecoginized: " + n.state
  138. + " (n.state), " + n.sid + " (n.sid)");
  139. break;
  140. }
  141. } else {
  142. if (!validVoter(n.leader)) {
  143. LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
  144. }
  145. if (!validVoter(n.sid)) {
  146. LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
  147. }
  148. }
  149. }
  150. return null;
  151. } finally {
  152. try {
  153. if(self.jmxLeaderElectionBean != null){
  154. MBeanRegistry.getInstance().unregister(
  155. self.jmxLeaderElectionBean);
  156. }
  157. } catch (Exception e) {
  158. LOG.warn("Failed to unregister with JMX", e);
  159. }
  160. self.jmxLeaderElectionBean = null;
  161. LOG.debug("Number of connection processing threads: {}",
  162. manager.getConnectionThreadCount());
  163. }
  164. }

3>. 源码:FastLeaderElection 的 sendNotifications() 方法

  1. private void sendNotifications() {
  2. for (long sid : self.getCurrentAndNextConfigVoters()) {
  3. QuorumVerifier qv = self.getQuorumVerifier();
  4. ToSend notmsg = new ToSend(ToSend.mType.notification,
  5. proposedLeader,
  6. proposedZxid,
  7. logicalclock.get(),
  8. QuorumPeer.ServerState.LOOKING,
  9. sid,
  10. proposedEpoch, qv.toString().getBytes());
  11. if(LOG.isDebugEnabled()){
  12. LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
  13. Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
  14. " (n.round), " + sid + " (recipient), " + self.getId() +
  15. " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
  16. }
  17. sendqueue.offer(notmsg);
  18. }
  19. }

4.7.3.FastLeaderElection 算法说明

1>. 术语

①. 外部投票: 特指其他服务器发来的投票;
②. 内部投票: 服务器自身当前的投票;
③. 选举轮次: Zookeeper 服务器 Leader 选举的轮次, 即 logicalclock;
④.PK: 对内部投票和外部投票进行对比来确定是否需要变更内部投票;

2>. 选票管理

①.sendqueue: 选票发送队列, 用于保存待发送的选票; ②.recvqueue: 选票接收队列, 用于保存接收到的外部投票; ③.WorkerReceiver: 选票接收器, 他会不断地从 QuorumCnxManager 中获取其他服务器发来的选举消息, 并将其转换成一个选票, 然后保存到 recvqueue 中, 在选票接收过程中, 如果发现该外部选票的选举轮次小于当前服务器的, 那么忽略该外部投票, 同时立即发送自己的内部投票; ④.WorkerSender: 选票发送器, 不断地从 sendqueue 中获取待发送的选票, 并将其传递到底层 QuorumCnxManager 中;

3>. 算法核心
下图展示了 FastLeaderElection 模块是如何与底层网络 I/O 进行交互的
Zookeeper的Leader选举 - 图6

4>.Leader 选举的基本流程如下:

①.自增选举轮次.Zookeeper 规定所有有效的投票都必须在同一轮次中, 在开始新一轮投票时, 会首先对 logicalclock 进行自增操作; ②.初始化选票.在开始进行新一轮投票之前, 每个服务器都会初始化自身的选票, 并且在初始化阶段, 每台服务器都会将自己推举为 Leader; ③.发送初始化选票.完成选票的初始化后, 服务器就会发起第一次投票. Zookeeper 会将刚刚初始化好的选票放入 sendqueue 中, 由发送器 WorkerSender 负责发送出去; ④.接收外部投票.每台服务器会不断地从 recvqueue 队列中获取外部选票. 如果服务器发现无法获取到任何外部投票, 那么就会立即确认自己是否和集群中其他服务器保持着有效的连接, 如果没有连接, 则马上建立连接; 如果已经建立了连接, 则再次发送自己当前的内部投票; ⑤.判断选举轮次.在发送完初始化选票之后, 接着开始处理外部投票. 在处理外部投票时, 会根据选举轮次来进行不同的处理:

1>>.外部投票的选举轮次大于内部投票.若服务器自身的选举轮次落后于该外部投票对应服务器的选举轮次, 那么就会立即更新自己的选举轮次 (logicalclock), 并且清空所有已经收到的投票, 然后使用初始化的投票来进行 PK 以确定是否变更内部投票. 最终再将内部投票发送出去; 2>>.外部投票的选举轮次小于内部投票.若服务器接收的外选票的选举轮次落后于自身的选举轮次, 那么 Zookeeper 就会直接忽略该外部投票, 不做任何处理, 并返回步骤 “④”; 3>>.外部投票的选举轮次等于内部投票.此时可以开始进行选票 PK;

⑥.选票 PK.在进行选票 PK 时. 符合任意一个条件就需要变更投票

1>>. 若外部投票中推举的 Leader 服务器的选举轮次大于内部投票, 那么需要变更投票; 2>>. 若选举轮次一致, 那么就对比两者的 ZXID, 若外部投票的 ZXID 大, 那么需要变更投票; 3>>. 若两者的 ZXID 一致, 那么就对比两者的 SID, 若外部投票的 SID 大, 那么就需要变更投票;

⑦.变更投票.经过 PK 后, 若确定了外部投票优于内部投票, 那么就变更投票, 即使用外部投票的选票信息来覆盖内部投票, 变更完成后, 再次将这个变更后的内部投票发送出去; ⑧.选票归档.无论是否变更了投票, 都会将刚刚收到的那份外部投票放入选票集合 recvset 中进行归档. recvset 用于记录当前服务器在本轮次的 Leader 选举中收到的所有外部投票 (按照服务队的 SID 区分, 如 {(1, vote1), (2, vote2)…}); ⑨.统计投票.完成选票归档后, 就可以开始统计投票, 统计投票是为了统计集群中是否已经有过半的服务器认可了当前的内部投票, 如果确定已经有过半服务器认可了该投票, 则终止投票. 否则返回步骤 “④”; ⑩.更新服务器状态.若已经确定可以终止投票, 那么就开始更新服务器状态, 服务器首先判断当前被过半服务器认可的投票所对应的 Leader 服务器是否是自己, 若是自己, 则将自己的服务器状态更新为 LEADING; 若不是, 则根据具体情况来确定自己是 FOLLOWING 或是 OBSERVING;

5. 数据一致性与 paxos 算法

5.1. 如何保持数据的一致性

这里有个原则就是:

一个分布式数据库系统中, 如果各节点的初始状态一致, 每个节点都执行相同的操作序列, 那么他们最后能得到一个一致的状态;

5.2.Paxos 算法解决的什么问题呢?

解决的就是保证每个节点执行相同的操作序列. master 维护一个全局写队列, 所有写操作都必须放入这个队列编号, 那么无论我们写多少个节点, 只要写操作是按编号来的, 就能保证一致性.

如果 master 挂了呢?

Paxos 算法通过投票来对写操作进行全局编号, 同一时刻, 只有一个写操作被批准, 同时并发的写操作要去争取选票,只有获得过半数选票的写操作才会被批准 (所以永远只会有一个写操作得到批准),其他的写操作竞争失败只好再发起一轮投票, 就这样, 在一轮又一轮的投票中, 所有写操作都被严格编号排序. 编号严格递增, 当一个节点接受了一个编号为 100 的写操作, 之后又接受到编号为 99 的写操作 (因为网络延迟等很多不可预见原因), 它马上能意识到自己数据不一致了, 自动停止对外服务并重启同步过程. 任何一个节点挂掉都不会影响整个集群的数据一致性 (总 2n+1 台, 除非挂掉大于 n 台)

6. 为什么 Zookeeper 集群的数目一般为奇数个?

1>.Leader 选举算法采用了 Paxos 协议;

2>.Paxos 核心思想:当多数 Server 写成功, 则任务数据写成功;如果有 3 个 Server, 则两个写成功即可; 如果有 4 或 5 个 Server, 则三个写成功即可;

3>.Server 数目一般为奇数 (3、5、7) 如果有 3 个 Server, 则最多允许 1 个 Server 挂掉; 如果有 4 个 Server, 则同样最多允许 1 个 Server 挂掉; 由此我们看出 3 台服务器和 4 台服务器的的容灾能力是一样的, 所以为了节省服务器资源, 一般我们采用奇数个数, 作为服务器部署个数;

7.Zookeeper 的 Quorum 机制

①.zookeeper cluster 的节点数目必须是奇数;
②.zookeeper 集群中必须超过半数节点 (Majority) 可用, 整个集群才能对外可用;

7.1.ZooKeeper 提供了几种方式来认定整个集群是否可用

所谓整个集群是否可用, 隐含的一个意思就是整个集群还能够选举出一个 "Leader"!

①.Majority Quorums
②.Weight
③.Hierarchy of Groups

*注意: ZooKeeper 默认设置的是采用 Majority Qunroms 的方式来支持 Leader 选举.

7.2. 在 ZooKeeper 中 Quorums 有 2 个作用:

①. 集群中最少的节点数用来选举 / 认可 Leader 保证集群可用;
②. 通知客户端数据已经安全保存前集群中最少数量的节点数已经保存了该数据. 一旦这些节点保存了该数据, 客户端将被通知已经安全保存了, 可以继续其他任务. 而集群中剩余的节点将会最终也保存了该数据;

7.3. 采用 Quoroms 投票的方式来选举 Leader 主要是为了解决脑裂 (“Split-Brain”) 问题

1>.Split-Brain 问题说的是一个集群如果发生了网络故障, 很可能出现一个集群分成了两部分, 而这两个部分都不知道对方是否存活, 不知道到底是网络问题还是直接机器 down 了, 所以这两部分都要选举一个 Leader, 而一旦两部分都选出了 Leader, 并且网络又恢复了, 那么就会出现两个 Brain/Leader 的情况, 整个集群的行为不一致了; 所以集群要防止出现 Split-Brain 的问题出现,Quoroms 是一种方式, 即只有集群中超过半数节点投票 / 认可才能选举出 Leader; 这样的方式可以确保 leader 的唯一性, 要么选出唯一的一个 leader, 要么选举失败;

2>.ZooKeeper 默认采用了这种方式. 更广义地解决 Split-Brain 的问题, 一般有 3 种方式:

①.Quorums ②. 采用 Redundant Communications, 冗余通信的方式, 集群中采用多种通信方式, 防止一种通信方式失效导致集群中的节点无法通信; ③.Fencing, 共享资源的方式, 比如能看到共享资源就表示在集群中, 能够获得共享资源的锁的就是 Leader, 看不到共享资源的, 就不在集群中;

7.4. 节点数配置成奇数的集群的容忍度更高

比如 3 个节点的集群, Quorums = 2, 也就是说集群可以容忍 1 个节点失效, 这时候还能选举出 1 个 leader, 集群还可用;

比如 4 个节点的集群, 它的 Quorums = 3,Quorums 要超过 3, 相当于集群的容忍度还是 1, 如果 2 个节点失效, 那么整个集群还是无效的;

所以 4 个节点的集群的容忍度 = 3 个节点的集群的容忍度, 但是 4 个节点的集群多了 1 个节点, 相当于浪费了资源;

更极端的例子是 100 个节点的集群, 如果网络问题导致分为两个部分, 50 个节点和 50 个节点, 这样整个集群还是不可用的, 因为按照 Quorums 的方式必须 51 个节点才能保证选出 1 个 Leader. 这时候可以采用 Weight 加权的方式, 有些节点的权值高, 有些节点的权值低, 最后计算权值, 只要权值过半, 也能选出 1 个 Leader;

原文

https://blog.csdn.net/xp_xpxp/article/details/100568354