缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,以提高系统的可用性。这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?Raft算法把问题分解成了领袖选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变化(membership changes)这几个子问题。Raft算法的基本操作只需2种RPC即可完成。RequestVote RPC是在选举过程中通过旧的Leader触发的,AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 3)
- 继续完善raft状态机之Follower状态的处理逻辑
- 继续完善raft状态机之Candidate状态的处理逻辑
设计
- tFollowerState:
- 监视Leader心跳是否超时
- 如果Leader心跳超时,则切换到Candidate状态,竞选新leader
- 添加RequestVote和AppendEntries两个RPC接口的响应
- tCandidateState:
import ( “learning/gooop/etcd/raft/config” “learning/gooop/etcd/raft/roles” “learning/gooop/etcd/raft/rpc” “learning/gooop/etcd/raft/timeout” “sync” “time” )
type tFollowerState struct { tRaftStateBase
mInitOnce sync.OncemStartOnce sync.OncemVotedLeaderID stringmLeaderHeartbeatClock int64mStateChangedHandler StateChangedHandleFuncmEventMap map[tFollowerEvent][]tFollowerEventHandler
}
type JobFunc func()
type tFollowerEvent int const ( evFollowerStart tFollowerEvent = iota evFollowerLeaderHeartbeatTimeout tFollowerEvent = iota )
type tFollowerEventHandler func(e tFollowerEvent, args …interface{})
func newFollowerState(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) IRaftState { it := new(tFollowerState) it.init(term, cfg, handler)
return it
}
func (me tFollowerState) init(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) { me.mInitOnce.Do(func() { me.tRaftStateBase = newRaftStateBase(term, cfg) me.role = roles.Follower me.mStateChangedHandler = handler
// init event mapme.mEventMap = make(map[tFollowerEvent][]tFollowerEventHandler)me.registerEventHandlers()})
}
func (me *tFollowerState) raise(e tFollowerEvent, args …interface{}) { if handlers, ok := me.mEventMap[e]; ok { for _, it := range handlers { it(e, args…) } } }
func (me *tFollowerState) registerEventHandlers() { me.mEventMap[evFollowerStart] = []tFollowerEventHandler{ me.whenStartThenBeginWatchLeaderTimeout, } me.mEventMap[evFollowerLeaderHeartbeatTimeout] = []tFollowerEventHandler { me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState, } }
func (me *tFollowerState) Start() { me.mStartOnce.Do(func() { me.raise(evFollowerStart) }) }
func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e tFollowerEvent, args… interface{}) { go func() { iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3 iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond) for range time.Tick(iCheckingTimeoutInterval) { now := time.Now().UnixNano() if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos { me.raise(evFollowerLeaderHeartbeatTimeout) return } } }() }
func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ tFollowerEvent, args… interface{}) { fn := me.mStateChangedHandler if fn == nil { return }
state := newCandidateState(me.cfg, me.term, me.mStateChangedHandler)fn(state)
}
func (me *tFollowerState) Role() roles.RaftRole { return roles.Follower }
func (me tFollowerState) RequestVote(cmd rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { if cmd.Term <= me.term { ret.Term = me.term ret.VoteGranted = false return nil }
if me.mVotedLeaderID != "" && me.mVotedLeaderID != cmd.CandidateID {ret.Term = me.termret.VoteGranted = falsereturn nil}me.mVotedLeaderID = cmd.CandidateIDret.Term = cmd.Termret.VoteGranted = truereturn nil
}
func (me tFollowerState) AppendEntries(cmd rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error { if cmd.Term < me.term { ret.Term = me.term ret.Success = false return nil }
me.term = cmd.Termme.leaderID = cmd.LeaderIDme.mLeaderHeartbeatClock = time.Now().UnixNano()if len(cmd.Entries) <= 0 {// just heartbeat packageret.Term = cmd.Termret.Success = truereturn nil}// todo: append logsreturn nil
}
func (me *tFollowerState) StateChangedHandler(handler StateChangedHandleFunc) { me.mStateChangedHandler = handler }
<a name="cmAzS"></a># tCandidateState.go继续完善raft状态机之Candidate状态的处理逻辑```gopackage lsmimport ("errors""learning/gooop/etcd/raft/config""learning/gooop/etcd/raft/rpc""sync")type tCandidateState struct {tRaftStateBasemInitOnce sync.OncemStartOnce sync.OncemStateChangedHandler StateChangedHandleFuncmEventMap map[tCandidateEvent][]tCandidateEventHandler}func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {return gErrorCandidateWontReplyRequestVote}func (me *tCandidateState) AppendEntries(cmd *rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error {return gErrorCandidateWontReplyAppendEntries}func (me *tCandidateState) StateChangedHandler(handler StateChangedHandleFunc) {me.mStateChangedHandler = handler}type tCandidateEvent intconst (evCandidateStart tCandidateEvent = iotaevCandidateElectionTimeout tCandidateEvent = iotaevCandidateGotEnoughVotes tCandidateEvent = iota)type tCandidateEventHandler func(e tCandidateEvent, args ...interface{})func newCandidateState(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) IRaftState {it := new(tCandidateState)it.init(cfg, term, handler)return it}func (me *tCandidateState) init(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) {me.mInitOnce.Do(func() {me.cfg = cfgme.term = termme.mStateChangedHandler = handler// init event mapme.mEventMap = make(map[tCandidateEvent][]tCandidateEventHandler)me.registerEventHandlers()})}func (me *tCandidateState) registerEventHandlers() {me.mEventMap[evCandidateStart] = []tCandidateEventHandler{me.whenStartThenRequestVote,me.whenStartThenWatchElectionTimeout,}me.mEventMap[evCandidateElectionTimeout] = []tCandidateEventHandler{me.whenElectionTimeoutThenRequestVoteAgain,}me.mEventMap[evCandidateGotEnoughVotes] = []tCandidateEventHandler{me.whenGotEnoughVotesThenSwitchToLeader,}}func (me *tCandidateState) raise(e tCandidateEvent, args ...interface{}) {if handlers, ok := me.mEventMap[e]; ok {for _, it := range handlers {it(e, args...)}}}func (me *tCandidateState) Start() {me.mStartOnce.Do(func() {me.raise(evCandidateStart)})}func (me *tCandidateState) whenStartThenRequestVote(_ tCandidateEvent, _... interface{}) {// todo: fixmepanic("implements me")}func (me *tCandidateState) whenStartThenWatchElectionTimeout(_ tCandidateEvent, _... interface{}) {// todo: fixmepanic("implements me")}func (me *tCandidateState) whenElectionTimeoutThenRequestVoteAgain(_ tCandidateEvent, _... interface{}) {// todo: fixmepanic("implements me")}func (me *tCandidateState) whenGotEnoughVotesThenSwitchToLeader(_ tCandidateEvent, _... interface{}) {// todo: fixmepanic("implements me")}var gErrorCandidateWontReplyRequestVote = errors.New("candidate won't reply RequestVote RPC")var gErrorCandidateWontReplyAppendEntries = errors.New("candidate won't reply AppendEntries RPC")
(未完待续)
