缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,以提高系统的可用性。这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?Raft算法把问题分解成了四个子问题:1. 领袖选举(leader election)、2. 日志复制(log replication)、3. 安全性(safety)4. 成员关系变化(membership changes)这几个子问题。
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 6)
大幅重构,提升代码的可理解/可管理性:
random:为各种超时时间添加随机性
- tFollowerState:基于事件驱动重构Follower状态的逻辑编排,各字段实施读写分离管理
- tCandidateState:基于事件驱动重构Candidate状态的逻辑编排,各字段实施读写分离管理
random.go
为各种超时时间添加随机性
package lsmimport ("math/rand""time")// fnRandomizeInt64 returns int64 value from v to v*1.3func fnRandomizeInt64(v int64) int64 {return v + v * gRand.Int63n(30) / 100}// fnRandomizeDuration returns duration value from v to v*1.3func fnRandomizeDuration(v time.Duration) time.Duration {i := int64(v)return time.Duration(fnRandomizeInt64(i))}var gRand = rand.New(rand.NewSource(time.Now().UnixNano()))
tFollowerState.go
基于事件驱动重构Follower状态的逻辑编排,各字段实施读写分离管理
package lsmimport ("learning/gooop/etcd/raft/roles""learning/gooop/etcd/raft/rpc""learning/gooop/etcd/raft/timeout""sync""time")// tFollowerState presents a follower nodetype tFollowerState struct {tEventDrivenModelcontext IRaftLSMmInitOnce sync.OncemStartOnce sync.Once// update: feInit / feLeaderHeartbeatmTerm int64// update: feInit / feLeaderHeartbeatmLeaderHeartbeatTimestamp int64// update: feLeaderHeartbeatmLeaderID string// update: feCandidateRequestVote / feVoteToCandidatemLastVotedTerm int64// update: feCandidateRequestVote / feVoteToCandidatemLastVotedCandidateID string// update: feCandidateRequestVote / feVoteToCandidatemLastVotedTimestamp int64// update: feInit / feDisposingmDiseposedFlag bool}// trigger: init()// args: emptyconst feInit = "follower.init"// trigger: Start()// args: emptyconst feStart = "follower.Start"// trigger: Heartbeat()// args: rpc.HeartbeatCmdconst feLeaderHeartbeat = "follower.LeaderHeartbeat"// trigger: whenStartThenBeginWatchLeaderTimeout()// args: emptyconst feLeaderHeartbeatTimeout = "follower.LeaderHeartbeatTimeout"// trigger: RequestVote()// args: rpc.RequestVoteCmdconst feCandidateRequestVote = "candidate.RequestVote"// trigger: RequestVote()// args: rpc.RequestVoteCmdconst feVoteToCandidate = "follower.CandidateRequestVote"// trigger: whenLeaderHeartbeatTimeoutThenSwitchToCandidateStateconst feDisposing = "follower.Disposing"func newFollowerState(ctx IRaftLSM) IRaftState {it := new(tFollowerState)it.init(ctx)return it}func (me *tFollowerState) init(ctx IRaftLSM) {me.mInitOnce.Do(func() {me.context = ctxme.initEventHandlers()})}func (me *tFollowerState) initEventHandlers() {// write only logicme.hookEventsForTerm()me.hookEventsForLeaderHeartbeatTimestamp()me.hookEventsForLeaderID()me.hookEventsForLastVotedTerm()me.hookEventsForLastVotedCandicateID()me.hookEventsForLastVotedTimestamp()me.hookEventsForDisposedFlag()// read only logicme.hook(feStart,me.whenStartThenBeginWatchLeaderTimeout)me.hook(feLeaderHeartbeatTimeout,me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState)}// hookEventsForTerm maintains field: mTerm// update : feInit / feLeaderHeartbeatfunc (me *tFollowerState) hookEventsForTerm() {me.hook(feInit, func(e string, args ...interface{}) {me.mTerm = me.context.store().LastCommittedTerm()})me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {cmd := args[0].(*rpc.HeartbeatCmd)me.mTerm = cmd.Term})}// hookEventsForLeaderHeartbeatClock maintains field: mLeaderHeartbeatClock// update : feLeaderHeartbeat / feLeaderHeartbeatTimeoutfunc (me *tFollowerState) hookEventsForLeaderHeartbeatTimestamp() {me.hook(feInit, func(e string, args ...interface{}) {me.mLeaderHeartbeatTimestamp = time.Now().UnixNano()})me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {me.mLeaderHeartbeatTimestamp = time.Now().UnixNano()})me.hook(feLeaderHeartbeatTimeout, func(e string, args ...interface{}) {me.mLeaderHeartbeatTimestamp = 0})}// hookEventsForLeaderID maintains field: mLeaderID// update : feLeaderHeartbeat / feLeaderHeartbeatTimeoutfunc (me *tFollowerState) hookEventsForLeaderID() {me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {cmd := args[0].(*rpc.HeartbeatCmd)me.mLeaderID = cmd.LeaderID})me.hook(feLeaderHeartbeatTimeout, func(e string, args ...interface{}) {me.mLeaderID = ""})}// hookEventsForLastVotedTerm maintains field: mLastVotedTerm// update : feCandidateRequestVote / feVoteToCandidatefunc (me *tFollowerState) hookEventsForLastVotedTerm() {me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {// before voting, check whether last vote timeoutnow := time.Now().UnixNano()if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {// timeout, reset to emptyme.mLastVotedTerm = 0me.mLastVotedCandidateID = ""me.mLastVotedTimestamp = 0}})me.hook(feVoteToCandidate, func(e string, args ...interface{}) {cmd := args[0].(*rpc.RequestVoteCmd)me.mLastVotedTerm = cmd.Term})}// hookEventsForLastVotedCandicateID maintains field: mLastVotedCandidateID// update : feCandidateRequestVote / feVoteToCandidatefunc (me *tFollowerState) hookEventsForLastVotedCandicateID() {me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {// before voting, check whether last vote timeoutnow := time.Now().UnixNano()if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {// timeout, reset to emptyme.mLastVotedTerm = 0me.mLastVotedCandidateID = ""me.mLastVotedTimestamp = 0}})me.hook(feVoteToCandidate, func(e string, args ...interface{}) {cmd := args[0].(*rpc.RequestVoteCmd)me.mLastVotedCandidateID = cmd.CandidateID})}// hookEventsForLastVotedTimestamp maintains field: mLastVotedTimestamp// update : feCandidateRequestVote / feVoteToCandidatefunc (me *tFollowerState) hookEventsForLastVotedTimestamp() {me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {// before voting, check whether last vote timeoutnow := time.Now().UnixNano()if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {// timeout, reset to emptyme.mLastVotedTerm = 0me.mLastVotedCandidateID = ""me.mLastVotedTimestamp = 0}})me.hook(feVoteToCandidate, func(e string, args ...interface{}) {me.mLastVotedTimestamp = time.Now().UnixNano()})}// hookEventsForDisposedFlag maintains field: mDisposedFlag// update: feInit / feDisposingfunc (me *tFollowerState) hookEventsForDisposedFlag() {me.hook(feInit, func(e string, args ...interface{}) {me.mDiseposedFlag = false})me.hook(feDisposing, func(e string, args ...interface{}) {me.mDiseposedFlag = true})}func (me *tFollowerState) Start() {me.mStartOnce.Do(func() {me.raise(feStart)})}func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) {go func() {iCheckingTimeoutInterval := fnRandomizeDuration(timeout.HeartbeatTimeout / 3)for range time.Tick(iCheckingTimeoutInterval) {if me.mDiseposedFlag {return}now := time.Now().UnixNano()iHeartbeatTimeoutNanos := fnRandomizeInt64(int64(timeout.HeartbeatTimeout / time.Nanosecond))if now - me.mLeaderHeartbeatTimestamp >= iHeartbeatTimeoutNanos {me.raise(feLeaderHeartbeatTimeout)return}}}()}func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) {me.raise(feDisposing)me.context.handleStateChanged(newCandidateState(me.context, me.mTerm + 1))}func (me *tFollowerState) Role() roles.RaftRole {return roles.Follower}// Heartbeat leader to followerfunc (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {// check termif cmd.Term < me.mTerm {// invalid leaderret.Code = rpc.HBTermMismatchret.Term = me.mTermreturn nil}// raise LeaderHeartbeatme.raise(feLeaderHeartbeat, cmd)// returnret.Code = rpc.HBOkreturn nil}// AppendLog leader to followerfunc (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {ret.Term = me.mTermif cmd.Term < me.mTerm {// invalid leaderret.Code = rpc.ALTermMismatchreturn nil}store := me.context.store()entry := cmd.Entry// check log: expecting appending action follows previous committing actionif entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() {// check loge, log := store.GetLog(entry.Index)if e != nil {ret.Code = rpc.ALInternalErrorreturn nil}if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm {// bad logret.Code = rpc.ALIndexMismatchret.PrevLogIndex = store.LastCommittedIndex()ret.PrevLogTerm = store.LastCommittedTerm()return nil}// good log, but old, just ignore itret.Code = rpc.ALOkreturn nil}// good loge := store.Append(entry)if e != nil {ret.Code = rpc.ALInternalErrorreturn nil} else {ret.Code = rpc.ALOkreturn nil}}// CommitLog leader to followerfunc (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {store := me.context.store()if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() {// bad indexret.Code = rpc.CLLogNotFoundreturn nil}e := store.Commit(cmd.Index)if e != nil {ret.Code = rpc.CLInternalErrorreturn nil}ret.Code = rpc.CLOkreturn nil}// RequestVote candidate to followerfunc (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {// before votingme.raise(feCandidateRequestVote, cmd)// check termif cmd.Term <= me.mTerm {ret.Term = me.mTermret.Code = rpc.RVTermMismatchreturn nil}// check if already voted anotherif me.mLastVotedTerm >= cmd.Term && me.mLastVotedCandidateID != "" && me.mLastVotedCandidateID != cmd.CandidateID {ret.Code = rpc.RVVotedAnotherreturn nil}// check log indexif cmd.LastLogIndex < me.context.store().LastCommittedIndex() {ret.Code = rpc.RVLogMismatchreturn nil}// vote okme.raise(feVoteToCandidate, cmd)ret.Term = cmd.Termret.Code = rpc.RVOkreturn nil}
tCandidateState.go
基于事件驱动重构Candidate状态的逻辑编排,各字段实施读写分离管理
package lsmimport ("learning/gooop/etcd/raft/roles""learning/gooop/etcd/raft/rpc""sync""time")// tCandidateState presents a candidate nodetype tCandidateState struct {tEventDrivenModelcontext IRaftLSMmInitOnce sync.OncemStartOnce sync.Once// update: init / ceElectionTimeoutmTerm int64// update: ceInit / ceElectionTimeout / ceVoteToCandidatemVotedTerm int64// update: ceInit / ceElectionTimeout / ceVoteToCandidatemVotedCandidateID string// update: ceInit / ceElectionTimeout / ceVoteToCandidatemVotedTimestamp int64}// trigger: init()// args: emptyconst ceInit = "candidate.init"// trigger: Start()// args: emptyconst ceStart = "candidate.Start"// trigger: whenStartThenWatchElectionTimeout()// args: emptyconst ceElectionTimeout = "candidate.ElectionTimeout"// trigger: Heartbeat() / AppendLog() / CommitLog()// args: emptyconst ceLeaderAnnounced = "candidate.LeaderAnnounced"// trigger: RequestVote()// args: *rpc.RequestVoteCmdconst ceVoteToCandidate = "candidate.VoteToCandidate"// trigger: whenLeaderHeartbeatThenSwitchToFollower()// args: emptyconst ceDisposing = "candidate.Disposing"func newCandidateState(ctx IRaftLSM, term int64) IRaftState {it := new(tCandidateState)it.init(ctx, term)return it}func (me *tCandidateState) init(ctx IRaftLSM, term int64) {me.mInitOnce.Do(func() {me.context = ctxme.mTerm = termme.initEventHandlers()me.raise(ceInit)})}func (me *tCandidateState) initEventHandlers() {// write only logicme.hookEventsForTerm()me.hookEventsForVotedTerm()me.hookEventsForVotedCandidateID()me.hookEventsForVotedTimestamp()// read only logicme.hook(ceLeaderAnnounced,me.whenLeaderAnnouncedThenSwitchToFollower)me.hook(ceElectionTimeout,me.whenElectionTimeoutThenRequestVoteAgain)}// hookEventsForTerm maintains field: mTerm// update: ceElectionTimeoutfunc (me *tCandidateState) hookEventsForTerm() {me.hook(ceElectionTimeout, func(e string, args ...interface{}) {// when election timeout, term++ and request vote againme.mTerm++})}// hookEventsForVotedTerm maintains field: mVotedTerm// update: ceInit / ceElectionTimeout / ceVoteToCandidatefunc (me *tCandidateState) hookEventsForVotedTerm() {me.hook(ceInit, func(e string, args ...interface{}) {// initially, vote to itselfme.mVotedTerm = me.mTerm})me.hook(ceElectionTimeout, func(e string, args ...interface{}) {// when timeout, reset to itselfme.mVotedTerm = me.mTerm})me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {// after vote to candidatecmd := args[0].(*rpc.RequestVoteCmd)me.mVotedTerm = cmd.Term})}// hookEventsForVotedCandidateID maintains field: mVotedCandidateID// update: ceInit / ceElectionTimeout / ceVoteToCandidatefunc (me *tCandidateState) hookEventsForVotedCandidateID() {me.hook(ceInit, func(e string, args ...interface{}) {// initially, vote to itselfme.mVotedCandidateID = me.context.config().ID()})me.hook(ceElectionTimeout, func(e string, args ...interface{}) {// when timeout, reset to itselfme.mVotedCandidateID = me.context.config().ID()})me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {// after vote to candidatecmd := args[0].(*rpc.RequestVoteCmd)me.mVotedCandidateID = cmd.CandidateID})}func (me *tCandidateState) hookEventsForVotedTimestamp() {me.hook(ceInit, func(e string, args ...interface{}) {// initially, vote to itselfme.mVotedTimestamp = time.Now().UnixNano()})me.hook(ceElectionTimeout, func(e string, args ...interface{}) {// when timeout, reset to itselfme.mVotedTimestamp = time.Now().UnixNano()})me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {// after vote to candidateme.mVotedTimestamp = time.Now().UnixNano()})}func (me *tCandidateState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {// check termif cmd.Term <= me.mTerm {// bad leaderret.Code = rpc.HBTermMismatchreturn nil}// new leaderme.raise(ceLeaderAnnounced)// return okret.Code = rpc.HBOkreturn nil}func (me *tCandidateState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {// check termif cmd.Term <= me.mTerm {// bad leaderret.Code = rpc.ALTermMismatchreturn nil}// new leaderme.raise(ceLeaderAnnounced)// ignore and returnret.Code = rpc.ALInternalErrorreturn nil}func (me *tCandidateState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {// ignore and returnret.Code = rpc.CLInternalErrorreturn nil}func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {// todo: fixmepanic("implements me")}func (me *tCandidateState) Role() roles.RaftRole {return roles.Candidate}func (me *tCandidateState) Start() {me.mStartOnce.Do(func() {me.raise(feStart)})}func (me *tCandidateState) whenLeaderAnnouncedThenSwitchToFollower(_ string, _ ...interface{}) {me.raise(ceDisposing)me.context.handleStateChanged(newFollowerState(me.context))}func (me *tCandidateState) whenElectionTimeoutThenRequestVoteAgain(_ string, _ ...interface{}) {// todo: fixmepanic("implements me")}
(未完待续)
