概念

在Raft集群中,存在三种角色,分别为:

  • leader(领导者)
  • candidate(候选人)
  • follower(跟随者)

其Client先发送 entry 给 server 。若当前 server 不是 leader,则直接拒绝请求。leader 收到 entry 后,对集群内其他 server 发送 entry,若该 entry 在大多数服务器上都存在,则认为该 entry 是安全了,是提交的。

当 leader 自己创建的 entry 在大多数 server 时,则之前的 entry 是可以应用的。

Client 流程

  1. Leader 收到 client 发送的 request
  2. Leader 添加到自己的 logs 中
  3. Leader 将对应的 log entry 发送给其他的 follower
  4. 若大多数节点提交了 log entry,则 apply
  5. Leader 返回结果给 client
  6. Leader 告诉 follower 可以 apply 刚刚的 log

选举

因为集群中经常可能会出现问题,导致 leader 是经常更换了。我们使用 term 来记录 server 的任期(是否较新)。

leader 若遇到问题不能正常工作,则集群则要从其他的 server 中选取新的 leader 来保证集群的正常工作。

follower 使用超时选举时间来判断 leader 是否正常工作,若超时未收到 leader 发送的信息,则转换为 candidate 进行选举。

选举时,candidate 对增加自身的 term ,发送请求给其他 server。其他 server 根据情况来决定是否投票给该 candidate,只有当 candidate 拿到大多数 follower 的选票时,candidate 才可以成为 leader。

结合服务器交互的特点,在一个 term 内,不会同时存在两个 leader。leader 收到 term 比自己高的 server 信息,会自动转换为 follower。因此在一个集群内,不可能存在两台正常工作的 leader,从而避免了脑裂情况。

日志

server 使用 logs[]来记录接收 entry。

若 leader 与其他 server 的第idx条日志的 term 是一致的,则其第idx条及其之前的日志是与 leader 的日志一致的。因为当 leader 发送日志时,会附带上一条日志的 term 信息。若一致则可以添加,否则表示之前的日志不一致,需要重新发送。

candidate 选举时,会发送接收到最新日志的 idxterm,当 follower 发现自己的日志比 candidate 日志更新时,拒绝投票。该方法保证提交成功的 entry 的安全性。

使用nextIndex[]维护下一个应该要发送给 server 的日志。当 server 发现不匹配时,则减少,否则增加。在论文中的方法是失败时,nextIndex[k] -= 1,但是当 follower 失联太久,可能需要上千次重试才能得到正确的结果。下面会介绍快速重传的内容。

持久化

Server中以下数据是需要持久化的(persistent)

  • currentTerm:因为当 currentTerm 与 log 一致时,才允许提交
  • votedFor:为了避免在任期内出现两个 Leader。若超过大半的Server重启,且 Leader 存活,出现某个Server开始选举,导致Server在同一任期内投票两次,同一任期出现两个 Leader 。
  • votedFor

机器硬盘旋转一次需要10ms,因此频繁的同步硬盘会有巨大的性能消耗。leader 可以在收到 client 多条 entry 时才进行同步,然后 RPC 发送出去。

注意的是在调用write并不会真正写入,还需要调用fsync同步。

对于状态机,其不一定是持久化的,因此在重启后,还需要重新应用到状态机,因此不需要保持 lastApplied等信息。

因为写入磁盘需要时间,因此将写入和发送给 Follower 并行处理。

日志压缩

随着时间的运行,logs将会变得十分巨大,将其重新执行可能需要花费数小时时间。

快照:将应用程序将它自己的 state 的副本作为特定的日志条目保存。

Raft 向应用程序拿取 snapshot reference,raft会从日志中选择一个idx ,要求应用程序制作日志 idx之前内容的一个快照。制作完成后,我们仅需要保存快照,idx之前的日志可以完全丢弃。

重启时,raft会找到磁盘最新的 snapshot-log pair,提交快照给应用程序还原。

当抛弃日志时,可能存在 follower 没有存储该日志中的信息,因此我们需要一些额外处理。

  • 当所有的 follower 已经存储idx之前的信息,才允许抛弃日志(若存在 server 断开一周以上呢?)
  • InstallSnapshot RPC:发送快照给 follower

服务器主要状态

名称 描述
currentTerm 当前服务器的任期
votedFor 当前任期投票者,不存在为 -1
logs[] 日志
serverType 服务器类型:Leader, Candidate, Follower
commitIndex 表示logs[1..commitIndex]是已经提交的
lastApplied 表示logs[1..lastApplied]是已经应用到状态机
nextIndex[] 下一条需要发送的log index
matchIndex[] 表示确定服务器已经收到的index
snapshotData 快照数据
snapshotIndex 快照最后的log对应的Index
snapshotTerm 快照最后的log对应的Term

服务器交互

1,所有服务器

  • 收到 Term > currentTerm 的情况,则更新 currentTerm 并且自己转变为 Follower。
  • 收到 Term < currentTerm 的情况,无视该请求。

2,Follower

  • Follower超时时间内没有收到 Leader 的信息,则转变为 Candidate。并开始进行选举。

3,Candidate

选举实现

  1. type RequestVoteArgs struct {
  2. Term int
  3. CandidateId int
  4. // Candida最后Log的信息
  5. LastLogIndex int
  6. LastLogTerm int
  7. }
  8. type RequestVoteReply struct {
  9. Term int
  10. VoteGranted bool // 0拒绝,1同意
  11. }

