缘起

最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

  1. 分布式存储系统通常会通过维护多个副本来进行容错,
  2. 以提高系统的可用性。
  3. 这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
  4. Raft算法把问题分解成了领袖选举(leader election)、
  5. 日志复制(log replication)、安全性(safety
  6. 和成员关系变化(membership changes)这几个子问题。
  7. Raft算法的基本操作只需2RPC即可完成。
  8. RequestVote RPC是在选举过程中通过旧的Leader触发的,
  9. AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 4)

  • 使用boltdb存储操作日志和kv键值数据
    • unstable存储桶:已收到未提交的日志,重启后清空
    • committed存储桶:已提交的日志
    • data存储桶:kv键值数据
    • meta存储桶:记录末次提交的index和term

设计

  • model/LogEntry: 日志条目
  • ICmd:操作指令接口
  • ICmdFactory:操作指令工厂
  • ILogStore:日志存储接口
  • tCmdBase:指令基类
  • PutCmd:put指令
  • DelCmd:del指令
  • tBoltDBStore:基于boltdb实现日志暂存,提交和应用

LogEntry.go

日志条目

  1. package model
  2. import "encoding/json"
  3. type LogEntry struct {
  4. Tag int
  5. Term int64
  6. Index int64
  7. PrevTerm int64
  8. PrevIndex int64
  9. Command []byte
  10. }
  11. func (me *LogEntry) Marshal() (error, []byte) {
  12. j, e := json.Marshal(me)
  13. if e != nil {
  14. return e, nil
  15. }
  16. return nil, j
  17. }
  18. func (me *LogEntry) Unmarshal(data []byte) error {
  19. return json.Unmarshal(data, me)
  20. }

ICmd.go

操作指令接口

  1. package store
  2. import "github.com/boltdb/bolt"
  3. type ICmd interface {
  4. Marshal() []byte
  5. Unmarshal(data []byte)
  6. Apply(tx *bolt.Tx) error
  7. }

ICmdFactory.go

操作指令工厂

  1. package store
  2. import "fmt"
  3. type ICmdFactory interface {
  4. OfTag(tag int) ICmd
  5. Tag(cmd ICmd) int
  6. }
  7. type tDefaultCmdFactory int
  8. const gPutCmdTag = 1
  9. const gDelCmdTag = 2
  10. func (me *tDefaultCmdFactory) OfTag(tag int) ICmd {
  11. switch tag {
  12. case gPutCmdTag:
  13. return new(PutCmd)
  14. case gDelCmdTag:
  15. return new(DelCmd)
  16. }
  17. panic(fmt.Sprintf("unknown tag: %d", tag))
  18. }
  19. func (me *tDefaultCmdFactory) Tag(cmd ICmd) int {
  20. if _, ok := cmd.(*PutCmd); ok {
  21. return gPutCmdTag
  22. }
  23. if _, ok := cmd.(*DelCmd); ok {
  24. return gDelCmdTag
  25. }
  26. panic(fmt.Sprintf("unknown cmd: %v", cmd))
  27. }
  28. var gCmdFactory = new(tDefaultCmdFactory)

ILogStore.go

日志存储接口

  1. package store
  2. import "learning/gooop/etcd/raft/model"
  3. type ILogStore interface {
  4. Term() int64
  5. Index() int64
  6. Append(entry *model.LogEntry) error
  7. Commit(index int64) error
  8. }

tCmdBase.go

指令基类

  1. package store
  2. import "encoding/json"
  3. type tCmdBase struct {
  4. }
  5. func (me *tCmdBase) Marshal() []byte {
  6. j, e := json.Marshal(me)
  7. if e != nil {
  8. return nil
  9. }
  10. return j
  11. }
  12. func (me *tCmdBase) Unmarshal(data []byte) {
  13. _ = json.Unmarshal(data, me)
  14. }

PutCmd.go

put指令

  1. package store
  2. import "github.com/boltdb/bolt"
  3. type PutCmd struct {
  4. tCmdBase
  5. Key string
  6. Value []byte
  7. }
  8. func (me *PutCmd) Apply(tx *bolt.Tx) error {
  9. b := tx.Bucket(gDataBucket)
  10. return b.Put([]byte(me.Key), me.Value)
  11. }

DelCmd.go

del指令

  1. package store
  2. import "github.com/boltdb/bolt"
  3. type DelCmd struct {
  4. tCmdBase
  5. Key string
  6. }
  7. func (me *DelCmd) Apply(tx *bolt.Tx) error {
  8. b := tx.Bucket(gDataBucket)
  9. return b.Delete([]byte(me.Key))
  10. }

tBoltDBStore.go

