Master 节点周期性地向集群中的其他节点发送ping, 检查节点是否仍然活跃,如果发现集群中节点数量不足(法定过半),则放弃 master 身份,重新执行选主流程。其他节点也周期性地检查 master 是否活跃,如果得不到 master 的回复则该节点重新执行选主流程。

一、为什么使用主从模式

除了Leader/Follower模式外,另一种是DHT(分布式哈希表),可以支持每小时数千个节点的离开和加入,其可以在不了解底层网络拓扑的异构网络中工作,查询响应时间约为7左右,例如Cassandra(一套开源的NoSQL数据库系统)使用就是这种方案。但是在相对稳定的对等网络中,主从模式会更好。
Elasticsearch经典场景中的另一个简化是集群中没有那么多节点。通常节点的数量远远小于单个节点能够维护的练接数,并且网络环境不必经常处理节点的加入和离开。这就是为什么ES使用主从模式。
我们需要注意的是,所有分布式系统都需要某种方式来处理数据的一致性问题。一般情况下,有两种策略:

  1. 1. 试图避免不一致
  2. 1. 发生不一致之后进行协调

第二种的非常强大,但对于数据模型来说,又比较严格的限制。因此使用前者,主从模式;

二、选举算法

2.1 Bully算法

Leader选举算法之一。基本思想是,它假定所有节点都有一个唯一的ID,使用该ID对节点进行排序。任何时候当前Leader都是参与这个集群的最高的ID节点。优点是易于实现,但是当最大ID节点处于不稳定的状态下时,可能出现假死现象;例如:master负载大假死了,集群中第二大的ID节点选为新主,这是假死的master恢复,再次被选为新主,然后又假死…,如果不同节点之间网络终端,还可能出现脑裂现象。详细讲解,可以查看:https://www.yuque.com/docs/share/11106ca3-a029-46cd-8fea-e043da1e65ae?# 《10 Bully算法》

2.2 Paxos算法

paxos非常强大,在何时、如何进行选举等方面的灵活性比Bully有很多优势,详细讲解,可以查看:https://www.yuque.com/docs/share/57895e39-6c37-43a4-96e0-3be1313bba08?# 《08 Paxos 算法》

三、Elasticsearch选用Bully算法

Elasticsearch 集群中节点数量有限,单个节点能够处理和其他所有节点间的连接,集群中不会出现节点频繁加入和离开的情况,因此 Zen Discovery 中使用了实现比较简单的 Bully 算法。
使用 Bully 算法的集群中有一个 master 维护整个集群的状态信息,并定时向集群中其他节点推送集群状态的版本信息,如果直接使用 Bully 算法,会遇到下面几个问题,因此 Elasticsearch 做了特殊化的处理。

3.1 master节点假死

master 可能因负载过重而处于不稳定的状态,可能无法响应某些节点的请求,但短时间内可以恢复正常,为了避免频繁的选举,ES 中使用了推迟选举的方法,直到 master 失效来解决上述问题。只要当前主节点不挂掉,就不重新选主。但是这样容易产生脑裂,为此,再通过“法定得票人数过半”机制解决脑裂问题。

3.2 脑裂

考虑下面这种情况,原集群由 A、B、C、D、E 组成,主节点是 A。当两个交换机之间连接中断之后,A、B 不能再与C、D、E 进行通信,形成了两个网络分区。A、B 组成的集群中 master 仍然是 A,而C、D、E 会选出一个新的 master,客户端访问 A、B 或者高 C、D、E 时数据不一致。
image.png
image.png

ES采用了设置 “法定得票人数过半” 解决,在选举过程中当节点得票达到 discovery.zen.minimum_master_nodes 的值时才能成为 master,这个值通常设定为:
(master_eligible_node/2)+1 > quorum

discovery.zen.minimum_master_nodes**:最小主节点数,这是防止脑裂、防止数据丢失的及其重要的参数,实际上,它的作用不知表面的含义。除了在选主时用于决定“多数”, 还用于多处重要的判断,如:触发选主、决定master、gateway选举元信息、master发布集群状态

