缘起

最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

  1. 分布式存储系统通常会通过维护多个副本来进行容错,
  2. 以提高系统的可用性。
  3. 这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
  4. Raft算法把问题分解成了四个子问题:
  5. 1. 领袖选举(leader election)、
  6. 2. 日志复制(log replication)、
  7. 3. 安全性(safety
  8. 4. 成员关系变化(membership changes
  9. 这几个子问题。

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 9)

  • Leader状态下的逻辑处理

设计

  • tLeaderState: 实现Leader状态的raft状态机处理。事件驱动的逻辑编排,读写分离的字段管理。

tLeaderState.go

(未完成)实现Leader状态的raft状态机处理。事件驱动的逻辑编排,读写分离的字段管理。

  1. package lsm
  2. import (
  3. "learning/gooop/etcd/raft/roles"
  4. "learning/gooop/etcd/raft/rpc"
  5. "learning/gooop/etcd/raft/timeout"
  6. "sync"
  7. "time"
  8. )
  9. // tLeaderState presents a leader node
  10. type tLeaderState struct {
  11. tEventDrivenModel
  12. context iRaftStateContext
  13. mInitOnce sync.Once
  14. mStartOnce sync.Once
  15. // update: leInit / leLeaderHeartbeat
  16. mTerm int64
  17. // update: leInit / leDisposing
  18. mDisposedFlag bool
  19. // update: leVoteToCandidate
  20. mVotedTerm int64
  21. mVotedCandidateID string
  22. mVotedTimestamp int64
  23. }
  24. // trigger: init()
  25. // args: empty
  26. const leInit = "leader.init"
  27. // trigger: Start()
  28. // args: empty
  29. const leStart = "leader.Start"
  30. // trigger: whenNewLeaderAnnouncedThenSwitchToFollower
  31. // args: empty
  32. const leDiposing = "leader.Disposing"
  33. // trigger : Heartbeat() / AppendLog()
  34. // args: term int64
  35. const leNewLeaderAnnounced = "leader.NewLeaderAnnounced"
  36. // trigger: RequestVote()
  37. // args: *rpc.RequestVoteCmd
  38. const leBeforeRequestVote = "leader.BeforeRequestVote"
  39. // trigger:
  40. // args: *rpc.RequestVoteCmd
  41. const leVoteToCandidate = "leader.VoteToCandidate"
  42. func newLeaderState(ctx iRaftStateContext, term int64) IRaftState {
  43. it := new(tLeaderState)
  44. it.init(ctx, term)
  45. return it
  46. }
  47. func (me *tLeaderState) init(ctx iRaftStateContext, term int64) {
  48. me.mInitOnce.Do(func() {
  49. me.context = ctx
  50. me.mTerm = term
  51. me.initEventHandlers()
  52. me.raise(leInit)
  53. })
  54. }
  55. func (me *tLeaderState) initEventHandlers() {
  56. // write only logic
  57. me.hookEventsForDisposedFlag()
  58. me.hookEventsForVotedTerm()
  59. // read only logic
  60. me.hook(leStart,
  61. me.whenStartThenBeginHeartbeatToOthers)
  62. me.hook(leNewLeaderAnnounced,
  63. me.whenNewLeaderAnnouncedThenSwitchToFollower)
  64. }
  65. func (me *tLeaderState) hookEventsForDisposedFlag() {
  66. me.hook(leInit, func(e string, args ...interface{}) {
  67. me.mDisposedFlag = false
  68. })
  69. me.hook(leDiposing, func(e string, args ...interface{}) {
  70. me.mDisposedFlag = true
  71. })
  72. }
  73. func (me *tLeaderState) hookEventsForVotedTerm() {
  74. me.hook(leBeforeRequestVote, func(e string, args ...interface{}) {
  75. // check last vote timeout
  76. if me.mVotedTerm == 0 {
  77. return
  78. }
  79. if time.Duration(time.Now().UnixNano() - me.mVotedTimestamp)*time.Nanosecond >= timeout.ElectionTimeout {
  80. me.mVotedTerm = 0
  81. me.mVotedTimestamp = 0
  82. me.mVotedCandidateID = ""
  83. }
  84. })
  85. me.hook(leVoteToCandidate, func(e string, args ...interface{}) {
  86. // after vote to candidate
  87. cmd := args[0].(*rpc.RequestVoteCmd)
  88. me.mVotedTerm = cmd.Term
  89. me.mVotedCandidateID = cmd.CandidateID
  90. me.mVotedTimestamp = time.Now().UnixNano()
  91. })
  92. }
  93. func (me *tLeaderState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
  94. // check term
  95. if cmd.Term <= me.mTerm {
  96. ret.Code = rpc.HBTermMismatch
  97. return nil
  98. }
  99. // new leader
  100. me.raise(leNewLeaderAnnounced, cmd.Term)
  101. // return ok
  102. ret.Code = rpc.HBOk
  103. return nil
  104. }
  105. func (me *tLeaderState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
  106. // check term
  107. if cmd.Term <= me.mTerm {
  108. ret.Code = rpc.ALTermMismatch
  109. return nil
  110. }
  111. // new leader
  112. me.raise(leNewLeaderAnnounced, cmd.Term)
  113. // return ok
  114. ret.Code = rpc.ALInternalError
  115. return nil
  116. }
  117. func (me *tLeaderState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
  118. // just ignore
  119. ret.Code = rpc.CLInternalError
  120. return nil
  121. }
  122. func (me *tLeaderState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
  123. me.raise(leBeforeRequestVote, cmd)
  124. // check voted term
  125. if cmd.Term < me.mVotedTerm {
  126. ret.Code = rpc.RVTermMismatch
  127. return nil
  128. }
  129. if cmd.Term == me.mVotedTerm {
  130. if me.mVotedCandidateID != "" && me.mVotedCandidateID != cmd.CandidateID {
  131. // already vote another
  132. ret.Code = rpc.RVVotedAnother
  133. return nil
  134. } else {
  135. // already voted
  136. ret.Code = rpc.RVOk
  137. return nil
  138. }
  139. }
  140. if cmd.Term > me.mVotedTerm {
  141. // new term, check log
  142. if cmd.LastLogIndex >= me.context.Store().LastCommittedIndex() {
  143. // good log
  144. me.raise(leVoteToCandidate, cmd)
  145. ret.Code = rpc.RVOk
  146. } else {
  147. // bad log
  148. ret.Code = rpc.RVLogMismatch
  149. }
  150. return nil
  151. }
  152. // should not reach here
  153. ret.Code = rpc.RVTermMismatch
  154. return nil
  155. }
  156. func (me *tLeaderState) Role() roles.RaftRole {
  157. return roles.Leader
  158. }
  159. func (me *tLeaderState) Start() {
  160. me.mStartOnce.Do(func() {
  161. me.raise(leStart)
  162. })
  163. }
  164. func (me *tLeaderState) whenStartThenBeginHeartbeatToOthers(_ string, _ ...interface{}) {
  165. // todo: fixme
  166. panic("implements me")
  167. }
  168. func (me *tLeaderState) whenNewLeaderAnnouncedThenSwitchToFollower(_ string, args ...interface{}) {
  169. me.raise(leDiposing)
  170. term := args[0].(int64)
  171. me.context.HandleStateChanged(newFollowerState(me.context, term))
  172. }

(未完待续)