etcd使用了boltdb的事务特性。但是优化了写,实现了batch write 源码会删除部分trace/log代码

1、etcd如何封装boltdb

1、第一层封装(backend模块)

此模块向MVCC模块提供了基础的CURD API,接口如下:

  1. type Backend interface {
  2. // 返回一个读事务
  3. ReadTx() ReadTx
  4. // 返回一个写事务
  5. BatchTx() BatchTx
  6. // 返回一个并发读事务(此处主要是优化写事务回写buf的锁逻辑,后续会详细说明)
  7. ConcurrentReadTx() ReadTx
  8. // 返回一个快照结构体(复用的blotdb的snapshot api)
  9. Snapshot() Snapshot
  10. // 暂时忽略
  11. Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
  12. // 返回存储模块分配的空间大小。本质上就是boltdb的文件大小(包含所有可用的free page)
  13. Size() int64
  14. // 返回存储模块实际使用的空间大小。本质上就是boltdb的正在使用的page,不包含可用的free page
  15. SizeInUse() int64
  16. // 返回当前正在打开的读事务数量
  17. OpenReadTxN() int64
  18. // 整理boltdb的磁盘布局。将数据重新按照连续page存储,降低使用的空间大小
  19. // 底层就是将当前db的数据读取出来,然后写入新的db
  20. Defrag() error
  21. // 强制提交事务
  22. ForceCommit()
  23. // 底层逻辑是关闭异步提交事务的线程
  24. Close() error
  25. // SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
  26. SetTxPostLockInsideApplyHook(func())
  27. }
  28. type ReadTx interface {
  29. // X锁主要用于写事务将未落盘的变更刷回读事务的buf
  30. Lock()
  31. Unlock()
  32. // S锁用于读事务
  33. RLock()
  34. RUnlock()
  35. // 读API
  36. // 实际使用需要如下流程:
  37. // tx.RLock()
  38. // tx.UnsafeRange or tx.UnsafeForEach
  39. // tx.RUnlock
  40. UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
  41. UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
  42. }
  43. type BatchTx interface {
  44. // 其中只实现了Lock/Unlock,未实现RLock/RUnlock
  45. // 读API本质上同ReadTx,底层还是复用boltdb的读写事务能力
  46. ReadTx
  47. // 写API
  48. // 实际使用需要如下流程:
  49. // tx.Lock()
  50. // tx.写API
  51. // tx.Unlock
  52. // 也因此ectd写本质上是串行的,也是因为boltdb也是串行的
  53. UnsafeCreateBucket(bucket Bucket)
  54. UnsafeDeleteBucket(bucket Bucket)
  55. UnsafePut(bucket Bucket, key []byte, value []byte)
  56. UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
  57. UnsafeDelete(bucket Bucket, key []byte)
  58. // 异步线程提交事务使用
  59. Commit()
  60. CommitAndStop()
  61. // 与raft等相关,暂时忽略
  62. LockInsideApply()
  63. LockOutsideApply()
  64. }

1.1、ReadTx实现

  1. type baseReadTx struct {
  2. // 保护buf字段
  3. mu sync.RWMutex
  4. // 保存未提交的写事务数据,保证读事务可以读取到新数据
  5. buf txReadBuffer
  6. // 保护boltdb的事务实例以及buckets
  7. txMu *sync.RWMutex
  8. tx *bolt.Tx
  9. buckets map[BucketID]*bolt.Bucket
  10. // 用于等待并发读事务结束
  11. // 也是写事务提交的时候会用到
  12. // 因为会重置读事务,但是又不互斥,因此通过waitGroup来保证重置读事务的时候
  13. // 不存在还有读事务未完成
  14. txWg *sync.WaitGroup
  15. }
  16. // 遍历给定bucket数据
  17. func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
  18. dups := make(map[string]struct{})
  19. getDups := func(k, v []byte) error {
  20. dups[string(k)] = struct{}{}
  21. return nil
  22. }
  23. visitNoDup := func(k, v []byte) error {
  24. if _, ok := dups[string(k)]; ok {
  25. return nil
  26. }
  27. return visitor(k, v)
  28. }
  29. // 先从buf读取数据,读取到新数据
  30. if err := baseReadTx.buf.ForEach(bucket, getDups); err != nil {
  31. return err
  32. }
  33. // 然后先遍历已提交数据
  34. baseReadTx.txMu.Lock()
  35. err := unsafeForEach(baseReadTx.tx, bucket, visitNoDup)
  36. baseReadTx.txMu.Unlock()
  37. if err != nil {
  38. return err
  39. }
  40. // 再便利未提交数据
  41. return baseReadTx.buf.ForEach(bucket, visitor)
  42. }
  43. func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
  44. if endKey == nil {
  45. limit = 1
  46. }
  47. if limit <= 0 {
  48. limit = math.MaxInt64
  49. }
  50. if limit > 1 && !bucketType.IsSafeRangeBucket() {
  51. panic("do not use unsafeRange on non-keys bucket")
  52. }
  53. // 如果buf中已经有全量数据,则直接返回
  54. keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
  55. if int64(len(keys)) == limit {
  56. return keys, vals
  57. }
  58. // 否则从boltdb读取数据并将bucket进行缓存
  59. bn := bucketType.ID()
  60. baseReadTx.txMu.RLock()
  61. bucket, ok := baseReadTx.buckets[bn]
  62. baseReadTx.txMu.RUnlock()
  63. lockHeld := false
  64. if !ok {
  65. baseReadTx.txMu.Lock()
  66. lockHeld = true
  67. bucket = baseReadTx.tx.Bucket(bucketType.Name())
  68. baseReadTx.buckets[bn] = bucket
  69. }
  70. if bucket == nil {
  71. if lockHeld {
  72. baseReadTx.txMu.Unlock()
  73. }
  74. return keys, vals
  75. }
  76. if !lockHeld {
  77. baseReadTx.txMu.Lock()
  78. }
  79. c := bucket.Cursor()
  80. baseReadTx.txMu.Unlock()
  81. k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
  82. return append(k2, keys...), append(v2, vals...)
  83. }
  84. // 普通读
  85. // 普通读因为要获取读锁,因此会阻塞写事务刷新未提交事务到写事务的buf
  86. type readTx struct {
  87. baseReadTx
  88. }
  89. func (rt *readTx) Lock() { rt.mu.Lock() }
  90. func (rt *readTx) Unlock() { rt.mu.Unlock() }
  91. func (rt *readTx) RLock() { rt.mu.RLock() }
  92. func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
  93. // 重置读事务
  94. // 当写事务提交完成的时候会重置读事务
  95. func (rt *readTx) reset() {
  96. rt.buf.reset()
  97. rt.buckets = make(map[BucketID]*bolt.Bucket)
  98. rt.tx = nil
  99. rt.txWg = new(sync.WaitGroup)
  100. }
  101. // 并发读
  102. // 并发读lock均为空,因此不会阻塞写事务刷新未提交事务到写事务的buf
  103. // 但是回copy一份读事务的buf
  104. type concurrentReadTx struct {
  105. baseReadTx
  106. }
  107. func (rt *concurrentReadTx) Lock() {}
  108. func (rt *concurrentReadTx) Unlock() {}
  109. func (rt *concurrentReadTx) RLock() {}
  110. func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }

1.2、BatchTx实现

  1. type batchTx struct {
  2. sync.Mutex
  3. tx *bolt.Tx
  4. backend *backend
  5. pending int // 待提交的事务数量
  6. }
  7. // 一般不会使用batchTx的Unlock
  8. // 使用的是带buf的batchTx的Unlock
  9. func (t *batchTx的) Lock() {
  10. ValidateCalledInsideUnittest(t.backend.lg)
  11. t.lock()
  12. }
  13. func (t *batchTx) Unlock() {
  14. if t.pending >= t.backend.batchLimit {
  15. t.commit(false)
  16. }
  17. t.Mutex.Unlock()
  18. }
  19. // LockInsideApply/LockOutsideApply先忽略
  20. func (t *batchTx) LockInsideApply() {
  21. t.lock()
  22. if t.backend.txPostLockInsideApplyHook != nil {
  23. // The callers of some methods (i.e., (*RaftCluster).AddMember)
  24. // can be coming from both InsideApply and OutsideApply, but the
  25. // callers from OutsideApply will have a nil txPostLockInsideApplyHook.
  26. // So we should check the txPostLockInsideApplyHook before validating
  27. // the callstack.
  28. ValidateCalledInsideApply(t.backend.lg)
  29. t.backend.txPostLockInsideApplyHook()
  30. }
  31. }
  32. func (t *batchTx) LockOutsideApply() {
  33. ValidateCalledOutSideApply(t.backend.lg)
  34. t.lock()
  35. }
  36. func (t *batchTx) lock() {
  37. t.Mutex.Lock()
  38. }
  39. // 写事务不允许调用RLock/RUnlock
  40. func (t *batchTx) RLock() {
  41. panic("unexpected RLock")
  42. }
  43. func (t *batchTx) RUnlock() {
  44. panic("unexpected RUnlock")
  45. }
  46. // 以下为CURD API。均为调用boltdb api
  47. // 不论是否成功,均pending++
  48. // ???不管是否成功,均忽略确定没有问题???
  49. func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
  50. _, err := t.tx.CreateBucket(bucket.Name())
  51. if err != nil && err != bolt.ErrBucketExists {
  52. t.backend.lg.Fatal(
  53. "failed to create a bucket",
  54. zap.Stringer("bucket-name", bucket),
  55. zap.Error(err),
  56. )
  57. }
  58. t.pending++
  59. }
  60. func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {
  61. err := t.tx.DeleteBucket(bucket.Name())
  62. if err != nil && err != bolt.ErrBucketNotFound {
  63. t.backend.lg.Fatal(
  64. "failed to delete a bucket",
  65. zap.Stringer("bucket-name", bucket),
  66. zap.Error(err),
  67. )
  68. }
  69. t.pending++
  70. }
  71. func (t *batchTx) UnsafePut(bucket Bucket, key []byte, value []byte) {
  72. t.unsafePut(bucket, key, value, false)
  73. }
  74. func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
  75. t.unsafePut(bucket, key, value, true)
  76. }
  77. func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
  78. bucket := t.tx.Bucket(bucketType.Name())
  79. if bucket == nil {
  80. t.backend.lg.Fatal(
  81. "failed to find a bucket",
  82. zap.Stringer("bucket-name", bucketType),
  83. zap.Stack("stack"),
  84. )
  85. }
  86. if seq {
  87. // it is useful to increase fill percent when the workloads are mostly append-only.
  88. // this can delay the page split and reduce space usage.
  89. bucket.FillPercent = 0.9
  90. }
  91. if err := bucket.Put(key, value); err != nil {
  92. t.backend.lg.Fatal(
  93. "failed to write to a bucket",
  94. zap.Stringer("bucket-name", bucketType),
  95. zap.Error(err),
  96. )
  97. }
  98. t.pending++
  99. }
  100. func (t *batchTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
  101. bucket := t.tx.Bucket(bucketType.Name())
  102. if bucket == nil {
  103. t.backend.lg.Fatal(
  104. "failed to find a bucket",
  105. zap.Stringer("bucket-name", bucketType),
  106. zap.Stack("stack"),
  107. )
  108. }
  109. return unsafeRange(bucket.Cursor(), key, endKey, limit)
  110. }
  111. func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {
  112. bucket := t.tx.Bucket(bucketType.Name())
  113. if bucket == nil {
  114. t.backend.lg.Fatal(
  115. "failed to find a bucket",
  116. zap.Stringer("bucket-name", bucketType),
  117. zap.Stack("stack"),
  118. )
  119. }
  120. err := bucket.Delete(key)
  121. if err != nil {
  122. t.backend.lg.Fatal(
  123. "failed to delete a key",
  124. zap.Stringer("bucket-name", bucketType),
  125. zap.Error(err),
  126. )
  127. }
  128. t.pending++
  129. }
  130. func (t *batchTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
  131. return unsafeForEach(t.tx, bucket, visitor)
  132. }
  133. // 用于异步提交API使用
  134. func (t *batchTx) Commit() {
  135. t.lock()
  136. t.commit(false)
  137. t.Unlock()
  138. }
  139. func (t *batchTx) CommitAndStop() {
  140. t.lock()
  141. t.commit(true)
  142. t.Unlock()
  143. }
  144. func (t *batchTx) safePending() int {
  145. t.Mutex.Lock()
  146. defer t.Mutex.Unlock()
  147. return t.pending
  148. }
  149. // 提交事务并重置写事务
  150. func (t *batchTx) commit(stop bool) {
  151. if t.tx != nil {
  152. if t.pending == 0 && !stop {
  153. return
  154. }
  155. start := time.Now()
  156. // gofail: var beforeCommit struct{}
  157. err := t.tx.Commit()
  158. // gofail: var afterCommit struct{}
  159. rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
  160. spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
  161. writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
  162. commitSec.Observe(time.Since(start).Seconds())
  163. atomic.AddInt64(&t.backend.commits, 1)
  164. t.pending = 0
  165. if err != nil {
  166. t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
  167. }
  168. }
  169. if !stop {
  170. t.tx = t.backend.begin(true)
  171. }
  172. }
  173. // 带有buf的batchTx
  174. type batchTxBuffered struct {
  175. batchTx
  176. buf txWriteBuffer
  177. }
  178. func newBatchTxBuffered(backend *backend) *batchTxBuffered {
  179. tx := &batchTxBuffered{
  180. batchTx: batchTx{backend: backend},
  181. buf: txWriteBuffer{
  182. txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
  183. bucket2seq: make(map[BucketID]bool),
  184. },
  185. }
  186. tx.Commit()
  187. return tx
  188. }
  189. func (t *batchTxBuffered) Unlock() {
  190. // 前面讲过写事务的使用方法
  191. // 当前调用Unlock的时候,会认为写事务以及结束(实际上并未commit)
  192. // 因此需要将当前的buf刷回到读事务那里去,保证可以读取到新数据
  193. if t.pending != 0 {
  194. t.backend.readTx.Lock()
  195. t.buf.writeback(&t.backend.readTx.buf)
  196. t.backend.readTx.Unlock()
  197. // 如果已经pending数量超过限定数量,强制commit
  198. if t.pending >= t.backend.batchLimit {
  199. t.commit(false)
  200. }
  201. }
  202. t.batchTx.Unlock()
  203. }
  204. func (t *batchTxBuffered) Commit() {
  205. t.lock()
  206. t.commit(false)
  207. t.Unlock()
  208. }
  209. func (t *batchTxBuffered) CommitAndStop() {
  210. t.lock()
  211. t.commit(true)
  212. t.Unlock()
  213. }
  214. func (t *batchTxBuffered) commit(stop bool) {
  215. if t.backend.hooks != nil {
  216. t.backend.hooks.OnPreCommitUnsafe(t)
  217. }
  218. // 此处commit会重置读事务,因此需要lock
  219. t.backend.readTx.Lock()
  220. t.unsafeCommit(stop)
  221. t.backend.readTx.Unlock()
  222. }
  223. func (t *batchTxBuffered) unsafeCommit(stop bool) {
  224. if t.backend.readTx.tx != nil {
  225. go func(tx *bolt.Tx, wg *sync.WaitGroup) {
  226. // 等待并发读事务全部完成,rollback读事务
  227. wg.Wait()
  228. if err := tx.Rollback(); err != nil {
  229. t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
  230. }
  231. }(t.backend.readTx.tx, t.backend.readTx.txWg)
  232. t.backend.readTx.reset()
  233. }
  234. // 提交写事务
  235. t.batchTx.commit(stop)
  236. if !stop {
  237. // 重置读事务
  238. t.backend.readTx.tx = t.backend.begin(false)
  239. }
  240. }
  241. // 修改操作会追加到buf里面
  242. func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) {
  243. t.batchTx.UnsafePut(bucket, key, value)
  244. t.buf.put(bucket, key, value)
  245. }
  246. func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
  247. t.batchTx.UnsafeSeqPut(bucket, key, value)
  248. t.buf.putSeq(bucket, key, value)
  249. }