将上例中 discovery.zen.minimum_master_nodes 设置为3。当两个交换机之间连接中断之后,A不能再与C,D,E进行通信,C、D、E所组成的网络分区中不存在活跃的master,因此发起选举。A的集群中只剩下A和B,不足minimum_master_nodes,因此放弃 master 身份。C、D、E进行投票,直到达成一致选出一个 master,形成一个新的集群,其中节点数量为 3,刚好满足 minimum_master_nodes,而A、B不再处理来自客户端的请求。

四、选主过程

4.1 通过 ZenDiscovery::findMaster 确定临时master

该函数查找当前集群活跃Master,或者从候选者中选举新的Master。如果选主成功,则返回选定的Master,否则返回空。
为什么是临时Master?因为还需要等待下一个步骤,该节点的得票数足够时,才确立为真正的Master。
image.png
选举过程如下:

  1. ping所有节点,获取节点列表(fullPingResponses),ping节点不包含本节点,把本节点单独添加到节点列表中。
  2. 构建2个列表。
    1. activeMasters列表:储存集群当前活跃的Master列表,正常情况下只有一个。在这个过程中,如果配置了discovery.zen.master_election.ignore_non_master_pings为true,默认为false,而节点又不具备Master资格,则跳过该节点。在构建过程中,如果节点不具备Master资格,则通过ignore_non_master_pings选项忽略它认为的那个Master。
    2. masterCandidates列表:存储master候选者列表。遍历第一步获取的列表,去掉不具备master资格的节点,添加到这个列表中。
  3. 如果activeMasters为空,则从masterCandidates中选举,结果可能成功,也可能失败。如果activeMasters不为空,可以从中选择最合适的Master。

选举源码如下:

  1. private DiscoveryNode findMaster() {
  2. // 1. PING 所有节点,获取各节点保存的集群信息
  3. List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
  4. // 2. 由于上面是获取的其他节点的信息,这里需要将本节点加上
  5. final DiscoveryNode localNode = transportService.getLocalNode();
  6. fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));
  7. // 3. 若设置了 master_election_ignore_non_masters 则去掉没有 master 资格(node.master: false)的节点
  8. final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
  9. // 4. 将各节点认为的 master 加入 activeMasters 列表
  10. List<DiscoveryNode> activeMasters = new ArrayList<>();
  11. for (ZenPing.PingResponse pingResponse : pingResponses) {
  12. // 避免未经其他节点检查就将本节点选为 master
  13. if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
  14. activeMasters.add(pingResponse.master());
  15. }
  16. }
  17. // 5. 将 PING 到的具有 master 资格的节点加入 masterCandidates 列表作为候选节点
  18. List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
  19. for (ZenPing.PingResponse pingResponse : pingResponses) {
  20. if (pingResponse.node().isMasterNode()) {
  21. masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
  22. }
  23. }
  24. if (activeMasters.isEmpty()) {
  25. // 6. 没有活跃的 master
  26. if (electMaster.hasEnoughCandidates(masterCandidates)) {
  27. // 7. 拥有足够的候选节点,则进行选举
  28. final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
  29. return winner.getNode();
  30. } else {
  31. // 8. 无法选举,无法得到 master,返回 null
  32. return null;
  33. }
  34. } else {
  35. assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
  36. // 9. 有活跃的 master,从 activeMasters 中选择
  37. return electMaster.tieBreakActiveMasters(activeMasters);
  38. }
  39. }

在没有活跃的 master 时使用,上面第 7 步执行 master 的选举,通过 MasterCandidate::compare 对候选节点进行比较

  1. // ElectMasterService.MasterCandidate::compare
  2. public static int compare(MasterCandidate c1, MasterCandidate c2) {
  3. // 优先选择集群状态版本最新的节点,先比较集群状态版本,注意此处c2在前,c1在后
  4. int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
  5. //如果版本号相同,则比较节点ID
  6. if (ret == 0) {
  7. ret = compareNodes(c1.getNode(), c2.getNode());
  8. }
  9. return ret;
  10. }
  11. // ElectMasterService::compareNodes
  12. private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
  13. // 集群状态版本相同时优先选择具有 master 资格的节点
  14. if (o1.isMasterNode() && !o2.isMasterNode()) {
  15. return -1;
  16. }
  17. if (!o1.isMasterNode() && o2.isMasterNode()) {
  18. return 1;
  19. }
  20. // 以上条件都相同时选择 ID 较小的节点
  21. return o1.getId().compareTo(o2.getId());
  22. }

