Degisn
存储系统的基础逻辑关系如图 1 所示。Backend 接口表示存储组件,核心存储功能最终由 BoltDB 提供,ReadTx 表示读取事务,BatchTx 则表示批处理事务,Snapshot 表示存储快照。
图 1:Backend 接口关联图
Backend 接口定义如下,其中第 6 行可以返回一个并发读事务实例,第 20 行可以获取全部读事务数量,Size 与 SizeInUse 则用于获取存储空间信息,其他方法如 Hash、Defrag 虽然通过名字可以猜测功能大致情况,但还不能准确描述,延后解决。
type Backend interface {
// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
ReadTx() ReadTx
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
ConcurrentReadTx() ReadTx
Snapshot() Snapshot
Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
// Size returns the current size of the backend physically allocated.
// The backend can hold DB space that is not utilized at the moment,
// since it can conduct pre-allocation or spare unused space for recycling.
// Use SizeInUse() instead for the actual DB size.
Size() int64
// SizeInUse returns the current size of the backend logically in use.
// Since the backend can manage free space in a non-byte unit such as
// number of pages, the returned value can be not exactly accurate in bytes.
SizeInUse() int64
// OpenReadTxN returns the number of currently open read transactions in the backend.
OpenReadTxN() int64
Defrag() error
ForceCommit()
Close() error
// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
SetTxPostLockInsideApplyHook(func())
}
代码 1 - 1:Backend 接口定义
ReadTx
ReadTx 接口定义中主要定义了锁方法及访问、遍历方法。从接口定义中,可以看到有两种不同锁方式。根据命名方式我们猜测 4 ~ 5 行与只读锁相关,2 ~ 3 行与写锁使用相关,那么显然有一个问题:为什么需要两种锁存在呢?
type ReadTx interface {
Lock()
Unlock()
RLock()
RUnlock()
UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
}
代码 1 - 2:ReadTx 接口定义
BatchTx
BatchTx 包含一个 ReadTx 接口,因此之前的写锁应该是与批处理相关的,当然在 BatchTx 接口中又定义了两个所相关方法:LockInsideApply、LockOutsideApply,这两个作用还要根据对该接口实现机制进一步分析,其他方法都容易理解。
type BatchTx interface {
ReadTx
UnsafeCreateBucket(bucket Bucket)
UnsafeDeleteBucket(bucket Bucket)
UnsafePut(bucket Bucket, key []byte, value []byte)
UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
UnsafeDelete(bucket Bucket, key []byte)
// Commit commits a previous tx and begins a new writable one.
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
CommitAndStop()
LockInsideApply()
LockOutsideApply()
}
代码 1 - 3:BatchTx 接口定义
Snapshot
Snapshot 接口定义非常简单,关键方法应该是 WriteTo 方法,由这个方法可以猜测存储快照有一定可能性是用于网络传输的。
type Snapshot interface {
// Size gets the size of the snapshot.
Size() int64
// WriteTo writes the snapshot into the given writer.
WriteTo(w io.Writer) (n int64, err error)
// Close closes the snapshot.
Close() error
}
代码 1 - 4:Snapshot 接口定义
Backend Instance
Backend 实例创建过程见代码 2 - 1。首先,创建 BoltDB 对象,因为 BoltDB 是类似于 sqlite 的文件格式,因此需要传入一个主目录(用户可配置);然后创建读事务、批处理事务;最后启动一个协程执行定时提交批处理事务功能。第 54 行启动的协程用于执行批处理事务定时提交操作,详见代码 2 - 2。
func newBackend(bcfg BackendConfig) *backend {
bopts := &bolt.Options{}
if boltOpenOptions != nil {
*bopts = *boltOpenOptions
}
bopts.InitialMmapSize = bcfg.mmapSize()
bopts.FreelistType = bcfg.BackendFreelistType
bopts.NoSync = bcfg.UnsafeNoFsync
bopts.NoGrowSync = bcfg.UnsafeNoFsync
bopts.Mlock = bcfg.Mlock
db, err := bolt.Open(bcfg.Path, 0600, bopts)
if err != nil {
bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
}
// In future, may want to make buffering optional for low-concurrency systems
// or dynamically swap between buffered/non-buffered depending on workload.
b := &backend{
bopts: bopts,
db: db,
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit,
mlock: bcfg.Mlock,
readTx: &readTx{
baseReadTx: baseReadTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
bufVersion: 0,
},
buckets: make(map[BucketID]*bolt.Bucket),
txWg: new(sync.WaitGroup),
txMu: new(sync.RWMutex),
},
},
txReadBufferCache: txReadBufferCache{
mu: sync.Mutex{},
bufVersion: 0,
buf: nil,
},
stopc: make(chan struct{}),
donec: make(chan struct{}),
lg: bcfg.Logger,
}
b.batchTx = newBatchTxBuffered(b)
// We set it after newBatchTxBuffered to skip the 'empty' commit.
b.hooks = bcfg.Hooks
go b.run()
return b
}
代码 2 - 1:Backend 实例创建
后台定时执行批处理事务提交,注意第 9 行与低 13 行的两种提交方式,第 9 行的提交是由停止信号控制的,提交后不会再开启新的只读事务和批处理事务;第 13 行的提交完成后会开启新的只读事务和批处理事务。
func (b *backend) run() {
defer close(b.donec)
t := time.NewTimer(b.batchInterval)
defer t.Stop()
for {
select {
case <-t.C:
case <-b.stopc:
b.batchTx.CommitAndStop()
return
}
if b.batchTx.safePending() != 0 {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
}
}
代码 2 - 2:后台定时任务
Read Transaction
图 2:baseReadTx 概览
Read Transaction 大致可以分为数据库访问部分和缓冲部分。设计的关键是通过 BucketID 对数据和缓冲进行关联。
type BucketID int
type Bucket interface {
// ID returns a unique identifier of a bucket.
// The id must NOT be persisted and can be used as lightweight identificator
// in the in-memory maps.
ID() BucketID
Name() []byte
// String implements Stringer (human readable name).
String() string
// IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
// is known to never overwrite any key so range is safe.
IsSafeRangeBucket() bool
}
代码 2 - 3:BucketID 与 Bucket
Bucket 接口通过 bucket 机构来实现。同时,全局定义了 Key、Meta、Lease、Alarm、Cluster、Members、MembersRemoved、Auth、AuthUsers、AuthRoles 和 Test 桶。
var (
Key = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true})
Meta = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false})
Lease = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false})
Alarm = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false})
Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false})
Members = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false})
MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false})
Auth = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false})
AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false})
AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false})
Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false})
)
type bucket struct {
id backend.BucketID
name []byte
safeRangeBucket bool
}
func (b bucket) ID() backend.BucketID { return b.id }
func (b bucket) Name() []byte { return b.name }
func (b bucket) String() string { return string(b.Name()) }
func (b bucket) IsSafeRangeBucket() bool { return b.safeRangeBucket }
代码 2 - 4:Bucket 接口实例和全局桶
缓存中最终为 kv 结构,存储一个 key - value 对,如代码 2 - 5 所示,bucketBuffer 可通过 BucketID 来查找,其 add 方法如见代码 2 - 6,当缓存满时,重新分配 1.5 倍空间。
type kv struct {
key []byte
val []byte
}
代码 2 - 5:kv 结构
func (bb *bucketBuffer) add(k, v []byte) {
bb.buf[bb.used].key, bb.buf[bb.used].val = k, v
bb.used++
if bb.used == len(bb.buf) {
buf := make([]kv, (3*len(bb.buf))/2)
copy(buf, bb.buf)
bb.buf = buf
}
}
代码 2 - 6:添加 kv 进缓存
Batch Transaction
图 3:batchTxBuffered 概览
对 Backend 的批处理操作通过图 3 中 batchTxBuffered 结构完成,这个结构是一个带缓存的批处理结构,最终事务提交操作由 bolt.Tx 完成。向 txWriteBuffer 中放入一个 key - value 对操作如代码 2 - 7 所示,注意第 2 行设置 bucket2seq 中对应 BucketID 值是 false。
func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) {
txw.bucket2seq[bucket.ID()] = false
txw.putInternal(bucket, k, v)
}
func (txw *txWriteBuffer) putInternal(bucket Bucket, k, v []byte) {
b, ok := txw.buckets[bucket.ID()]
if !ok {
b = newBucketBuffer()
txw.buckets[bucket.ID()] = b
}
b.add(k, v)
}
代码 2 - 7:txWriteBuffer 写入操作
Commit
创建 batchTxBuffered 结构时,会执行一个 Commit 操作,如代码 2 - 8 第 9 行所示,一个全新的事务为什么要执行一次提交操作呢?在代码 2 - 9 中,可以发现第 3 行中的 commit(false) 操作,接下来我们将详细分析 Commit 都做了什么。
func newBatchTxBuffered(backend *backend) *batchTxBuffered {
tx := &batchTxBuffered{
batchTx: batchTx{backend: backend},
buf: txWriteBuffer{
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
bucket2seq: make(map[BucketID]bool),
},
}
tx.Commit()
return tx
}
代码 2 - 8:batchTxBuffered 创建
func (t *batchTxBuffered) Commit() {
t.lock()
t.commit(false)
t.Unlock()
}
代码 2 - 9:batchTxBuffered.Commit
执行 commit 时(代码 2 - 10),需要传入一个 stop 参数,这个参数通过名称大致可以猜测用于控制事务终结。首先根据 backend 中是否设置了 Hook 方法,如果设置了,执行其 OnPreCommitUnsafe 方法;接下来,锁定全部读事务,并执行提交操作,操作完成解锁读事务。回忆一下,读事务中包含了只读锁和读写锁,在这里锁定的是读写锁。
func (t *batchTxBuffered) commit(stop bool) {
if t.backend.hooks != nil {
t.backend.hooks.OnPreCommitUnsafe(t)
}
// all read txs must be closed to acquire boltdb commit rwlock
t.backend.readTx.Lock()
t.unsafeCommit(stop)
t.backend.readTx.Unlock()
}
代码 2 - 10:batchTxBuffered.commit
执行 unsafeCommit 时,首先根据 backend 中是否存在真正的 bolt.Tx 事务实例,如果存在,则等待该事务执行 Rollback 操作,并重置 readTx(代码 2 - 12),可以看到主要清理的是缓存内容,并重置事务和等待组实例;如果 stop 为 false,则重建一个只读事务(代码 2 - 11 第 17 行)。
func (t *batchTxBuffered) unsafeCommit(stop bool) {
if t.backend.readTx.tx != nil {
// wait all store read transactions using the current boltdb tx to finish,
// then close the boltdb tx
go func(tx *bolt.Tx, wg *sync.WaitGroup) {
wg.Wait()
if err := tx.Rollback(); err != nil {
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
}
}(t.backend.readTx.tx, t.backend.readTx.txWg)
t.backend.readTx.reset()
}
t.batchTx.commit(stop)
if !stop {
t.backend.readTx.tx = t.backend.begin(false)
}
}
代码 2 - 11:batchTxBuffered.unsafeCommit
func (rt *readTx) reset() {
rt.buf.reset()
rt.buckets = make(map[BucketID]*bolt.Bucket)
rt.tx = nil
rt.txWg = new(sync.WaitGroup)
}
代码 2 - 12:readTx.reset
最终,执行 batchTx 的 commit 方法,如果此时有 bolt.Tx 实例,执行其 Commit 操作。如果 stop 为 false,重建一个 bolt.Tx 读写事务(代码 2 - 13 第 26 行)。
func (t *batchTx) commit(stop bool) {
// commit the last tx
if t.tx != nil {
if t.pending == 0 && !stop {
return
}
start := time.Now()
// gofail: var beforeCommit struct{}
err := t.tx.Commit()
// gofail: var afterCommit struct{}
rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
commitSec.Observe(time.Since(start).Seconds())
atomic.AddInt64(&t.backend.commits, 1)
t.pending = 0
if err != nil {
t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
}
}
if !stop {
t.tx = t.backend.begin(true)
}
}
代码 2 - 13:batchTx.commit
在提交操作中使用的 backend.begin 操作如代码 2 - 14 所示,根据 write 参数的取值来创建读写、只读事务。
func (b *backend) begin(write bool) *bolt.Tx {
b.mu.RLock()
tx := b.unsafeBegin(write)
b.mu.RUnlock()
size := tx.Size()
db := tx.DB()
stats := db.Stats()
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))
atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))
return tx
}
func (b *backend) unsafeBegin(write bool) *bolt.Tx {
tx, err := b.db.Begin(write)
if err != nil {
b.lg.Fatal("failed to begin tx", zap.Error(err))
}
return tx
}
代码 2 - 14:backend.begin
Hooks
图 4:Hooks 概览
图 4 展示了 Backend 的 Hooks 关键结构,其中 consistentIndex 主要用于保存与 Raft 相关的数据结构,最终还是要通过 backend 的批处理事务进行写入操作。代码 2 - 15 展示了写入 Index 与 Term 的具体细节,这两个值存储在 Meta 桶的 consistent_key 和 term 关键字下,使用大端表示法存储。
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
if index == 0 {
// Never save 0 as it means that we didn't load the real index yet.
return
}
bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(Meta, MetaConsistentIndexKeyName, bs1)
if term > 0 {
bs2 := make([]byte, 8)
binary.BigEndian.PutUint64(bs2, term)
tx.UnsafePut(Meta, MetaTermKeyName, bs2)
}
}
代码 2 - 15:保存 Index 与 Term
至此,与 Backend 相关的关键结构就基本分析完毕,具体功能细节在后续源码分析中会根据场景再展开讲解。