1.3、backend模块实现

  1. type backend struct {
  2. size int64
  3. sizeInUse int64
  4. commits int64
  5. openReadTxN int64
  6. // mlock prevents backend database file to be swapped
  7. mlock bool
  8. mu sync.RWMutex
  9. bopts *bolt.Options
  10. db *bolt.DB
  11. batchInterval time.Duration
  12. batchLimit int
  13. batchTx *batchTxBuffered
  14. readTx *readTx
  15. // 用以给batchTx换成读buf,否则每次都要copy
  16. txReadBufferCache txReadBufferCache
  17. stopc chan struct{}
  18. donec chan struct{}
  19. hooks Hooks
  20. txPostLockInsideApplyHook func()
  21. lg *zap.Logger
  22. }
  23. func New(cfg BackendConfig) Backend {
  24. return newBackend(cfg)
  25. }
  26. func NewDefaultBackend(lg *zap.Logger, path string) Backend {
  27. cfg := DefaultBackendConfig(lg)
  28. cfg.Path = path
  29. return newBackend(cfg)
  30. }
  31. func newBackend(cfg BackendConfig) *backend {
  32. opts := &bolt.Options{}
  33. if boltOpenOptions != nil {
  34. *opts = *boltOpenOptions
  35. }
  36. opts.InitialMmapSize = cfg.mmapSize()
  37. opts.FreelistType = cfg.BackendFreelistType
  38. opts.NoSync = cfg.UnsafeNoFsync
  39. opts.NoGrowSync = cfg.UnsafeNoFsync
  40. opts.Mlock = cfg.Mlock
  41. db, err := bolt.Open(cfg.Path, 0600, opts)
  42. if err != nil {
  43. cfg.Logger.Panic("failed to open database", zap.String("path", cfg.Path), zap.Error(err))
  44. }
  45. b := &backend{
  46. bopts: opts,
  47. db: db,
  48. batchInterval: cfg.BatchInterval,
  49. batchLimit: cfg.BatchLimit,
  50. mlock: cfg.Mlock,
  51. readTx: &readTx{
  52. baseReadTx: baseReadTx{
  53. buf: txReadBuffer{
  54. txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
  55. bufVersion: 0,
  56. },
  57. buckets: make(map[BucketID]*bolt.Bucket),
  58. txWg: new(sync.WaitGroup),
  59. txMu: new(sync.RWMutex),
  60. },
  61. },
  62. txReadBufferCache: txReadBufferCache{
  63. mu: sync.Mutex{},
  64. bufVersion: 0,
  65. buf: nil,
  66. },
  67. stopc: make(chan struct{}),
  68. donec: make(chan struct{}),
  69. lg: cfg.Logger,
  70. }
  71. b.batchTx = newBatchTxBuffered(b)
  72. b.hooks = cfg.Hooks
  73. // 定期commit线程
  74. go b.run()
  75. return b
  76. }
  77. func (b *backend) ReadTx() ReadTx {
  78. return b.readTx
  79. }
  80. func (b *backend) BatchTx() BatchTx {
  81. return b.batchTx
  82. }
  83. // ConcurrentReadTx creates and returns a new ReadTx, which:
  84. // A) creates and keeps a copy of backend.readTx.txReadBuffer,
  85. // B) references the boltdb read Tx (and its bucket cache) of current batch interval.
  86. func (b *backend) ConcurrentReadTx() ReadTx {
  87. b.readTx.RLock()
  88. defer b.readTx.RUnlock()
  89. // prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
  90. b.readTx.txWg.Add(1)
  91. // TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
  92. // inspect/update cache recency iff there's no ongoing update to the cache
  93. // this falls through if there's no cache update
  94. // by this line, "ConcurrentReadTx" code path is already protected against concurrent "writeback" operations
  95. // which requires write lock to update "readTx.baseReadTx.buf".
  96. // Which means setting "buf *txReadBuffer" with "readTx.buf.unsafeCopy()" is guaranteed to be up-to-date,
  97. // whereas "txReadBufferCache.buf" may be stale from concurrent "writeback" operations.
  98. // We only update "txReadBufferCache.buf" if we know "buf *txReadBuffer" is up-to-date.
  99. // The update to "txReadBufferCache.buf" will benefit the following "ConcurrentReadTx" creation
  100. // by avoiding copying "readTx.baseReadTx.buf".
  101. b.txReadBufferCache.mu.Lock()
  102. curCache := b.txReadBufferCache.buf
  103. curCacheVer := b.txReadBufferCache.bufVersion
  104. curBufVer := b.readTx.buf.bufVersion
  105. isEmptyCache := curCache == nil
  106. isStaleCache := curCacheVer != curBufVer
  107. var buf *txReadBuffer
  108. switch {
  109. case isEmptyCache:
  110. // perform safe copy of buffer while holding "b.txReadBufferCache.mu.Lock"
  111. // this is only supposed to run once so there won't be much overhead
  112. curBuf := b.readTx.buf.unsafeCopy()
  113. buf = &curBuf
  114. case isStaleCache:
  115. // to maximize the concurrency, try unsafe copy of buffer
  116. // release the lock while copying buffer -- cache may become stale again and
  117. // get overwritten by someone else.
  118. // therefore, we need to check the readTx buffer version again
  119. b.txReadBufferCache.mu.Unlock()
  120. curBuf := b.readTx.buf.unsafeCopy()
  121. b.txReadBufferCache.mu.Lock()
  122. buf = &curBuf
  123. default:
  124. // neither empty nor stale cache, just use the current buffer
  125. buf = curCache
  126. }
  127. // txReadBufferCache.bufVersion can be modified when we doing an unsafeCopy()
  128. // as a result, curCacheVer could be no longer the same as
  129. // txReadBufferCache.bufVersion
  130. // if !isEmptyCache && curCacheVer != b.txReadBufferCache.bufVersion
  131. // then the cache became stale while copying "readTx.baseReadTx.buf".
  132. // It is safe to not update "txReadBufferCache.buf", because the next following
  133. // "ConcurrentReadTx" creation will trigger a new "readTx.baseReadTx.buf" copy
  134. // and "buf" is still used for the current "concurrentReadTx.baseReadTx.buf".
  135. if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
  136. // continue if the cache is never set or no one has modified the cache
  137. b.txReadBufferCache.buf = buf
  138. b.txReadBufferCache.bufVersion = curBufVer
  139. }
  140. b.txReadBufferCache.mu.Unlock()
  141. // concurrentReadTx is not supposed to write to its txReadBuffer
  142. return &concurrentReadTx{
  143. baseReadTx: baseReadTx{
  144. buf: *buf,
  145. txMu: b.readTx.txMu,
  146. tx: b.readTx.tx,
  147. buckets: b.readTx.buckets,
  148. txWg: b.readTx.txWg,
  149. },
  150. }
  151. }
  152. func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {
  153. // It needs to lock the batchTx, because the periodic commit
  154. // may be accessing the txPostLockInsideApplyHook at the moment.
  155. b.batchTx.lock()
  156. defer b.batchTx.Unlock()
  157. b.txPostLockInsideApplyHook = hook
  158. }
  159. func (b *backend) ForceCommit() {
  160. b.batchTx.Commit()
  161. }
  162. func (b *backend) Snapshot() Snapshot {
  163. b.batchTx.Commit()
  164. b.mu.RLock()
  165. defer b.mu.RUnlock()
  166. tx, err := b.db.Begin(false)
  167. if err != nil {
  168. b.lg.Fatal("failed to begin tx", zap.Error(err))
  169. }
  170. stopc, donec := make(chan struct{}), make(chan struct{})
  171. dbBytes := tx.Size()
  172. go func() {
  173. defer close(donec)
  174. // sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection
  175. // assuming a min tcp throughput of 100MB/s.
  176. var sendRateBytes int64 = 100 * 1024 * 1024
  177. warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
  178. if warningTimeout < minSnapshotWarningTimeout {
  179. warningTimeout = minSnapshotWarningTimeout
  180. }
  181. start := time.Now()
  182. ticker := time.NewTicker(warningTimeout)
  183. defer ticker.Stop()
  184. for {
  185. select {
  186. case <-ticker.C:
  187. b.lg.Warn(
  188. "snapshotting taking too long to transfer",
  189. zap.Duration("taking", time.Since(start)),
  190. zap.Int64("bytes", dbBytes),
  191. zap.String("size", humanize.Bytes(uint64(dbBytes))),
  192. )
  193. case <-stopc:
  194. snapshotTransferSec.Observe(time.Since(start).Seconds())
  195. return
  196. }
  197. }
  198. }()
  199. return &snapshot{tx, stopc, donec}
  200. }
  201. func (b *backend) Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error) {
  202. h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
  203. b.mu.RLock()
  204. defer b.mu.RUnlock()
  205. err := b.db.View(func(tx *bolt.Tx) error {
  206. c := tx.Cursor()
  207. for next, _ := c.First(); next != nil; next, _ = c.Next() {
  208. b := tx.Bucket(next)
  209. if b == nil {
  210. return fmt.Errorf("cannot get hash of bucket %s", string(next))
  211. }
  212. h.Write(next)
  213. b.ForEach(func(k, v []byte) error {
  214. if ignores != nil && !ignores(next, k) {
  215. h.Write(k)
  216. h.Write(v)
  217. }
  218. return nil
  219. })
  220. }
  221. return nil
  222. })
  223. if err != nil {
  224. return 0, err
  225. }
  226. return h.Sum32(), nil
  227. }
  228. func (b *backend) Size() int64 {
  229. return atomic.LoadInt64(&b.size)
  230. }
  231. func (b *backend) SizeInUse() int64 {
  232. return atomic.LoadInt64(&b.sizeInUse)
  233. }
  234. func (b *backend) Close() error {
  235. close(b.stopc)
  236. <-b.donec
  237. b.mu.Lock()
  238. defer b.mu.Unlock()
  239. return b.db.Close()
  240. }
  241. func (b *backend) Commits() int64 {
  242. return atomic.LoadInt64(&b.commits)
  243. }
  244. func (b *backend) OpenReadTxN() int64 {
  245. return atomic.LoadInt64(&b.openReadTxN)
  246. }
  247. func (b *backend) Defrag() error {
  248. return b.defrag()
  249. }
  250. func (b *backend) defrag() error {
  251. now := time.Now()
  252. isDefragActive.Set(1)
  253. defer isDefragActive.Set(0)
  254. // TODO: make this non-blocking?
  255. // lock batchTx to ensure nobody is using previous tx, and then
  256. // close previous ongoing tx.
  257. b.batchTx.LockOutsideApply()
  258. defer b.batchTx.Unlock()
  259. // lock database after lock tx to avoid deadlock.
  260. b.mu.Lock()
  261. defer b.mu.Unlock()
  262. // block concurrent read requests while resetting tx
  263. b.readTx.Lock()
  264. defer b.readTx.Unlock()
  265. b.batchTx.unsafeCommit(true)
  266. b.batchTx.tx = nil
  267. // Create a temporary file to ensure we start with a clean slate.
  268. // Snapshotter.cleanupSnapdir cleans up any of these that are found during startup.
  269. dir := filepath.Dir(b.db.Path())
  270. temp, err := os.CreateTemp(dir, "db.tmp.*")
  271. if err != nil {
  272. return err
  273. }
  274. options := bolt.Options{}
  275. if boltOpenOptions != nil {
  276. options = *boltOpenOptions
  277. }
  278. options.OpenFile = func(_ string, _ int, _ os.FileMode) (file *os.File, err error) {
  279. return temp, nil
  280. }
  281. // Don't load tmp db into memory regardless of opening options
  282. options.Mlock = false
  283. tdbp := temp.Name()
  284. tmpdb, err := bolt.Open(tdbp, 0600, &options)
  285. if err != nil {
  286. return err
  287. }
  288. dbp := b.db.Path()
  289. size1, sizeInUse1 := b.Size(), b.SizeInUse()
  290. if b.lg != nil {
  291. b.lg.Info(
  292. "defragmenting",
  293. zap.String("path", dbp),
  294. zap.Int64("current-db-size-bytes", size1),
  295. zap.String("current-db-size", humanize.Bytes(uint64(size1))),
  296. zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
  297. zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
  298. )
  299. }
  300. // gofail: var defragBeforeCopy struct{}
  301. err = defragdb(b.db, tmpdb, defragLimit)
  302. if err != nil {
  303. tmpdb.Close()
  304. if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
  305. b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr))
  306. }
  307. return err
  308. }
  309. err = b.db.Close()
  310. if err != nil {
  311. b.lg.Fatal("failed to close database", zap.Error(err))
  312. }
  313. err = tmpdb.Close()
  314. if err != nil {
  315. b.lg.Fatal("failed to close tmp database", zap.Error(err))
  316. }
  317. // gofail: var defragBeforeRename struct{}
  318. err = os.Rename(tdbp, dbp)
  319. if err != nil {
  320. b.lg.Fatal("failed to rename tmp database", zap.Error(err))
  321. }
  322. b.db, err = bolt.Open(dbp, 0600, b.bopts)
  323. if err != nil {
  324. b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
  325. }
  326. b.batchTx.tx = b.unsafeBegin(true)
  327. b.readTx.reset()
  328. b.readTx.tx = b.unsafeBegin(false)
  329. size := b.readTx.tx.Size()
  330. db := b.readTx.tx.DB()
  331. atomic.StoreInt64(&b.size, size)
  332. atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
  333. took := time.Since(now)
  334. defragSec.Observe(took.Seconds())
  335. size2, sizeInUse2 := b.Size(), b.SizeInUse()
  336. if b.lg != nil {
  337. b.lg.Info(
  338. "finished defragmenting directory",
  339. zap.String("path", dbp),
  340. zap.Int64("current-db-size-bytes-diff", size2-size1),
  341. zap.Int64("current-db-size-bytes", size2),
  342. zap.String("current-db-size", humanize.Bytes(uint64(size2))),
  343. zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
  344. zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
  345. zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
  346. zap.Duration("took", took),
  347. )
  348. }
  349. return nil
  350. }
  351. func (b *backend) run() {
  352. defer close(b.donec)
  353. t := time.NewTimer(b.batchInterval)
  354. defer t.Stop()
  355. for {
  356. select {
  357. case <-t.C:
  358. case <-b.stopc:
  359. b.batchTx.CommitAndStop()
  360. return
  361. }
  362. if b.batchTx.safePending() != 0 {
  363. b.batchTx.Commit()
  364. }
  365. t.Reset(b.batchInterval)
  366. }
  367. }
  368. // 开启事务并更新stats
  369. func (b *backend) begin(write bool) *bolt.Tx {
  370. b.mu.RLock()
  371. tx := b.unsafeBegin(write)
  372. b.mu.RUnlock()
  373. size := tx.Size()
  374. db := tx.DB()
  375. stats := db.Stats()
  376. atomic.StoreInt64(&b.size, size)
  377. atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))
  378. atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))
  379. return tx
  380. }
  381. func (b *backend) unsafeBegin(write bool) *bolt.Tx {
  382. tx, err := b.db.Begin(write)
  383. if err != nil {
  384. b.lg.Fatal("failed to begin tx", zap.Error(err))
  385. }
  386. return tx
  387. }