基于boltdb实现日志暂存,提交和应用

  1. package store
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "github.com/boltdb/bolt"
  7. "learning/gooop/etcd/raft/model"
  8. )
  9. type tBoltDBStore struct {
  10. file string
  11. term int64
  12. index int64
  13. db bolt.DB
  14. }
  15. func NewBoltStore(file string) (error, ILogStore) {
  16. db, err := bolt.Open(file, 0600, nil)
  17. if err != nil {
  18. return err, nil
  19. }
  20. store := new(tBoltDBStore)
  21. err = db.Update(func(tx *bolt.Tx) error {
  22. b, e := tx.CreateBucketIfNotExists(gMetaBucket)
  23. if e != nil {
  24. return e
  25. }
  26. v := b.Get(gKeyCommittedTerm)
  27. if v == nil {
  28. e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))
  29. if e != nil {
  30. return e
  31. }
  32. store.term = gDefaultTerm
  33. } else {
  34. store.term = bytesToInt64(v)
  35. }
  36. v = b.Get(gKeyCommittedIndex)
  37. if v == nil {
  38. e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))
  39. if e != nil {
  40. return e
  41. }
  42. store.index = gDefaultIndex
  43. } else {
  44. store.index = bytesToInt64(v)
  45. }
  46. b, e = tx.CreateBucketIfNotExists(gDataBucket)
  47. if e != nil {
  48. return e
  49. }
  50. e = tx.DeleteBucket(gUnstableBucket)
  51. if e != nil {
  52. return e
  53. }
  54. _, e = tx.CreateBucket(gUnstableBucket)
  55. if e != nil {
  56. return e
  57. }
  58. _, e = tx.CreateBucketIfNotExists(gCommittedBucket)
  59. if e != nil {
  60. return e
  61. }
  62. return nil
  63. })
  64. if err != nil {
  65. return err, nil
  66. }
  67. return nil, store
  68. }
  69. func int64ToBytes(i int64) []byte {
  70. buf := bytes.NewBuffer(make([]byte, 8))
  71. _ = binary.Write(buf, binary.BigEndian, i)
  72. return buf.Bytes()
  73. }
  74. func bytesToInt64(data []byte) int64 {
  75. var i int64
  76. buf := bytes.NewBuffer(data)
  77. _ = binary.Read(buf, binary.BigEndian, &i)
  78. return i
  79. }
  80. func (me *tBoltDBStore) Term() int64 {
  81. return me.term
  82. }
  83. func (me *tBoltDBStore) Index() int64 {
  84. return me.index
  85. }
  86. func (me *tBoltDBStore) Append(entry *model.LogEntry) error {
  87. cmd := gCmdFactory.OfTag(entry.Tag)
  88. cmd.Unmarshal(entry.Command)
  89. e, entryData := entry.Marshal()
  90. if e != nil {
  91. return e
  92. }
  93. return me.db.Update(func(tx *bolt.Tx) error {
  94. // save log to unstable
  95. b := tx.Bucket(gUnstableBucket)
  96. e = b.Put(int64ToBytes(entry.Index), entryData)
  97. if e != nil {
  98. return e
  99. }
  100. me.index = entry.Index
  101. me.term = entry.Term
  102. return nil
  103. })
  104. }
  105. func (me *tBoltDBStore) Commit(index int64) error {
  106. return me.db.Update(func(tx *bolt.Tx) error {
  107. // read unstable log
  108. ub := tx.Bucket(gUnstableBucket)
  109. k := int64ToBytes(index)
  110. data := ub.Get(k)
  111. if data == nil {
  112. return gErrorCommitLogNotFound
  113. }
  114. entry := new(model.LogEntry)
  115. e := entry.Unmarshal(data)
  116. if e != nil {
  117. return e
  118. }
  119. // apply cmd
  120. cmd := gCmdFactory.OfTag(entry.Tag)
  121. cmd.Unmarshal(entry.Command)
  122. e = cmd.Apply(tx)
  123. if e != nil {
  124. return e
  125. }
  126. // save to committed log
  127. cb := tx.Bucket(gCommittedBucket)
  128. e = cb.Put(k, data)
  129. if e != nil {
  130. return e
  131. }
  132. // update committed.index, committed.term
  133. mb := tx.Bucket(gMetaBucket)
  134. e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))
  135. if e != nil {
  136. return e
  137. }
  138. e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))
  139. if e != nil {
  140. return e
  141. }
  142. // del unstable.index
  143. e = ub.Delete(k)
  144. if e != nil {
  145. return e
  146. }
  147. return nil
  148. })
  149. }
  150. var gMetaBucket = []byte("meta")
  151. var gUnstableBucket = []byte("unstable")
  152. var gCommittedBucket = []byte("committed")
  153. var gDataBucket = []byte("data")
  154. var gKeyCommittedIndex = []byte("committed.index")
  155. var gKeyCommittedTerm = []byte("committed.term")
  156. var gDefaultTerm int64 = 0
  157. var gDefaultIndex int64 = 0
  158. var gErrorCommitLogNotFound = errors.New("committing log not found")

(未完待续)