缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,以提高系统的可用性。这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?Raft算法把问题分解成了领袖选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变化(membership changes)这几个子问题。Raft算法的基本操作只需2种RPC即可完成。RequestVote RPC是在选举过程中通过旧的Leader触发的,AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 4)
- 使用boltdb存储操作日志和kv键值数据
- unstable存储桶:已收到未提交的日志,重启后清空
- committed存储桶:已提交的日志
- data存储桶:kv键值数据
- meta存储桶:记录末次提交的index和term
设计
- model/LogEntry: 日志条目
- ICmd:操作指令接口
- ICmdFactory:操作指令工厂
- ILogStore:日志存储接口
- tCmdBase:指令基类
- PutCmd:put指令
- DelCmd:del指令
- tBoltDBStore:基于boltdb实现日志暂存,提交和应用
LogEntry.go
日志条目
package modelimport "encoding/json"type LogEntry struct {Tag intTerm int64Index int64PrevTerm int64PrevIndex int64Command []byte}func (me *LogEntry) Marshal() (error, []byte) {j, e := json.Marshal(me)if e != nil {return e, nil}return nil, j}func (me *LogEntry) Unmarshal(data []byte) error {return json.Unmarshal(data, me)}
ICmd.go
操作指令接口
package storeimport "github.com/boltdb/bolt"type ICmd interface {Marshal() []byteUnmarshal(data []byte)Apply(tx *bolt.Tx) error}
ICmdFactory.go
操作指令工厂
package storeimport "fmt"type ICmdFactory interface {OfTag(tag int) ICmdTag(cmd ICmd) int}type tDefaultCmdFactory intconst gPutCmdTag = 1const gDelCmdTag = 2func (me *tDefaultCmdFactory) OfTag(tag int) ICmd {switch tag {case gPutCmdTag:return new(PutCmd)case gDelCmdTag:return new(DelCmd)}panic(fmt.Sprintf("unknown tag: %d", tag))}func (me *tDefaultCmdFactory) Tag(cmd ICmd) int {if _, ok := cmd.(*PutCmd); ok {return gPutCmdTag}if _, ok := cmd.(*DelCmd); ok {return gDelCmdTag}panic(fmt.Sprintf("unknown cmd: %v", cmd))}var gCmdFactory = new(tDefaultCmdFactory)
ILogStore.go
日志存储接口
package storeimport "learning/gooop/etcd/raft/model"type ILogStore interface {Term() int64Index() int64Append(entry *model.LogEntry) errorCommit(index int64) error}
tCmdBase.go
指令基类
package storeimport "encoding/json"type tCmdBase struct {}func (me *tCmdBase) Marshal() []byte {j, e := json.Marshal(me)if e != nil {return nil}return j}func (me *tCmdBase) Unmarshal(data []byte) {_ = json.Unmarshal(data, me)}
PutCmd.go
put指令
package storeimport "github.com/boltdb/bolt"type PutCmd struct {tCmdBaseKey stringValue []byte}func (me *PutCmd) Apply(tx *bolt.Tx) error {b := tx.Bucket(gDataBucket)return b.Put([]byte(me.Key), me.Value)}
DelCmd.go
del指令
package storeimport "github.com/boltdb/bolt"type DelCmd struct {tCmdBaseKey string}func (me *DelCmd) Apply(tx *bolt.Tx) error {b := tx.Bucket(gDataBucket)return b.Delete([]byte(me.Key))}
tBoltDBStore.go
基于boltdb实现日志暂存,提交和应用
package storeimport ("bytes""encoding/binary""errors""github.com/boltdb/bolt""learning/gooop/etcd/raft/model")type tBoltDBStore struct {file stringterm int64index int64db bolt.DB}func NewBoltStore(file string) (error, ILogStore) {db, err := bolt.Open(file, 0600, nil)if err != nil {return err, nil}store := new(tBoltDBStore)err = db.Update(func(tx *bolt.Tx) error {b, e := tx.CreateBucketIfNotExists(gMetaBucket)if e != nil {return e}v := b.Get(gKeyCommittedTerm)if v == nil {e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))if e != nil {return e}store.term = gDefaultTerm} else {store.term = bytesToInt64(v)}v = b.Get(gKeyCommittedIndex)if v == nil {e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))if e != nil {return e}store.index = gDefaultIndex} else {store.index = bytesToInt64(v)}b, e = tx.CreateBucketIfNotExists(gDataBucket)if e != nil {return e}e = tx.DeleteBucket(gUnstableBucket)if e != nil {return e}_, e = tx.CreateBucket(gUnstableBucket)if e != nil {return e}_, e = tx.CreateBucketIfNotExists(gCommittedBucket)if e != nil {return e}return nil})if err != nil {return err, nil}return nil, store}func int64ToBytes(i int64) []byte {buf := bytes.NewBuffer(make([]byte, 8))_ = binary.Write(buf, binary.BigEndian, i)return buf.Bytes()}func bytesToInt64(data []byte) int64 {var i int64buf := bytes.NewBuffer(data)_ = binary.Read(buf, binary.BigEndian, &i)return i}func (me *tBoltDBStore) Term() int64 {return me.term}func (me *tBoltDBStore) Index() int64 {return me.index}func (me *tBoltDBStore) Append(entry *model.LogEntry) error {cmd := gCmdFactory.OfTag(entry.Tag)cmd.Unmarshal(entry.Command)e, entryData := entry.Marshal()if e != nil {return e}return me.db.Update(func(tx *bolt.Tx) error {// save log to unstableb := tx.Bucket(gUnstableBucket)e = b.Put(int64ToBytes(entry.Index), entryData)if e != nil {return e}me.index = entry.Indexme.term = entry.Termreturn nil})}func (me *tBoltDBStore) Commit(index int64) error {return me.db.Update(func(tx *bolt.Tx) error {// read unstable logub := tx.Bucket(gUnstableBucket)k := int64ToBytes(index)data := ub.Get(k)if data == nil {return gErrorCommitLogNotFound}entry := new(model.LogEntry)e := entry.Unmarshal(data)if e != nil {return e}// apply cmdcmd := gCmdFactory.OfTag(entry.Tag)cmd.Unmarshal(entry.Command)e = cmd.Apply(tx)if e != nil {return e}// save to committed logcb := tx.Bucket(gCommittedBucket)e = cb.Put(k, data)if e != nil {return e}// update committed.index, committed.termmb := tx.Bucket(gMetaBucket)e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))if e != nil {return e}e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))if e != nil {return e}// del unstable.indexe = ub.Delete(k)if e != nil {return e}return nil})}var gMetaBucket = []byte("meta")var gUnstableBucket = []byte("unstable")var gCommittedBucket = []byte("committed")var gDataBucket = []byte("data")var gKeyCommittedIndex = []byte("committed.index")var gKeyCommittedTerm = []byte("committed.term")var gDefaultTerm int64 = 0var gDefaultIndex int64 = 0var gErrorCommitLogNotFound = errors.New("committing log not found")
(未完待续)