上述代码中,最后一行可以看出,这里只是将节点排序后选择足校的节点作为Master。但是排序时使用自定义的比较函数MasterCandidate::compare,早期的版本中只是对节点ID进行排序,现在会优先把集群状态版本号高的节点放在前边。

在有活跃的 master 时,上面第 9 步通过 ElectMasterService::tieBreakAcitveMasters 使用 ElectMasterService::compareNodes 的规则从 activeMasters 中选择。选择过程很简单,取列表中的最小值。

4.2 根据临时master投票

在ES中,发送投票就是发送加入集群(requiredJoins)请求。得票就是申请加入该节点的请求的数量。
选举出的临时Master有两种情况:该临时Master是本节点或者非本节点。

  1. 如果临时Master是本节点
    1. 等待足够多的具备Master资格的节点加入本节点(投票达到法定人数),以完成选举;
    2. 超时(默认30s)后还没有满足数量的join请求,则选举失败,需要进行新一轮的选举;
    3. 成功后发布新的ClusterState
  2. 如果其他节点被选为Master
    1. 不再接收其他节点的join请求
    2. 向Master发送加入请求,并等待回复。超时时间默认为1分钟。如果遇到异常,则默认重试3次。这个步骤在joinElectedMaster方法中实现。
    3. 最终当选的Master会被发布集群状态,才确认客户的join请求。因此,JoinElectedMaster返回代表收到了join请求的确认,并且已经收到了集群状态。本步骤检查收到的集群状态中的Master节点如果为空,或者当选的Master不是之前选择的节点,则重新选举。
  1. if (transportService.getLocalNode().equals(masterNode)) {
  2. // 选出的临时 master 是本节点,则等待被选举为真正的 master
  3. final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1);
  4. nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
  5. new NodeJoinController.ElectionCallback() {
  6. @Override
  7. public void onElectedAsMaster(ClusterState state) {
  8. // 成功被选举为 master
  9. synchronized (stateMutex) {
  10. joinThreadControl.markThreadAsDone(currentThread);
  11. }
  12. }
  13. @Override
  14. public void onFailure(Throwable t) {
  15. // 等待超时,重新开始选举流程
  16. synchronized (stateMutex) {
  17. joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
  18. }
  19. }
  20. }
  21. );
  22. } else {
  23. // 选出的临时 master 不是本节点,不再接收其他节点的 join 请求
  24. nodeJoinController.stopElectionContext(masterNode + " elected");
  25. // 向临时节点发送 join 请求(投票),被选举的临时 master 在确认成为 master 并发布新的集群状态后才会返回
  26. final boolean success = joinElectedMaster(masterNode);
  27. // 成功加入之前选择的临时 master 节点,则结束线程,否则重新选举
  28. synchronized (stateMutex) {
  29. if (success) {
  30. DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
  31. if (currentMasterNode == null) {
  32. joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
  33. } else if (currentMasterNode.equals(masterNode) == false) {
  34. joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
  35. }
  36. joinThreadControl.markThreadAsDone(currentThread);
  37. } else {
  38. joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
  39. }
  40. }
  41. }
  42. 复制代码

4.3 选举完成

NodeJoinController::checkPendingJoinsAndElectIfNeeded 在节点获得足够的得票时使节点成为 master,并发布新的集群状态

  1. private synchronized void checkPendingJoinsAndElectIfNeeded() {
  2. // 计算节点得票数
  3. final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
  4. if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
  5. ...
  6. } else {
  7. // 得票数足够,成为 master
  8. electionContext.closeAndBecomeMaster();
  9. }
  10. }
  11. public synchronized int getPendingMasterJoinsCount() {
  12. int pendingMasterJoins = 0;
  13. // 统计节点得票数,只计算拥有 master 资格节点的投票
  14. for (DiscoveryNode node : joinRequestAccumulator.keySet()) {
  15. if (node.isMasterNode()) {
  16. pendingMasterJoins++;
  17. }
  18. }
  19. return pendingMasterJoins;
  20. }