Backend 是 etcd 中与 blotdb 相接的位置,在了解 Backend 之前,应该熟悉 blotdb 支持哪些功能,实际上 Blotdb 也是支持事务的,但是同时只能支持一个写事务,可以支持多个读事务。
Backend 可以说是对 Blotdb 进行了一些封装,将事务通过实例的形式返回出来,另外还提供了批量事务,还有缓存的功能。
type Backend interface {
// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
ReadTx() ReadTx
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
ConcurrentReadTx() ReadTx
Snapshot() Snapshot
Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
// Size returns the current size of the backend physically allocated.
// The backend can hold DB space that is not utilized at the moment,
// since it can conduct pre-allocation or spare unused space for recycling.
// Use SizeInUse() instead for the actual DB size.
Size() int64
// SizeInUse returns the current size of the backend logically in use.
// Since the backend can manage free space in a non-byte unit such as
// number of pages, the returned value can be not exactly accurate in bytes.
SizeInUse() int64
// OpenReadTxN returns the number of currently open read transactions in the backend.
OpenReadTxN() int64
Defrag() error
ForceCommit()
Close() error
}
读事务比较简单,除了加锁之外只提供了两个函数。但是这两个函数都有一个共同的参数 Bucket,所以先来介绍 Bucket。
type ReadTx interface {
Lock()
Unlock()
RLock()
RUnlock()
UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
}
创建 Bucket 与写事务有关,但是这里没有提供写事务,除了 ReadTx 之外,只提供了 BatchTx。可以看到它直接继承了 ReadTx,并且提供了一些其他函数,包括创建删除桶,写入删除键值对,还有提交操作。
type BatchTx interface {
ReadTx
UnsafeCreateBucket(bucket Bucket)
UnsafeDeleteBucket(bucket Bucket)
UnsafePut(bucket Bucket, key []byte, value []byte)
UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
UnsafeDelete(bucket Bucket, key []byte)
// Commit commits a previous tx and begins a new writable one.
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
CommitAndStop()
}
Bucket 类型同样是一个接口,实现的方法非常简单,就是获取 bucket 的字段。简单浏览下创建删除桶的方法,发现就是直接调用 blot.Tx.CreateBucket/blot.Tx.DeleteBucket
。接口介绍完毕,下面开始具体实现部分。
type Bucket interface {
// ID returns a unique identifier of a bucket.
// The id must NOT be persisted and can be used as lightweight identificator
// in the in-memory maps.
ID() BucketID
Name() []byte
// String implements Stringer (human readable name).
String() string
// IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
// is known to never overwrite any key so range is safe.
IsSafeRangeBucket() bool
}
Read
ForEach
foreach 的逻辑如图 1 所示,这里将 Foreach 分为两部分,红色的是通过 Tx 从 bolt 中读取,蓝色的是优先从缓存中读取,如果缓存中有将其标记为访问过,在 bolt 中不会再读取。
图 1: foreach
Read 的复杂性主要来自于缓存机制,ForEach 不会创建缓存,只会使用缓存。
Range
range 的逻辑如图 2 所示,同样也没有看到存储将查询到的值存储到 Buffer 的逻辑,那么 Buffer 究竟是如何生成的呢?
图 2: range
Write
前面已经说过写操作是通过 BatchTx 实现的,同样这里的批量事务也是有缓冲的,各接口与结构体之间的关系如下。
图 3:BatchTx
BatchTx 在每次调用 Unlock 方法的时候会将 writeBuffer 的内容写入到 readBuffer 中,这样就让 readTx 中读取到的数据与是最新的。需要注意这里并不满足事务的隔离性,如果要按照隔离级别来说的话,应该是属于读未提交级别。
func (t *batchTxBuffered) Unlock() {
if t.pending != 0 {
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.Unlock()
if t.pending >= t.backend.batchLimit {
t.commit(false)
}
}
t.batchTx.Unlock()
}
在 Backend 创建时,会运行单独的 Goroutine,每隔一段时间自动 Commit 当前的 BatchTx,BatchTx 的 Commit 分为两种,分别是 Commit
和 CommitAndStop
,这两者的区别是是否会创建新的 tx。
func (b *backend) run() {
defer close(b.donec)
t := time.NewTimer(b.batchInterval)
defer t.Stop()
for {
select {
case <-t.C:
case <-b.stopc:
b.batchTx.CommitAndStop()
return
}
if b.batchTx.safePending() != 0 {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
}
}
在 Commit 的同时,BatchTx 还会清理 ReadTx,保证其能够读取到最新的状态。
func (t *batchTxBuffered) unsafeCommit(stop bool) {
if t.backend.readTx.tx != nil {
// wait all store read transactions using the current boltdb tx to finish,
// then close the boltdb tx
go func(tx *bolt.Tx, wg *sync.WaitGroup) {
wg.Wait()
if err := tx.Rollback(); err != nil {
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
}
}(t.backend.readTx.tx, t.backend.readTx.txWg)
t.backend.readTx.reset()
}
t.batchTx.commit(stop)
if !stop {
t.backend.readTx.tx = t.backend.begin(false)
}
}