缘起

最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

  1. 分布式存储系统通常会通过维护多个副本来进行容错,
  2. 以提高系统的可用性。
  3. 这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
  4. Raft算法把问题分解成了四个子问题:
  5. 1. 领袖选举(leader election)、
  6. 2. 日志复制(log replication)、
  7. 3. 安全性(safety
  8. 4. 成员关系变化(membership changes
  9. 这几个子问题。

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 8)

  • 简化rpc连接管理器tRaftClientService的实现
  • 剥离IRaftLSM的内部支持接口到iRaftStateContext接口
  • 完成Candidate状态的处理逻辑

设计

  • IRaftLSM:raft有限状态机接口
  • iRaftStateContext:提供状态模式下的上下文支持
  • tCandidateState:Candidate(候选人)状态的实现。基于事件驱动的逻辑编排,基于读写分离的字段管理。
  • tRaftClient:管理到指定raft节点的rpc连接
  • tRaftClientService:管理当前节点到其他raft节点的rpc连接

IRaftLSM.go

raft有限状态机接口

  1. package lsm
  2. import (
  3. "learning/gooop/etcd/raft/rpc"
  4. )
  5. // IRaftLSM raft有限状态自动机
  6. type IRaftLSM interface {
  7. rpc.IRaftRPC
  8. iRaftStateContext
  9. State() IRaftState
  10. }

iRaftStateContext.go

提供状态模式下的上下文支持

  1. package lsm
  2. import (
  3. "learning/gooop/etcd/raft/config"
  4. "learning/gooop/etcd/raft/rpc/client"
  5. "learning/gooop/etcd/raft/store"
  6. )
  7. type iRaftStateContext interface {
  8. Config() config.IRaftConfig
  9. Store() store.ILogStore
  10. HandleStateChanged(state IRaftState)
  11. RaftClientService() client.IRaftClientService
  12. }

tCandidateState.go

Candidate(候选人)状态的实现。基于事件驱动的逻辑编排,基于读写分离的字段管理。

  1. package lsm
  2. import (
  3. "learning/gooop/etcd/raft/roles"
  4. "learning/gooop/etcd/raft/rpc"
  5. "learning/gooop/etcd/raft/timeout"
  6. "sync"
  7. "time"
  8. )
  9. // tCandidateState presents a candidate node
  10. type tCandidateState struct {
  11. tEventDrivenModel
  12. context iRaftStateContext
  13. mInitOnce sync.Once
  14. mStartOnce sync.Once
  15. // update: init / ceAskingForVote
  16. mTerm int64
  17. // update: ceInit / ceAskingForVote / ceVoteToCandidate
  18. mVotedTerm int64
  19. // update: ceInit / ceAskingForVote / ceVoteToCandidate
  20. mVotedCandidateID string
  21. // update: ceInit / ceAskingForVote / ceVoteToCandidate
  22. mVotedTimestamp int64
  23. // update: ceInit / ceAskingForVote / ceReceiveTicket / ceDisposing
  24. mTicketCount map[string]bool
  25. mTicketMutex *sync.Mutex
  26. // update: ceInit / ceDisposing
  27. mDisposedFlag bool
  28. }
  29. // trigger: init()
  30. // args: empty
  31. const ceInit = "candidate.init"
  32. // trigger: Start()
  33. // args: empty
  34. const ceStart = "candidate.Start"
  35. // trigger: whenAskingForVoteThenWatchElectionTimeout()
  36. // args: empty
  37. const ceElectionTimeout = "candidate.ElectionTimeout"
  38. // trigger: Heartbeat() / AppendLog() / CommitLog()
  39. // args: empty
  40. const ceLeaderAnnounced = "candidate.LeaderAnnounced"
  41. // trigger: RequestVote()
  42. // args: *rpc.RequestVoteCmd
  43. const ceVoteToCandidate = "candidate.VoteToCandidate"
  44. // trigger: whenLeaderAnnouncedThenSwitchToFollower()
  45. // args: empty
  46. const ceDisposing = "candidate.Disposing"
  47. // trigger: beginAskForVote()
  48. // args: empty
  49. const ceAskingForVote = "candidate.AskingForVote"
  50. // trigger: handleRequestVoteOK()
  51. // args: empty
  52. const ceReceiveTicket = "candidate.ReceiveTicket"
  53. // trigger: whenReceiveTicketThenCheckTicketCount
  54. // args: empty
  55. const ceWinningTheVote = "candidate.ceWinningTheVote"
  56. func newCandidateState(ctx iRaftStateContext, term int64) IRaftState {
  57. it := new(tCandidateState)
  58. it.init(ctx, term)
  59. return it
  60. }
  61. func (me *tCandidateState) init(ctx iRaftStateContext, term int64) {
  62. me.mInitOnce.Do(func() {
  63. me.context = ctx
  64. me.mTerm = term
  65. me.initEventHandlers()
  66. me.raise(ceInit)
  67. })
  68. }
  69. func (me *tCandidateState) initEventHandlers() {
  70. // write only logic
  71. me.hookEventsForTerm()
  72. me.hookEventsForVotedTerm()
  73. me.hookEventsForVotedCandidateID()
  74. me.hookEventsForVotedTimestamp()
  75. me.hookEventsForTicketCount()
  76. me.hookEventsForDisposedFlag()
  77. // read only logic
  78. me.hook(ceStart,
  79. me.whenStartThenAskForVote)
  80. me.hook(ceAskingForVote,
  81. me.whenAskingForVoteThenWatchElectionTimeout)
  82. me.hook(ceReceiveTicket,
  83. me.whenReceiveTicketThenCheckTicketCount)
  84. me.hook(ceElectionTimeout,
  85. me.whenElectionTimeoutThenAskForVoteAgain)
  86. me.hook(ceWinningTheVote,
  87. me.whenWinningTheVoteThenSwitchToLeader)
  88. me.hook(ceLeaderAnnounced,
  89. me.whenLeaderAnnouncedThenSwitchToFollower)
  90. }
  91. // hookEventsForTerm maintains field: mTerm
  92. // update: ceElectionTimeout
  93. func (me *tCandidateState) hookEventsForTerm() {
  94. me.hook(ceAskingForVote, func(e string, args ...interface{}) {
  95. me.mTerm++
  96. })
  97. }
  98. // hookEventsForVotedTerm maintains field: mVotedTerm
  99. // update: ceInit / ceElectionTimeout / ceVoteToCandidate
  100. func (me *tCandidateState) hookEventsForVotedTerm() {
  101. me.hook(ceInit, func(e string, args ...interface{}) {
  102. // initially, vote to itself
  103. me.mVotedTerm = me.mTerm
  104. })
  105. me.hook(ceAskingForVote, func(e string, args ...interface{}) {
  106. // when timeout, reset to itself
  107. me.mVotedTerm = me.mTerm
  108. })
  109. me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
  110. // after vote to candidate
  111. cmd := args[0].(*rpc.RequestVoteCmd)
  112. me.mVotedTerm = cmd.Term
  113. })
  114. }
  115. // hookEventsForVotedCandidateID maintains field: mVotedCandidateID
  116. // update: ceInit / ceElectionTimeout / ceVoteToCandidate
  117. func (me *tCandidateState) hookEventsForVotedCandidateID() {
  118. me.hook(ceInit, func(e string, args ...interface{}) {
  119. // initially, vote to itself
  120. me.mVotedCandidateID = me.context.Config().ID()
  121. })
  122. me.hook(ceAskingForVote, func(e string, args ...interface{}) {
  123. me.mVotedCandidateID = me.context.Config().ID()
  124. })
  125. me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
  126. // after vote to candidate
  127. cmd := args[0].(*rpc.RequestVoteCmd)
  128. me.mVotedCandidateID = cmd.CandidateID
  129. })
  130. }
  131. func (me *tCandidateState) hookEventsForVotedTimestamp() {
  132. me.hook(ceInit, func(e string, args ...interface{}) {
  133. // initially, vote to itself
  134. me.mVotedTimestamp = time.Now().UnixNano()
  135. })
  136. me.hook(ceAskingForVote, func(e string, args ...interface{}) {
  137. me.mVotedTimestamp = time.Now().UnixNano()
  138. })
  139. me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
  140. // after vote to candidate
  141. me.mVotedTimestamp = time.Now().UnixNano()
  142. })
  143. }
  144. func (me *tCandidateState) hookEventsForTicketCount() {
  145. me.hook(ceInit, func(e string, args ...interface{}) {
  146. me.mTicketMutex = new(sync.Mutex)
  147. me.mTicketCount = make(map[string]bool, 0)
  148. me.mTicketCount[me.context.Config().ID()] = true
  149. })
  150. me.hook(ceAskingForVote, func(e string, args ...interface{}) {
  151. me.mTicketMutex.Lock()
  152. defer me.mTicketMutex.Unlock()
  153. me.mTicketCount = make(map[string]bool, 0)
  154. me.mTicketCount[me.context.Config().ID()] = true
  155. })
  156. me.hook(ceReceiveTicket, func(e string, args ...interface{}) {
  157. peerID := args[0].(string)
  158. me.mTicketMutex.Lock()
  159. defer me.mTicketMutex.Unlock()
  160. me.mTicketCount[peerID] = true
  161. })
  162. me.hook(ceDisposing, func(e string, args ...interface{}) {
  163. me.mTicketMutex.Lock()
  164. defer me.mTicketMutex.Unlock()
  165. me.mTicketCount = make(map[string]bool, 0)
  166. })
  167. }
  168. func (me *tCandidateState) hookEventsForDisposedFlag() {
  169. me.hook(ceInit, func(e string, args ...interface{}) {
  170. me.mDisposedFlag = false
  171. })
  172. me.hook(ceDisposing, func(e string, args ...interface{}) {
  173. me.mDisposedFlag = true
  174. })
  175. }
  176. func (me *tCandidateState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
  177. // check term
  178. if cmd.Term <= me.mTerm {
  179. // bad leader
  180. ret.Code = rpc.HBTermMismatch
  181. return nil
  182. }
  183. // new leader
  184. me.raise(ceLeaderAnnounced)
  185. // return ok
  186. ret.Code = rpc.HBOk
  187. return nil
  188. }
  189. func (me *tCandidateState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
  190. // check term
  191. if cmd.Term <= me.mTerm {
  192. // bad leader
  193. ret.Code = rpc.ALTermMismatch
  194. return nil
  195. }
  196. // new leader
  197. me.raise(ceLeaderAnnounced)
  198. // ignore and return
  199. ret.Code = rpc.ALInternalError
  200. return nil
  201. }
  202. func (me *tCandidateState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
  203. // ignore and return
  204. ret.Code = rpc.CLInternalError
  205. return nil
  206. }
  207. func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
  208. // check voted term
  209. if cmd.Term < me.mVotedTerm {
  210. ret.Code = rpc.RVTermMismatch
  211. return nil
  212. }
  213. if cmd.Term == me.mVotedTerm {
  214. if me.mVotedCandidateID != "" && me.mVotedCandidateID != cmd.CandidateID {
  215. // already vote another
  216. ret.Code = rpc.RVVotedAnother
  217. return nil
  218. } else {
  219. // already voted
  220. ret.Code = rpc.RVOk
  221. return nil
  222. }
  223. }
  224. if cmd.Term > me.mVotedTerm {
  225. // new term, check log
  226. if cmd.LastLogIndex >= me.context.Store().LastCommittedIndex() {
  227. // good log
  228. me.raise(ceVoteToCandidate, cmd)
  229. ret.Code = rpc.RVOk
  230. } else {
  231. // bad log
  232. ret.Code = rpc.RVLogMismatch
  233. }
  234. return nil
  235. }
  236. // should not reaches here
  237. ret.Code = rpc.RVTermMismatch
  238. return nil
  239. }
  240. func (me *tCandidateState) Role() roles.RaftRole {
  241. return roles.Candidate
  242. }
  243. func (me *tCandidateState) Start() {
  244. me.mStartOnce.Do(func() {
  245. me.raise(feStart)
  246. })
  247. }
  248. func (me *tCandidateState) whenLeaderAnnouncedThenSwitchToFollower(_ string, _ ...interface{}) {
  249. me.raise(ceDisposing)
  250. me.context.HandleStateChanged(newFollowerState(me.context))
  251. }
  252. func (me *tCandidateState) whenElectionTimeoutThenAskForVoteAgain(_ string, _ ...interface{}) {
  253. me.beginAskForVote()
  254. }
  255. func (me *tCandidateState) whenStartThenAskForVote(_ string, _ ...interface{}) {
  256. me.beginAskForVote()
  257. }
  258. func (me *tCandidateState) beginAskForVote() {
  259. // raise ceAskingForVote
  260. me.raise(ceAskingForVote)
  261. // for each node, call node.RequestVote
  262. cmd := new(rpc.RequestVoteCmd)
  263. cmd.CandidateID = me.context.Config().ID()
  264. cmd.Term = me.mTerm
  265. store := me.context.Store()
  266. cmd.LastLogIndex = store.LastCommittedIndex()
  267. cmd.LastLogTerm = store.LastCommittedTerm()
  268. term := me.mTerm
  269. for _,node := range me.context.Config().Nodes() {
  270. if node.ID() == me.context.Config().ID() {
  271. continue
  272. }
  273. peerID := node.ID()
  274. go func() {
  275. ret := new(rpc.RequestVoteRet)
  276. err := me.context.RaftClientService().Using(peerID, func(client rpc.IRaftRPC) error {
  277. return client.RequestVote(cmd, ret)
  278. })
  279. if err == nil && ret.Code == rpc.RVOk {
  280. me.handleRequestVoteOK(peerID, term)
  281. }
  282. }()
  283. }
  284. }
  285. func (me *tCandidateState) whenAskingForVoteThenWatchElectionTimeout(_ string, _ ...interface{}) {
  286. term := me.mTerm
  287. go func() {
  288. time.Sleep(timeout.RandElectionTimeout())
  289. if me.mDisposedFlag || me.mTerm != term {
  290. return
  291. }
  292. tc := me.getTicketCount()
  293. if tc < len(me.context.Config().Nodes())/2 + 1 {
  294. me.raise(ceElectionTimeout)
  295. }
  296. }()
  297. }
  298. func (me *tCandidateState) handleRequestVoteOK(peerID string, term int64) {
  299. if me.mDisposedFlag || me.mTerm != term {
  300. return
  301. }
  302. me.raise(ceReceiveTicket, peerID)
  303. }
  304. func (me *tCandidateState) whenReceiveTicketThenCheckTicketCount(_ string, _ ...interface{}) {
  305. tc := me.getTicketCount()
  306. if tc >= len(me.context.Config().Nodes())/2 + 1 {
  307. // win the vote
  308. me.raise(ceWinningTheVote)
  309. }
  310. }
  311. func (me *tCandidateState) getTicketCount() int {
  312. me.mTicketMutex.Lock()
  313. defer me.mTicketMutex.Unlock()
  314. return len(me.mTicketCount)
  315. }
  316. func (me *tCandidateState) whenWinningTheVoteThenSwitchToLeader(_ string, _ ...interface{}) {
  317. me.raise(ceDisposing)
  318. me.context.HandleStateChanged(newLeaderState(me.context, me.mTerm))
  319. }

