缘起

最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

  1. 分布式存储系统通常会通过维护多个副本来进行容错,
  2. 以提高系统的可用性。
  3. 这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
  4. Raft算法把问题分解成了领袖选举(leader election)、
  5. 日志复制(log replication)、安全性(safety
  6. 和成员关系变化(membership changes)这几个子问题。
  7. Raft算法的基本操作只需2RPC即可完成。
  8. RequestVote RPC是在选举过程中通过旧的Leader触发的,
  9. AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 5)

  • 重新设计RPC接口,将原有浓缩的两个接口分解为更易于理解和实现的四个接口( 尽信书则不如无书 -_-|| )
  • 根据新RPC接口重写Follower状态的实现

设计

  • IRaftRPC: 将原有浓缩的两个接口分解为更易于理解和实现的四个接口
  • IRaftLSM: 添加部分包内支持接口
  • iEventDrivenModel:抽取并实现事件驱动型的逻辑编排
  • ILogStore:改造适配新分解的RPC接口
  • tBoltDBStore:基于boltdb实现日志暂存,提交和应用
  • tFollowerState:根据新分解的RPC接口,重写Follower状态的实现(未完成)

IRaftRPC.go

将原有浓缩的两个接口分解为更易于理解和实现的四个接口。尽信书则不如无书-_-||

  1. package rpc
  2. import "learning/gooop/etcd/raft/model"
  3. type IRaftRPC interface {
  4. // leader to follower
  5. Heartbeat(cmd *HeartbeatCmd, ret *HeartbeatRet) error
  6. // leader to follower
  7. AppendLog(cmd *AppendLogCmd, ret *AppendLogRet) error
  8. // leader to follower
  9. CommitLog(cmd *CommitLogCmd, ret *CommitLogRet) error
  10. // candidate to follower
  11. RequestVote(cmd *RequestVoteCmd, ret *RequestVoteRet) error
  12. }
  13. type HeartbeatCmd struct {
  14. LeaderID string
  15. Term int64
  16. }
  17. type HeartbeatRet struct {
  18. Code HBCode
  19. Term int64
  20. }
  21. type HBCode int
  22. const (
  23. HBOk HBCode = iota
  24. HBTermMismatch HBCode = iota
  25. )
  26. type RequestVoteCmd struct {
  27. CandidateID string
  28. Term int64
  29. LastLogIndex int64
  30. LastLogTerm int64
  31. }
  32. type RequestVoteRet struct {
  33. Code RVCode
  34. Term int64
  35. }
  36. type RVCode int
  37. const (
  38. RVOk RVCode = iota
  39. RVLogMismatch RVCode = iota
  40. RVTermMismatch RVCode = iota
  41. RVVotedAnother RVCode = iota
  42. )
  43. type AppendLogCmd struct {
  44. LeaderID string
  45. Term int64
  46. Entry *model.LogEntry
  47. }
  48. type AppendLogRet struct {
  49. Code ALCode
  50. Term int64
  51. PrevLogIndex int64
  52. PrevLogTerm int64
  53. }
  54. type ALCode int
  55. const (
  56. ALOk ALCode = iota
  57. ALTermMismatch ALCode = iota
  58. ALIndexMismatch ALCode = iota
  59. ALInternalError ALCode = iota
  60. )
  61. type CommitLogCmd struct {
  62. LeaderID string
  63. Term int64
  64. Index int64
  65. }
  66. type CommitLogRet struct {
  67. Code CLCode
  68. }
  69. type CLCode int
  70. const (
  71. CLOk CLCode = iota
  72. CLLogNotFound CLCode = iota
  73. CLInternalError CLCode = iota
  74. )

IRaftLSM.go

添加部分包内支持接口

  1. package lsm
  2. import (
  3. "learning/gooop/etcd/raft/config"
  4. "learning/gooop/etcd/raft/rpc"
  5. "learning/gooop/etcd/raft/store"
  6. )
  7. // IRaftLSM raft有限状态自动机
  8. type IRaftLSM interface {
  9. rpc.IRaftRPC
  10. State() IRaftState
  11. config() config.IRaftConfig
  12. store() store.ILogStore
  13. handleStateChanged(state IRaftState)
  14. }

