Degisn

存储系统的基础逻辑关系如图 1 所示。Backend 接口表示存储组件,核心存储功能最终由 BoltDB 提供,ReadTx 表示读取事务,BatchTx 则表示批处理事务,Snapshot 表示存储快照。
backend-interface.svg
图 1:Backend 接口关联图

Backend 接口定义如下,其中第 6 行可以返回一个并发读事务实例,第 20 行可以获取全部读事务数量,Size 与 SizeInUse 则用于获取存储空间信息,其他方法如 Hash、Defrag 虽然通过名字可以猜测功能大致情况,但还不能准确描述,延后解决。

  1. type Backend interface {
  2. // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
  3. ReadTx() ReadTx
  4. BatchTx() BatchTx
  5. // ConcurrentReadTx returns a non-blocking read transaction.
  6. ConcurrentReadTx() ReadTx
  7. Snapshot() Snapshot
  8. Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
  9. // Size returns the current size of the backend physically allocated.
  10. // The backend can hold DB space that is not utilized at the moment,
  11. // since it can conduct pre-allocation or spare unused space for recycling.
  12. // Use SizeInUse() instead for the actual DB size.
  13. Size() int64
  14. // SizeInUse returns the current size of the backend logically in use.
  15. // Since the backend can manage free space in a non-byte unit such as
  16. // number of pages, the returned value can be not exactly accurate in bytes.
  17. SizeInUse() int64
  18. // OpenReadTxN returns the number of currently open read transactions in the backend.
  19. OpenReadTxN() int64
  20. Defrag() error
  21. ForceCommit()
  22. Close() error
  23. // SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
  24. SetTxPostLockInsideApplyHook(func())
  25. }

代码 1 - 1:Backend 接口定义

ReadTx

ReadTx 接口定义中主要定义了锁方法及访问、遍历方法。从接口定义中,可以看到有两种不同锁方式。根据命名方式我们猜测 4 ~ 5 行与只读锁相关,2 ~ 3 行与写锁使用相关,那么显然有一个问题:为什么需要两种锁存在呢?

  1. type ReadTx interface {
  2. Lock()
  3. Unlock()
  4. RLock()
  5. RUnlock()
  6. UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
  7. UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
  8. }

代码 1 - 2:ReadTx 接口定义

BatchTx

BatchTx 包含一个 ReadTx 接口,因此之前的写锁应该是与批处理相关的,当然在 BatchTx 接口中又定义了两个所相关方法:LockInsideApply、LockOutsideApply,这两个作用还要根据对该接口实现机制进一步分析,其他方法都容易理解。

  1. type BatchTx interface {
  2. ReadTx
  3. UnsafeCreateBucket(bucket Bucket)
  4. UnsafeDeleteBucket(bucket Bucket)
  5. UnsafePut(bucket Bucket, key []byte, value []byte)
  6. UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
  7. UnsafeDelete(bucket Bucket, key []byte)
  8. // Commit commits a previous tx and begins a new writable one.
  9. Commit()
  10. // CommitAndStop commits the previous tx and does not create a new one.
  11. CommitAndStop()
  12. LockInsideApply()
  13. LockOutsideApply()
  14. }

代码 1 - 3:BatchTx 接口定义

Snapshot

Snapshot 接口定义非常简单,关键方法应该是 WriteTo 方法,由这个方法可以猜测存储快照有一定可能性是用于网络传输的。

  1. type Snapshot interface {
  2. // Size gets the size of the snapshot.
  3. Size() int64
  4. // WriteTo writes the snapshot into the given writer.
  5. WriteTo(w io.Writer) (n int64, err error)
  6. // Close closes the snapshot.
  7. Close() error
  8. }

代码 1 - 4:Snapshot 接口定义

Backend Instance

