缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,以提高系统的可用性。这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?Raft算法把问题分解成了领袖选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变化(membership changes)这几个子问题。Raft算法的基本操作只需2种RPC即可完成。RequestVote RPC是在选举过程中通过旧的Leader触发的,AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 5)
- 重新设计RPC接口,将原有浓缩的两个接口分解为更易于理解和实现的四个接口( 尽信书则不如无书 -_-|| )
- 根据新RPC接口重写Follower状态的实现
设计
- IRaftRPC: 将原有浓缩的两个接口分解为更易于理解和实现的四个接口
- IRaftLSM: 添加部分包内支持接口
- iEventDrivenModel:抽取并实现事件驱动型的逻辑编排
- ILogStore:改造适配新分解的RPC接口
- tBoltDBStore:基于boltdb实现日志暂存,提交和应用
- tFollowerState:根据新分解的RPC接口,重写Follower状态的实现(未完成)
IRaftRPC.go
将原有浓缩的两个接口分解为更易于理解和实现的四个接口。尽信书则不如无书-_-||
package rpcimport "learning/gooop/etcd/raft/model"type IRaftRPC interface {// leader to followerHeartbeat(cmd *HeartbeatCmd, ret *HeartbeatRet) error// leader to followerAppendLog(cmd *AppendLogCmd, ret *AppendLogRet) error// leader to followerCommitLog(cmd *CommitLogCmd, ret *CommitLogRet) error// candidate to followerRequestVote(cmd *RequestVoteCmd, ret *RequestVoteRet) error}type HeartbeatCmd struct {LeaderID stringTerm int64}type HeartbeatRet struct {Code HBCodeTerm int64}type HBCode intconst (HBOk HBCode = iotaHBTermMismatch HBCode = iota)type RequestVoteCmd struct {CandidateID stringTerm int64LastLogIndex int64LastLogTerm int64}type RequestVoteRet struct {Code RVCodeTerm int64}type RVCode intconst (RVOk RVCode = iotaRVLogMismatch RVCode = iotaRVTermMismatch RVCode = iotaRVVotedAnother RVCode = iota)type AppendLogCmd struct {LeaderID stringTerm int64Entry *model.LogEntry}type AppendLogRet struct {Code ALCodeTerm int64PrevLogIndex int64PrevLogTerm int64}type ALCode intconst (ALOk ALCode = iotaALTermMismatch ALCode = iotaALIndexMismatch ALCode = iotaALInternalError ALCode = iota)type CommitLogCmd struct {LeaderID stringTerm int64Index int64}type CommitLogRet struct {Code CLCode}type CLCode intconst (CLOk CLCode = iotaCLLogNotFound CLCode = iotaCLInternalError CLCode = iota)
IRaftLSM.go
添加部分包内支持接口
package lsmimport ("learning/gooop/etcd/raft/config""learning/gooop/etcd/raft/rpc""learning/gooop/etcd/raft/store")// IRaftLSM raft有限状态自动机type IRaftLSM interface {rpc.IRaftRPCState() IRaftStateconfig() config.IRaftConfigstore() store.ILogStorehandleStateChanged(state IRaftState)}
iEventDrivenModel.go
抽取并实现事件驱动型的逻辑编排
package lsmtype tEventHandleFunc func(e string, args... interface{})type iEventDrivenModel interface {hook(e string, handleFunc tEventHandleFunc)raise(e string, args... interface{})}type tEventDrivenModel struct {items map[string][]tEventHandleFunc}func (me *tEventDrivenModel) hook(e string, handler tEventHandleFunc) {arr, ok := me.items[e]if ok {me.items[e] = append(arr, handler)} else {me.items[e] = []tEventHandleFunc{handler }}}func (me *tEventDrivenModel) raise(e string, args... interface{}) {if handlers, ok := me.items[e];ok {for _,it := range handlers {it(e, args...)}}}
ILogStore.go
改造适配新分解的RPC接口
package storeimport "learning/gooop/etcd/raft/model"type ILogStore interface {LastAppendedTerm() int64LastAppendedIndex() int64LastCommittedTerm() int64LastCommittedIndex() int64Append(entry *model.LogEntry) errorCommit(index int64) errorGetLog(index int64) (error, *model.LogEntry)}
tBoltDBStore.go
基于boltdb实现日志暂存,提交和应用
package storeimport ("bytes""encoding/binary""errors""github.com/boltdb/bolt""learning/gooop/etcd/raft/model")type tBoltDBStore struct {file stringlastAppendedTerm int64lastAppendedIndex int64lastCommittedTerm int64lastCommittedIndex int64db bolt.DB}func NewBoltStore(file string) (error, ILogStore) {db, err := bolt.Open(file, 0600, nil)if err != nil {return err, nil}store := new(tBoltDBStore)err = db.Update(func(tx *bolt.Tx) error {b, e := tx.CreateBucketIfNotExists(gMetaBucket)if e != nil {return e}v := b.Get(gKeyCommittedTerm)if v == nil {e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))if e != nil {return e}store.lastCommittedTerm = gDefaultTerm} else {store.lastCommittedTerm = bytesToInt64(v)}v = b.Get(gKeyCommittedIndex)if v == nil {e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))if e != nil {return e}store.lastCommittedIndex = gDefaultIndex} else {store.lastCommittedIndex = bytesToInt64(v)}b, e = tx.CreateBucketIfNotExists(gDataBucket)if e != nil {return e}e = tx.DeleteBucket(gUnstableBucket)if e != nil {return e}_, e = tx.CreateBucket(gUnstableBucket)if e != nil {return e}_, e = tx.CreateBucketIfNotExists(gCommittedBucket)if e != nil {return e}return nil})if err != nil {return err, nil}return nil, store}func int64ToBytes(i int64) []byte {buf := bytes.NewBuffer(make([]byte, 8))_ = binary.Write(buf, binary.BigEndian, i)return buf.Bytes()}func bytesToInt64(data []byte) int64 {var i int64buf := bytes.NewBuffer(data)_ = binary.Read(buf, binary.BigEndian, &i)return i}func (me *tBoltDBStore) LastCommittedTerm() int64 {return me.lastCommittedTerm}func (me *tBoltDBStore) LastCommittedIndex() int64 {return me.lastCommittedIndex}func (me *tBoltDBStore) LastAppendedTerm() int64 {return me.lastAppendedTerm}func (me *tBoltDBStore) LastAppendedIndex() int64 {return me.lastAppendedIndex}func (me *tBoltDBStore) Append(entry *model.LogEntry) error {cmd := gCmdFactory.OfTag(entry.Tag)cmd.Unmarshal(entry.Command)e, entryData := entry.Marshal()if e != nil {return e}return me.db.Update(func(tx *bolt.Tx) error {// save log to unstableb := tx.Bucket(gUnstableBucket)e = b.Put(int64ToBytes(entry.Index), entryData)if e != nil {return e}return nil})}func (me *tBoltDBStore) Commit(index int64) error {return me.db.Update(func(tx *bolt.Tx) error {// read unstable logub := tx.Bucket(gUnstableBucket)k := int64ToBytes(index)data := ub.Get(k)if data == nil {return gErrorCommitLogNotFound}entry := new(model.LogEntry)e := entry.Unmarshal(data)if e != nil {return e}// apply cmdcmd := gCmdFactory.OfTag(entry.Tag)cmd.Unmarshal(entry.Command)e = cmd.Apply(tx)if e != nil {return e}// save to committed logcb := tx.Bucket(gCommittedBucket)e = cb.Put(k, data)if e != nil {return e}// update committed.index, committed.termmb := tx.Bucket(gMetaBucket)e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))if e != nil {return e}e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))if e != nil {return e}// del unstable.indexe = ub.Delete(k)if e != nil {return e}me.lastCommittedIndex = entry.Indexme.lastCommittedTerm = entry.Termreturn nil})}func (me *tBoltDBStore) GetLog(index int64) (error, *model.LogEntry) {ret := []*model.LogEntry{ nil }e := me.db.View(func(tx *bolt.Tx) error {k := int64ToBytes(index)v := tx.Bucket(gCommittedBucket).Get(k)if v == nil {return nil}entry := new(model.LogEntry)e := entry.Unmarshal(v)if e != nil {return e}ret[0] = entryreturn nil})return e, ret[0]}var gMetaBucket = []byte("meta")var gUnstableBucket = []byte("unstable")var gCommittedBucket = []byte("committed")var gDataBucket = []byte("data")var gKeyCommittedIndex = []byte("committed.index")var gKeyCommittedTerm = []byte("committed.term")var gDefaultTerm int64 = 0var gDefaultIndex int64 = 0var gErrorCommitLogNotFound = errors.New("committing log not found")
tFollowerState.go
根据新分解的RPC接口,重写Follower状态的实现(未完成)
package lsmimport ("learning/gooop/etcd/raft/roles""learning/gooop/etcd/raft/rpc""learning/gooop/etcd/raft/timeout""sync""time")// tFollowerState presents a follower nodetype tFollowerState struct {tEventDrivenModelcontext IRaftLSMmInitOnce sync.OncemStartOnce sync.OncemDisposeOnce sync.Once// updated when init, set term == store.lastCommittedTerm// updated when leader.heartbeatmTerm int64// updated when leader.heartbeatmLeaderHeartbeatClock int64mVotedLeaderID stringmVotedTimestamp int64}const feStart string = "follower.Start"const feLeaderHeartbeatTimeout string = "follower.LeaderHeartbeatTimeout"func newFollowerState(ctx IRaftLSM) IRaftState {it := new(tFollowerState)it.init(ctx)return it}func (me *tFollowerState) init(ctx IRaftLSM) {me.mInitOnce.Do(func() {me.context = ctxme.mTerm = ctx.store().LastCommittedTerm()me.mLeaderHeartbeatClock = 0me.initEventHandlers()})}func (me *tFollowerState) initEventHandlers() {me.hook(feStart,me.whenStartThenBeginWatchLeaderTimeout)me.hook(feLeaderHeartbeatTimeout,me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState)}func (me *tFollowerState) Start() {me.mStartOnce.Do(func() {me.raise(feStart)})}func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) {go func() {iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond)for range time.Tick(iCheckingTimeoutInterval) {now := time.Now().UnixNano()if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos {me.raise(feLeaderHeartbeatTimeout)return}}}()}func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) {panic("implements me")}func (me *tFollowerState) Role() roles.RaftRole {return roles.Follower}// Heartbeat leader to followerfunc (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {if cmd.Term < me.mTerm {// invalid leaderret.Code = rpc.HBTermMismatchret.Term = me.mTermreturn nil} else if cmd.Term > me.mTerm {// new leaderme.mTerm = cmd.Term}// update heartbeat clock and returnme.mLeaderHeartbeatClock = time.Now().UnixNano()ret.Code = rpc.HBOkreturn nil}// AppendLog leader to followerfunc (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {ret.Term = me.mTermif cmd.Term < me.mTerm {// invalid leaderret.Code = rpc.ALTermMismatchreturn nil}store := me.context.store()entry := cmd.Entry// check log: expecting appending action follows previous committing actionif entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() {// check loge, log := store.GetLog(entry.Index)if e != nil {ret.Code = rpc.ALInternalErrorreturn nil}if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm {// bad logret.Code = rpc.ALIndexMismatchret.PrevLogIndex = store.LastCommittedIndex()ret.PrevLogTerm = store.LastCommittedTerm()return nil}// good log, but old, just ignore itret.Code = rpc.ALOkreturn nil}// good loge := store.Append(entry)if e != nil {ret.Code = rpc.ALInternalErrorreturn nil} else {ret.Code = rpc.ALOkreturn nil}}// CommitLog leader to followerfunc (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {store := me.context.store()if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() {// bad indexret.Code = rpc.CLLogNotFoundreturn nil}e := store.Commit(cmd.Index)if e != nil {ret.Code = rpc.CLInternalErrorreturn nil}ret.Code = rpc.CLOkreturn nil}// RequestVote candidate to followerfunc (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {panic("implements me")}
(未完待续)
