缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,以提高系统的可用性。这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?Raft算法把问题分解成了四个子问题:1. 领袖选举(leader election)、2. 日志复制(log replication)、3. 安全性(safety)4. 成员关系变化(membership changes)这几个子问题。
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 8)
- 简化rpc连接管理器tRaftClientService的实现
- 剥离IRaftLSM的内部支持接口到iRaftStateContext接口
- 完成Candidate状态的处理逻辑
设计
- IRaftLSM:raft有限状态机接口
- iRaftStateContext:提供状态模式下的上下文支持
- tCandidateState:Candidate(候选人)状态的实现。基于事件驱动的逻辑编排,基于读写分离的字段管理。
- tRaftClient:管理到指定raft节点的rpc连接
- tRaftClientService:管理当前节点到其他raft节点的rpc连接
IRaftLSM.go
raft有限状态机接口
package lsmimport ("learning/gooop/etcd/raft/rpc")// IRaftLSM raft有限状态自动机type IRaftLSM interface {rpc.IRaftRPCiRaftStateContextState() IRaftState}
iRaftStateContext.go
提供状态模式下的上下文支持
package lsmimport ("learning/gooop/etcd/raft/config""learning/gooop/etcd/raft/rpc/client""learning/gooop/etcd/raft/store")type iRaftStateContext interface {Config() config.IRaftConfigStore() store.ILogStoreHandleStateChanged(state IRaftState)RaftClientService() client.IRaftClientService}
tCandidateState.go
Candidate(候选人)状态的实现。基于事件驱动的逻辑编排,基于读写分离的字段管理。
package lsmimport ("learning/gooop/etcd/raft/roles""learning/gooop/etcd/raft/rpc""learning/gooop/etcd/raft/timeout""sync""time")// tCandidateState presents a candidate nodetype tCandidateState struct {tEventDrivenModelcontext iRaftStateContextmInitOnce sync.OncemStartOnce sync.Once// update: init / ceAskingForVotemTerm int64// update: ceInit / ceAskingForVote / ceVoteToCandidatemVotedTerm int64// update: ceInit / ceAskingForVote / ceVoteToCandidatemVotedCandidateID string// update: ceInit / ceAskingForVote / ceVoteToCandidatemVotedTimestamp int64// update: ceInit / ceAskingForVote / ceReceiveTicket / ceDisposingmTicketCount map[string]boolmTicketMutex *sync.Mutex// update: ceInit / ceDisposingmDisposedFlag bool}// trigger: init()// args: emptyconst ceInit = "candidate.init"// trigger: Start()// args: emptyconst ceStart = "candidate.Start"// trigger: whenAskingForVoteThenWatchElectionTimeout()// args: emptyconst ceElectionTimeout = "candidate.ElectionTimeout"// trigger: Heartbeat() / AppendLog() / CommitLog()// args: emptyconst ceLeaderAnnounced = "candidate.LeaderAnnounced"// trigger: RequestVote()// args: *rpc.RequestVoteCmdconst ceVoteToCandidate = "candidate.VoteToCandidate"// trigger: whenLeaderAnnouncedThenSwitchToFollower()// args: emptyconst ceDisposing = "candidate.Disposing"// trigger: beginAskForVote()// args: emptyconst ceAskingForVote = "candidate.AskingForVote"// trigger: handleRequestVoteOK()// args: emptyconst ceReceiveTicket = "candidate.ReceiveTicket"// trigger: whenReceiveTicketThenCheckTicketCount// args: emptyconst ceWinningTheVote = "candidate.ceWinningTheVote"func newCandidateState(ctx iRaftStateContext, term int64) IRaftState {it := new(tCandidateState)it.init(ctx, term)return it}func (me *tCandidateState) init(ctx iRaftStateContext, term int64) {me.mInitOnce.Do(func() {me.context = ctxme.mTerm = termme.initEventHandlers()me.raise(ceInit)})}func (me *tCandidateState) initEventHandlers() {// write only logicme.hookEventsForTerm()me.hookEventsForVotedTerm()me.hookEventsForVotedCandidateID()me.hookEventsForVotedTimestamp()me.hookEventsForTicketCount()me.hookEventsForDisposedFlag()// read only logicme.hook(ceStart,me.whenStartThenAskForVote)me.hook(ceAskingForVote,me.whenAskingForVoteThenWatchElectionTimeout)me.hook(ceReceiveTicket,me.whenReceiveTicketThenCheckTicketCount)me.hook(ceElectionTimeout,me.whenElectionTimeoutThenAskForVoteAgain)me.hook(ceWinningTheVote,me.whenWinningTheVoteThenSwitchToLeader)me.hook(ceLeaderAnnounced,me.whenLeaderAnnouncedThenSwitchToFollower)}// hookEventsForTerm maintains field: mTerm// update: ceElectionTimeoutfunc (me *tCandidateState) hookEventsForTerm() {me.hook(ceAskingForVote, func(e string, args ...interface{}) {me.mTerm++})}// hookEventsForVotedTerm maintains field: mVotedTerm// update: ceInit / ceElectionTimeout / ceVoteToCandidatefunc (me *tCandidateState) hookEventsForVotedTerm() {me.hook(ceInit, func(e string, args ...interface{}) {// initially, vote to itselfme.mVotedTerm = me.mTerm})me.hook(ceAskingForVote, func(e string, args ...interface{}) {// when timeout, reset to itselfme.mVotedTerm = me.mTerm})me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {// after vote to candidatecmd := args[0].(*rpc.RequestVoteCmd)me.mVotedTerm = cmd.Term})}// hookEventsForVotedCandidateID maintains field: mVotedCandidateID// update: ceInit / ceElectionTimeout / ceVoteToCandidatefunc (me *tCandidateState) hookEventsForVotedCandidateID() {me.hook(ceInit, func(e string, args ...interface{}) {// initially, vote to itselfme.mVotedCandidateID = me.context.Config().ID()})me.hook(ceAskingForVote, func(e string, args ...interface{}) {me.mVotedCandidateID = me.context.Config().ID()})me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {// after vote to candidatecmd := args[0].(*rpc.RequestVoteCmd)me.mVotedCandidateID = cmd.CandidateID})}func (me *tCandidateState) hookEventsForVotedTimestamp() {me.hook(ceInit, func(e string, args ...interface{}) {// initially, vote to itselfme.mVotedTimestamp = time.Now().UnixNano()})me.hook(ceAskingForVote, func(e string, args ...interface{}) {me.mVotedTimestamp = time.Now().UnixNano()})me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {// after vote to candidateme.mVotedTimestamp = time.Now().UnixNano()})}func (me *tCandidateState) hookEventsForTicketCount() {me.hook(ceInit, func(e string, args ...interface{}) {me.mTicketMutex = new(sync.Mutex)me.mTicketCount = make(map[string]bool, 0)me.mTicketCount[me.context.Config().ID()] = true})me.hook(ceAskingForVote, func(e string, args ...interface{}) {me.mTicketMutex.Lock()defer me.mTicketMutex.Unlock()me.mTicketCount = make(map[string]bool, 0)me.mTicketCount[me.context.Config().ID()] = true})me.hook(ceReceiveTicket, func(e string, args ...interface{}) {peerID := args[0].(string)me.mTicketMutex.Lock()defer me.mTicketMutex.Unlock()me.mTicketCount[peerID] = true})me.hook(ceDisposing, func(e string, args ...interface{}) {me.mTicketMutex.Lock()defer me.mTicketMutex.Unlock()me.mTicketCount = make(map[string]bool, 0)})}func (me *tCandidateState) hookEventsForDisposedFlag() {me.hook(ceInit, func(e string, args ...interface{}) {me.mDisposedFlag = false})me.hook(ceDisposing, func(e string, args ...interface{}) {me.mDisposedFlag = true})}func (me *tCandidateState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {// check termif cmd.Term <= me.mTerm {// bad leaderret.Code = rpc.HBTermMismatchreturn nil}// new leaderme.raise(ceLeaderAnnounced)// return okret.Code = rpc.HBOkreturn nil}func (me *tCandidateState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {// check termif cmd.Term <= me.mTerm {// bad leaderret.Code = rpc.ALTermMismatchreturn nil}// new leaderme.raise(ceLeaderAnnounced)// ignore and returnret.Code = rpc.ALInternalErrorreturn nil}func (me *tCandidateState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {// ignore and returnret.Code = rpc.CLInternalErrorreturn nil}func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {// check voted termif cmd.Term < me.mVotedTerm {ret.Code = rpc.RVTermMismatchreturn nil}if cmd.Term == me.mVotedTerm {if me.mVotedCandidateID != "" && me.mVotedCandidateID != cmd.CandidateID {// already vote anotherret.Code = rpc.RVVotedAnotherreturn nil} else {// already votedret.Code = rpc.RVOkreturn nil}}if cmd.Term > me.mVotedTerm {// new term, check logif cmd.LastLogIndex >= me.context.Store().LastCommittedIndex() {// good logme.raise(ceVoteToCandidate, cmd)ret.Code = rpc.RVOk} else {// bad logret.Code = rpc.RVLogMismatch}return nil}// should not reaches hereret.Code = rpc.RVTermMismatchreturn nil}func (me *tCandidateState) Role() roles.RaftRole {return roles.Candidate}func (me *tCandidateState) Start() {me.mStartOnce.Do(func() {me.raise(feStart)})}func (me *tCandidateState) whenLeaderAnnouncedThenSwitchToFollower(_ string, _ ...interface{}) {me.raise(ceDisposing)me.context.HandleStateChanged(newFollowerState(me.context))}func (me *tCandidateState) whenElectionTimeoutThenAskForVoteAgain(_ string, _ ...interface{}) {me.beginAskForVote()}func (me *tCandidateState) whenStartThenAskForVote(_ string, _ ...interface{}) {me.beginAskForVote()}func (me *tCandidateState) beginAskForVote() {// raise ceAskingForVoteme.raise(ceAskingForVote)// for each node, call node.RequestVotecmd := new(rpc.RequestVoteCmd)cmd.CandidateID = me.context.Config().ID()cmd.Term = me.mTermstore := me.context.Store()cmd.LastLogIndex = store.LastCommittedIndex()cmd.LastLogTerm = store.LastCommittedTerm()term := me.mTermfor _,node := range me.context.Config().Nodes() {if node.ID() == me.context.Config().ID() {continue}peerID := node.ID()go func() {ret := new(rpc.RequestVoteRet)err := me.context.RaftClientService().Using(peerID, func(client rpc.IRaftRPC) error {return client.RequestVote(cmd, ret)})if err == nil && ret.Code == rpc.RVOk {me.handleRequestVoteOK(peerID, term)}}()}}func (me *tCandidateState) whenAskingForVoteThenWatchElectionTimeout(_ string, _ ...interface{}) {term := me.mTermgo func() {time.Sleep(timeout.RandElectionTimeout())if me.mDisposedFlag || me.mTerm != term {return}tc := me.getTicketCount()if tc < len(me.context.Config().Nodes())/2 + 1 {me.raise(ceElectionTimeout)}}()}func (me *tCandidateState) handleRequestVoteOK(peerID string, term int64) {if me.mDisposedFlag || me.mTerm != term {return}me.raise(ceReceiveTicket, peerID)}func (me *tCandidateState) whenReceiveTicketThenCheckTicketCount(_ string, _ ...interface{}) {tc := me.getTicketCount()if tc >= len(me.context.Config().Nodes())/2 + 1 {// win the voteme.raise(ceWinningTheVote)}}func (me *tCandidateState) getTicketCount() int {me.mTicketMutex.Lock()defer me.mTicketMutex.Unlock()return len(me.mTicketCount)}func (me *tCandidateState) whenWinningTheVoteThenSwitchToLeader(_ string, _ ...interface{}) {me.raise(ceDisposing)me.context.HandleStateChanged(newLeaderState(me.context, me.mTerm))}
tRaftClient.go
管理到指定raft节点的rpc连接
package clientimport ("learning/gooop/etcd/raft/config"rrpc "learning/gooop/etcd/raft/rpc""net/rpc")type tRaftClient struct {cfg config.IRaftNodeConfigconn *rpc.Clientstate iClientState}func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient {it := new(tRaftClient)it.init(cfg, conn)return it}func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) {me.cfg = cfgme.conn = connif conn == nil {me.state = newBrokenState(me)} else {me.state = newConnectedState(me)}me.state.Start()}func (me *tRaftClient) Config() config.IRaftNodeConfig {return me.cfg}func (me *tRaftClient) GetConn() *rpc.Client {return me.conn}func (me *tRaftClient) SetConn(conn *rpc.Client) {me.conn = conn}func (me *tRaftClient) HandleStateChanged(state iClientState) {me.state = statestate.Start()}func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {return me.state.Heartbeat(cmd, ret)}func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {return me.state.AppendLog(cmd, ret)}func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {return me.state.CommitLog(cmd, ret)}func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {return me.state.RequestVote(cmd, ret)}func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error {return me.state.Ping(cmd, ret)}
tRaftClientService.go
管理当前节点到其他raft节点的rpc连接
package clientimport ("errors""learning/gooop/etcd/raft/config""learning/gooop/etcd/raft/rpc"netrpc "net/rpc")type tRaftClientService struct {cfg config.IRaftConfigclients map[string]IRaftClient}func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService {it := new(tRaftClientService)it.init(cfg)return it}func (me *tRaftClientService) init(cfg config.IRaftConfig) {me.cfg = cfgme.clients = make(map[string]IRaftClient)for _,nc := range me.cfg.Nodes() {me.clients[nc.ID()] = me.createRaftClient(nc)}}func (me *tRaftClientService) createRaftClient(nodeCfg config.IRaftNodeConfig) IRaftClient {// dial to peerconn, err := netrpc.Dial("tcp", nodeCfg.Endpoint())if err != nil {return newRaftClient(nodeCfg, nil)} else {return newRaftClient(nodeCfg, conn)}}func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error {it, ok := me.clients[peerID]if ok {return action(it)} else {return gErrorUnknownRaftPeer}}var gErrorUnknownRaftPeer = errors.New("unknown raft peer")
(未完待续)
