Backend 是 etcd 中与 blotdb 相接的位置,在了解 Backend 之前,应该熟悉 blotdb 支持哪些功能,实际上 Blotdb 也是支持事务的,但是同时只能支持一个写事务,可以支持多个读事务。

Backend 可以说是对 Blotdb 进行了一些封装,将事务通过实例的形式返回出来,另外还提供了批量事务,还有缓存的功能。

  1. type Backend interface {
  2. // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
  3. ReadTx() ReadTx
  4. BatchTx() BatchTx
  5. // ConcurrentReadTx returns a non-blocking read transaction.
  6. ConcurrentReadTx() ReadTx
  7. Snapshot() Snapshot
  8. Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
  9. // Size returns the current size of the backend physically allocated.
  10. // The backend can hold DB space that is not utilized at the moment,
  11. // since it can conduct pre-allocation or spare unused space for recycling.
  12. // Use SizeInUse() instead for the actual DB size.
  13. Size() int64
  14. // SizeInUse returns the current size of the backend logically in use.
  15. // Since the backend can manage free space in a non-byte unit such as
  16. // number of pages, the returned value can be not exactly accurate in bytes.
  17. SizeInUse() int64
  18. // OpenReadTxN returns the number of currently open read transactions in the backend.
  19. OpenReadTxN() int64
  20. Defrag() error
  21. ForceCommit()
  22. Close() error
  23. }

读事务比较简单,除了加锁之外只提供了两个函数。但是这两个函数都有一个共同的参数 Bucket,所以先来介绍 Bucket。

  1. type ReadTx interface {
  2. Lock()
  3. Unlock()
  4. RLock()
  5. RUnlock()
  6. UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
  7. UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
  8. }

创建 Bucket 与写事务有关,但是这里没有提供写事务,除了 ReadTx 之外,只提供了 BatchTx。可以看到它直接继承了 ReadTx,并且提供了一些其他函数,包括创建删除桶,写入删除键值对,还有提交操作。

  1. type BatchTx interface {
  2. ReadTx
  3. UnsafeCreateBucket(bucket Bucket)
  4. UnsafeDeleteBucket(bucket Bucket)
  5. UnsafePut(bucket Bucket, key []byte, value []byte)
  6. UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
  7. UnsafeDelete(bucket Bucket, key []byte)
  8. // Commit commits a previous tx and begins a new writable one.
  9. Commit()
  10. // CommitAndStop commits the previous tx and does not create a new one.
  11. CommitAndStop()
  12. }

Bucket 类型同样是一个接口,实现的方法非常简单,就是获取 bucket 的字段。简单浏览下创建删除桶的方法,发现就是直接调用 blot.Tx.CreateBucket/blot.Tx.DeleteBucket。接口介绍完毕,下面开始具体实现部分。

  1. type Bucket interface {
  2. // ID returns a unique identifier of a bucket.
  3. // The id must NOT be persisted and can be used as lightweight identificator
  4. // in the in-memory maps.
  5. ID() BucketID
  6. Name() []byte
  7. // String implements Stringer (human readable name).
  8. String() string
  9. // IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
  10. // overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
  11. // is known to never overwrite any key so range is safe.
  12. IsSafeRangeBucket() bool
  13. }

Read

ForEach

foreach 的逻辑如图 1 所示,这里将 Foreach 分为两部分,红色的是通过 Tx 从 bolt 中读取,蓝色的是优先从缓存中读取,如果缓存中有将其标记为访问过,在 bolt 中不会再读取。
etcd-backend-第 4 页.drawio.svg
图 1: foreach

Read 的复杂性主要来自于缓存机制,ForEach 不会创建缓存,只会使用缓存。

Range

range 的逻辑如图 2 所示,同样也没有看到存储将查询到的值存储到 Buffer 的逻辑,那么 Buffer 究竟是如何生成的呢?
etcd-backend-第 6 页.drawio.svg
图 2: range

Write

前面已经说过写操作是通过 BatchTx 实现的,同样这里的批量事务也是有缓冲的,各接口与结构体之间的关系如下。

etcd-backend-第 5 页.drawio.svg
图 3:BatchTx

BatchTx 在每次调用 Unlock 方法的时候会将 writeBuffer 的内容写入到 readBuffer 中,这样就让 readTx 中读取到的数据与是最新的。需要注意这里并不满足事务的隔离性,如果要按照隔离级别来说的话,应该是属于读未提交级别。

  1. func (t *batchTxBuffered) Unlock() {
  2. if t.pending != 0 {
  3. t.backend.readTx.Lock() // blocks txReadBuffer for writing.
  4. t.buf.writeback(&t.backend.readTx.buf)
  5. t.backend.readTx.Unlock()
  6. if t.pending >= t.backend.batchLimit {
  7. t.commit(false)
  8. }
  9. }
  10. t.batchTx.Unlock()
  11. }

在 Backend 创建时,会运行单独的 Goroutine,每隔一段时间自动 Commit 当前的 BatchTx,BatchTx 的 Commit 分为两种,分别是 CommitCommitAndStop,这两者的区别是是否会创建新的 tx。

  1. func (b *backend) run() {
  2. defer close(b.donec)
  3. t := time.NewTimer(b.batchInterval)
  4. defer t.Stop()
  5. for {
  6. select {
  7. case <-t.C:
  8. case <-b.stopc:
  9. b.batchTx.CommitAndStop()
  10. return
  11. }
  12. if b.batchTx.safePending() != 0 {
  13. b.batchTx.Commit()
  14. }
  15. t.Reset(b.batchInterval)
  16. }
  17. }

在 Commit 的同时,BatchTx 还会清理 ReadTx,保证其能够读取到最新的状态。

  1. func (t *batchTxBuffered) unsafeCommit(stop bool) {
  2. if t.backend.readTx.tx != nil {
  3. // wait all store read transactions using the current boltdb tx to finish,
  4. // then close the boltdb tx
  5. go func(tx *bolt.Tx, wg *sync.WaitGroup) {
  6. wg.Wait()
  7. if err := tx.Rollback(); err != nil {
  8. t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
  9. }
  10. }(t.backend.readTx.tx, t.backend.readTx.txWg)
  11. t.backend.readTx.reset()
  12. }
  13. t.batchTx.commit(stop)
  14. if !stop {
  15. t.backend.readTx.tx = t.backend.begin(false)
  16. }
  17. }