iEventDrivenModel.go

抽取并实现事件驱动型的逻辑编排

  1. package lsm
  2. type tEventHandleFunc func(e string, args... interface{})
  3. type iEventDrivenModel interface {
  4. hook(e string, handleFunc tEventHandleFunc)
  5. raise(e string, args... interface{})
  6. }
  7. type tEventDrivenModel struct {
  8. items map[string][]tEventHandleFunc
  9. }
  10. func (me *tEventDrivenModel) hook(e string, handler tEventHandleFunc) {
  11. arr, ok := me.items[e]
  12. if ok {
  13. me.items[e] = append(arr, handler)
  14. } else {
  15. me.items[e] = []tEventHandleFunc{handler }
  16. }
  17. }
  18. func (me *tEventDrivenModel) raise(e string, args... interface{}) {
  19. if handlers, ok := me.items[e];ok {
  20. for _,it := range handlers {
  21. it(e, args...)
  22. }
  23. }
  24. }

ILogStore.go

改造适配新分解的RPC接口

  1. package store
  2. import "learning/gooop/etcd/raft/model"
  3. type ILogStore interface {
  4. LastAppendedTerm() int64
  5. LastAppendedIndex() int64
  6. LastCommittedTerm() int64
  7. LastCommittedIndex() int64
  8. Append(entry *model.LogEntry) error
  9. Commit(index int64) error
  10. GetLog(index int64) (error, *model.LogEntry)
  11. }

tBoltDBStore.go