Backend 实例创建过程见代码 2 - 1。首先,创建 BoltDB 对象,因为 BoltDB 是类似于 sqlite 的文件格式,因此需要传入一个主目录(用户可配置);然后创建读事务、批处理事务;最后启动一个协程执行定时提交批处理事务功能。第 54 行启动的协程用于执行批处理事务定时提交操作,详见代码 2 - 2。

  1. func newBackend(bcfg BackendConfig) *backend {
  2. bopts := &bolt.Options{}
  3. if boltOpenOptions != nil {
  4. *bopts = *boltOpenOptions
  5. }
  6. bopts.InitialMmapSize = bcfg.mmapSize()
  7. bopts.FreelistType = bcfg.BackendFreelistType
  8. bopts.NoSync = bcfg.UnsafeNoFsync
  9. bopts.NoGrowSync = bcfg.UnsafeNoFsync
  10. bopts.Mlock = bcfg.Mlock
  11. db, err := bolt.Open(bcfg.Path, 0600, bopts)
  12. if err != nil {
  13. bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
  14. }
  15. // In future, may want to make buffering optional for low-concurrency systems
  16. // or dynamically swap between buffered/non-buffered depending on workload.
  17. b := &backend{
  18. bopts: bopts,
  19. db: db,
  20. batchInterval: bcfg.BatchInterval,
  21. batchLimit: bcfg.BatchLimit,
  22. mlock: bcfg.Mlock,
  23. readTx: &readTx{
  24. baseReadTx: baseReadTx{
  25. buf: txReadBuffer{
  26. txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
  27. bufVersion: 0,
  28. },
  29. buckets: make(map[BucketID]*bolt.Bucket),
  30. txWg: new(sync.WaitGroup),
  31. txMu: new(sync.RWMutex),
  32. },
  33. },
  34. txReadBufferCache: txReadBufferCache{
  35. mu: sync.Mutex{},
  36. bufVersion: 0,
  37. buf: nil,
  38. },
  39. stopc: make(chan struct{}),
  40. donec: make(chan struct{}),
  41. lg: bcfg.Logger,
  42. }
  43. b.batchTx = newBatchTxBuffered(b)
  44. // We set it after newBatchTxBuffered to skip the 'empty' commit.
  45. b.hooks = bcfg.Hooks
  46. go b.run()
  47. return b
  48. }

代码 2 - 1:Backend 实例创建

后台定时执行批处理事务提交,注意第 9 行与低 13 行的两种提交方式,第 9 行的提交是由停止信号控制的,提交后不会再开启新的只读事务和批处理事务;第 13 行的提交完成后会开启新的只读事务和批处理事务。

  1. func (b *backend) run() {
  2. defer close(b.donec)
  3. t := time.NewTimer(b.batchInterval)
  4. defer t.Stop()
  5. for {
  6. select {
  7. case <-t.C:
  8. case <-b.stopc:
  9. b.batchTx.CommitAndStop()
  10. return
  11. }
  12. if b.batchTx.safePending() != 0 {
  13. b.batchTx.Commit()
  14. }
  15. t.Reset(b.batchInterval)
  16. }
  17. }

代码 2 - 2:后台定时任务

Read Transaction

backend-readtx.svg
图 2:baseReadTx 概览

Read Transaction 大致可以分为数据库访问部分和缓冲部分。设计的关键是通过 BucketID 对数据和缓冲进行关联。

  1. type BucketID int
  2. type Bucket interface {
  3. // ID returns a unique identifier of a bucket.
  4. // The id must NOT be persisted and can be used as lightweight identificator
  5. // in the in-memory maps.
  6. ID() BucketID
  7. Name() []byte
  8. // String implements Stringer (human readable name).
  9. String() string
  10. // IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
  11. // overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
  12. // is known to never overwrite any key so range is safe.
  13. IsSafeRangeBucket() bool
  14. }

代码 2 - 3:BucketID 与 Bucket

Bucket 接口通过 bucket 机构来实现。同时,全局定义了 Key、Meta、Lease、Alarm、Cluster、Members、MembersRemoved、Auth、AuthUsers、AuthRoles 和 Test 桶。

  1. var (
  2. Key = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true})
  3. Meta = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false})
  4. Lease = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false})
  5. Alarm = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false})
  6. Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false})
  7. Members = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false})
  8. MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false})
  9. Auth = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false})
  10. AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false})
  11. AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false})
  12. Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false})
  13. )
  14. type bucket struct {
  15. id backend.BucketID
  16. name []byte
  17. safeRangeBucket bool
  18. }
  19. func (b bucket) ID() backend.BucketID { return b.id }
  20. func (b bucket) Name() []byte { return b.name }
  21. func (b bucket) String() string { return string(b.Name()) }
  22. func (b bucket) IsSafeRangeBucket() bool { return b.safeRangeBucket }