PS:helper函数

  1. // 调用boltdb for_range api
  2. // 根据endKey是否为空决定遍历多少数据
  3. func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
  4. if limit <= 0 {
  5. limit = math.MaxInt64
  6. }
  7. var isMatch func(b []byte) bool
  8. if len(endKey) > 0 {
  9. isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
  10. } else {
  11. isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
  12. limit = 1
  13. }
  14. for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
  15. vs = append(vs, cv)
  16. keys = append(keys, ck)
  17. if limit == int64(len(keys)) {
  18. break
  19. }
  20. }
  21. return keys, vs
  22. }
  23. // 调用boltdb for_each api
  24. func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) error {
  25. if b := tx.Bucket(bucket.Name()); b != nil {
  26. return b.ForEach(visitor)
  27. }
  28. return nil
  29. }

2、第二层封装(kv_store模块)

2.1、kv_store接口介绍

  1. type RangeOptions struct {
  2. Limit int64
  3. Rev int64
  4. Count bool
  5. }
  6. type RangeResult struct {
  7. KVs []mvccpb.KeyValue
  8. Rev int64
  9. Count int
  10. }
  11. type ReadView interface {
  12. // FirstRev returns the first KV revision at the time of opening the txn.
  13. // After a compaction, the first revision increases to the compaction
  14. // revision.
  15. FirstRev() int64
  16. // Rev returns the revision of the KV at the time of opening the txn.
  17. Rev() int64
  18. // Range gets the keys in the range at rangeRev.
  19. // The returned rev is the current revision of the KV when the operation is executed.
  20. // If rangeRev <=0, range gets the keys at currentRev.
  21. // If `end` is nil, the request returns the key.
  22. // If `end` is not nil and not empty, it gets the keys in range [key, range_end).
  23. // If `end` is not nil and empty, it gets the keys greater than or equal to key.
  24. // Limit limits the number of keys returned.
  25. // If the required rev is compacted, ErrCompacted will be returned.
  26. Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error)
  27. }
  28. // TxnRead represents a read-only transaction with operations that will not
  29. // block other read transactions.
  30. type TxnRead interface {
  31. ReadView
  32. // End marks the transaction is complete and ready to commit.
  33. End()
  34. }
  35. type WriteView interface {
  36. // DeleteRange deletes the given range from the store.
  37. // A deleteRange increases the rev of the store if any key in the range exists.
  38. // The number of key deleted will be returned.
  39. // The returned rev is the current revision of the KV when the operation is executed.
  40. // It also generates one event for each key delete in the event history.
  41. // if the `end` is nil, deleteRange deletes the key.
  42. // if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
  43. DeleteRange(key, end []byte) (n, rev int64)
  44. // Put puts the given key, value into the store. Put also takes additional argument lease to
  45. // attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
  46. // id.
  47. // A put also increases the rev of the store, and generates one event in the event history.
  48. // The returned rev is the current revision of the KV when the operation is executed.
  49. Put(key, value []byte, lease lease.LeaseID) (rev int64)
  50. }
  51. // TxnWrite represents a transaction that can modify the store.
  52. type TxnWrite interface {
  53. TxnRead
  54. WriteView
  55. // Changes gets the changes made since opening the write txn.
  56. Changes() []mvccpb.KeyValue
  57. }
  58. // txnReadWrite coerces a read txn to a write, panicking on any write operation.
  59. type txnReadWrite struct{ TxnRead }
  60. func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("unexpected DeleteRange") }
  61. func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
  62. panic("unexpected Put")
  63. }
  64. func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil }
  65. func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} }
  66. type ReadTxMode uint32
  67. const (
  68. // Use ConcurrentReadTx and the txReadBuffer is copied
  69. ConcurrentReadTxMode = ReadTxMode(1)
  70. // Use backend ReadTx and txReadBuffer is not copied
  71. SharedBufReadTxMode = ReadTxMode(2)
  72. )
  73. type KV interface {
  74. ReadView
  75. WriteView
  76. // Read creates a read transaction.
  77. Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead
  78. // Write creates a write transaction.
  79. Write(trace *traceutil.Trace) TxnWrite
  80. // HashStorage returns HashStorage interface for KV storage.
  81. HashStorage() HashStorage
  82. // Compact frees all superseded keys with revisions less than rev.
  83. Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)
  84. // Commit commits outstanding txns into the underlying backend.
  85. Commit()
  86. // Restore restores the KV store from a backend.
  87. Restore(b backend.Backend) error
  88. Close() error
  89. }
  90. // WatchableKV is a KV that can be watched.
  91. type WatchableKV interface {
  92. KV
  93. Watchable
  94. }
  95. // Watchable is the interface that wraps the NewWatchStream function.
  96. type Watchable interface {
  97. // NewWatchStream returns a WatchStream that can be used to
  98. // watch events happened or happening on the KV.
  99. NewWatchStream() WatchStream
  100. }

