etcd使用了boltdb的事务特性。但是优化了写,实现了batch write 源码会删除部分trace/log代码
1、etcd如何封装boltdb
1、第一层封装(backend模块)
此模块向MVCC模块提供了基础的CURD API,接口如下:
type Backend interface {
// 返回一个读事务
ReadTx() ReadTx
// 返回一个写事务
BatchTx() BatchTx
// 返回一个并发读事务(此处主要是优化写事务回写buf的锁逻辑,后续会详细说明)
ConcurrentReadTx() ReadTx
// 返回一个快照结构体(复用的blotdb的snapshot api)
Snapshot() Snapshot
// 暂时忽略
Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
// 返回存储模块分配的空间大小。本质上就是boltdb的文件大小(包含所有可用的free page)
Size() int64
// 返回存储模块实际使用的空间大小。本质上就是boltdb的正在使用的page,不包含可用的free page
SizeInUse() int64
// 返回当前正在打开的读事务数量
OpenReadTxN() int64
// 整理boltdb的磁盘布局。将数据重新按照连续page存储,降低使用的空间大小
// 底层就是将当前db的数据读取出来,然后写入新的db
Defrag() error
// 强制提交事务
ForceCommit()
// 底层逻辑是关闭异步提交事务的线程
Close() error
// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
SetTxPostLockInsideApplyHook(func())
}
type ReadTx interface {
// X锁主要用于写事务将未落盘的变更刷回读事务的buf
Lock()
Unlock()
// S锁用于读事务
RLock()
RUnlock()
// 读API
// 实际使用需要如下流程:
// tx.RLock()
// tx.UnsafeRange or tx.UnsafeForEach
// tx.RUnlock
UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
}
type BatchTx interface {
// 其中只实现了Lock/Unlock,未实现RLock/RUnlock
// 读API本质上同ReadTx,底层还是复用boltdb的读写事务能力
ReadTx
// 写API
// 实际使用需要如下流程:
// tx.Lock()
// tx.写API
// tx.Unlock
// 也因此ectd写本质上是串行的,也是因为boltdb也是串行的
UnsafeCreateBucket(bucket Bucket)
UnsafeDeleteBucket(bucket Bucket)
UnsafePut(bucket Bucket, key []byte, value []byte)
UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
UnsafeDelete(bucket Bucket, key []byte)
// 异步线程提交事务使用
Commit()
CommitAndStop()
// 与raft等相关,暂时忽略
LockInsideApply()
LockOutsideApply()
}
1.1、ReadTx实现
type baseReadTx struct {
// 保护buf字段
mu sync.RWMutex
// 保存未提交的写事务数据,保证读事务可以读取到新数据
buf txReadBuffer
// 保护boltdb的事务实例以及buckets
txMu *sync.RWMutex
tx *bolt.Tx
buckets map[BucketID]*bolt.Bucket
// 用于等待并发读事务结束
// 也是写事务提交的时候会用到
// 因为会重置读事务,但是又不互斥,因此通过waitGroup来保证重置读事务的时候
// 不存在还有读事务未完成
txWg *sync.WaitGroup
}
// 遍历给定bucket数据
func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
dups := make(map[string]struct{})
getDups := func(k, v []byte) error {
dups[string(k)] = struct{}{}
return nil
}
visitNoDup := func(k, v []byte) error {
if _, ok := dups[string(k)]; ok {
return nil
}
return visitor(k, v)
}
// 先从buf读取数据,读取到新数据
if err := baseReadTx.buf.ForEach(bucket, getDups); err != nil {
return err
}
// 然后先遍历已提交数据
baseReadTx.txMu.Lock()
err := unsafeForEach(baseReadTx.tx, bucket, visitNoDup)
baseReadTx.txMu.Unlock()
if err != nil {
return err
}
// 再便利未提交数据
return baseReadTx.buf.ForEach(bucket, visitor)
}
func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if endKey == nil {
limit = 1
}
if limit <= 0 {
limit = math.MaxInt64
}
if limit > 1 && !bucketType.IsSafeRangeBucket() {
panic("do not use unsafeRange on non-keys bucket")
}
// 如果buf中已经有全量数据,则直接返回
keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
if int64(len(keys)) == limit {
return keys, vals
}
// 否则从boltdb读取数据并将bucket进行缓存
bn := bucketType.ID()
baseReadTx.txMu.RLock()
bucket, ok := baseReadTx.buckets[bn]
baseReadTx.txMu.RUnlock()
lockHeld := false
if !ok {
baseReadTx.txMu.Lock()
lockHeld = true
bucket = baseReadTx.tx.Bucket(bucketType.Name())
baseReadTx.buckets[bn] = bucket
}
if bucket == nil {
if lockHeld {
baseReadTx.txMu.Unlock()
}
return keys, vals
}
if !lockHeld {
baseReadTx.txMu.Lock()
}
c := bucket.Cursor()
baseReadTx.txMu.Unlock()
k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
}
// 普通读
// 普通读因为要获取读锁,因此会阻塞写事务刷新未提交事务到写事务的buf
type readTx struct {
baseReadTx
}
func (rt *readTx) Lock() { rt.mu.Lock() }
func (rt *readTx) Unlock() { rt.mu.Unlock() }
func (rt *readTx) RLock() { rt.mu.RLock() }
func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
// 重置读事务
// 当写事务提交完成的时候会重置读事务
func (rt *readTx) reset() {
rt.buf.reset()
rt.buckets = make(map[BucketID]*bolt.Bucket)
rt.tx = nil
rt.txWg = new(sync.WaitGroup)
}
// 并发读
// 并发读lock均为空,因此不会阻塞写事务刷新未提交事务到写事务的buf
// 但是回copy一份读事务的buf
type concurrentReadTx struct {
baseReadTx
}
func (rt *concurrentReadTx) Lock() {}
func (rt *concurrentReadTx) Unlock() {}
func (rt *concurrentReadTx) RLock() {}
func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }
1.2、BatchTx实现
type batchTx struct {
sync.Mutex
tx *bolt.Tx
backend *backend
pending int // 待提交的事务数量
}
// 一般不会使用batchTx的Unlock
// 使用的是带buf的batchTx的Unlock
func (t *batchTx的) Lock() {
ValidateCalledInsideUnittest(t.backend.lg)
t.lock()
}
func (t *batchTx) Unlock() {
if t.pending >= t.backend.batchLimit {
t.commit(false)
}
t.Mutex.Unlock()
}
// LockInsideApply/LockOutsideApply先忽略
func (t *batchTx) LockInsideApply() {
t.lock()
if t.backend.txPostLockInsideApplyHook != nil {
// The callers of some methods (i.e., (*RaftCluster).AddMember)
// can be coming from both InsideApply and OutsideApply, but the
// callers from OutsideApply will have a nil txPostLockInsideApplyHook.
// So we should check the txPostLockInsideApplyHook before validating
// the callstack.
ValidateCalledInsideApply(t.backend.lg)
t.backend.txPostLockInsideApplyHook()
}
}
func (t *batchTx) LockOutsideApply() {
ValidateCalledOutSideApply(t.backend.lg)
t.lock()
}
func (t *batchTx) lock() {
t.Mutex.Lock()
}
// 写事务不允许调用RLock/RUnlock
func (t *batchTx) RLock() {
panic("unexpected RLock")
}
func (t *batchTx) RUnlock() {
panic("unexpected RUnlock")
}
// 以下为CURD API。均为调用boltdb api
// 不论是否成功,均pending++
// ???不管是否成功,均忽略确定没有问题???
func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
_, err := t.tx.CreateBucket(bucket.Name())
if err != nil && err != bolt.ErrBucketExists {
t.backend.lg.Fatal(
"failed to create a bucket",
zap.Stringer("bucket-name", bucket),
zap.Error(err),
)
}
t.pending++
}
func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {
err := t.tx.DeleteBucket(bucket.Name())
if err != nil && err != bolt.ErrBucketNotFound {
t.backend.lg.Fatal(
"failed to delete a bucket",
zap.Stringer("bucket-name", bucket),
zap.Error(err),
)
}
t.pending++
}
func (t *batchTx) UnsafePut(bucket Bucket, key []byte, value []byte) {
t.unsafePut(bucket, key, value, false)
}
func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
t.unsafePut(bucket, key, value, true)
}
func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketType.Name())
if bucket == nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.Stringer("bucket-name", bucketType),
zap.Stack("stack"),
)
}
if seq {
// it is useful to increase fill percent when the workloads are mostly append-only.
// this can delay the page split and reduce space usage.
bucket.FillPercent = 0.9
}
if err := bucket.Put(key, value); err != nil {
t.backend.lg.Fatal(
"failed to write to a bucket",
zap.Stringer("bucket-name", bucketType),
zap.Error(err),
)
}
t.pending++
}
func (t *batchTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
bucket := t.tx.Bucket(bucketType.Name())
if bucket == nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.Stringer("bucket-name", bucketType),
zap.Stack("stack"),
)
}
return unsafeRange(bucket.Cursor(), key, endKey, limit)
}
func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {
bucket := t.tx.Bucket(bucketType.Name())
if bucket == nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.Stringer("bucket-name", bucketType),
zap.Stack("stack"),
)
}
err := bucket.Delete(key)
if err != nil {
t.backend.lg.Fatal(
"failed to delete a key",
zap.Stringer("bucket-name", bucketType),
zap.Error(err),
)
}
t.pending++
}
func (t *batchTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
return unsafeForEach(t.tx, bucket, visitor)
}
// 用于异步提交API使用
func (t *batchTx) Commit() {
t.lock()
t.commit(false)
t.Unlock()
}
func (t *batchTx) CommitAndStop() {
t.lock()
t.commit(true)
t.Unlock()
}
func (t *batchTx) safePending() int {
t.Mutex.Lock()
defer t.Mutex.Unlock()
return t.pending
}
// 提交事务并重置写事务
func (t *batchTx) commit(stop bool) {
if t.tx != nil {
if t.pending == 0 && !stop {
return
}
start := time.Now()
// gofail: var beforeCommit struct{}
err := t.tx.Commit()
// gofail: var afterCommit struct{}
rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
commitSec.Observe(time.Since(start).Seconds())
atomic.AddInt64(&t.backend.commits, 1)
t.pending = 0
if err != nil {
t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
}
}
if !stop {
t.tx = t.backend.begin(true)
}
}
// 带有buf的batchTx
type batchTxBuffered struct {
batchTx
buf txWriteBuffer
}
func newBatchTxBuffered(backend *backend) *batchTxBuffered {
tx := &batchTxBuffered{
batchTx: batchTx{backend: backend},
buf: txWriteBuffer{
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
bucket2seq: make(map[BucketID]bool),
},
}
tx.Commit()
return tx
}
func (t *batchTxBuffered) Unlock() {
// 前面讲过写事务的使用方法
// 当前调用Unlock的时候,会认为写事务以及结束(实际上并未commit)
// 因此需要将当前的buf刷回到读事务那里去,保证可以读取到新数据
if t.pending != 0 {
t.backend.readTx.Lock()
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.Unlock()
// 如果已经pending数量超过限定数量,强制commit
if t.pending >= t.backend.batchLimit {
t.commit(false)
}
}
t.batchTx.Unlock()
}
func (t *batchTxBuffered) Commit() {
t.lock()
t.commit(false)
t.Unlock()
}
func (t *batchTxBuffered) CommitAndStop() {
t.lock()
t.commit(true)
t.Unlock()
}
func (t *batchTxBuffered) commit(stop bool) {
if t.backend.hooks != nil {
t.backend.hooks.OnPreCommitUnsafe(t)
}
// 此处commit会重置读事务,因此需要lock
t.backend.readTx.Lock()
t.unsafeCommit(stop)
t.backend.readTx.Unlock()
}
func (t *batchTxBuffered) unsafeCommit(stop bool) {
if t.backend.readTx.tx != nil {
go func(tx *bolt.Tx, wg *sync.WaitGroup) {
// 等待并发读事务全部完成,rollback读事务
wg.Wait()
if err := tx.Rollback(); err != nil {
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
}
}(t.backend.readTx.tx, t.backend.readTx.txWg)
t.backend.readTx.reset()
}
// 提交写事务
t.batchTx.commit(stop)
if !stop {
// 重置读事务
t.backend.readTx.tx = t.backend.begin(false)
}
}
// 修改操作会追加到buf里面
func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) {
t.batchTx.UnsafePut(bucket, key, value)
t.buf.put(bucket, key, value)
}
func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
t.batchTx.UnsafeSeqPut(bucket, key, value)
t.buf.putSeq(bucket, key, value)
}
1.3、backend模块实现
type backend struct {
size int64
sizeInUse int64
commits int64
openReadTxN int64
// mlock prevents backend database file to be swapped
mlock bool
mu sync.RWMutex
bopts *bolt.Options
db *bolt.DB
batchInterval time.Duration
batchLimit int
batchTx *batchTxBuffered
readTx *readTx
// 用以给batchTx换成读buf,否则每次都要copy
txReadBufferCache txReadBufferCache
stopc chan struct{}
donec chan struct{}
hooks Hooks
txPostLockInsideApplyHook func()
lg *zap.Logger
}
func New(cfg BackendConfig) Backend {
return newBackend(cfg)
}
func NewDefaultBackend(lg *zap.Logger, path string) Backend {
cfg := DefaultBackendConfig(lg)
cfg.Path = path
return newBackend(cfg)
}
func newBackend(cfg BackendConfig) *backend {
opts := &bolt.Options{}
if boltOpenOptions != nil {
*opts = *boltOpenOptions
}
opts.InitialMmapSize = cfg.mmapSize()
opts.FreelistType = cfg.BackendFreelistType
opts.NoSync = cfg.UnsafeNoFsync
opts.NoGrowSync = cfg.UnsafeNoFsync
opts.Mlock = cfg.Mlock
db, err := bolt.Open(cfg.Path, 0600, opts)
if err != nil {
cfg.Logger.Panic("failed to open database", zap.String("path", cfg.Path), zap.Error(err))
}
b := &backend{
bopts: opts,
db: db,
batchInterval: cfg.BatchInterval,
batchLimit: cfg.BatchLimit,
mlock: cfg.Mlock,
readTx: &readTx{
baseReadTx: baseReadTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
bufVersion: 0,
},
buckets: make(map[BucketID]*bolt.Bucket),
txWg: new(sync.WaitGroup),
txMu: new(sync.RWMutex),
},
},
txReadBufferCache: txReadBufferCache{
mu: sync.Mutex{},
bufVersion: 0,
buf: nil,
},
stopc: make(chan struct{}),
donec: make(chan struct{}),
lg: cfg.Logger,
}
b.batchTx = newBatchTxBuffered(b)
b.hooks = cfg.Hooks
// 定期commit线程
go b.run()
return b
}
func (b *backend) ReadTx() ReadTx {
return b.readTx
}
func (b *backend) BatchTx() BatchTx {
return b.batchTx
}
// ConcurrentReadTx creates and returns a new ReadTx, which:
// A) creates and keeps a copy of backend.readTx.txReadBuffer,
// B) references the boltdb read Tx (and its bucket cache) of current batch interval.
func (b *backend) ConcurrentReadTx() ReadTx {
b.readTx.RLock()
defer b.readTx.RUnlock()
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
b.readTx.txWg.Add(1)
// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
// inspect/update cache recency iff there's no ongoing update to the cache
// this falls through if there's no cache update
// by this line, "ConcurrentReadTx" code path is already protected against concurrent "writeback" operations
// which requires write lock to update "readTx.baseReadTx.buf".
// Which means setting "buf *txReadBuffer" with "readTx.buf.unsafeCopy()" is guaranteed to be up-to-date,
// whereas "txReadBufferCache.buf" may be stale from concurrent "writeback" operations.
// We only update "txReadBufferCache.buf" if we know "buf *txReadBuffer" is up-to-date.
// The update to "txReadBufferCache.buf" will benefit the following "ConcurrentReadTx" creation
// by avoiding copying "readTx.baseReadTx.buf".
b.txReadBufferCache.mu.Lock()
curCache := b.txReadBufferCache.buf
curCacheVer := b.txReadBufferCache.bufVersion
curBufVer := b.readTx.buf.bufVersion
isEmptyCache := curCache == nil
isStaleCache := curCacheVer != curBufVer
var buf *txReadBuffer
switch {
case isEmptyCache:
// perform safe copy of buffer while holding "b.txReadBufferCache.mu.Lock"
// this is only supposed to run once so there won't be much overhead
curBuf := b.readTx.buf.unsafeCopy()
buf = &curBuf
case isStaleCache:
// to maximize the concurrency, try unsafe copy of buffer
// release the lock while copying buffer -- cache may become stale again and
// get overwritten by someone else.
// therefore, we need to check the readTx buffer version again
b.txReadBufferCache.mu.Unlock()
curBuf := b.readTx.buf.unsafeCopy()
b.txReadBufferCache.mu.Lock()
buf = &curBuf
default:
// neither empty nor stale cache, just use the current buffer
buf = curCache
}
// txReadBufferCache.bufVersion can be modified when we doing an unsafeCopy()
// as a result, curCacheVer could be no longer the same as
// txReadBufferCache.bufVersion
// if !isEmptyCache && curCacheVer != b.txReadBufferCache.bufVersion
// then the cache became stale while copying "readTx.baseReadTx.buf".
// It is safe to not update "txReadBufferCache.buf", because the next following
// "ConcurrentReadTx" creation will trigger a new "readTx.baseReadTx.buf" copy
// and "buf" is still used for the current "concurrentReadTx.baseReadTx.buf".
if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
// continue if the cache is never set or no one has modified the cache
b.txReadBufferCache.buf = buf
b.txReadBufferCache.bufVersion = curBufVer
}
b.txReadBufferCache.mu.Unlock()
// concurrentReadTx is not supposed to write to its txReadBuffer
return &concurrentReadTx{
baseReadTx: baseReadTx{
buf: *buf,
txMu: b.readTx.txMu,
tx: b.readTx.tx,
buckets: b.readTx.buckets,
txWg: b.readTx.txWg,
},
}
}
func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {
// It needs to lock the batchTx, because the periodic commit
// may be accessing the txPostLockInsideApplyHook at the moment.
b.batchTx.lock()
defer b.batchTx.Unlock()
b.txPostLockInsideApplyHook = hook
}
func (b *backend) ForceCommit() {
b.batchTx.Commit()
}
func (b *backend) Snapshot() Snapshot {
b.batchTx.Commit()
b.mu.RLock()
defer b.mu.RUnlock()
tx, err := b.db.Begin(false)
if err != nil {
b.lg.Fatal("failed to begin tx", zap.Error(err))
}
stopc, donec := make(chan struct{}), make(chan struct{})
dbBytes := tx.Size()
go func() {
defer close(donec)
// sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection
// assuming a min tcp throughput of 100MB/s.
var sendRateBytes int64 = 100 * 1024 * 1024
warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
if warningTimeout < minSnapshotWarningTimeout {
warningTimeout = minSnapshotWarningTimeout
}
start := time.Now()
ticker := time.NewTicker(warningTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
b.lg.Warn(
"snapshotting taking too long to transfer",
zap.Duration("taking", time.Since(start)),
zap.Int64("bytes", dbBytes),
zap.String("size", humanize.Bytes(uint64(dbBytes))),
)
case <-stopc:
snapshotTransferSec.Observe(time.Since(start).Seconds())
return
}
}
}()
return &snapshot{tx, stopc, donec}
}
func (b *backend) Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error) {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
b.mu.RLock()
defer b.mu.RUnlock()
err := b.db.View(func(tx *bolt.Tx) error {
c := tx.Cursor()
for next, _ := c.First(); next != nil; next, _ = c.Next() {
b := tx.Bucket(next)
if b == nil {
return fmt.Errorf("cannot get hash of bucket %s", string(next))
}
h.Write(next)
b.ForEach(func(k, v []byte) error {
if ignores != nil && !ignores(next, k) {
h.Write(k)
h.Write(v)
}
return nil
})
}
return nil
})
if err != nil {
return 0, err
}
return h.Sum32(), nil
}
func (b *backend) Size() int64 {
return atomic.LoadInt64(&b.size)
}
func (b *backend) SizeInUse() int64 {
return atomic.LoadInt64(&b.sizeInUse)
}
func (b *backend) Close() error {
close(b.stopc)
<-b.donec
b.mu.Lock()
defer b.mu.Unlock()
return b.db.Close()
}
func (b *backend) Commits() int64 {
return atomic.LoadInt64(&b.commits)
}
func (b *backend) OpenReadTxN() int64 {
return atomic.LoadInt64(&b.openReadTxN)
}
func (b *backend) Defrag() error {
return b.defrag()
}
func (b *backend) defrag() error {
now := time.Now()
isDefragActive.Set(1)
defer isDefragActive.Set(0)
// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()
// lock database after lock tx to avoid deadlock.
b.mu.Lock()
defer b.mu.Unlock()
// block concurrent read requests while resetting tx
b.readTx.Lock()
defer b.readTx.Unlock()
b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil
// Create a temporary file to ensure we start with a clean slate.
// Snapshotter.cleanupSnapdir cleans up any of these that are found during startup.
dir := filepath.Dir(b.db.Path())
temp, err := os.CreateTemp(dir, "db.tmp.*")
if err != nil {
return err
}
options := bolt.Options{}
if boltOpenOptions != nil {
options = *boltOpenOptions
}
options.OpenFile = func(_ string, _ int, _ os.FileMode) (file *os.File, err error) {
return temp, nil
}
// Don't load tmp db into memory regardless of opening options
options.Mlock = false
tdbp := temp.Name()
tmpdb, err := bolt.Open(tdbp, 0600, &options)
if err != nil {
return err
}
dbp := b.db.Path()
size1, sizeInUse1 := b.Size(), b.SizeInUse()
if b.lg != nil {
b.lg.Info(
"defragmenting",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes", size1),
zap.String("current-db-size", humanize.Bytes(uint64(size1))),
zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
)
}
// gofail: var defragBeforeCopy struct{}
err = defragdb(b.db, tmpdb, defragLimit)
if err != nil {
tmpdb.Close()
if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr))
}
return err
}
err = b.db.Close()
if err != nil {
b.lg.Fatal("failed to close database", zap.Error(err))
}
err = tmpdb.Close()
if err != nil {
b.lg.Fatal("failed to close tmp database", zap.Error(err))
}
// gofail: var defragBeforeRename struct{}
err = os.Rename(tdbp, dbp)
if err != nil {
b.lg.Fatal("failed to rename tmp database", zap.Error(err))
}
b.db, err = bolt.Open(dbp, 0600, b.bopts)
if err != nil {
b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
}
b.batchTx.tx = b.unsafeBegin(true)
b.readTx.reset()
b.readTx.tx = b.unsafeBegin(false)
size := b.readTx.tx.Size()
db := b.readTx.tx.DB()
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
took := time.Since(now)
defragSec.Observe(took.Seconds())
size2, sizeInUse2 := b.Size(), b.SizeInUse()
if b.lg != nil {
b.lg.Info(
"finished defragmenting directory",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes-diff", size2-size1),
zap.Int64("current-db-size-bytes", size2),
zap.String("current-db-size", humanize.Bytes(uint64(size2))),
zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
zap.Duration("took", took),
)
}
return nil
}
func (b *backend) run() {
defer close(b.donec)
t := time.NewTimer(b.batchInterval)
defer t.Stop()
for {
select {
case <-t.C:
case <-b.stopc:
b.batchTx.CommitAndStop()
return
}
if b.batchTx.safePending() != 0 {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
}
}
// 开启事务并更新stats
func (b *backend) begin(write bool) *bolt.Tx {
b.mu.RLock()
tx := b.unsafeBegin(write)
b.mu.RUnlock()
size := tx.Size()
db := tx.DB()
stats := db.Stats()
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))
atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))
return tx
}
func (b *backend) unsafeBegin(write bool) *bolt.Tx {
tx, err := b.db.Begin(write)
if err != nil {
b.lg.Fatal("failed to begin tx", zap.Error(err))
}
return tx
}
PS:helper函数
// 调用boltdb for_range api
// 根据endKey是否为空决定遍历多少数据
func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
if limit <= 0 {
limit = math.MaxInt64
}
var isMatch func(b []byte) bool
if len(endKey) > 0 {
isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
} else {
isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
limit = 1
}
for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
vs = append(vs, cv)
keys = append(keys, ck)
if limit == int64(len(keys)) {
break
}
}
return keys, vs
}
// 调用boltdb for_each api
func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) error {
if b := tx.Bucket(bucket.Name()); b != nil {
return b.ForEach(visitor)
}
return nil
}
2、第二层封装(kv_store模块)
2.1、kv_store接口介绍
type RangeOptions struct {
Limit int64
Rev int64
Count bool
}
type RangeResult struct {
KVs []mvccpb.KeyValue
Rev int64
Count int
}
type ReadView interface {
// FirstRev returns the first KV revision at the time of opening the txn.
// After a compaction, the first revision increases to the compaction
// revision.
FirstRev() int64
// Rev returns the revision of the KV at the time of opening the txn.
Rev() int64
// Range gets the keys in the range at rangeRev.
// The returned rev is the current revision of the KV when the operation is executed.
// If rangeRev <=0, range gets the keys at currentRev.
// If `end` is nil, the request returns the key.
// If `end` is not nil and not empty, it gets the keys in range [key, range_end).
// If `end` is not nil and empty, it gets the keys greater than or equal to key.
// Limit limits the number of keys returned.
// If the required rev is compacted, ErrCompacted will be returned.
Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error)
}
// TxnRead represents a read-only transaction with operations that will not
// block other read transactions.
type TxnRead interface {
ReadView
// End marks the transaction is complete and ready to commit.
End()
}
type WriteView interface {
// DeleteRange deletes the given range from the store.
// A deleteRange increases the rev of the store if any key in the range exists.
// The number of key deleted will be returned.
// The returned rev is the current revision of the KV when the operation is executed.
// It also generates one event for each key delete in the event history.
// if the `end` is nil, deleteRange deletes the key.
// if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
DeleteRange(key, end []byte) (n, rev int64)
// Put puts the given key, value into the store. Put also takes additional argument lease to
// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
// id.
// A put also increases the rev of the store, and generates one event in the event history.
// The returned rev is the current revision of the KV when the operation is executed.
Put(key, value []byte, lease lease.LeaseID) (rev int64)
}
// TxnWrite represents a transaction that can modify the store.
type TxnWrite interface {
TxnRead
WriteView
// Changes gets the changes made since opening the write txn.
Changes() []mvccpb.KeyValue
}
// txnReadWrite coerces a read txn to a write, panicking on any write operation.
type txnReadWrite struct{ TxnRead }
func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("unexpected DeleteRange") }
func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
panic("unexpected Put")
}
func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil }
func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} }
type ReadTxMode uint32
const (
// Use ConcurrentReadTx and the txReadBuffer is copied
ConcurrentReadTxMode = ReadTxMode(1)
// Use backend ReadTx and txReadBuffer is not copied
SharedBufReadTxMode = ReadTxMode(2)
)
type KV interface {
ReadView
WriteView
// Read creates a read transaction.
Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead
// Write creates a write transaction.
Write(trace *traceutil.Trace) TxnWrite
// HashStorage returns HashStorage interface for KV storage.
HashStorage() HashStorage
// Compact frees all superseded keys with revisions less than rev.
Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)
// Commit commits outstanding txns into the underlying backend.
Commit()
// Restore restores the KV store from a backend.
Restore(b backend.Backend) error
Close() error
}
// WatchableKV is a KV that can be watched.
type WatchableKV interface {
KV
Watchable
}
// Watchable is the interface that wraps the NewWatchStream function.
type Watchable interface {
// NewWatchStream returns a WatchStream that can be used to
// watch events happened or happening on the KV.
NewWatchStream() WatchStream
}
2.2、kv_store接口实现
2.2.1、read/write view实现
可以看到read/write view本质上还是封装了读写事务,同时读操作默认都是并发读 同时也可以看到,使用读写事务的流程为
- Read/Write
- 相关读写操作
- End
type readView struct{ kv KV }
func (rv *readView) FirstRev() int64 {
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer tr.End()
return tr.FirstRev()
}
func (rv *readView) Rev() int64 {
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer tr.End()
return tr.Rev()
}
func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer tr.End()
return tr.Range(ctx, key, end, ro)
}
type writeView struct{ kv KV }
func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
tw := wv.kv.Write(traceutil.TODO())
defer tw.End()
return tw.DeleteRange(key, end)
}
func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
tw := wv.kv.Write(traceutil.TODO())
defer tw.End()
return tw.Put(key, value, lease)
}
2.2.2、read事务封装
type storeTxnRead struct {
s *store
tx backend.ReadTx
firstRev int64
rev int64
}
func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
s.mu.RLock()
s.revMu.RLock()
var tx backend.ReadTx
if mode == ConcurrentReadTxMode {
tx = s.b.ConcurrentReadTx()
} else {
tx = s.b.ReadTx()
}
// Read事务开启的时候需要获取revMu.RLock()
// 因为Read事务可以并发执行,且currentRev修改由写事务修改
// 因此需要上读锁
tx.RLock()
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})
}
func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
func (tr *storeTxnRead) Rev() int64 { return tr.rev }
func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
}
func (tr *storeTxnRead) End() {
tr.tx.RUnlock()
tr.s.mu.RUnlock()
}
func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
rev := ro.Rev
if rev > curRev {
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
}
if rev <= 0 {
rev = curRev
}
if rev < tr.s.compactMainRev {
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
}
if ro.Count {
total := tr.s.kvindex.CountRevisions(key, end, rev)
tr.trace.Step("count revisions from in-memory index tree")
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
}
revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
if len(revpairs) == 0 {
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
}
limit := int(ro.Limit)
if limit <= 0 || limit > len(revpairs) {
limit = len(revpairs)
}
kvs := make([]mvccpb.KeyValue, limit)
revBytes := newRevBytes()
for i, revpair := range revpairs[:len(kvs)] {
select {
case <-ctx.Done():
return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err())
default:
}
revToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
if len(vs) != 1 {
tr.s.lg.Fatal(
"range failed to find revision pair",
zap.Int64("revision-main", revpair.main),
zap.Int64("revision-sub", revpair.sub),
)
}
if err := kvs[i].Unmarshal(vs[0]); err != nil {
tr.s.lg.Fatal(
"failed to unmarshal mvccpb.KeyValue",
zap.Error(err),
)
}
}
tr.trace.Step("range keys from bolt db")
return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
}
2.2.3、write事务封装
type storeTxnWrite struct {
storeTxnRead
tx backend.BatchTx
// beginRev is the revision where the txn begins; it will write to the next revision.
beginRev int64
changes []mvccpb.KeyValue
}
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.LockInsideApply()
// 写事务只能同时开启一个,因此此处获取currentRev不需要上revMu锁
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
}
func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
func (tw *storeTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
rev := tw.beginRev
if len(tw.changes) > 0 {
rev++
}
return tw.rangeKeys(ctx, key, end, rev, ro)
}
func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
return n, tw.beginRev + 1
}
return 0, tw.beginRev
}
func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
tw.put(key, value, lease)
return tw.beginRev + 1
}
func (tw *storeTxnWrite) End() {
// only update index if the txn modifies the mvcc state.
if len(tw.changes) != 0 {
// hold revMu lock to prevent new read txns from opening until writeback.
tw.s.revMu.Lock()
tw.s.currentRev++
}
tw.tx.Unlock()
if len(tw.changes) != 0 {
tw.s.revMu.Unlock()
}
tw.s.mu.RUnlock()
}
func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
rev := tw.beginRev + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
tw.trace.Step("get key's previous created_revision and leaseID")
}
ibytes := newRevBytes()
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
if err != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
}
tw.trace.Step("marshal mvccpb.KeyValue")
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
tw.trace.Step("store kv pair into bolt db")
if oldLease == leaseID {
tw.trace.Step("attach lease to kv pair")
return
}
if oldLease != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to detach lease")
}
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
tw.storeTxnRead.s.lg.Error(
"failed to detach old lease from a key",
zap.Error(err),
)
}
}
if leaseID != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to attach lease")
}
err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease Attach")
}
}
tw.trace.Step("attach lease to kv pair")
}
func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
rrev := tw.beginRev
if len(tw.changes) > 0 {
rrev++
}
keys, _ := tw.s.kvindex.Range(key, end, rrev)
if len(keys) == 0 {
return 0
}
for _, key := range keys {
tw.delete(key)
}
return int64(len(keys))
}
func (tw *storeTxnWrite) delete(key []byte) {
ibytes := newRevBytes()
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
kv := mvccpb.KeyValue{Key: key}
d, err := kv.Marshal()
if err != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
}
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev)
if err != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to tombstone an existing key",
zap.String("key", string(key)),
zap.Error(err),
)
}
tw.changes = append(tw.changes, kv)
item := lease.LeaseItem{Key: string(key)}
leaseID := tw.s.le.GetLease(item)
if leaseID != lease.NoLease {
err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
if err != nil {
tw.storeTxnRead.s.lg.Error(
"failed to detach old lease from a key",
zap.Error(err),
)
}
}
}