代码 2 - 4:Bucket 接口实例和全局桶

缓存中最终为 kv 结构,存储一个 key - value 对,如代码 2 - 5 所示,bucketBuffer 可通过 BucketID 来查找,其 add 方法如见代码 2 - 6,当缓存满时,重新分配 1.5 倍空间。

  1. type kv struct {
  2. key []byte
  3. val []byte
  4. }

代码 2 - 5:kv 结构

  1. func (bb *bucketBuffer) add(k, v []byte) {
  2. bb.buf[bb.used].key, bb.buf[bb.used].val = k, v
  3. bb.used++
  4. if bb.used == len(bb.buf) {
  5. buf := make([]kv, (3*len(bb.buf))/2)
  6. copy(buf, bb.buf)
  7. bb.buf = buf
  8. }
  9. }

代码 2 - 6:添加 kv 进缓存

Batch Transaction

backend-batchtx.svg
图 3:batchTxBuffered 概览

对 Backend 的批处理操作通过图 3 中 batchTxBuffered 结构完成,这个结构是一个带缓存的批处理结构,最终事务提交操作由 bolt.Tx 完成。向 txWriteBuffer 中放入一个 key - value 对操作如代码 2 - 7 所示,注意第 2 行设置 bucket2seq 中对应 BucketID 值是 false。

  1. func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) {
  2. txw.bucket2seq[bucket.ID()] = false
  3. txw.putInternal(bucket, k, v)
  4. }
  5. func (txw *txWriteBuffer) putInternal(bucket Bucket, k, v []byte) {
  6. b, ok := txw.buckets[bucket.ID()]
  7. if !ok {
  8. b = newBucketBuffer()
  9. txw.buckets[bucket.ID()] = b
  10. }
  11. b.add(k, v)
  12. }

代码 2 - 7:txWriteBuffer 写入操作

Commit

创建 batchTxBuffered 结构时,会执行一个 Commit 操作,如代码 2 - 8 第 9 行所示,一个全新的事务为什么要执行一次提交操作呢?在代码 2 - 9 中,可以发现第 3 行中的 commit(false) 操作,接下来我们将详细分析 Commit 都做了什么。

  1. func newBatchTxBuffered(backend *backend) *batchTxBuffered {
  2. tx := &batchTxBuffered{
  3. batchTx: batchTx{backend: backend},
  4. buf: txWriteBuffer{
  5. txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
  6. bucket2seq: make(map[BucketID]bool),
  7. },
  8. }
  9. tx.Commit()
  10. return tx
  11. }

代码 2 - 8:batchTxBuffered 创建

  1. func (t *batchTxBuffered) Commit() {
  2. t.lock()
  3. t.commit(false)
  4. t.Unlock()
  5. }

代码 2 - 9:batchTxBuffered.Commit

执行 commit 时(代码 2 - 10),需要传入一个 stop 参数,这个参数通过名称大致可以猜测用于控制事务终结。首先根据 backend 中是否设置了 Hook 方法,如果设置了,执行其 OnPreCommitUnsafe 方法;接下来,锁定全部读事务,并执行提交操作,操作完成解锁读事务。回忆一下,读事务中包含了只读锁和读写锁,在这里锁定的是读写锁。

  1. func (t *batchTxBuffered) commit(stop bool) {
  2. if t.backend.hooks != nil {
  3. t.backend.hooks.OnPreCommitUnsafe(t)
  4. }
  5. // all read txs must be closed to acquire boltdb commit rwlock
  6. t.backend.readTx.Lock()
  7. t.unsafeCommit(stop)
  8. t.backend.readTx.Unlock()
  9. }

代码 2 - 10:batchTxBuffered.commit

