缘起

最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

  1. 分布式存储系统通常会通过维护多个副本来进行容错,
  2. 以提高系统的可用性。
  3. 这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
  4. Raft算法把问题分解成了领袖选举(leader election)、
  5. 日志复制(log replication)、安全性(safety
  6. 和成员关系变化(membership changes)这几个子问题。
  7. Raft算法的基本操作只需2RPC即可完成。
  8. RequestVote RPC是在选举过程中通过旧的Leader触发的,
  9. AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 3)

  • 继续完善raft状态机之Follower状态的处理逻辑
  • 继续完善raft状态机之Candidate状态的处理逻辑

设计

  • tFollowerState:
    • 监视Leader心跳是否超时
    • 如果Leader心跳超时,则切换到Candidate状态,竞选新leader
    • 添加RequestVote和AppendEntries两个RPC接口的响应
  • tCandidateState:
    • 进入此状态,立即向其他节点发起竞选请求
    • 如竞选超时,则重新发起竞选
    • 如收到新Leader心跳,则切换回Follower
    • 如收到N/2+1张票,则切换到Leader,并广播之

      tFollowerState.go

      继续完善raft状态机之Follower状态的处理逻辑 ```go package lsm

import ( “learning/gooop/etcd/raft/config” “learning/gooop/etcd/raft/roles” “learning/gooop/etcd/raft/rpc” “learning/gooop/etcd/raft/timeout” “sync” “time” )

type tFollowerState struct { tRaftStateBase

  1. mInitOnce sync.Once
  2. mStartOnce sync.Once
  3. mVotedLeaderID string
  4. mLeaderHeartbeatClock int64
  5. mStateChangedHandler StateChangedHandleFunc
  6. mEventMap map[tFollowerEvent][]tFollowerEventHandler

}

type JobFunc func()

type tFollowerEvent int const ( evFollowerStart tFollowerEvent = iota evFollowerLeaderHeartbeatTimeout tFollowerEvent = iota )

type tFollowerEventHandler func(e tFollowerEvent, args …interface{})

func newFollowerState(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) IRaftState { it := new(tFollowerState) it.init(term, cfg, handler)

  1. return it

}

func (me tFollowerState) init(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) { me.mInitOnce.Do(func() { me.tRaftStateBase = newRaftStateBase(term, cfg) me.role = roles.Follower me.mStateChangedHandler = handler

  1. // init event map
  2. me.mEventMap = make(map[tFollowerEvent][]tFollowerEventHandler)
  3. me.registerEventHandlers()
  4. })

}

func (me *tFollowerState) raise(e tFollowerEvent, args …interface{}) { if handlers, ok := me.mEventMap[e]; ok { for _, it := range handlers { it(e, args…) } } }

func (me *tFollowerState) registerEventHandlers() { me.mEventMap[evFollowerStart] = []tFollowerEventHandler{ me.whenStartThenBeginWatchLeaderTimeout, } me.mEventMap[evFollowerLeaderHeartbeatTimeout] = []tFollowerEventHandler { me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState, } }

func (me *tFollowerState) Start() { me.mStartOnce.Do(func() { me.raise(evFollowerStart) }) }

func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e tFollowerEvent, args… interface{}) { go func() { iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3 iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond) for range time.Tick(iCheckingTimeoutInterval) { now := time.Now().UnixNano() if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos { me.raise(evFollowerLeaderHeartbeatTimeout) return } } }() }

func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ tFollowerEvent, args… interface{}) { fn := me.mStateChangedHandler if fn == nil { return }

  1. state := newCandidateState(me.cfg, me.term, me.mStateChangedHandler)
  2. fn(state)

}

func (me *tFollowerState) Role() roles.RaftRole { return roles.Follower }

func (me tFollowerState) RequestVote(cmd rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { if cmd.Term <= me.term { ret.Term = me.term ret.VoteGranted = false return nil }

  1. if me.mVotedLeaderID != "" && me.mVotedLeaderID != cmd.CandidateID {
  2. ret.Term = me.term
  3. ret.VoteGranted = false
  4. return nil
  5. }
  6. me.mVotedLeaderID = cmd.CandidateID
  7. ret.Term = cmd.Term
  8. ret.VoteGranted = true
  9. return nil

}

func (me tFollowerState) AppendEntries(cmd rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error { if cmd.Term < me.term { ret.Term = me.term ret.Success = false return nil }

  1. me.term = cmd.Term
  2. me.leaderID = cmd.LeaderID
  3. me.mLeaderHeartbeatClock = time.Now().UnixNano()
  4. if len(cmd.Entries) <= 0 {
  5. // just heartbeat package
  6. ret.Term = cmd.Term
  7. ret.Success = true
  8. return nil
  9. }
  10. // todo: append logs
  11. return nil

}

func (me *tFollowerState) StateChangedHandler(handler StateChangedHandleFunc) { me.mStateChangedHandler = handler }

  1. <a name="cmAzS"></a>
  2. # tCandidateState.go
  3. 继续完善raft状态机之Candidate状态的处理逻辑
  4. ```go
  5. package lsm
  6. import (
  7. "errors"
  8. "learning/gooop/etcd/raft/config"
  9. "learning/gooop/etcd/raft/rpc"
  10. "sync"
  11. )
  12. type tCandidateState struct {
  13. tRaftStateBase
  14. mInitOnce sync.Once
  15. mStartOnce sync.Once
  16. mStateChangedHandler StateChangedHandleFunc
  17. mEventMap map[tCandidateEvent][]tCandidateEventHandler
  18. }
  19. func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
  20. return gErrorCandidateWontReplyRequestVote
  21. }
  22. func (me *tCandidateState) AppendEntries(cmd *rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error {
  23. return gErrorCandidateWontReplyAppendEntries
  24. }
  25. func (me *tCandidateState) StateChangedHandler(handler StateChangedHandleFunc) {
  26. me.mStateChangedHandler = handler
  27. }
  28. type tCandidateEvent int
  29. const (
  30. evCandidateStart tCandidateEvent = iota
  31. evCandidateElectionTimeout tCandidateEvent = iota
  32. evCandidateGotEnoughVotes tCandidateEvent = iota
  33. )
  34. type tCandidateEventHandler func(e tCandidateEvent, args ...interface{})
  35. func newCandidateState(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) IRaftState {
  36. it := new(tCandidateState)
  37. it.init(cfg, term, handler)
  38. return it
  39. }
  40. func (me *tCandidateState) init(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) {
  41. me.mInitOnce.Do(func() {
  42. me.cfg = cfg
  43. me.term = term
  44. me.mStateChangedHandler = handler
  45. // init event map
  46. me.mEventMap = make(map[tCandidateEvent][]tCandidateEventHandler)
  47. me.registerEventHandlers()
  48. })
  49. }
  50. func (me *tCandidateState) registerEventHandlers() {
  51. me.mEventMap[evCandidateStart] = []tCandidateEventHandler{
  52. me.whenStartThenRequestVote,
  53. me.whenStartThenWatchElectionTimeout,
  54. }
  55. me.mEventMap[evCandidateElectionTimeout] = []tCandidateEventHandler{
  56. me.whenElectionTimeoutThenRequestVoteAgain,
  57. }
  58. me.mEventMap[evCandidateGotEnoughVotes] = []tCandidateEventHandler{
  59. me.whenGotEnoughVotesThenSwitchToLeader,
  60. }
  61. }
  62. func (me *tCandidateState) raise(e tCandidateEvent, args ...interface{}) {
  63. if handlers, ok := me.mEventMap[e]; ok {
  64. for _, it := range handlers {
  65. it(e, args...)
  66. }
  67. }
  68. }
  69. func (me *tCandidateState) Start() {
  70. me.mStartOnce.Do(func() {
  71. me.raise(evCandidateStart)
  72. })
  73. }
  74. func (me *tCandidateState) whenStartThenRequestVote(_ tCandidateEvent, _... interface{}) {
  75. // todo: fixme
  76. panic("implements me")
  77. }
  78. func (me *tCandidateState) whenStartThenWatchElectionTimeout(_ tCandidateEvent, _... interface{}) {
  79. // todo: fixme
  80. panic("implements me")
  81. }
  82. func (me *tCandidateState) whenElectionTimeoutThenRequestVoteAgain(_ tCandidateEvent, _... interface{}) {
  83. // todo: fixme
  84. panic("implements me")
  85. }
  86. func (me *tCandidateState) whenGotEnoughVotesThenSwitchToLeader(_ tCandidateEvent, _... interface{}) {
  87. // todo: fixme
  88. panic("implements me")
  89. }
  90. var gErrorCandidateWontReplyRequestVote = errors.New("candidate won't reply RequestVote RPC")
  91. var gErrorCandidateWontReplyAppendEntries = errors.New("candidate won't reply AppendEntries RPC")

(未完待续)