2.2、kv_store接口实现

2.2.1、read/write view实现

可以看到read/write view本质上还是封装了读写事务,同时读操作默认都是并发读 同时也可以看到,使用读写事务的流程为

  1. Read/Write
  2. 相关读写操作
  3. End
  1. type readView struct{ kv KV }
  2. func (rv *readView) FirstRev() int64 {
  3. tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
  4. defer tr.End()
  5. return tr.FirstRev()
  6. }
  7. func (rv *readView) Rev() int64 {
  8. tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
  9. defer tr.End()
  10. return tr.Rev()
  11. }
  12. func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
  13. tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
  14. defer tr.End()
  15. return tr.Range(ctx, key, end, ro)
  16. }
  17. type writeView struct{ kv KV }
  18. func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
  19. tw := wv.kv.Write(traceutil.TODO())
  20. defer tw.End()
  21. return tw.DeleteRange(key, end)
  22. }
  23. func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
  24. tw := wv.kv.Write(traceutil.TODO())
  25. defer tw.End()
  26. return tw.Put(key, value, lease)
  27. }

2.2.2、read事务封装

  1. type storeTxnRead struct {
  2. s *store
  3. tx backend.ReadTx
  4. firstRev int64
  5. rev int64
  6. }
  7. func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
  8. s.mu.RLock()
  9. s.revMu.RLock()
  10. var tx backend.ReadTx
  11. if mode == ConcurrentReadTxMode {
  12. tx = s.b.ConcurrentReadTx()
  13. } else {
  14. tx = s.b.ReadTx()
  15. }
  16. // Read事务开启的时候需要获取revMu.RLock()
  17. // 因为Read事务可以并发执行,且currentRev修改由写事务修改
  18. // 因此需要上读锁
  19. tx.RLock()
  20. firstRev, rev := s.compactMainRev, s.currentRev
  21. s.revMu.RUnlock()
  22. return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})
  23. }
  24. func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
  25. func (tr *storeTxnRead) Rev() int64 { return tr.rev }
  26. func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
  27. return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
  28. }
  29. func (tr *storeTxnRead) End() {
  30. tr.tx.RUnlock()
  31. tr.s.mu.RUnlock()
  32. }
  33. func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
  34. rev := ro.Rev
  35. if rev > curRev {
  36. return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
  37. }
  38. if rev <= 0 {
  39. rev = curRev
  40. }
  41. if rev < tr.s.compactMainRev {
  42. return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
  43. }
  44. if ro.Count {
  45. total := tr.s.kvindex.CountRevisions(key, end, rev)
  46. tr.trace.Step("count revisions from in-memory index tree")
  47. return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
  48. }
  49. revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
  50. if len(revpairs) == 0 {
  51. return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
  52. }
  53. limit := int(ro.Limit)
  54. if limit <= 0 || limit > len(revpairs) {
  55. limit = len(revpairs)
  56. }
  57. kvs := make([]mvccpb.KeyValue, limit)
  58. revBytes := newRevBytes()
  59. for i, revpair := range revpairs[:len(kvs)] {
  60. select {
  61. case <-ctx.Done():
  62. return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err())
  63. default:
  64. }
  65. revToBytes(revpair, revBytes)
  66. _, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
  67. if len(vs) != 1 {
  68. tr.s.lg.Fatal(
  69. "range failed to find revision pair",
  70. zap.Int64("revision-main", revpair.main),
  71. zap.Int64("revision-sub", revpair.sub),
  72. )
  73. }
  74. if err := kvs[i].Unmarshal(vs[0]); err != nil {
  75. tr.s.lg.Fatal(
  76. "failed to unmarshal mvccpb.KeyValue",
  77. zap.Error(err),
  78. )
  79. }
  80. }
  81. tr.trace.Step("range keys from bolt db")
  82. return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
  83. }

