Part 2A 和 Part 2B 部分基本已经把整个 Raft 的框架搭好了。如果基于Raft的服务器重新启动,则应从中断的位置恢复服务。这就要求 Raft 保持持久状态,使其在重启后仍然有效。
论文图 2 直接告诉了我们 Persistent state on all servers: currentTerm、votedFor 和 log[]。在这个部分里面,我们不需要负责把数据写到磁盘上的文件里,而是只需要通过 Persister 对象 实现 persist() 和 readPersist() 函数保存和恢复状态。
func (rf *Raft) persist() {// Your code here (2C).w := new(bytes.Buffer)e := labgob.NewEncoder(w)e.Encode(rf.currentTerm)e.Encode(rf.voteFor)e.Encode(rf.log)data := w.Bytes()rf.persister.SaveRaftState(data)}func (rf *Raft) readPersist(data []byte) {//// Your code here (2C).if data == nil || len(data) < 1 {return}r := bytes.NewBuffer(data)d := labgob.NewDecoder(r)rf.mu.Lock()d.Decode(&rf.currentTerm)d.Decode(&rf.voteFor)d.Decode(&rf.log)rf.mu.Unlock()}
之后我们要想清楚哪里需要加persist,有这几个地方
- 向 Leader 添加一个新的需要同步的 LogEntry 时
- Leader 处理 AppendEntries 的回复,并且需要改变自身 term 时
- Candidate 处理 RequestVote 的回复,并且需要改变自身 term 时
- Receiver 处理完 AppendEntries 或者 RequestVote 时
我们加了之后,运行 go test -run 2C之后可以通过除unreliable之外的测试了。
其实基本都没问题了,但是我们需要优化一下,减少RPC的调用才能通过。
后来又读了一遍实验要求,发现这么一条 Hint:
You will probably need the optimization that backs up nextIndex by more than one entry at a time. Look at the extended Raft paper starting at the bottom of page 7 and top of page 8 (marked by a gray line). The paper is vague about the details; you will need to fill in the gaps, perhaps with the help of the 6.824 Raft lectures.
简而言之就是在之前的实现里面,每次当 AppendEntries 因为不一致的 log 而返回 false 时,我们只让 nextIndex 往回退了一个位置。对于有很多不一致的 entry 的话这样做显然不够 efficient。因此需要优化这一步骤,具体的做法是在 AppendEntriesReply 结构中增加一个 conflictIndex 和conflictTerm项。
Student’s Guide to Raft
// leaderSendEntrieselse {if reply.Term > rf.currentTerm {rf.currentTerm = reply.Termrf.state = Follower} else { // 对应Append的2、3// 如果走到这个这里,那就往前推,找到彼此匹配的index//rf.nextIndex[serverId] = args.PrevLogIndex - 1rf.nextIndex[serverId] = reply.ConflictIndexif reply.ConflictTerm != -1 {for i := args.PrevLogIndex; i >= 1; i-- {if rf.log[i-1].Term == reply.ConflictTerm {rf.nextIndex[serverId] = ibreak}}}}}// AppendEntries// 2. Reply false if log does not contain an entry at prevLogIndex// whose term matches prevLogTerm (5.3)// args.PrevLogIndex一开始最大是Leader的len(log)+1,如果比从机大,那就-1 或后期lenlastLogIndex := len(rf.log) - 1if lastLogIndex < args.PrevLogIndex {reply.Success = falsereply.Term = rf.currentTermreply.ConflictTerm = -1reply.ConflictIndex = len(rf.log)return}// 3. If an existing entry conflicts with a new one (same index// but diffent term), delete the existing entry and all that// follow it (5.3)// 日志长度能匹配上了,但是对应的term不一样,那也不行if rf.log[(args.PrevLogIndex)].Term != args.PrevLogTerm {reply.Success = falsereply.Term = rf.currentTermreply.ConflictTerm = rf.log[args.PrevLogIndex].Termreply.ConflictIndex = -1return}

