缘起

最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

  1. 分布式存储系统通常会通过维护多个副本来进行容错,
  2. 以提高系统的可用性。
  3. 这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
  4. Raft算法把问题分解成了四个子问题:
  5. 1. 领袖选举(leader election)、
  6. 2. 日志复制(log replication)、
  7. 3. 安全性(safety
  8. 4. 成员关系变化(membership changes
  9. 这几个子问题。

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 6)

  • 大幅重构,提升代码的可理解/可管理性:

    • 基于事件驱动的逻辑编排,重构Follower和Candidate状态下的实现
    • 将字段状态的管理,实行读写分离。没看错,代码也是可以”读写分离”的 ^_^

      设计

  • random:为各种超时时间添加随机性

  • tFollowerState:基于事件驱动重构Follower状态的逻辑编排,各字段实施读写分离管理
  • tCandidateState:基于事件驱动重构Candidate状态的逻辑编排,各字段实施读写分离管理

random.go

为各种超时时间添加随机性

  1. package lsm
  2. import (
  3. "math/rand"
  4. "time"
  5. )
  6. // fnRandomizeInt64 returns int64 value from v to v*1.3
  7. func fnRandomizeInt64(v int64) int64 {
  8. return v + v * gRand.Int63n(30) / 100
  9. }
  10. // fnRandomizeDuration returns duration value from v to v*1.3
  11. func fnRandomizeDuration(v time.Duration) time.Duration {
  12. i := int64(v)
  13. return time.Duration(fnRandomizeInt64(i))
  14. }
  15. var gRand = rand.New(rand.NewSource(time.Now().UnixNano()))

tFollowerState.go

基于事件驱动重构Follower状态的逻辑编排,各字段实施读写分离管理

  1. package lsm
  2. import (
  3. "learning/gooop/etcd/raft/roles"
  4. "learning/gooop/etcd/raft/rpc"
  5. "learning/gooop/etcd/raft/timeout"
  6. "sync"
  7. "time"
  8. )
  9. // tFollowerState presents a follower node
  10. type tFollowerState struct {
  11. tEventDrivenModel
  12. context IRaftLSM
  13. mInitOnce sync.Once
  14. mStartOnce sync.Once
  15. // update: feInit / feLeaderHeartbeat
  16. mTerm int64
  17. // update: feInit / feLeaderHeartbeat
  18. mLeaderHeartbeatTimestamp int64
  19. // update: feLeaderHeartbeat
  20. mLeaderID string
  21. // update: feCandidateRequestVote / feVoteToCandidate
  22. mLastVotedTerm int64
  23. // update: feCandidateRequestVote / feVoteToCandidate
  24. mLastVotedCandidateID string
  25. // update: feCandidateRequestVote / feVoteToCandidate
  26. mLastVotedTimestamp int64
  27. // update: feInit / feDisposing
  28. mDiseposedFlag bool
  29. }
  30. // trigger: init()
  31. // args: empty
  32. const feInit = "follower.init"
  33. // trigger: Start()
  34. // args: empty
  35. const feStart = "follower.Start"
  36. // trigger: Heartbeat()
  37. // args: rpc.HeartbeatCmd
  38. const feLeaderHeartbeat = "follower.LeaderHeartbeat"
  39. // trigger: whenStartThenBeginWatchLeaderTimeout()
  40. // args: empty
  41. const feLeaderHeartbeatTimeout = "follower.LeaderHeartbeatTimeout"
  42. // trigger: RequestVote()
  43. // args: rpc.RequestVoteCmd
  44. const feCandidateRequestVote = "candidate.RequestVote"
  45. // trigger: RequestVote()
  46. // args: rpc.RequestVoteCmd
  47. const feVoteToCandidate = "follower.CandidateRequestVote"
  48. // trigger: whenLeaderHeartbeatTimeoutThenSwitchToCandidateState
  49. const feDisposing = "follower.Disposing"
  50. func newFollowerState(ctx IRaftLSM) IRaftState {
  51. it := new(tFollowerState)
  52. it.init(ctx)
  53. return it
  54. }
  55. func (me *tFollowerState) init(ctx IRaftLSM) {
  56. me.mInitOnce.Do(func() {
  57. me.context = ctx
  58. me.initEventHandlers()
  59. })
  60. }
  61. func (me *tFollowerState) initEventHandlers() {
  62. // write only logic
  63. me.hookEventsForTerm()
  64. me.hookEventsForLeaderHeartbeatTimestamp()
  65. me.hookEventsForLeaderID()
  66. me.hookEventsForLastVotedTerm()
  67. me.hookEventsForLastVotedCandicateID()
  68. me.hookEventsForLastVotedTimestamp()
  69. me.hookEventsForDisposedFlag()
  70. // read only logic
  71. me.hook(feStart,
  72. me.whenStartThenBeginWatchLeaderTimeout)
  73. me.hook(feLeaderHeartbeatTimeout,
  74. me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState)
  75. }
  76. // hookEventsForTerm maintains field: mTerm
  77. // update : feInit / feLeaderHeartbeat
  78. func (me *tFollowerState) hookEventsForTerm() {
  79. me.hook(feInit, func(e string, args ...interface{}) {
  80. me.mTerm = me.context.store().LastCommittedTerm()
  81. })
  82. me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {
  83. cmd := args[0].(*rpc.HeartbeatCmd)
  84. me.mTerm = cmd.Term
  85. })
  86. }
  87. // hookEventsForLeaderHeartbeatClock maintains field: mLeaderHeartbeatClock
  88. // update : feLeaderHeartbeat / feLeaderHeartbeatTimeout
  89. func (me *tFollowerState) hookEventsForLeaderHeartbeatTimestamp() {
  90. me.hook(feInit, func(e string, args ...interface{}) {
  91. me.mLeaderHeartbeatTimestamp = time.Now().UnixNano()
  92. })
  93. me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {
  94. me.mLeaderHeartbeatTimestamp = time.Now().UnixNano()
  95. })
  96. me.hook(feLeaderHeartbeatTimeout, func(e string, args ...interface{}) {
  97. me.mLeaderHeartbeatTimestamp = 0
  98. })
  99. }
  100. // hookEventsForLeaderID maintains field: mLeaderID
  101. // update : feLeaderHeartbeat / feLeaderHeartbeatTimeout
  102. func (me *tFollowerState) hookEventsForLeaderID() {
  103. me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {
  104. cmd := args[0].(*rpc.HeartbeatCmd)
  105. me.mLeaderID = cmd.LeaderID
  106. })
  107. me.hook(feLeaderHeartbeatTimeout, func(e string, args ...interface{}) {
  108. me.mLeaderID = ""
  109. })
  110. }
  111. // hookEventsForLastVotedTerm maintains field: mLastVotedTerm
  112. // update : feCandidateRequestVote / feVoteToCandidate
  113. func (me *tFollowerState) hookEventsForLastVotedTerm() {
  114. me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {
  115. // before voting, check whether last vote timeout
  116. now := time.Now().UnixNano()
  117. if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {
  118. // timeout, reset to empty
  119. me.mLastVotedTerm = 0
  120. me.mLastVotedCandidateID = ""
  121. me.mLastVotedTimestamp = 0
  122. }
  123. })
  124. me.hook(feVoteToCandidate, func(e string, args ...interface{}) {
  125. cmd := args[0].(*rpc.RequestVoteCmd)
  126. me.mLastVotedTerm = cmd.Term
  127. })
  128. }
  129. // hookEventsForLastVotedCandicateID maintains field: mLastVotedCandidateID
  130. // update : feCandidateRequestVote / feVoteToCandidate
  131. func (me *tFollowerState) hookEventsForLastVotedCandicateID() {
  132. me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {
  133. // before voting, check whether last vote timeout
  134. now := time.Now().UnixNano()
  135. if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {
  136. // timeout, reset to empty
  137. me.mLastVotedTerm = 0
  138. me.mLastVotedCandidateID = ""
  139. me.mLastVotedTimestamp = 0
  140. }
  141. })
  142. me.hook(feVoteToCandidate, func(e string, args ...interface{}) {
  143. cmd := args[0].(*rpc.RequestVoteCmd)
  144. me.mLastVotedCandidateID = cmd.CandidateID
  145. })
  146. }
  147. // hookEventsForLastVotedTimestamp maintains field: mLastVotedTimestamp
  148. // update : feCandidateRequestVote / feVoteToCandidate
  149. func (me *tFollowerState) hookEventsForLastVotedTimestamp() {
  150. me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {
  151. // before voting, check whether last vote timeout
  152. now := time.Now().UnixNano()
  153. if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {
  154. // timeout, reset to empty
  155. me.mLastVotedTerm = 0
  156. me.mLastVotedCandidateID = ""
  157. me.mLastVotedTimestamp = 0
  158. }
  159. })
  160. me.hook(feVoteToCandidate, func(e string, args ...interface{}) {
  161. me.mLastVotedTimestamp = time.Now().UnixNano()
  162. })
  163. }
  164. // hookEventsForDisposedFlag maintains field: mDisposedFlag
  165. // update: feInit / feDisposing
  166. func (me *tFollowerState) hookEventsForDisposedFlag() {
  167. me.hook(feInit, func(e string, args ...interface{}) {
  168. me.mDiseposedFlag = false
  169. })
  170. me.hook(feDisposing, func(e string, args ...interface{}) {
  171. me.mDiseposedFlag = true
  172. })
  173. }
  174. func (me *tFollowerState) Start() {
  175. me.mStartOnce.Do(func() {
  176. me.raise(feStart)
  177. })
  178. }
  179. func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) {
  180. go func() {
  181. iCheckingTimeoutInterval := fnRandomizeDuration(timeout.HeartbeatTimeout / 3)
  182. for range time.Tick(iCheckingTimeoutInterval) {
  183. if me.mDiseposedFlag {
  184. return
  185. }
  186. now := time.Now().UnixNano()
  187. iHeartbeatTimeoutNanos := fnRandomizeInt64(int64(timeout.HeartbeatTimeout / time.Nanosecond))
  188. if now - me.mLeaderHeartbeatTimestamp >= iHeartbeatTimeoutNanos {
  189. me.raise(feLeaderHeartbeatTimeout)
  190. return
  191. }
  192. }
  193. }()
  194. }
  195. func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) {
  196. me.raise(feDisposing)
  197. me.context.handleStateChanged(newCandidateState(me.context, me.mTerm + 1))
  198. }
  199. func (me *tFollowerState) Role() roles.RaftRole {
  200. return roles.Follower
  201. }
  202. // Heartbeat leader to follower
  203. func (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
  204. // check term
  205. if cmd.Term < me.mTerm {
  206. // invalid leader
  207. ret.Code = rpc.HBTermMismatch
  208. ret.Term = me.mTerm
  209. return nil
  210. }
  211. // raise LeaderHeartbeat
  212. me.raise(feLeaderHeartbeat, cmd)
  213. // return
  214. ret.Code = rpc.HBOk
  215. return nil
  216. }
  217. // AppendLog leader to follower
  218. func (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
  219. ret.Term = me.mTerm
  220. if cmd.Term < me.mTerm {
  221. // invalid leader
  222. ret.Code = rpc.ALTermMismatch
  223. return nil
  224. }
  225. store := me.context.store()
  226. entry := cmd.Entry
  227. // check log: expecting appending action follows previous committing action
  228. if entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() {
  229. // check log
  230. e, log := store.GetLog(entry.Index)
  231. if e != nil {
  232. ret.Code = rpc.ALInternalError
  233. return nil
  234. }
  235. if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm {
  236. // bad log
  237. ret.Code = rpc.ALIndexMismatch
  238. ret.PrevLogIndex = store.LastCommittedIndex()
  239. ret.PrevLogTerm = store.LastCommittedTerm()
  240. return nil
  241. }
  242. // good log, but old, just ignore it
  243. ret.Code = rpc.ALOk
  244. return nil
  245. }
  246. // good log
  247. e := store.Append(entry)
  248. if e != nil {
  249. ret.Code = rpc.ALInternalError
  250. return nil
  251. } else {
  252. ret.Code = rpc.ALOk
  253. return nil
  254. }
  255. }
  256. // CommitLog leader to follower
  257. func (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
  258. store := me.context.store()
  259. if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() {
  260. // bad index
  261. ret.Code = rpc.CLLogNotFound
  262. return nil
  263. }
  264. e := store.Commit(cmd.Index)
  265. if e != nil {
  266. ret.Code = rpc.CLInternalError
  267. return nil
  268. }
  269. ret.Code = rpc.CLOk
  270. return nil
  271. }
  272. // RequestVote candidate to follower
  273. func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
  274. // before voting
  275. me.raise(feCandidateRequestVote, cmd)
  276. // check term
  277. if cmd.Term <= me.mTerm {
  278. ret.Term = me.mTerm
  279. ret.Code = rpc.RVTermMismatch
  280. return nil
  281. }
  282. // check if already voted another
  283. if me.mLastVotedTerm >= cmd.Term && me.mLastVotedCandidateID != "" && me.mLastVotedCandidateID != cmd.CandidateID {
  284. ret.Code = rpc.RVVotedAnother
  285. return nil
  286. }
  287. // check log index
  288. if cmd.LastLogIndex < me.context.store().LastCommittedIndex() {
  289. ret.Code = rpc.RVLogMismatch
  290. return nil
  291. }
  292. // vote ok
  293. me.raise(feVoteToCandidate, cmd)
  294. ret.Term = cmd.Term
  295. ret.Code = rpc.RVOk
  296. return nil
  297. }

