etcd使用了boltdb的事务特性。但是优化了写,实现了batch write 源码会删除部分trace/log代码
1、etcd如何封装boltdb
1、第一层封装(backend模块)
此模块向MVCC模块提供了基础的CURD API,接口如下:
type Backend interface {// 返回一个读事务ReadTx() ReadTx// 返回一个写事务BatchTx() BatchTx// 返回一个并发读事务(此处主要是优化写事务回写buf的锁逻辑,后续会详细说明)ConcurrentReadTx() ReadTx// 返回一个快照结构体(复用的blotdb的snapshot api)Snapshot() Snapshot// 暂时忽略Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)// 返回存储模块分配的空间大小。本质上就是boltdb的文件大小(包含所有可用的free page)Size() int64// 返回存储模块实际使用的空间大小。本质上就是boltdb的正在使用的page,不包含可用的free pageSizeInUse() int64// 返回当前正在打开的读事务数量OpenReadTxN() int64// 整理boltdb的磁盘布局。将数据重新按照连续page存储,降低使用的空间大小// 底层就是将当前db的数据读取出来,然后写入新的dbDefrag() error// 强制提交事务ForceCommit()// 底层逻辑是关闭异步提交事务的线程Close() error// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.SetTxPostLockInsideApplyHook(func())}type ReadTx interface {// X锁主要用于写事务将未落盘的变更刷回读事务的bufLock()Unlock()// S锁用于读事务RLock()RUnlock()// 读API// 实际使用需要如下流程:// tx.RLock()// tx.UnsafeRange or tx.UnsafeForEach// tx.RUnlockUnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error}type BatchTx interface {// 其中只实现了Lock/Unlock,未实现RLock/RUnlock// 读API本质上同ReadTx,底层还是复用boltdb的读写事务能力ReadTx// 写API// 实际使用需要如下流程:// tx.Lock()// tx.写API// tx.Unlock// 也因此ectd写本质上是串行的,也是因为boltdb也是串行的UnsafeCreateBucket(bucket Bucket)UnsafeDeleteBucket(bucket Bucket)UnsafePut(bucket Bucket, key []byte, value []byte)UnsafeSeqPut(bucket Bucket, key []byte, value []byte)UnsafeDelete(bucket Bucket, key []byte)// 异步线程提交事务使用Commit()CommitAndStop()// 与raft等相关,暂时忽略LockInsideApply()LockOutsideApply()}
1.1、ReadTx实现
type baseReadTx struct {// 保护buf字段mu sync.RWMutex// 保存未提交的写事务数据,保证读事务可以读取到新数据buf txReadBuffer// 保护boltdb的事务实例以及bucketstxMu *sync.RWMutextx *bolt.Txbuckets map[BucketID]*bolt.Bucket// 用于等待并发读事务结束// 也是写事务提交的时候会用到// 因为会重置读事务,但是又不互斥,因此通过waitGroup来保证重置读事务的时候// 不存在还有读事务未完成txWg *sync.WaitGroup}// 遍历给定bucket数据func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {dups := make(map[string]struct{})getDups := func(k, v []byte) error {dups[string(k)] = struct{}{}return nil}visitNoDup := func(k, v []byte) error {if _, ok := dups[string(k)]; ok {return nil}return visitor(k, v)}// 先从buf读取数据,读取到新数据if err := baseReadTx.buf.ForEach(bucket, getDups); err != nil {return err}// 然后先遍历已提交数据baseReadTx.txMu.Lock()err := unsafeForEach(baseReadTx.tx, bucket, visitNoDup)baseReadTx.txMu.Unlock()if err != nil {return err}// 再便利未提交数据return baseReadTx.buf.ForEach(bucket, visitor)}func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {if endKey == nil {limit = 1}if limit <= 0 {limit = math.MaxInt64}if limit > 1 && !bucketType.IsSafeRangeBucket() {panic("do not use unsafeRange on non-keys bucket")}// 如果buf中已经有全量数据,则直接返回keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)if int64(len(keys)) == limit {return keys, vals}// 否则从boltdb读取数据并将bucket进行缓存bn := bucketType.ID()baseReadTx.txMu.RLock()bucket, ok := baseReadTx.buckets[bn]baseReadTx.txMu.RUnlock()lockHeld := falseif !ok {baseReadTx.txMu.Lock()lockHeld = truebucket = baseReadTx.tx.Bucket(bucketType.Name())baseReadTx.buckets[bn] = bucket}if bucket == nil {if lockHeld {baseReadTx.txMu.Unlock()}return keys, vals}if !lockHeld {baseReadTx.txMu.Lock()}c := bucket.Cursor()baseReadTx.txMu.Unlock()k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))return append(k2, keys...), append(v2, vals...)}// 普通读// 普通读因为要获取读锁,因此会阻塞写事务刷新未提交事务到写事务的buftype readTx struct {baseReadTx}func (rt *readTx) Lock() { rt.mu.Lock() }func (rt *readTx) Unlock() { rt.mu.Unlock() }func (rt *readTx) RLock() { rt.mu.RLock() }func (rt *readTx) RUnlock() { rt.mu.RUnlock() }// 重置读事务// 当写事务提交完成的时候会重置读事务func (rt *readTx) reset() {rt.buf.reset()rt.buckets = make(map[BucketID]*bolt.Bucket)rt.tx = nilrt.txWg = new(sync.WaitGroup)}// 并发读// 并发读lock均为空,因此不会阻塞写事务刷新未提交事务到写事务的buf// 但是回copy一份读事务的buftype concurrentReadTx struct {baseReadTx}func (rt *concurrentReadTx) Lock() {}func (rt *concurrentReadTx) Unlock() {}func (rt *concurrentReadTx) RLock() {}func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }
1.2、BatchTx实现
type batchTx struct {sync.Mutextx *bolt.Txbackend *backendpending int // 待提交的事务数量}// 一般不会使用batchTx的Unlock// 使用的是带buf的batchTx的Unlockfunc (t *batchTx的) Lock() {ValidateCalledInsideUnittest(t.backend.lg)t.lock()}func (t *batchTx) Unlock() {if t.pending >= t.backend.batchLimit {t.commit(false)}t.Mutex.Unlock()}// LockInsideApply/LockOutsideApply先忽略func (t *batchTx) LockInsideApply() {t.lock()if t.backend.txPostLockInsideApplyHook != nil {// The callers of some methods (i.e., (*RaftCluster).AddMember)// can be coming from both InsideApply and OutsideApply, but the// callers from OutsideApply will have a nil txPostLockInsideApplyHook.// So we should check the txPostLockInsideApplyHook before validating// the callstack.ValidateCalledInsideApply(t.backend.lg)t.backend.txPostLockInsideApplyHook()}}func (t *batchTx) LockOutsideApply() {ValidateCalledOutSideApply(t.backend.lg)t.lock()}func (t *batchTx) lock() {t.Mutex.Lock()}// 写事务不允许调用RLock/RUnlockfunc (t *batchTx) RLock() {panic("unexpected RLock")}func (t *batchTx) RUnlock() {panic("unexpected RUnlock")}// 以下为CURD API。均为调用boltdb api// 不论是否成功,均pending++// ???不管是否成功,均忽略确定没有问题???func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {_, err := t.tx.CreateBucket(bucket.Name())if err != nil && err != bolt.ErrBucketExists {t.backend.lg.Fatal("failed to create a bucket",zap.Stringer("bucket-name", bucket),zap.Error(err),)}t.pending++}func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {err := t.tx.DeleteBucket(bucket.Name())if err != nil && err != bolt.ErrBucketNotFound {t.backend.lg.Fatal("failed to delete a bucket",zap.Stringer("bucket-name", bucket),zap.Error(err),)}t.pending++}func (t *batchTx) UnsafePut(bucket Bucket, key []byte, value []byte) {t.unsafePut(bucket, key, value, false)}func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {t.unsafePut(bucket, key, value, true)}func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {bucket := t.tx.Bucket(bucketType.Name())if bucket == nil {t.backend.lg.Fatal("failed to find a bucket",zap.Stringer("bucket-name", bucketType),zap.Stack("stack"),)}if seq {// it is useful to increase fill percent when the workloads are mostly append-only.// this can delay the page split and reduce space usage.bucket.FillPercent = 0.9}if err := bucket.Put(key, value); err != nil {t.backend.lg.Fatal("failed to write to a bucket",zap.Stringer("bucket-name", bucketType),zap.Error(err),)}t.pending++}func (t *batchTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {bucket := t.tx.Bucket(bucketType.Name())if bucket == nil {t.backend.lg.Fatal("failed to find a bucket",zap.Stringer("bucket-name", bucketType),zap.Stack("stack"),)}return unsafeRange(bucket.Cursor(), key, endKey, limit)}func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {bucket := t.tx.Bucket(bucketType.Name())if bucket == nil {t.backend.lg.Fatal("failed to find a bucket",zap.Stringer("bucket-name", bucketType),zap.Stack("stack"),)}err := bucket.Delete(key)if err != nil {t.backend.lg.Fatal("failed to delete a key",zap.Stringer("bucket-name", bucketType),zap.Error(err),)}t.pending++}func (t *batchTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {return unsafeForEach(t.tx, bucket, visitor)}// 用于异步提交API使用func (t *batchTx) Commit() {t.lock()t.commit(false)t.Unlock()}func (t *batchTx) CommitAndStop() {t.lock()t.commit(true)t.Unlock()}func (t *batchTx) safePending() int {t.Mutex.Lock()defer t.Mutex.Unlock()return t.pending}// 提交事务并重置写事务func (t *batchTx) commit(stop bool) {if t.tx != nil {if t.pending == 0 && !stop {return}start := time.Now()// gofail: var beforeCommit struct{}err := t.tx.Commit()// gofail: var afterCommit struct{}rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())spillSec.Observe(t.tx.Stats().SpillTime.Seconds())writeSec.Observe(t.tx.Stats().WriteTime.Seconds())commitSec.Observe(time.Since(start).Seconds())atomic.AddInt64(&t.backend.commits, 1)t.pending = 0if err != nil {t.backend.lg.Fatal("failed to commit tx", zap.Error(err))}}if !stop {t.tx = t.backend.begin(true)}}// 带有buf的batchTxtype batchTxBuffered struct {batchTxbuf txWriteBuffer}func newBatchTxBuffered(backend *backend) *batchTxBuffered {tx := &batchTxBuffered{batchTx: batchTx{backend: backend},buf: txWriteBuffer{txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},bucket2seq: make(map[BucketID]bool),},}tx.Commit()return tx}func (t *batchTxBuffered) Unlock() {// 前面讲过写事务的使用方法// 当前调用Unlock的时候,会认为写事务以及结束(实际上并未commit)// 因此需要将当前的buf刷回到读事务那里去,保证可以读取到新数据if t.pending != 0 {t.backend.readTx.Lock()t.buf.writeback(&t.backend.readTx.buf)t.backend.readTx.Unlock()// 如果已经pending数量超过限定数量,强制commitif t.pending >= t.backend.batchLimit {t.commit(false)}}t.batchTx.Unlock()}func (t *batchTxBuffered) Commit() {t.lock()t.commit(false)t.Unlock()}func (t *batchTxBuffered) CommitAndStop() {t.lock()t.commit(true)t.Unlock()}func (t *batchTxBuffered) commit(stop bool) {if t.backend.hooks != nil {t.backend.hooks.OnPreCommitUnsafe(t)}// 此处commit会重置读事务,因此需要lockt.backend.readTx.Lock()t.unsafeCommit(stop)t.backend.readTx.Unlock()}func (t *batchTxBuffered) unsafeCommit(stop bool) {if t.backend.readTx.tx != nil {go func(tx *bolt.Tx, wg *sync.WaitGroup) {// 等待并发读事务全部完成,rollback读事务wg.Wait()if err := tx.Rollback(); err != nil {t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))}}(t.backend.readTx.tx, t.backend.readTx.txWg)t.backend.readTx.reset()}// 提交写事务t.batchTx.commit(stop)if !stop {// 重置读事务t.backend.readTx.tx = t.backend.begin(false)}}// 修改操作会追加到buf里面func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) {t.batchTx.UnsafePut(bucket, key, value)t.buf.put(bucket, key, value)}func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {t.batchTx.UnsafeSeqPut(bucket, key, value)t.buf.putSeq(bucket, key, value)}
1.3、backend模块实现
type backend struct {size int64sizeInUse int64commits int64openReadTxN int64// mlock prevents backend database file to be swappedmlock boolmu sync.RWMutexbopts *bolt.Optionsdb *bolt.DBbatchInterval time.DurationbatchLimit intbatchTx *batchTxBufferedreadTx *readTx// 用以给batchTx换成读buf,否则每次都要copytxReadBufferCache txReadBufferCachestopc chan struct{}donec chan struct{}hooks HookstxPostLockInsideApplyHook func()lg *zap.Logger}func New(cfg BackendConfig) Backend {return newBackend(cfg)}func NewDefaultBackend(lg *zap.Logger, path string) Backend {cfg := DefaultBackendConfig(lg)cfg.Path = pathreturn newBackend(cfg)}func newBackend(cfg BackendConfig) *backend {opts := &bolt.Options{}if boltOpenOptions != nil {*opts = *boltOpenOptions}opts.InitialMmapSize = cfg.mmapSize()opts.FreelistType = cfg.BackendFreelistTypeopts.NoSync = cfg.UnsafeNoFsyncopts.NoGrowSync = cfg.UnsafeNoFsyncopts.Mlock = cfg.Mlockdb, err := bolt.Open(cfg.Path, 0600, opts)if err != nil {cfg.Logger.Panic("failed to open database", zap.String("path", cfg.Path), zap.Error(err))}b := &backend{bopts: opts,db: db,batchInterval: cfg.BatchInterval,batchLimit: cfg.BatchLimit,mlock: cfg.Mlock,readTx: &readTx{baseReadTx: baseReadTx{buf: txReadBuffer{txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},bufVersion: 0,},buckets: make(map[BucketID]*bolt.Bucket),txWg: new(sync.WaitGroup),txMu: new(sync.RWMutex),},},txReadBufferCache: txReadBufferCache{mu: sync.Mutex{},bufVersion: 0,buf: nil,},stopc: make(chan struct{}),donec: make(chan struct{}),lg: cfg.Logger,}b.batchTx = newBatchTxBuffered(b)b.hooks = cfg.Hooks// 定期commit线程go b.run()return b}func (b *backend) ReadTx() ReadTx {return b.readTx}func (b *backend) BatchTx() BatchTx {return b.batchTx}// ConcurrentReadTx creates and returns a new ReadTx, which:// A) creates and keeps a copy of backend.readTx.txReadBuffer,// B) references the boltdb read Tx (and its bucket cache) of current batch interval.func (b *backend) ConcurrentReadTx() ReadTx {b.readTx.RLock()defer b.readTx.RUnlock()// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().b.readTx.txWg.Add(1)// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.// inspect/update cache recency iff there's no ongoing update to the cache// this falls through if there's no cache update// by this line, "ConcurrentReadTx" code path is already protected against concurrent "writeback" operations// which requires write lock to update "readTx.baseReadTx.buf".// Which means setting "buf *txReadBuffer" with "readTx.buf.unsafeCopy()" is guaranteed to be up-to-date,// whereas "txReadBufferCache.buf" may be stale from concurrent "writeback" operations.// We only update "txReadBufferCache.buf" if we know "buf *txReadBuffer" is up-to-date.// The update to "txReadBufferCache.buf" will benefit the following "ConcurrentReadTx" creation// by avoiding copying "readTx.baseReadTx.buf".b.txReadBufferCache.mu.Lock()curCache := b.txReadBufferCache.bufcurCacheVer := b.txReadBufferCache.bufVersioncurBufVer := b.readTx.buf.bufVersionisEmptyCache := curCache == nilisStaleCache := curCacheVer != curBufVervar buf *txReadBufferswitch {case isEmptyCache:// perform safe copy of buffer while holding "b.txReadBufferCache.mu.Lock"// this is only supposed to run once so there won't be much overheadcurBuf := b.readTx.buf.unsafeCopy()buf = &curBufcase isStaleCache:// to maximize the concurrency, try unsafe copy of buffer// release the lock while copying buffer -- cache may become stale again and// get overwritten by someone else.// therefore, we need to check the readTx buffer version againb.txReadBufferCache.mu.Unlock()curBuf := b.readTx.buf.unsafeCopy()b.txReadBufferCache.mu.Lock()buf = &curBufdefault:// neither empty nor stale cache, just use the current bufferbuf = curCache}// txReadBufferCache.bufVersion can be modified when we doing an unsafeCopy()// as a result, curCacheVer could be no longer the same as// txReadBufferCache.bufVersion// if !isEmptyCache && curCacheVer != b.txReadBufferCache.bufVersion// then the cache became stale while copying "readTx.baseReadTx.buf".// It is safe to not update "txReadBufferCache.buf", because the next following// "ConcurrentReadTx" creation will trigger a new "readTx.baseReadTx.buf" copy// and "buf" is still used for the current "concurrentReadTx.baseReadTx.buf".if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {// continue if the cache is never set or no one has modified the cacheb.txReadBufferCache.buf = bufb.txReadBufferCache.bufVersion = curBufVer}b.txReadBufferCache.mu.Unlock()// concurrentReadTx is not supposed to write to its txReadBufferreturn &concurrentReadTx{baseReadTx: baseReadTx{buf: *buf,txMu: b.readTx.txMu,tx: b.readTx.tx,buckets: b.readTx.buckets,txWg: b.readTx.txWg,},}}func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {// It needs to lock the batchTx, because the periodic commit// may be accessing the txPostLockInsideApplyHook at the moment.b.batchTx.lock()defer b.batchTx.Unlock()b.txPostLockInsideApplyHook = hook}func (b *backend) ForceCommit() {b.batchTx.Commit()}func (b *backend) Snapshot() Snapshot {b.batchTx.Commit()b.mu.RLock()defer b.mu.RUnlock()tx, err := b.db.Begin(false)if err != nil {b.lg.Fatal("failed to begin tx", zap.Error(err))}stopc, donec := make(chan struct{}), make(chan struct{})dbBytes := tx.Size()go func() {defer close(donec)// sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection// assuming a min tcp throughput of 100MB/s.var sendRateBytes int64 = 100 * 1024 * 1024warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))if warningTimeout < minSnapshotWarningTimeout {warningTimeout = minSnapshotWarningTimeout}start := time.Now()ticker := time.NewTicker(warningTimeout)defer ticker.Stop()for {select {case <-ticker.C:b.lg.Warn("snapshotting taking too long to transfer",zap.Duration("taking", time.Since(start)),zap.Int64("bytes", dbBytes),zap.String("size", humanize.Bytes(uint64(dbBytes))),)case <-stopc:snapshotTransferSec.Observe(time.Since(start).Seconds())return}}}()return &snapshot{tx, stopc, donec}}func (b *backend) Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error) {h := crc32.New(crc32.MakeTable(crc32.Castagnoli))b.mu.RLock()defer b.mu.RUnlock()err := b.db.View(func(tx *bolt.Tx) error {c := tx.Cursor()for next, _ := c.First(); next != nil; next, _ = c.Next() {b := tx.Bucket(next)if b == nil {return fmt.Errorf("cannot get hash of bucket %s", string(next))}h.Write(next)b.ForEach(func(k, v []byte) error {if ignores != nil && !ignores(next, k) {h.Write(k)h.Write(v)}return nil})}return nil})if err != nil {return 0, err}return h.Sum32(), nil}func (b *backend) Size() int64 {return atomic.LoadInt64(&b.size)}func (b *backend) SizeInUse() int64 {return atomic.LoadInt64(&b.sizeInUse)}func (b *backend) Close() error {close(b.stopc)<-b.donecb.mu.Lock()defer b.mu.Unlock()return b.db.Close()}func (b *backend) Commits() int64 {return atomic.LoadInt64(&b.commits)}func (b *backend) OpenReadTxN() int64 {return atomic.LoadInt64(&b.openReadTxN)}func (b *backend) Defrag() error {return b.defrag()}func (b *backend) defrag() error {now := time.Now()isDefragActive.Set(1)defer isDefragActive.Set(0)// TODO: make this non-blocking?// lock batchTx to ensure nobody is using previous tx, and then// close previous ongoing tx.b.batchTx.LockOutsideApply()defer b.batchTx.Unlock()// lock database after lock tx to avoid deadlock.b.mu.Lock()defer b.mu.Unlock()// block concurrent read requests while resetting txb.readTx.Lock()defer b.readTx.Unlock()b.batchTx.unsafeCommit(true)b.batchTx.tx = nil// Create a temporary file to ensure we start with a clean slate.// Snapshotter.cleanupSnapdir cleans up any of these that are found during startup.dir := filepath.Dir(b.db.Path())temp, err := os.CreateTemp(dir, "db.tmp.*")if err != nil {return err}options := bolt.Options{}if boltOpenOptions != nil {options = *boltOpenOptions}options.OpenFile = func(_ string, _ int, _ os.FileMode) (file *os.File, err error) {return temp, nil}// Don't load tmp db into memory regardless of opening optionsoptions.Mlock = falsetdbp := temp.Name()tmpdb, err := bolt.Open(tdbp, 0600, &options)if err != nil {return err}dbp := b.db.Path()size1, sizeInUse1 := b.Size(), b.SizeInUse()if b.lg != nil {b.lg.Info("defragmenting",zap.String("path", dbp),zap.Int64("current-db-size-bytes", size1),zap.String("current-db-size", humanize.Bytes(uint64(size1))),zap.Int64("current-db-size-in-use-bytes", sizeInUse1),zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),)}// gofail: var defragBeforeCopy struct{}err = defragdb(b.db, tmpdb, defragLimit)if err != nil {tmpdb.Close()if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr))}return err}err = b.db.Close()if err != nil {b.lg.Fatal("failed to close database", zap.Error(err))}err = tmpdb.Close()if err != nil {b.lg.Fatal("failed to close tmp database", zap.Error(err))}// gofail: var defragBeforeRename struct{}err = os.Rename(tdbp, dbp)if err != nil {b.lg.Fatal("failed to rename tmp database", zap.Error(err))}b.db, err = bolt.Open(dbp, 0600, b.bopts)if err != nil {b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))}b.batchTx.tx = b.unsafeBegin(true)b.readTx.reset()b.readTx.tx = b.unsafeBegin(false)size := b.readTx.tx.Size()db := b.readTx.tx.DB()atomic.StoreInt64(&b.size, size)atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))took := time.Since(now)defragSec.Observe(took.Seconds())size2, sizeInUse2 := b.Size(), b.SizeInUse()if b.lg != nil {b.lg.Info("finished defragmenting directory",zap.String("path", dbp),zap.Int64("current-db-size-bytes-diff", size2-size1),zap.Int64("current-db-size-bytes", size2),zap.String("current-db-size", humanize.Bytes(uint64(size2))),zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),zap.Int64("current-db-size-in-use-bytes", sizeInUse2),zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),zap.Duration("took", took),)}return nil}func (b *backend) run() {defer close(b.donec)t := time.NewTimer(b.batchInterval)defer t.Stop()for {select {case <-t.C:case <-b.stopc:b.batchTx.CommitAndStop()return}if b.batchTx.safePending() != 0 {b.batchTx.Commit()}t.Reset(b.batchInterval)}}// 开启事务并更新statsfunc (b *backend) begin(write bool) *bolt.Tx {b.mu.RLock()tx := b.unsafeBegin(write)b.mu.RUnlock()size := tx.Size()db := tx.DB()stats := db.Stats()atomic.StoreInt64(&b.size, size)atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))return tx}func (b *backend) unsafeBegin(write bool) *bolt.Tx {tx, err := b.db.Begin(write)if err != nil {b.lg.Fatal("failed to begin tx", zap.Error(err))}return tx}
PS:helper函数
// 调用boltdb for_range api// 根据endKey是否为空决定遍历多少数据func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {if limit <= 0 {limit = math.MaxInt64}var isMatch func(b []byte) boolif len(endKey) > 0 {isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }} else {isMatch = func(b []byte) bool { return bytes.Equal(b, key) }limit = 1}for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {vs = append(vs, cv)keys = append(keys, ck)if limit == int64(len(keys)) {break}}return keys, vs}// 调用boltdb for_each apifunc unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) error {if b := tx.Bucket(bucket.Name()); b != nil {return b.ForEach(visitor)}return nil}
2、第二层封装(kv_store模块)
2.1、kv_store接口介绍
type RangeOptions struct {Limit int64Rev int64Count bool}type RangeResult struct {KVs []mvccpb.KeyValueRev int64Count int}type ReadView interface {// FirstRev returns the first KV revision at the time of opening the txn.// After a compaction, the first revision increases to the compaction// revision.FirstRev() int64// Rev returns the revision of the KV at the time of opening the txn.Rev() int64// Range gets the keys in the range at rangeRev.// The returned rev is the current revision of the KV when the operation is executed.// If rangeRev <=0, range gets the keys at currentRev.// If `end` is nil, the request returns the key.// If `end` is not nil and not empty, it gets the keys in range [key, range_end).// If `end` is not nil and empty, it gets the keys greater than or equal to key.// Limit limits the number of keys returned.// If the required rev is compacted, ErrCompacted will be returned.Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error)}// TxnRead represents a read-only transaction with operations that will not// block other read transactions.type TxnRead interface {ReadView// End marks the transaction is complete and ready to commit.End()}type WriteView interface {// DeleteRange deletes the given range from the store.// A deleteRange increases the rev of the store if any key in the range exists.// The number of key deleted will be returned.// The returned rev is the current revision of the KV when the operation is executed.// It also generates one event for each key delete in the event history.// if the `end` is nil, deleteRange deletes the key.// if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).DeleteRange(key, end []byte) (n, rev int64)// Put puts the given key, value into the store. Put also takes additional argument lease to// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease// id.// A put also increases the rev of the store, and generates one event in the event history.// The returned rev is the current revision of the KV when the operation is executed.Put(key, value []byte, lease lease.LeaseID) (rev int64)}// TxnWrite represents a transaction that can modify the store.type TxnWrite interface {TxnReadWriteView// Changes gets the changes made since opening the write txn.Changes() []mvccpb.KeyValue}// txnReadWrite coerces a read txn to a write, panicking on any write operation.type txnReadWrite struct{ TxnRead }func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("unexpected DeleteRange") }func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {panic("unexpected Put")}func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil }func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} }type ReadTxMode uint32const (// Use ConcurrentReadTx and the txReadBuffer is copiedConcurrentReadTxMode = ReadTxMode(1)// Use backend ReadTx and txReadBuffer is not copiedSharedBufReadTxMode = ReadTxMode(2))type KV interface {ReadViewWriteView// Read creates a read transaction.Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead// Write creates a write transaction.Write(trace *traceutil.Trace) TxnWrite// HashStorage returns HashStorage interface for KV storage.HashStorage() HashStorage// Compact frees all superseded keys with revisions less than rev.Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)// Commit commits outstanding txns into the underlying backend.Commit()// Restore restores the KV store from a backend.Restore(b backend.Backend) errorClose() error}// WatchableKV is a KV that can be watched.type WatchableKV interface {KVWatchable}// Watchable is the interface that wraps the NewWatchStream function.type Watchable interface {// NewWatchStream returns a WatchStream that can be used to// watch events happened or happening on the KV.NewWatchStream() WatchStream}
2.2、kv_store接口实现
2.2.1、read/write view实现
可以看到read/write view本质上还是封装了读写事务,同时读操作默认都是并发读 同时也可以看到,使用读写事务的流程为
- Read/Write
- 相关读写操作
- End
type readView struct{ kv KV }func (rv *readView) FirstRev() int64 {tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())defer tr.End()return tr.FirstRev()}func (rv *readView) Rev() int64 {tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())defer tr.End()return tr.Rev()}func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())defer tr.End()return tr.Range(ctx, key, end, ro)}type writeView struct{ kv KV }func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {tw := wv.kv.Write(traceutil.TODO())defer tw.End()return tw.DeleteRange(key, end)}func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {tw := wv.kv.Write(traceutil.TODO())defer tw.End()return tw.Put(key, value, lease)}
2.2.2、read事务封装
type storeTxnRead struct {s *storetx backend.ReadTxfirstRev int64rev int64}func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {s.mu.RLock()s.revMu.RLock()var tx backend.ReadTxif mode == ConcurrentReadTxMode {tx = s.b.ConcurrentReadTx()} else {tx = s.b.ReadTx()}// Read事务开启的时候需要获取revMu.RLock()// 因为Read事务可以并发执行,且currentRev修改由写事务修改// 因此需要上读锁tx.RLock()firstRev, rev := s.compactMainRev, s.currentRevs.revMu.RUnlock()return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})}func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }func (tr *storeTxnRead) Rev() int64 { return tr.rev }func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)}func (tr *storeTxnRead) End() {tr.tx.RUnlock()tr.s.mu.RUnlock()}func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {rev := ro.Revif rev > curRev {return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev}if rev <= 0 {rev = curRev}if rev < tr.s.compactMainRev {return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted}if ro.Count {total := tr.s.kvindex.CountRevisions(key, end, rev)tr.trace.Step("count revisions from in-memory index tree")return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil}revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))if len(revpairs) == 0 {return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil}limit := int(ro.Limit)if limit <= 0 || limit > len(revpairs) {limit = len(revpairs)}kvs := make([]mvccpb.KeyValue, limit)revBytes := newRevBytes()for i, revpair := range revpairs[:len(kvs)] {select {case <-ctx.Done():return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err())default:}revToBytes(revpair, revBytes)_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)if len(vs) != 1 {tr.s.lg.Fatal("range failed to find revision pair",zap.Int64("revision-main", revpair.main),zap.Int64("revision-sub", revpair.sub),)}if err := kvs[i].Unmarshal(vs[0]); err != nil {tr.s.lg.Fatal("failed to unmarshal mvccpb.KeyValue",zap.Error(err),)}}tr.trace.Step("range keys from bolt db")return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil}
2.2.3、write事务封装
type storeTxnWrite struct {storeTxnReadtx backend.BatchTx// beginRev is the revision where the txn begins; it will write to the next revision.beginRev int64changes []mvccpb.KeyValue}func (s *store) Write(trace *traceutil.Trace) TxnWrite {s.mu.RLock()tx := s.b.BatchTx()tx.LockInsideApply()// 写事务只能同时开启一个,因此此处获取currentRev不需要上revMu锁tw := &storeTxnWrite{storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},tx: tx,beginRev: s.currentRev,changes: make([]mvccpb.KeyValue, 0, 4),}return newMetricsTxnWrite(tw)}func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }func (tw *storeTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {rev := tw.beginRevif len(tw.changes) > 0 {rev++}return tw.rangeKeys(ctx, key, end, rev, ro)}func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {return n, tw.beginRev + 1}return 0, tw.beginRev}func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {tw.put(key, value, lease)return tw.beginRev + 1}func (tw *storeTxnWrite) End() {// only update index if the txn modifies the mvcc state.if len(tw.changes) != 0 {// hold revMu lock to prevent new read txns from opening until writeback.tw.s.revMu.Lock()tw.s.currentRev++}tw.tx.Unlock()if len(tw.changes) != 0 {tw.s.revMu.Unlock()}tw.s.mu.RUnlock()}func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {rev := tw.beginRev + 1c := revoldLease := lease.NoLease// if the key exists before, use its previous created and// get its previous leaseID_, created, ver, err := tw.s.kvindex.Get(key, rev)if err == nil {c = created.mainoldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})tw.trace.Step("get key's previous created_revision and leaseID")}ibytes := newRevBytes()idxRev := revision{main: rev, sub: int64(len(tw.changes))}revToBytes(idxRev, ibytes)ver = ver + 1kv := mvccpb.KeyValue{Key: key,Value: value,CreateRevision: c,ModRevision: rev,Version: ver,Lease: int64(leaseID),}d, err := kv.Marshal()if err != nil {tw.storeTxnRead.s.lg.Fatal("failed to marshal mvccpb.KeyValue",zap.Error(err),)}tw.trace.Step("marshal mvccpb.KeyValue")tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)tw.s.kvindex.Put(key, idxRev)tw.changes = append(tw.changes, kv)tw.trace.Step("store kv pair into bolt db")if oldLease == leaseID {tw.trace.Step("attach lease to kv pair")return}if oldLease != lease.NoLease {if tw.s.le == nil {panic("no lessor to detach lease")}err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})if err != nil {tw.storeTxnRead.s.lg.Error("failed to detach old lease from a key",zap.Error(err),)}}if leaseID != lease.NoLease {if tw.s.le == nil {panic("no lessor to attach lease")}err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})if err != nil {panic("unexpected error from lease Attach")}}tw.trace.Step("attach lease to kv pair")}func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {rrev := tw.beginRevif len(tw.changes) > 0 {rrev++}keys, _ := tw.s.kvindex.Range(key, end, rrev)if len(keys) == 0 {return 0}for _, key := range keys {tw.delete(key)}return int64(len(keys))}func (tw *storeTxnWrite) delete(key []byte) {ibytes := newRevBytes()idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}revToBytes(idxRev, ibytes)ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)kv := mvccpb.KeyValue{Key: key}d, err := kv.Marshal()if err != nil {tw.storeTxnRead.s.lg.Fatal("failed to marshal mvccpb.KeyValue",zap.Error(err),)}tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)err = tw.s.kvindex.Tombstone(key, idxRev)if err != nil {tw.storeTxnRead.s.lg.Fatal("failed to tombstone an existing key",zap.String("key", string(key)),zap.Error(err),)}tw.changes = append(tw.changes, kv)item := lease.LeaseItem{Key: string(key)}leaseID := tw.s.le.GetLease(item)if leaseID != lease.NoLease {err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})if err != nil {tw.storeTxnRead.s.lg.Error("failed to detach old lease from a key",zap.Error(err),)}}}