tRaftClient.go

管理到指定raft节点的rpc连接

  1. package client
  2. import (
  3. "learning/gooop/etcd/raft/config"
  4. rrpc "learning/gooop/etcd/raft/rpc"
  5. "net/rpc"
  6. )
  7. type tRaftClient struct {
  8. cfg config.IRaftNodeConfig
  9. conn *rpc.Client
  10. state iClientState
  11. }
  12. func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient {
  13. it := new(tRaftClient)
  14. it.init(cfg, conn)
  15. return it
  16. }
  17. func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) {
  18. me.cfg = cfg
  19. me.conn = conn
  20. if conn == nil {
  21. me.state = newBrokenState(me)
  22. } else {
  23. me.state = newConnectedState(me)
  24. }
  25. me.state.Start()
  26. }
  27. func (me *tRaftClient) Config() config.IRaftNodeConfig {
  28. return me.cfg
  29. }
  30. func (me *tRaftClient) GetConn() *rpc.Client {
  31. return me.conn
  32. }
  33. func (me *tRaftClient) SetConn(conn *rpc.Client) {
  34. me.conn = conn
  35. }
  36. func (me *tRaftClient) HandleStateChanged(state iClientState) {
  37. me.state = state
  38. state.Start()
  39. }
  40. func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
  41. return me.state.Heartbeat(cmd, ret)
  42. }
  43. func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
  44. return me.state.AppendLog(cmd, ret)
  45. }
  46. func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
  47. return me.state.CommitLog(cmd, ret)
  48. }
  49. func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
  50. return me.state.RequestVote(cmd, ret)
  51. }
  52. func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error {
  53. return me.state.Ping(cmd, ret)
  54. }