1,Candidate

  1. Term += 1
  2. 将选票投给自己:votedFor = rf.me
  3. 重置选举时间
  4. 发送请求给所有服务器,若存在过半的票数,则转换为 Leader
  5. 转换Leader后:
    1. 重设所有 nextIndex[i] = lastLogIndex + 1
    2. 重设所有 matchIndex[i] = 0
    3. 立即发送心跳请求

2,Follower

当满足以下情况时,投票给Candidate,并重设超时时间

  1. Term满足以下其中一种情况(确保当前任期不存在两个Leader)
    1. args.Term > currentTerm
    2. Term == currentTerm && votedFor == -1
  2. Logs满足以下其中一种情况(设 lastLog 为当前服务器最后一个 Entries)
    1. args.LastLogTerm > lastLog.Term
    2. args.LastLogTerm == lastLog.term && args.LastLogIndex > lastLog.index

使用比较比较任期号和Index,确定哪个Server收到的Log是较新的。因此对于已经提交的Log来说,一定是安全的,因为存在未提交的Server的Log对于大部分的Server来说是较旧的,无法拿到Leader。

心跳/追加Entries实现

这是类型不同的请求,不过使用一个RPC实现:

  • 心跳请求:不包含Entries,仅仅用于表示 Leader 存活使用。要求1s内不超过10次心跳请求。
  • 追加请求:包含Entries,用于增加条目。这里我通过两个事件发送:
    • 发送心跳请求时附带请求。
    • Leader收到Client发送的请求时发送。
type AppendEntriesArgs struct {
    Term         int
    LeaderId     int

    // 发送新的Log的上一条Log信息,即nextIndex[i] - 1的信息
    PrevLogIndex int
    PrevLogTerm  int

    // 一次性发送多条Log(分块发送或全部发送)
    Entries      []Entry

    // 可以提交的Index
    LeaderCommit int
}

type AppendEntriesReply struct {
    Term    int

    // Log是否正确
    Success bool

    // 用于快速恢复
    XTerm  int
    XIndex int
    XLen   int
}

1,Leader

  1. 根据 nextIndex[i] 的信息,发送 Entries。
  2. 若成功发送信息,则更新 nextIndex[i]matchIndex[i]。并且判断是否可以更新 commitIndex。
  3. 若发送失败,表示Log状态不一致,需要发送之前的Log。通过逐步减少 nextIndex[i] 实现。

若某条Log是大半服务器已经收到了,且该Log的Term与服务器的Term一致,则表示它和之前的log是可以提交的。

2,Follower

判断 PrevLogTerm 是否与当前存储的 Log 一致,若一致则添加 Log。若不一致则需要Leader重新发送。

根据归纳法可以得到,若当前Log与Leader对于的Log一致,其之前的Log也一致。

3,快速恢复

若对 nextIndex[i] 逐步进行减少,若落后Leader非常多Log的话,需要大量的请求。

以下方法根据Term的任期进行快速恢复。当Log不一致时,跳到该冲突Term出现的首个Log进行重传。

实现快速恢复的参数为下:

  • XTerm:当前冲突Log的任期。若不存在对应的PrevLog时,该参数为 -1
  • XIndex:当前冲突的首个Log。若不存在对应的PrevLog时,该参数为 -1
  • XLen:当前服务器的Log长度

Leader针对以下情况进行处理:

  1. Follower缺少对应的Log: nextIndex[i] = XLen + 1
  2. Leader存在对应的 XTerm:nextIndex[i] = lastIdxlastIdx为Leader中该任期的最后一条Log。因为Follower 存在Leader中对应XTerm的全部Log(如果Follower缺少部分Log,会转换为规则1进行处理)。
  3. Leader不存在对应的XTerm:nextIndex[i] = XIndex 。表示Follower中该任期的全部Log是多余的,重传对应位置开始的Log。
// Follower的处理
// 不冲突,但缺少部分日志
if rf.lastLogIndex+rf.snapshotIndex < args.PrevLogIndex {
    reply.XTerm = -1
    reply.XIndex = -1
    reply.XLen = rf.lastLogIndex + rf.snapshotIndex
    return
}

// 日志冲突
if rf.lastLogIndex > 0 && rf.logs[rf.lastLogIndex].Term != args.PrevLogTerm {
    reply.XLen = rf.lastLogIndex + rf.snapshotIndex
    reply.XTerm = rf.logs[rf.lastLogIndex].Term

    // 计算对应任期的第一条槽位号
    reply.XIndex = rf.lastLogIndex
    for reply.XIndex >= 2 && rf.logs[reply.XIndex-1].Term == reply.XTerm {
        reply.XIndex -= 1
    }
    reply.XIndex += rf.snapshotIndex
    return
}

// Leader收到后的处理
if reply.XTerm == -1 {
        // 不冲突,但是缺少部分日志
        rf.nextIndex[server] = reply.XLen + 1
    } else {
        // 搜索Logs内是否包含对应 Term
        idx := rf.lastLogIndex
        for idx > 0 && rf.logs[idx].Term > reply.XTerm {
            idx--
        }

        if idx == 0 {
            // 需要重新开始发送
            rf.nextIndex[server] = 1
        } else if rf.logs[idx].Term == reply.XTerm {
            // Logs包含日志对应的Term
            rf.nextIndex[server] = idx + rf.snapshotIndex + 1
        } else {
            // Logs不包含日志对应的Term
            rf.nextIndex[server] = reply.XIndex
        }
    }

Lab总结

  • 请求可能是乱序到达的,因此需要核对每个请求的状态,才允许其进行操作。
  • 无法保证两个原子操作的状态发生了变更。
  • 在应用至状态机时,可能存在堵塞,需要使用新的协程进行,而且请求可能乱序,只能使用一条协程进行所有提交。