执行 unsafeCommit 时,首先根据 backend 中是否存在真正的 bolt.Tx 事务实例,如果存在,则等待该事务执行 Rollback 操作,并重置 readTx(代码 2 - 12),可以看到主要清理的是缓存内容,并重置事务和等待组实例;如果 stop 为 false,则重建一个只读事务(代码 2 - 11 第 17 行)。

  1. func (t *batchTxBuffered) unsafeCommit(stop bool) {
  2. if t.backend.readTx.tx != nil {
  3. // wait all store read transactions using the current boltdb tx to finish,
  4. // then close the boltdb tx
  5. go func(tx *bolt.Tx, wg *sync.WaitGroup) {
  6. wg.Wait()
  7. if err := tx.Rollback(); err != nil {
  8. t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
  9. }
  10. }(t.backend.readTx.tx, t.backend.readTx.txWg)
  11. t.backend.readTx.reset()
  12. }
  13. t.batchTx.commit(stop)
  14. if !stop {
  15. t.backend.readTx.tx = t.backend.begin(false)
  16. }
  17. }

代码 2 - 11:batchTxBuffered.unsafeCommit

  1. func (rt *readTx) reset() {
  2. rt.buf.reset()
  3. rt.buckets = make(map[BucketID]*bolt.Bucket)
  4. rt.tx = nil
  5. rt.txWg = new(sync.WaitGroup)
  6. }

代码 2 - 12:readTx.reset

最终,执行 batchTx 的 commit 方法,如果此时有 bolt.Tx 实例,执行其 Commit 操作。如果 stop 为 false,重建一个 bolt.Tx 读写事务(代码 2 - 13 第 26 行)。

  1. func (t *batchTx) commit(stop bool) {
  2. // commit the last tx
  3. if t.tx != nil {
  4. if t.pending == 0 && !stop {
  5. return
  6. }
  7. start := time.Now()
  8. // gofail: var beforeCommit struct{}
  9. err := t.tx.Commit()
  10. // gofail: var afterCommit struct{}
  11. rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
  12. spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
  13. writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
  14. commitSec.Observe(time.Since(start).Seconds())
  15. atomic.AddInt64(&t.backend.commits, 1)
  16. t.pending = 0
  17. if err != nil {
  18. t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
  19. }
  20. }
  21. if !stop {
  22. t.tx = t.backend.begin(true)
  23. }
  24. }

代码 2 - 13:batchTx.commit

在提交操作中使用的 backend.begin 操作如代码 2 - 14 所示,根据 write 参数的取值来创建读写、只读事务。

  1. func (b *backend) begin(write bool) *bolt.Tx {
  2. b.mu.RLock()
  3. tx := b.unsafeBegin(write)
  4. b.mu.RUnlock()
  5. size := tx.Size()
  6. db := tx.DB()
  7. stats := db.Stats()
  8. atomic.StoreInt64(&b.size, size)
  9. atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))
  10. atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))
  11. return tx
  12. }
  13. func (b *backend) unsafeBegin(write bool) *bolt.Tx {
  14. tx, err := b.db.Begin(write)
  15. if err != nil {
  16. b.lg.Fatal("failed to begin tx", zap.Error(err))
  17. }
  18. return tx
  19. }

代码 2 - 14:backend.begin

Hooks

backend-hooks.svg
图 4:Hooks 概览

图 4 展示了 Backend 的 Hooks 关键结构,其中 consistentIndex 主要用于保存与 Raft 相关的数据结构,最终还是要通过 backend 的批处理事务进行写入操作。代码 2 - 15 展示了写入 Index 与 Term 的具体细节,这两个值存储在 Meta 桶的 consistent_keyterm 关键字下,使用大端表示法存储。

  1. func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
  2. if index == 0 {
  3. // Never save 0 as it means that we didn't load the real index yet.
  4. return
  5. }
  6. bs1 := make([]byte, 8)
  7. binary.BigEndian.PutUint64(bs1, index)
  8. // put the index into the underlying backend
  9. // tx has been locked in TxnBegin, so there is no need to lock it again
  10. tx.UnsafePut(Meta, MetaConsistentIndexKeyName, bs1)
  11. if term > 0 {
  12. bs2 := make([]byte, 8)
  13. binary.BigEndian.PutUint64(bs2, term)
  14. tx.UnsafePut(Meta, MetaTermKeyName, bs2)
  15. }
  16. }

代码 2 - 15:保存 Index 与 Term

至此,与 Backend 相关的关键结构就基本分析完毕,具体功能细节在后续源码分析中会根据场景再展开讲解。

backend.xml