tCandidateState.go

基于事件驱动重构Candidate状态的逻辑编排,各字段实施读写分离管理

  1. package lsm
  2. import (
  3. "learning/gooop/etcd/raft/roles"
  4. "learning/gooop/etcd/raft/rpc"
  5. "sync"
  6. "time"
  7. )
  8. // tCandidateState presents a candidate node
  9. type tCandidateState struct {
  10. tEventDrivenModel
  11. context IRaftLSM
  12. mInitOnce sync.Once
  13. mStartOnce sync.Once
  14. // update: init / ceElectionTimeout
  15. mTerm int64
  16. // update: ceInit / ceElectionTimeout / ceVoteToCandidate
  17. mVotedTerm int64
  18. // update: ceInit / ceElectionTimeout / ceVoteToCandidate
  19. mVotedCandidateID string
  20. // update: ceInit / ceElectionTimeout / ceVoteToCandidate
  21. mVotedTimestamp int64
  22. }
  23. // trigger: init()
  24. // args: empty
  25. const ceInit = "candidate.init"
  26. // trigger: Start()
  27. // args: empty
  28. const ceStart = "candidate.Start"
  29. // trigger: whenStartThenWatchElectionTimeout()
  30. // args: empty
  31. const ceElectionTimeout = "candidate.ElectionTimeout"
  32. // trigger: Heartbeat() / AppendLog() / CommitLog()
  33. // args: empty
  34. const ceLeaderAnnounced = "candidate.LeaderAnnounced"
  35. // trigger: RequestVote()
  36. // args: *rpc.RequestVoteCmd
  37. const ceVoteToCandidate = "candidate.VoteToCandidate"
  38. // trigger: whenLeaderHeartbeatThenSwitchToFollower()
  39. // args: empty
  40. const ceDisposing = "candidate.Disposing"
  41. func newCandidateState(ctx IRaftLSM, term int64) IRaftState {
  42. it := new(tCandidateState)
  43. it.init(ctx, term)
  44. return it
  45. }
  46. func (me *tCandidateState) init(ctx IRaftLSM, term int64) {
  47. me.mInitOnce.Do(func() {
  48. me.context = ctx
  49. me.mTerm = term
  50. me.initEventHandlers()
  51. me.raise(ceInit)
  52. })
  53. }
  54. func (me *tCandidateState) initEventHandlers() {
  55. // write only logic
  56. me.hookEventsForTerm()
  57. me.hookEventsForVotedTerm()
  58. me.hookEventsForVotedCandidateID()
  59. me.hookEventsForVotedTimestamp()
  60. // read only logic
  61. me.hook(ceLeaderAnnounced,
  62. me.whenLeaderAnnouncedThenSwitchToFollower)
  63. me.hook(ceElectionTimeout,
  64. me.whenElectionTimeoutThenRequestVoteAgain)
  65. }
  66. // hookEventsForTerm maintains field: mTerm
  67. // update: ceElectionTimeout
  68. func (me *tCandidateState) hookEventsForTerm() {
  69. me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
  70. // when election timeout, term++ and request vote again
  71. me.mTerm++
  72. })
  73. }
  74. // hookEventsForVotedTerm maintains field: mVotedTerm
  75. // update: ceInit / ceElectionTimeout / ceVoteToCandidate
  76. func (me *tCandidateState) hookEventsForVotedTerm() {
  77. me.hook(ceInit, func(e string, args ...interface{}) {
  78. // initially, vote to itself
  79. me.mVotedTerm = me.mTerm
  80. })
  81. me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
  82. // when timeout, reset to itself
  83. me.mVotedTerm = me.mTerm
  84. })
  85. me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
  86. // after vote to candidate
  87. cmd := args[0].(*rpc.RequestVoteCmd)
  88. me.mVotedTerm = cmd.Term
  89. })
  90. }
  91. // hookEventsForVotedCandidateID maintains field: mVotedCandidateID
  92. // update: ceInit / ceElectionTimeout / ceVoteToCandidate
  93. func (me *tCandidateState) hookEventsForVotedCandidateID() {
  94. me.hook(ceInit, func(e string, args ...interface{}) {
  95. // initially, vote to itself
  96. me.mVotedCandidateID = me.context.config().ID()
  97. })
  98. me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
  99. // when timeout, reset to itself
  100. me.mVotedCandidateID = me.context.config().ID()
  101. })
  102. me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
  103. // after vote to candidate
  104. cmd := args[0].(*rpc.RequestVoteCmd)
  105. me.mVotedCandidateID = cmd.CandidateID
  106. })
  107. }
  108. func (me *tCandidateState) hookEventsForVotedTimestamp() {
  109. me.hook(ceInit, func(e string, args ...interface{}) {
  110. // initially, vote to itself
  111. me.mVotedTimestamp = time.Now().UnixNano()
  112. })
  113. me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
  114. // when timeout, reset to itself
  115. me.mVotedTimestamp = time.Now().UnixNano()
  116. })
  117. me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
  118. // after vote to candidate
  119. me.mVotedTimestamp = time.Now().UnixNano()
  120. })
  121. }
  122. func (me *tCandidateState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
  123. // check term
  124. if cmd.Term <= me.mTerm {
  125. // bad leader
  126. ret.Code = rpc.HBTermMismatch
  127. return nil
  128. }
  129. // new leader
  130. me.raise(ceLeaderAnnounced)
  131. // return ok
  132. ret.Code = rpc.HBOk
  133. return nil
  134. }
  135. func (me *tCandidateState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
  136. // check term
  137. if cmd.Term <= me.mTerm {
  138. // bad leader
  139. ret.Code = rpc.ALTermMismatch
  140. return nil
  141. }
  142. // new leader
  143. me.raise(ceLeaderAnnounced)
  144. // ignore and return
  145. ret.Code = rpc.ALInternalError
  146. return nil
  147. }
  148. func (me *tCandidateState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
  149. // ignore and return
  150. ret.Code = rpc.CLInternalError
  151. return nil
  152. }
  153. func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
  154. // todo: fixme
  155. panic("implements me")
  156. }
  157. func (me *tCandidateState) Role() roles.RaftRole {
  158. return roles.Candidate
  159. }
  160. func (me *tCandidateState) Start() {
  161. me.mStartOnce.Do(func() {
  162. me.raise(feStart)
  163. })
  164. }
  165. func (me *tCandidateState) whenLeaderAnnouncedThenSwitchToFollower(_ string, _ ...interface{}) {
  166. me.raise(ceDisposing)
  167. me.context.handleStateChanged(newFollowerState(me.context))
  168. }
  169. func (me *tCandidateState) whenElectionTimeoutThenRequestVoteAgain(_ string, _ ...interface{}) {
  170. // todo: fixme
  171. panic("implements me")
  172. }

(未完待续)