Degisn
存储系统的基础逻辑关系如图 1 所示。Backend 接口表示存储组件,核心存储功能最终由 BoltDB 提供,ReadTx 表示读取事务,BatchTx 则表示批处理事务,Snapshot 表示存储快照。
图 1:Backend 接口关联图
Backend 接口定义如下,其中第 6 行可以返回一个并发读事务实例,第 20 行可以获取全部读事务数量,Size 与 SizeInUse 则用于获取存储空间信息,其他方法如 Hash、Defrag 虽然通过名字可以猜测功能大致情况,但还不能准确描述,延后解决。
type Backend interface {// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.ReadTx() ReadTxBatchTx() BatchTx// ConcurrentReadTx returns a non-blocking read transaction.ConcurrentReadTx() ReadTxSnapshot() SnapshotHash(ignores func(bucketName, keyName []byte) bool) (uint32, error)// Size returns the current size of the backend physically allocated.// The backend can hold DB space that is not utilized at the moment,// since it can conduct pre-allocation or spare unused space for recycling.// Use SizeInUse() instead for the actual DB size.Size() int64// SizeInUse returns the current size of the backend logically in use.// Since the backend can manage free space in a non-byte unit such as// number of pages, the returned value can be not exactly accurate in bytes.SizeInUse() int64// OpenReadTxN returns the number of currently open read transactions in the backend.OpenReadTxN() int64Defrag() errorForceCommit()Close() error// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.SetTxPostLockInsideApplyHook(func())}
代码 1 - 1:Backend 接口定义
ReadTx
ReadTx 接口定义中主要定义了锁方法及访问、遍历方法。从接口定义中,可以看到有两种不同锁方式。根据命名方式我们猜测 4 ~ 5 行与只读锁相关,2 ~ 3 行与写锁使用相关,那么显然有一个问题:为什么需要两种锁存在呢?
type ReadTx interface {Lock()Unlock()RLock()RUnlock()UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error}
代码 1 - 2:ReadTx 接口定义
BatchTx
BatchTx 包含一个 ReadTx 接口,因此之前的写锁应该是与批处理相关的,当然在 BatchTx 接口中又定义了两个所相关方法:LockInsideApply、LockOutsideApply,这两个作用还要根据对该接口实现机制进一步分析,其他方法都容易理解。
type BatchTx interface {ReadTxUnsafeCreateBucket(bucket Bucket)UnsafeDeleteBucket(bucket Bucket)UnsafePut(bucket Bucket, key []byte, value []byte)UnsafeSeqPut(bucket Bucket, key []byte, value []byte)UnsafeDelete(bucket Bucket, key []byte)// Commit commits a previous tx and begins a new writable one.Commit()// CommitAndStop commits the previous tx and does not create a new one.CommitAndStop()LockInsideApply()LockOutsideApply()}
代码 1 - 3:BatchTx 接口定义
Snapshot
Snapshot 接口定义非常简单,关键方法应该是 WriteTo 方法,由这个方法可以猜测存储快照有一定可能性是用于网络传输的。
type Snapshot interface {// Size gets the size of the snapshot.Size() int64// WriteTo writes the snapshot into the given writer.WriteTo(w io.Writer) (n int64, err error)// Close closes the snapshot.Close() error}
代码 1 - 4:Snapshot 接口定义
Backend Instance
Backend 实例创建过程见代码 2 - 1。首先,创建 BoltDB 对象,因为 BoltDB 是类似于 sqlite 的文件格式,因此需要传入一个主目录(用户可配置);然后创建读事务、批处理事务;最后启动一个协程执行定时提交批处理事务功能。第 54 行启动的协程用于执行批处理事务定时提交操作,详见代码 2 - 2。
func newBackend(bcfg BackendConfig) *backend {bopts := &bolt.Options{}if boltOpenOptions != nil {*bopts = *boltOpenOptions}bopts.InitialMmapSize = bcfg.mmapSize()bopts.FreelistType = bcfg.BackendFreelistTypebopts.NoSync = bcfg.UnsafeNoFsyncbopts.NoGrowSync = bcfg.UnsafeNoFsyncbopts.Mlock = bcfg.Mlockdb, err := bolt.Open(bcfg.Path, 0600, bopts)if err != nil {bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))}// In future, may want to make buffering optional for low-concurrency systems// or dynamically swap between buffered/non-buffered depending on workload.b := &backend{bopts: bopts,db: db,batchInterval: bcfg.BatchInterval,batchLimit: bcfg.BatchLimit,mlock: bcfg.Mlock,readTx: &readTx{baseReadTx: baseReadTx{buf: txReadBuffer{txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},bufVersion: 0,},buckets: make(map[BucketID]*bolt.Bucket),txWg: new(sync.WaitGroup),txMu: new(sync.RWMutex),},},txReadBufferCache: txReadBufferCache{mu: sync.Mutex{},bufVersion: 0,buf: nil,},stopc: make(chan struct{}),donec: make(chan struct{}),lg: bcfg.Logger,}b.batchTx = newBatchTxBuffered(b)// We set it after newBatchTxBuffered to skip the 'empty' commit.b.hooks = bcfg.Hooksgo b.run()return b}
代码 2 - 1:Backend 实例创建
后台定时执行批处理事务提交,注意第 9 行与低 13 行的两种提交方式,第 9 行的提交是由停止信号控制的,提交后不会再开启新的只读事务和批处理事务;第 13 行的提交完成后会开启新的只读事务和批处理事务。
func (b *backend) run() {defer close(b.donec)t := time.NewTimer(b.batchInterval)defer t.Stop()for {select {case <-t.C:case <-b.stopc:b.batchTx.CommitAndStop()return}if b.batchTx.safePending() != 0 {b.batchTx.Commit()}t.Reset(b.batchInterval)}}
代码 2 - 2:后台定时任务
Read Transaction
图 2:baseReadTx 概览
Read Transaction 大致可以分为数据库访问部分和缓冲部分。设计的关键是通过 BucketID 对数据和缓冲进行关联。
type BucketID inttype Bucket interface {// ID returns a unique identifier of a bucket.// The id must NOT be persisted and can be used as lightweight identificator// in the in-memory maps.ID() BucketIDName() []byte// String implements Stringer (human readable name).String() string// IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket// is known to never overwrite any key so range is safe.IsSafeRangeBucket() bool}
代码 2 - 3:BucketID 与 Bucket
Bucket 接口通过 bucket 机构来实现。同时,全局定义了 Key、Meta、Lease、Alarm、Cluster、Members、MembersRemoved、Auth、AuthUsers、AuthRoles 和 Test 桶。
var (Key = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true})Meta = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false})Lease = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false})Alarm = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false})Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false})Members = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false})MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false})Auth = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false})AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false})AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false})Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false}))type bucket struct {id backend.BucketIDname []bytesafeRangeBucket bool}func (b bucket) ID() backend.BucketID { return b.id }func (b bucket) Name() []byte { return b.name }func (b bucket) String() string { return string(b.Name()) }func (b bucket) IsSafeRangeBucket() bool { return b.safeRangeBucket }
代码 2 - 4:Bucket 接口实例和全局桶
缓存中最终为 kv 结构,存储一个 key - value 对,如代码 2 - 5 所示,bucketBuffer 可通过 BucketID 来查找,其 add 方法如见代码 2 - 6,当缓存满时,重新分配 1.5 倍空间。
type kv struct {key []byteval []byte}
代码 2 - 5:kv 结构
func (bb *bucketBuffer) add(k, v []byte) {bb.buf[bb.used].key, bb.buf[bb.used].val = k, vbb.used++if bb.used == len(bb.buf) {buf := make([]kv, (3*len(bb.buf))/2)copy(buf, bb.buf)bb.buf = buf}}
代码 2 - 6:添加 kv 进缓存
Batch Transaction
图 3:batchTxBuffered 概览
对 Backend 的批处理操作通过图 3 中 batchTxBuffered 结构完成,这个结构是一个带缓存的批处理结构,最终事务提交操作由 bolt.Tx 完成。向 txWriteBuffer 中放入一个 key - value 对操作如代码 2 - 7 所示,注意第 2 行设置 bucket2seq 中对应 BucketID 值是 false。
func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) {txw.bucket2seq[bucket.ID()] = falsetxw.putInternal(bucket, k, v)}func (txw *txWriteBuffer) putInternal(bucket Bucket, k, v []byte) {b, ok := txw.buckets[bucket.ID()]if !ok {b = newBucketBuffer()txw.buckets[bucket.ID()] = b}b.add(k, v)}
代码 2 - 7:txWriteBuffer 写入操作
Commit
创建 batchTxBuffered 结构时,会执行一个 Commit 操作,如代码 2 - 8 第 9 行所示,一个全新的事务为什么要执行一次提交操作呢?在代码 2 - 9 中,可以发现第 3 行中的 commit(false) 操作,接下来我们将详细分析 Commit 都做了什么。
func newBatchTxBuffered(backend *backend) *batchTxBuffered {tx := &batchTxBuffered{batchTx: batchTx{backend: backend},buf: txWriteBuffer{txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},bucket2seq: make(map[BucketID]bool),},}tx.Commit()return tx}
代码 2 - 8:batchTxBuffered 创建
func (t *batchTxBuffered) Commit() {t.lock()t.commit(false)t.Unlock()}
代码 2 - 9:batchTxBuffered.Commit
执行 commit 时(代码 2 - 10),需要传入一个 stop 参数,这个参数通过名称大致可以猜测用于控制事务终结。首先根据 backend 中是否设置了 Hook 方法,如果设置了,执行其 OnPreCommitUnsafe 方法;接下来,锁定全部读事务,并执行提交操作,操作完成解锁读事务。回忆一下,读事务中包含了只读锁和读写锁,在这里锁定的是读写锁。
func (t *batchTxBuffered) commit(stop bool) {if t.backend.hooks != nil {t.backend.hooks.OnPreCommitUnsafe(t)}// all read txs must be closed to acquire boltdb commit rwlockt.backend.readTx.Lock()t.unsafeCommit(stop)t.backend.readTx.Unlock()}
代码 2 - 10:batchTxBuffered.commit
执行 unsafeCommit 时,首先根据 backend 中是否存在真正的 bolt.Tx 事务实例,如果存在,则等待该事务执行 Rollback 操作,并重置 readTx(代码 2 - 12),可以看到主要清理的是缓存内容,并重置事务和等待组实例;如果 stop 为 false,则重建一个只读事务(代码 2 - 11 第 17 行)。
func (t *batchTxBuffered) unsafeCommit(stop bool) {if t.backend.readTx.tx != nil {// wait all store read transactions using the current boltdb tx to finish,// then close the boltdb txgo func(tx *bolt.Tx, wg *sync.WaitGroup) {wg.Wait()if err := tx.Rollback(); err != nil {t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))}}(t.backend.readTx.tx, t.backend.readTx.txWg)t.backend.readTx.reset()}t.batchTx.commit(stop)if !stop {t.backend.readTx.tx = t.backend.begin(false)}}
代码 2 - 11:batchTxBuffered.unsafeCommit
func (rt *readTx) reset() {rt.buf.reset()rt.buckets = make(map[BucketID]*bolt.Bucket)rt.tx = nilrt.txWg = new(sync.WaitGroup)}
代码 2 - 12:readTx.reset
最终,执行 batchTx 的 commit 方法,如果此时有 bolt.Tx 实例,执行其 Commit 操作。如果 stop 为 false,重建一个 bolt.Tx 读写事务(代码 2 - 13 第 26 行)。
func (t *batchTx) commit(stop bool) {// commit the last txif t.tx != nil {if t.pending == 0 && !stop {return}start := time.Now()// gofail: var beforeCommit struct{}err := t.tx.Commit()// gofail: var afterCommit struct{}rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())spillSec.Observe(t.tx.Stats().SpillTime.Seconds())writeSec.Observe(t.tx.Stats().WriteTime.Seconds())commitSec.Observe(time.Since(start).Seconds())atomic.AddInt64(&t.backend.commits, 1)t.pending = 0if err != nil {t.backend.lg.Fatal("failed to commit tx", zap.Error(err))}}if !stop {t.tx = t.backend.begin(true)}}
代码 2 - 13:batchTx.commit
在提交操作中使用的 backend.begin 操作如代码 2 - 14 所示,根据 write 参数的取值来创建读写、只读事务。
func (b *backend) begin(write bool) *bolt.Tx {b.mu.RLock()tx := b.unsafeBegin(write)b.mu.RUnlock()size := tx.Size()db := tx.DB()stats := db.Stats()atomic.StoreInt64(&b.size, size)atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))return tx}func (b *backend) unsafeBegin(write bool) *bolt.Tx {tx, err := b.db.Begin(write)if err != nil {b.lg.Fatal("failed to begin tx", zap.Error(err))}return tx}
代码 2 - 14:backend.begin
Hooks
图 4:Hooks 概览
图 4 展示了 Backend 的 Hooks 关键结构,其中 consistentIndex 主要用于保存与 Raft 相关的数据结构,最终还是要通过 backend 的批处理事务进行写入操作。代码 2 - 15 展示了写入 Index 与 Term 的具体细节,这两个值存储在 Meta 桶的 consistent_key 和 term 关键字下,使用大端表示法存储。
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {if index == 0 {// Never save 0 as it means that we didn't load the real index yet.return}bs1 := make([]byte, 8)binary.BigEndian.PutUint64(bs1, index)// put the index into the underlying backend// tx has been locked in TxnBegin, so there is no need to lock it againtx.UnsafePut(Meta, MetaConsistentIndexKeyName, bs1)if term > 0 {bs2 := make([]byte, 8)binary.BigEndian.PutUint64(bs2, term)tx.UnsafePut(Meta, MetaTermKeyName, bs2)}}
代码 2 - 15:保存 Index 与 Term
至此,与 Backend 相关的关键结构就基本分析完毕,具体功能细节在后续源码分析中会根据场景再展开讲解。
