概念
在Raft集群中,存在三种角色,分别为:
- leader(领导者)
- candidate(候选人)
- follower(跟随者)
其Client先发送 entry 给 server 。若当前 server 不是 leader,则直接拒绝请求。leader 收到 entry 后,对集群内其他 server 发送 entry,若该 entry 在大多数服务器上都存在,则认为该 entry 是安全了,是提交的。
当 leader 自己创建的 entry 在大多数 server 时,则之前的 entry 是可以应用的。
Client 流程
- Leader 收到 client 发送的 request
- Leader 添加到自己的 logs 中
- Leader 将对应的 log entry 发送给其他的 follower
- 若大多数节点提交了 log entry,则 apply
- Leader 返回结果给 client
- Leader 告诉 follower 可以 apply 刚刚的 log
选举
因为集群中经常可能会出现问题,导致 leader 是经常更换了。我们使用 term 来记录 server 的任期(是否较新)。
leader 若遇到问题不能正常工作,则集群则要从其他的 server 中选取新的 leader 来保证集群的正常工作。
follower 使用超时选举时间来判断 leader 是否正常工作,若超时未收到 leader 发送的信息,则转换为 candidate 进行选举。
选举时,candidate 对增加自身的 term ,发送请求给其他 server。其他 server 根据情况来决定是否投票给该 candidate,只有当 candidate 拿到大多数 follower 的选票时,candidate 才可以成为 leader。
结合服务器交互的特点,在一个 term 内,不会同时存在两个 leader。leader 收到 term 比自己高的 server 信息,会自动转换为 follower。因此在一个集群内,不可能存在两台正常工作的 leader,从而避免了脑裂情况。
日志
server 使用 logs[]
来记录接收 entry。
若 leader 与其他 server 的第idx
条日志的 term 是一致的,则其第idx
条及其之前的日志是与 leader 的日志一致的。因为当 leader 发送日志时,会附带上一条日志的 term 信息。若一致则可以添加,否则表示之前的日志不一致,需要重新发送。
candidate 选举时,会发送接收到最新日志的 idx
和 term
,当 follower 发现自己的日志比 candidate 日志更新时,拒绝投票。该方法保证提交成功的 entry 的安全性。
使用nextIndex[]
维护下一个应该要发送给 server 的日志。当 server 发现不匹配时,则减少,否则增加。在论文中的方法是失败时,nextIndex[k] -= 1
,但是当 follower 失联太久,可能需要上千次重试才能得到正确的结果。下面会介绍快速重传的内容。
持久化
Server中以下数据是需要持久化的(persistent)
currentTerm
:因为当currentTerm
与 log 一致时,才允许提交votedFor
:为了避免在任期内出现两个 Leader。若超过大半的Server重启,且 Leader 存活,出现某个Server开始选举,导致Server在同一任期内投票两次,同一任期出现两个 Leader 。votedFor
机器硬盘旋转一次需要10ms,因此频繁的同步硬盘会有巨大的性能消耗。leader 可以在收到 client 多条 entry 时才进行同步,然后 RPC 发送出去。
注意的是在调用write
并不会真正写入,还需要调用fsync
同步。
对于状态机,其不一定是持久化的,因此在重启后,还需要重新应用到状态机,因此不需要保持 lastApplied
等信息。
因为写入磁盘需要时间,因此将写入和发送给 Follower 并行处理。
日志压缩
随着时间的运行,logs
将会变得十分巨大,将其重新执行可能需要花费数小时时间。
快照:将应用程序将它自己的 state 的副本作为特定的日志条目保存。
Raft 向应用程序拿取 snapshot reference,raft会从日志中选择一个idx
,要求应用程序制作日志 idx
之前内容的一个快照。制作完成后,我们仅需要保存快照,idx
之前的日志可以完全丢弃。
重启时,raft会找到磁盘最新的 snapshot-log pair,提交快照给应用程序还原。
当抛弃日志时,可能存在 follower 没有存储该日志中的信息,因此我们需要一些额外处理。
- 当所有的 follower 已经存储
idx
之前的信息,才允许抛弃日志(若存在 server 断开一周以上呢?) - InstallSnapshot RPC:发送快照给 follower
服务器主要状态
名称 | 描述 |
---|---|
currentTerm | 当前服务器的任期 |
votedFor | 当前任期投票者,不存在为 -1 |
logs[] | 日志 |
serverType | 服务器类型:Leader, Candidate, Follower |
commitIndex | 表示logs[1..commitIndex]是已经提交的 |
lastApplied | 表示logs[1..lastApplied]是已经应用到状态机 |
nextIndex[] | 下一条需要发送的log index |
matchIndex[] | 表示确定服务器已经收到的index |
snapshotData | 快照数据 |
snapshotIndex | 快照最后的log对应的Index |
snapshotTerm | 快照最后的log对应的Term |
服务器交互
1,所有服务器
- 收到
Term > currentTerm
的情况,则更新currentTerm
并且自己转变为 Follower。 - 收到
Term < currentTerm
的情况,无视该请求。
2,Follower
- Follower超时时间内没有收到 Leader 的信息,则转变为 Candidate。并开始进行选举。
3,Candidate
选举实现
type RequestVoteArgs struct {
Term int
CandidateId int
// Candida最后Log的信息
LastLogIndex int
LastLogTerm int
}
type RequestVoteReply struct {
Term int
VoteGranted bool // 0拒绝,1同意
}
1,Candidate
- Term += 1
- 将选票投给自己:
votedFor = rf.me
- 重置选举时间
- 发送请求给所有服务器,若存在过半的票数,则转换为 Leader
- 转换Leader后:
- 重设所有
nextIndex[i] = lastLogIndex + 1
- 重设所有
matchIndex[i] = 0
- 立即发送心跳请求
- 重设所有
2,Follower
当满足以下情况时,投票给Candidate,并重设超时时间
- Term满足以下其中一种情况(确保当前任期不存在两个Leader)
args.Term > currentTerm
Term == currentTerm && votedFor == -1
- Logs满足以下其中一种情况(设
lastLog
为当前服务器最后一个 Entries)args.LastLogTerm > lastLog.Term
args.LastLogTerm == lastLog.term && args.LastLogIndex > lastLog.index
使用比较比较任期号和Index,确定哪个Server收到的Log是较新的。因此对于已经提交的Log来说,一定是安全的,因为存在未提交的Server的Log对于大部分的Server来说是较旧的,无法拿到Leader。
心跳/追加Entries实现
这是类型不同的请求,不过使用一个RPC实现:
- 心跳请求:不包含Entries,仅仅用于表示 Leader 存活使用。要求1s内不超过10次心跳请求。
- 追加请求:包含Entries,用于增加条目。这里我通过两个事件发送:
- 发送心跳请求时附带请求。
- Leader收到Client发送的请求时发送。
type AppendEntriesArgs struct {
Term int
LeaderId int
// 发送新的Log的上一条Log信息,即nextIndex[i] - 1的信息
PrevLogIndex int
PrevLogTerm int
// 一次性发送多条Log(分块发送或全部发送)
Entries []Entry
// 可以提交的Index
LeaderCommit int
}
type AppendEntriesReply struct {
Term int
// Log是否正确
Success bool
// 用于快速恢复
XTerm int
XIndex int
XLen int
}
1,Leader
- 根据
nextIndex[i]
的信息,发送 Entries。 - 若成功发送信息,则更新
nextIndex[i]
和matchIndex[i]
。并且判断是否可以更新 commitIndex。 - 若发送失败,表示Log状态不一致,需要发送之前的Log。通过逐步减少
nextIndex[i]
实现。
若某条Log是大半服务器已经收到了,且该Log的Term与服务器的Term一致,则表示它和之前的log是可以提交的。
2,Follower
判断 PrevLogTerm
是否与当前存储的 Log 一致,若一致则添加 Log。若不一致则需要Leader重新发送。
根据归纳法可以得到,若当前Log与Leader对于的Log一致,其之前的Log也一致。
3,快速恢复
若对 nextIndex[i]
逐步进行减少,若落后Leader非常多Log的话,需要大量的请求。
以下方法根据Term的任期进行快速恢复。当Log不一致时,跳到该冲突Term出现的首个Log进行重传。
实现快速恢复的参数为下:
- XTerm:当前冲突Log的任期。若不存在对应的PrevLog时,该参数为
-1
。 - XIndex:当前冲突的首个Log。若不存在对应的PrevLog时,该参数为
-1
。 - XLen:当前服务器的Log长度
Leader针对以下情况进行处理:
- Follower缺少对应的Log:
nextIndex[i] = XLen + 1
- Leader存在对应的 XTerm:
nextIndex[i] = lastIdx
。lastIdx
为Leader中该任期的最后一条Log。因为Follower 存在Leader中对应XTerm的全部Log(如果Follower缺少部分Log,会转换为规则1进行处理)。 - Leader不存在对应的XTerm:
nextIndex[i] = XIndex
。表示Follower中该任期的全部Log是多余的,重传对应位置开始的Log。
// Follower的处理
// 不冲突,但缺少部分日志
if rf.lastLogIndex+rf.snapshotIndex < args.PrevLogIndex {
reply.XTerm = -1
reply.XIndex = -1
reply.XLen = rf.lastLogIndex + rf.snapshotIndex
return
}
// 日志冲突
if rf.lastLogIndex > 0 && rf.logs[rf.lastLogIndex].Term != args.PrevLogTerm {
reply.XLen = rf.lastLogIndex + rf.snapshotIndex
reply.XTerm = rf.logs[rf.lastLogIndex].Term
// 计算对应任期的第一条槽位号
reply.XIndex = rf.lastLogIndex
for reply.XIndex >= 2 && rf.logs[reply.XIndex-1].Term == reply.XTerm {
reply.XIndex -= 1
}
reply.XIndex += rf.snapshotIndex
return
}
// Leader收到后的处理
if reply.XTerm == -1 {
// 不冲突,但是缺少部分日志
rf.nextIndex[server] = reply.XLen + 1
} else {
// 搜索Logs内是否包含对应 Term
idx := rf.lastLogIndex
for idx > 0 && rf.logs[idx].Term > reply.XTerm {
idx--
}
if idx == 0 {
// 需要重新开始发送
rf.nextIndex[server] = 1
} else if rf.logs[idx].Term == reply.XTerm {
// Logs包含日志对应的Term
rf.nextIndex[server] = idx + rf.snapshotIndex + 1
} else {
// Logs不包含日志对应的Term
rf.nextIndex[server] = reply.XIndex
}
}
Lab总结
- 请求可能是乱序到达的,因此需要核对每个请求的状态,才允许其进行操作。
- 无法保证两个原子操作的状态发生了变更。
- 在应用至状态机时,可能存在堵塞,需要使用新的协程进行,而且请求可能乱序,只能使用一条协程进行所有提交。