基于boltdb实现日志暂存,提交和应用

  1. package store
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "github.com/boltdb/bolt"
  7. "learning/gooop/etcd/raft/model"
  8. )
  9. type tBoltDBStore struct {
  10. file string
  11. lastAppendedTerm int64
  12. lastAppendedIndex int64
  13. lastCommittedTerm int64
  14. lastCommittedIndex int64
  15. db bolt.DB
  16. }
  17. func NewBoltStore(file string) (error, ILogStore) {
  18. db, err := bolt.Open(file, 0600, nil)
  19. if err != nil {
  20. return err, nil
  21. }
  22. store := new(tBoltDBStore)
  23. err = db.Update(func(tx *bolt.Tx) error {
  24. b, e := tx.CreateBucketIfNotExists(gMetaBucket)
  25. if e != nil {
  26. return e
  27. }
  28. v := b.Get(gKeyCommittedTerm)
  29. if v == nil {
  30. e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))
  31. if e != nil {
  32. return e
  33. }
  34. store.lastCommittedTerm = gDefaultTerm
  35. } else {
  36. store.lastCommittedTerm = bytesToInt64(v)
  37. }
  38. v = b.Get(gKeyCommittedIndex)
  39. if v == nil {
  40. e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))
  41. if e != nil {
  42. return e
  43. }
  44. store.lastCommittedIndex = gDefaultIndex
  45. } else {
  46. store.lastCommittedIndex = bytesToInt64(v)
  47. }
  48. b, e = tx.CreateBucketIfNotExists(gDataBucket)
  49. if e != nil {
  50. return e
  51. }
  52. e = tx.DeleteBucket(gUnstableBucket)
  53. if e != nil {
  54. return e
  55. }
  56. _, e = tx.CreateBucket(gUnstableBucket)
  57. if e != nil {
  58. return e
  59. }
  60. _, e = tx.CreateBucketIfNotExists(gCommittedBucket)
  61. if e != nil {
  62. return e
  63. }
  64. return nil
  65. })
  66. if err != nil {
  67. return err, nil
  68. }
  69. return nil, store
  70. }
  71. func int64ToBytes(i int64) []byte {
  72. buf := bytes.NewBuffer(make([]byte, 8))
  73. _ = binary.Write(buf, binary.BigEndian, i)
  74. return buf.Bytes()
  75. }
  76. func bytesToInt64(data []byte) int64 {
  77. var i int64
  78. buf := bytes.NewBuffer(data)
  79. _ = binary.Read(buf, binary.BigEndian, &i)
  80. return i
  81. }
  82. func (me *tBoltDBStore) LastCommittedTerm() int64 {
  83. return me.lastCommittedTerm
  84. }
  85. func (me *tBoltDBStore) LastCommittedIndex() int64 {
  86. return me.lastCommittedIndex
  87. }
  88. func (me *tBoltDBStore) LastAppendedTerm() int64 {
  89. return me.lastAppendedTerm
  90. }
  91. func (me *tBoltDBStore) LastAppendedIndex() int64 {
  92. return me.lastAppendedIndex
  93. }
  94. func (me *tBoltDBStore) Append(entry *model.LogEntry) error {
  95. cmd := gCmdFactory.OfTag(entry.Tag)
  96. cmd.Unmarshal(entry.Command)
  97. e, entryData := entry.Marshal()
  98. if e != nil {
  99. return e
  100. }
  101. return me.db.Update(func(tx *bolt.Tx) error {
  102. // save log to unstable
  103. b := tx.Bucket(gUnstableBucket)
  104. e = b.Put(int64ToBytes(entry.Index), entryData)
  105. if e != nil {
  106. return e
  107. }
  108. return nil
  109. })
  110. }
  111. func (me *tBoltDBStore) Commit(index int64) error {
  112. return me.db.Update(func(tx *bolt.Tx) error {
  113. // read unstable log
  114. ub := tx.Bucket(gUnstableBucket)
  115. k := int64ToBytes(index)
  116. data := ub.Get(k)
  117. if data == nil {
  118. return gErrorCommitLogNotFound
  119. }
  120. entry := new(model.LogEntry)
  121. e := entry.Unmarshal(data)
  122. if e != nil {
  123. return e
  124. }
  125. // apply cmd
  126. cmd := gCmdFactory.OfTag(entry.Tag)
  127. cmd.Unmarshal(entry.Command)
  128. e = cmd.Apply(tx)
  129. if e != nil {
  130. return e
  131. }
  132. // save to committed log
  133. cb := tx.Bucket(gCommittedBucket)
  134. e = cb.Put(k, data)
  135. if e != nil {
  136. return e
  137. }
  138. // update committed.index, committed.term
  139. mb := tx.Bucket(gMetaBucket)
  140. e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))
  141. if e != nil {
  142. return e
  143. }
  144. e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))
  145. if e != nil {
  146. return e
  147. }
  148. // del unstable.index
  149. e = ub.Delete(k)
  150. if e != nil {
  151. return e
  152. }
  153. me.lastCommittedIndex = entry.Index
  154. me.lastCommittedTerm = entry.Term
  155. return nil
  156. })
  157. }
  158. func (me *tBoltDBStore) GetLog(index int64) (error, *model.LogEntry) {
  159. ret := []*model.LogEntry{ nil }
  160. e := me.db.View(func(tx *bolt.Tx) error {
  161. k := int64ToBytes(index)
  162. v := tx.Bucket(gCommittedBucket).Get(k)
  163. if v == nil {
  164. return nil
  165. }
  166. entry := new(model.LogEntry)
  167. e := entry.Unmarshal(v)
  168. if e != nil {
  169. return e
  170. }
  171. ret[0] = entry
  172. return nil
  173. })
  174. return e, ret[0]
  175. }
  176. var gMetaBucket = []byte("meta")
  177. var gUnstableBucket = []byte("unstable")
  178. var gCommittedBucket = []byte("committed")
  179. var gDataBucket = []byte("data")
  180. var gKeyCommittedIndex = []byte("committed.index")
  181. var gKeyCommittedTerm = []byte("committed.term")
  182. var gDefaultTerm int64 = 0
  183. var gDefaultIndex int64 = 0
  184. var gErrorCommitLogNotFound = errors.New("committing log not found")

tFollowerState.go