2.2.3、write事务封装

  1. type storeTxnWrite struct {
  2. storeTxnRead
  3. tx backend.BatchTx
  4. // beginRev is the revision where the txn begins; it will write to the next revision.
  5. beginRev int64
  6. changes []mvccpb.KeyValue
  7. }
  8. func (s *store) Write(trace *traceutil.Trace) TxnWrite {
  9. s.mu.RLock()
  10. tx := s.b.BatchTx()
  11. tx.LockInsideApply()
  12. // 写事务只能同时开启一个,因此此处获取currentRev不需要上revMu锁
  13. tw := &storeTxnWrite{
  14. storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
  15. tx: tx,
  16. beginRev: s.currentRev,
  17. changes: make([]mvccpb.KeyValue, 0, 4),
  18. }
  19. return newMetricsTxnWrite(tw)
  20. }
  21. func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
  22. func (tw *storeTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
  23. rev := tw.beginRev
  24. if len(tw.changes) > 0 {
  25. rev++
  26. }
  27. return tw.rangeKeys(ctx, key, end, rev, ro)
  28. }
  29. func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
  30. if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
  31. return n, tw.beginRev + 1
  32. }
  33. return 0, tw.beginRev
  34. }
  35. func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
  36. tw.put(key, value, lease)
  37. return tw.beginRev + 1
  38. }
  39. func (tw *storeTxnWrite) End() {
  40. // only update index if the txn modifies the mvcc state.
  41. if len(tw.changes) != 0 {
  42. // hold revMu lock to prevent new read txns from opening until writeback.
  43. tw.s.revMu.Lock()
  44. tw.s.currentRev++
  45. }
  46. tw.tx.Unlock()
  47. if len(tw.changes) != 0 {
  48. tw.s.revMu.Unlock()
  49. }
  50. tw.s.mu.RUnlock()
  51. }
  52. func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }
  53. func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
  54. rev := tw.beginRev + 1
  55. c := rev
  56. oldLease := lease.NoLease
  57. // if the key exists before, use its previous created and
  58. // get its previous leaseID
  59. _, created, ver, err := tw.s.kvindex.Get(key, rev)
  60. if err == nil {
  61. c = created.main
  62. oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
  63. tw.trace.Step("get key's previous created_revision and leaseID")
  64. }
  65. ibytes := newRevBytes()
  66. idxRev := revision{main: rev, sub: int64(len(tw.changes))}
  67. revToBytes(idxRev, ibytes)
  68. ver = ver + 1
  69. kv := mvccpb.KeyValue{
  70. Key: key,
  71. Value: value,
  72. CreateRevision: c,
  73. ModRevision: rev,
  74. Version: ver,
  75. Lease: int64(leaseID),
  76. }
  77. d, err := kv.Marshal()
  78. if err != nil {
  79. tw.storeTxnRead.s.lg.Fatal(
  80. "failed to marshal mvccpb.KeyValue",
  81. zap.Error(err),
  82. )
  83. }
  84. tw.trace.Step("marshal mvccpb.KeyValue")
  85. tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
  86. tw.s.kvindex.Put(key, idxRev)
  87. tw.changes = append(tw.changes, kv)
  88. tw.trace.Step("store kv pair into bolt db")
  89. if oldLease == leaseID {
  90. tw.trace.Step("attach lease to kv pair")
  91. return
  92. }
  93. if oldLease != lease.NoLease {
  94. if tw.s.le == nil {
  95. panic("no lessor to detach lease")
  96. }
  97. err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
  98. if err != nil {
  99. tw.storeTxnRead.s.lg.Error(
  100. "failed to detach old lease from a key",
  101. zap.Error(err),
  102. )
  103. }
  104. }
  105. if leaseID != lease.NoLease {
  106. if tw.s.le == nil {
  107. panic("no lessor to attach lease")
  108. }
  109. err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
  110. if err != nil {
  111. panic("unexpected error from lease Attach")
  112. }
  113. }
  114. tw.trace.Step("attach lease to kv pair")
  115. }
  116. func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
  117. rrev := tw.beginRev
  118. if len(tw.changes) > 0 {
  119. rrev++
  120. }
  121. keys, _ := tw.s.kvindex.Range(key, end, rrev)
  122. if len(keys) == 0 {
  123. return 0
  124. }
  125. for _, key := range keys {
  126. tw.delete(key)
  127. }
  128. return int64(len(keys))
  129. }
  130. func (tw *storeTxnWrite) delete(key []byte) {
  131. ibytes := newRevBytes()
  132. idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
  133. revToBytes(idxRev, ibytes)
  134. ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
  135. kv := mvccpb.KeyValue{Key: key}
  136. d, err := kv.Marshal()
  137. if err != nil {
  138. tw.storeTxnRead.s.lg.Fatal(
  139. "failed to marshal mvccpb.KeyValue",
  140. zap.Error(err),
  141. )
  142. }
  143. tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
  144. err = tw.s.kvindex.Tombstone(key, idxRev)
  145. if err != nil {
  146. tw.storeTxnRead.s.lg.Fatal(
  147. "failed to tombstone an existing key",
  148. zap.String("key", string(key)),
  149. zap.Error(err),
  150. )
  151. }
  152. tw.changes = append(tw.changes, kv)
  153. item := lease.LeaseItem{Key: string(key)}
  154. leaseID := tw.s.le.GetLease(item)
  155. if leaseID != lease.NoLease {
  156. err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
  157. if err != nil {
  158. tw.storeTxnRead.s.lg.Error(
  159. "failed to detach old lease from a key",
  160. zap.Error(err),
  161. )
  162. }
  163. }
  164. }

2.3、kv_store watch原理