Backend 是 etcd 中与 blotdb 相接的位置,在了解 Backend 之前,应该熟悉 blotdb 支持哪些功能,实际上 Blotdb 也是支持事务的,但是同时只能支持一个写事务,可以支持多个读事务。
Backend 可以说是对 Blotdb 进行了一些封装,将事务通过实例的形式返回出来,另外还提供了批量事务,还有缓存的功能。
type Backend interface {// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.ReadTx() ReadTxBatchTx() BatchTx// ConcurrentReadTx returns a non-blocking read transaction.ConcurrentReadTx() ReadTxSnapshot() SnapshotHash(ignores func(bucketName, keyName []byte) bool) (uint32, error)// Size returns the current size of the backend physically allocated.// The backend can hold DB space that is not utilized at the moment,// since it can conduct pre-allocation or spare unused space for recycling.// Use SizeInUse() instead for the actual DB size.Size() int64// SizeInUse returns the current size of the backend logically in use.// Since the backend can manage free space in a non-byte unit such as// number of pages, the returned value can be not exactly accurate in bytes.SizeInUse() int64// OpenReadTxN returns the number of currently open read transactions in the backend.OpenReadTxN() int64Defrag() errorForceCommit()Close() error}
读事务比较简单,除了加锁之外只提供了两个函数。但是这两个函数都有一个共同的参数 Bucket,所以先来介绍 Bucket。
type ReadTx interface {Lock()Unlock()RLock()RUnlock()UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error}
创建 Bucket 与写事务有关,但是这里没有提供写事务,除了 ReadTx 之外,只提供了 BatchTx。可以看到它直接继承了 ReadTx,并且提供了一些其他函数,包括创建删除桶,写入删除键值对,还有提交操作。
type BatchTx interface {ReadTxUnsafeCreateBucket(bucket Bucket)UnsafeDeleteBucket(bucket Bucket)UnsafePut(bucket Bucket, key []byte, value []byte)UnsafeSeqPut(bucket Bucket, key []byte, value []byte)UnsafeDelete(bucket Bucket, key []byte)// Commit commits a previous tx and begins a new writable one.Commit()// CommitAndStop commits the previous tx and does not create a new one.CommitAndStop()}
Bucket 类型同样是一个接口,实现的方法非常简单,就是获取 bucket 的字段。简单浏览下创建删除桶的方法,发现就是直接调用 blot.Tx.CreateBucket/blot.Tx.DeleteBucket。接口介绍完毕,下面开始具体实现部分。
type Bucket interface {// ID returns a unique identifier of a bucket.// The id must NOT be persisted and can be used as lightweight identificator// in the in-memory maps.ID() BucketIDName() []byte// String implements Stringer (human readable name).String() string// IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket// is known to never overwrite any key so range is safe.IsSafeRangeBucket() bool}
Read
ForEach
foreach 的逻辑如图 1 所示,这里将 Foreach 分为两部分,红色的是通过 Tx 从 bolt 中读取,蓝色的是优先从缓存中读取,如果缓存中有将其标记为访问过,在 bolt 中不会再读取。
图 1: foreach
Read 的复杂性主要来自于缓存机制,ForEach 不会创建缓存,只会使用缓存。
Range
range 的逻辑如图 2 所示,同样也没有看到存储将查询到的值存储到 Buffer 的逻辑,那么 Buffer 究竟是如何生成的呢?
图 2: range
Write
前面已经说过写操作是通过 BatchTx 实现的,同样这里的批量事务也是有缓冲的,各接口与结构体之间的关系如下。
图 3:BatchTx
BatchTx 在每次调用 Unlock 方法的时候会将 writeBuffer 的内容写入到 readBuffer 中,这样就让 readTx 中读取到的数据与是最新的。需要注意这里并不满足事务的隔离性,如果要按照隔离级别来说的话,应该是属于读未提交级别。
func (t *batchTxBuffered) Unlock() {if t.pending != 0 {t.backend.readTx.Lock() // blocks txReadBuffer for writing.t.buf.writeback(&t.backend.readTx.buf)t.backend.readTx.Unlock()if t.pending >= t.backend.batchLimit {t.commit(false)}}t.batchTx.Unlock()}
在 Backend 创建时,会运行单独的 Goroutine,每隔一段时间自动 Commit 当前的 BatchTx,BatchTx 的 Commit 分为两种,分别是 Commit 和 CommitAndStop,这两者的区别是是否会创建新的 tx。
func (b *backend) run() {defer close(b.donec)t := time.NewTimer(b.batchInterval)defer t.Stop()for {select {case <-t.C:case <-b.stopc:b.batchTx.CommitAndStop()return}if b.batchTx.safePending() != 0 {b.batchTx.Commit()}t.Reset(b.batchInterval)}}
在 Commit 的同时,BatchTx 还会清理 ReadTx,保证其能够读取到最新的状态。
func (t *batchTxBuffered) unsafeCommit(stop bool) {if t.backend.readTx.tx != nil {// wait all store read transactions using the current boltdb tx to finish,// then close the boltdb txgo func(tx *bolt.Tx, wg *sync.WaitGroup) {wg.Wait()if err := tx.Rollback(); err != nil {t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))}}(t.backend.readTx.tx, t.backend.readTx.txWg)t.backend.readTx.reset()}t.batchTx.commit(stop)if !stop {t.backend.readTx.tx = t.backend.begin(false)}}