根据新分解的RPC接口,重写Follower状态的实现(未完成)

  1. package lsm
  2. import (
  3. "learning/gooop/etcd/raft/roles"
  4. "learning/gooop/etcd/raft/rpc"
  5. "learning/gooop/etcd/raft/timeout"
  6. "sync"
  7. "time"
  8. )
  9. // tFollowerState presents a follower node
  10. type tFollowerState struct {
  11. tEventDrivenModel
  12. context IRaftLSM
  13. mInitOnce sync.Once
  14. mStartOnce sync.Once
  15. mDisposeOnce sync.Once
  16. // updated when init, set term == store.lastCommittedTerm
  17. // updated when leader.heartbeat
  18. mTerm int64
  19. // updated when leader.heartbeat
  20. mLeaderHeartbeatClock int64
  21. mVotedLeaderID string
  22. mVotedTimestamp int64
  23. }
  24. const feStart string = "follower.Start"
  25. const feLeaderHeartbeatTimeout string = "follower.LeaderHeartbeatTimeout"
  26. func newFollowerState(ctx IRaftLSM) IRaftState {
  27. it := new(tFollowerState)
  28. it.init(ctx)
  29. return it
  30. }
  31. func (me *tFollowerState) init(ctx IRaftLSM) {
  32. me.mInitOnce.Do(func() {
  33. me.context = ctx
  34. me.mTerm = ctx.store().LastCommittedTerm()
  35. me.mLeaderHeartbeatClock = 0
  36. me.initEventHandlers()
  37. })
  38. }
  39. func (me *tFollowerState) initEventHandlers() {
  40. me.hook(feStart,
  41. me.whenStartThenBeginWatchLeaderTimeout)
  42. me.hook(feLeaderHeartbeatTimeout,
  43. me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState)
  44. }
  45. func (me *tFollowerState) Start() {
  46. me.mStartOnce.Do(func() {
  47. me.raise(feStart)
  48. })
  49. }
  50. func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) {
  51. go func() {
  52. iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3
  53. iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond)
  54. for range time.Tick(iCheckingTimeoutInterval) {
  55. now := time.Now().UnixNano()
  56. if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos {
  57. me.raise(feLeaderHeartbeatTimeout)
  58. return
  59. }
  60. }
  61. }()
  62. }
  63. func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) {
  64. panic("implements me")
  65. }
  66. func (me *tFollowerState) Role() roles.RaftRole {
  67. return roles.Follower
  68. }
  69. // Heartbeat leader to follower
  70. func (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
  71. if cmd.Term < me.mTerm {
  72. // invalid leader
  73. ret.Code = rpc.HBTermMismatch
  74. ret.Term = me.mTerm
  75. return nil
  76. } else if cmd.Term > me.mTerm {
  77. // new leader
  78. me.mTerm = cmd.Term
  79. }
  80. // update heartbeat clock and return
  81. me.mLeaderHeartbeatClock = time.Now().UnixNano()
  82. ret.Code = rpc.HBOk
  83. return nil
  84. }
  85. // AppendLog leader to follower
  86. func (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
  87. ret.Term = me.mTerm
  88. if cmd.Term < me.mTerm {
  89. // invalid leader
  90. ret.Code = rpc.ALTermMismatch
  91. return nil
  92. }
  93. store := me.context.store()
  94. entry := cmd.Entry
  95. // check log: expecting appending action follows previous committing action
  96. if entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() {
  97. // check log
  98. e, log := store.GetLog(entry.Index)
  99. if e != nil {
  100. ret.Code = rpc.ALInternalError
  101. return nil
  102. }
  103. if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm {
  104. // bad log
  105. ret.Code = rpc.ALIndexMismatch
  106. ret.PrevLogIndex = store.LastCommittedIndex()
  107. ret.PrevLogTerm = store.LastCommittedTerm()
  108. return nil
  109. }
  110. // good log, but old, just ignore it
  111. ret.Code = rpc.ALOk
  112. return nil
  113. }
  114. // good log
  115. e := store.Append(entry)
  116. if e != nil {
  117. ret.Code = rpc.ALInternalError
  118. return nil
  119. } else {
  120. ret.Code = rpc.ALOk
  121. return nil
  122. }
  123. }
  124. // CommitLog leader to follower
  125. func (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
  126. store := me.context.store()
  127. if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() {
  128. // bad index
  129. ret.Code = rpc.CLLogNotFound
  130. return nil
  131. }
  132. e := store.Commit(cmd.Index)
  133. if e != nil {
  134. ret.Code = rpc.CLInternalError
  135. return nil
  136. }
  137. ret.Code = rpc.CLOk
  138. return nil
  139. }
  140. // RequestVote candidate to follower
  141. func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
  142. panic("implements me")
  143. }

(未完待续)