tRaftClientService.go

管理当前节点到其他raft节点的rpc连接

  1. package client
  2. import (
  3. "errors"
  4. "learning/gooop/etcd/raft/config"
  5. "learning/gooop/etcd/raft/rpc"
  6. netrpc "net/rpc"
  7. )
  8. type tRaftClientService struct {
  9. cfg config.IRaftConfig
  10. clients map[string]IRaftClient
  11. }
  12. func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService {
  13. it := new(tRaftClientService)
  14. it.init(cfg)
  15. return it
  16. }
  17. func (me *tRaftClientService) init(cfg config.IRaftConfig) {
  18. me.cfg = cfg
  19. me.clients = make(map[string]IRaftClient)
  20. for _,nc := range me.cfg.Nodes() {
  21. me.clients[nc.ID()] = me.createRaftClient(nc)
  22. }
  23. }
  24. func (me *tRaftClientService) createRaftClient(nodeCfg config.IRaftNodeConfig) IRaftClient {
  25. // dial to peer
  26. conn, err := netrpc.Dial("tcp", nodeCfg.Endpoint())
  27. if err != nil {
  28. return newRaftClient(nodeCfg, nil)
  29. } else {
  30. return newRaftClient(nodeCfg, conn)
  31. }
  32. }
  33. func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error {
  34. it, ok := me.clients[peerID]
  35. if ok {
  36. return action(it)
  37. } else {
  38. return gErrorUnknownRaftPeer
  39. }
  40. }
  41. var gErrorUnknownRaftPeer = errors.New("unknown raft peer")

(未完待续)