背景
用 golang 写的数据库不多,boltdb 是一个非常优秀的开源项目,目前虽然是 achieve 不维护了,但是依然有很多项目都在使用,比如在云原生领域最火的 etcd。笔者一直想学习 etcd,借此先把它的底层存储 boltdb 搞定。
boltdb 是一个B+树实现的嵌入式数据库,里面实现了 ACID 事务,对于理解数据库实现,理解 golang 的指针操作有非常大的帮助。 关于 B+树的概念本篇文章不做赘述,可以去网上搜下具体的原理和使用场景。
架构
boltdb的核心代码在 5K 行左右,我这里 fork 的是 v1.3.1 版本。
一个 db 一个文件,db 下可以有多个 bucket,bucket 类似于表,里面可以嵌套 bucket,也可以直接存放 kv 数据。一个 bucket 就是一颗 B+树,B+树的节点是 node,一个 node 可以存放多个 kv 数据,一个 kv 数据在内部是用 inode 来表示的,inode 映射到 page 的结构里,page 是直接和磁盘打交道的,也就是说在内存里的指针,在磁盘里就表现为 offset。
在 bucket 找数据,需要 cursor。 而数据的读和写需要事务,事务也是和 bucket 绑定的。接下来我们来一个个分析。
核心概念
page
首先先看下 page 的结构。
type pgid uint64
type page struct {
id pgid
flags uint16
count uint16
overflow uint32
ptr uintptr
}
// typ returns a human readable page type string used for debugging.
func (p *page) typ() string {
if (p.flags & branchPageFlag) != 0 {
return "branch"
} else if (p.flags & leafPageFlag) != 0 {
return "leaf"
} else if (p.flags & metaPageFlag) != 0 {
return "meta"
} else if (p.flags & freelistPageFlag) != 0 {
return "freelist"
}
return fmt.Sprintf("unknown<%02x>", p.flags)
}
page 可以存放很多结构类型的数据,在 boltdb 里,page 可以放 branch 枝干,leaf 叶子节点,可以放元信息,可以放特殊的 freelist 分配器。每个 page 都有唯一的 pageid。 ptr 指向的是 page 的数据内容,比如是kv 数据的话就代表 kv 数据的开始位置。
node
node 是 page 的内存结构,page 是非常底层的结构了,node 会更像 B+ 树的 node,可读性更好。一个 node 对应多个 page,持有 pageid 引用。
// node represents an in-memory, deserialized page.
type node struct {
bucket *Bucket
isLeaf bool
unbalanced bool
spilled bool
key []byte
pgid pgid // 持有对应的 page 的 id
parent *node
children nodes
inodes inodes
}
node 里面实际存放数据的是 inode。
// inode represents an internal node inside of a node.
// It can be used to point to elements in a page or point
// to an element which hasn't been added to a page yet.
// inode 其实 node 的内部结构,才是持有 page 的 id
type inode struct {
flags uint32 // 会存放好几种 flag,有 bucket 的 flag
pgid pgid
key []byte // 存放了真实的数据
value []byte
}
type inodes []inode
对于 node 的详细细节我这里不会描述的太多,很多时候我们只需要知道这个函数是做什么的就行了。
bucket
bucket 是表,一个 bucket 可以存放多个 kv 数据。
// Bucket represents a collection of key/value pairs inside the database.
type Bucket struct {
*bucket
tx *Tx // the associated transaction
buckets map[string]*Bucket // subbucket cache
page *page // inline page reference
rootNode *node // materialized node for the root page.
nodes map[pgid]*node // node cache
// Sets the threshold for filling nodes when they split. By default,
// the bucket will fill to 50% but it can be useful to increase this
// amount if you know that your write workloads are mostly append-only.
//
// This is non-persisted across transactions so it must be set in every Tx.
FillPercent float64
}
// bucket represents the on-file representation of a bucket.
// This is stored as the "value" of a bucket key. If the bucket is small enough,
// then its root page can be stored inline in the "value", after the bucket
// header. In the case of inline buckets, the "root" will be 0.
type bucket struct {
root pgid // page id of the bucket's root-level page
sequence uint64 // monotonically incrementing, used by NextSequence()
}
从代码可以看出,bucket 支持嵌套 bucket。一个 bucket 有一个 tx 事务绑定。bucket 的常用操作有如下。
// Cursor creates a cursor associated with the bucket.
// The cursor is only valid as long as the transaction is open.
// Do not use a cursor after the transaction is closed.
func (b *Bucket) Cursor() *Cursor {
// Update transaction statistics.
b.tx.stats.CursorCount++
// Allocate and return a cursor.
return &Cursor{
bucket: b,
stack: make([]elemRef, 0),
}
}
遍历bucket,cursor 是每次需要的时候临时创建的,也就是说生命周期是跟一次遍历相同。
// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
func (b *Bucket) Get(key []byte) []byte {
k, v, flags := b.Cursor().seek(key)
// Return nil if this is a bucket.
if (flags & bucketLeafFlag) != 0 {
return nil
}
// If our target node isn't the same key as what's passed in then return nil.
if !bytes.Equal(key, k) {
return nil
}
return v
}
从 bucket 里获取 KV 数据。可以看到获取是通过 cursor 游标来遍历的,这个就是 B+树的查找过程。
// Put sets the value for a key in the bucket.
// If the key exist then its previous value will be overwritten.
// Supplied value must remain valid for the life of the transaction.
// Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
func (b *Bucket) Put(key []byte, value []byte) error {
if b.tx.db == nil {
return ErrTxClosed
} else if !b.Writable() {
return ErrTxNotWritable
} else if len(key) == 0 {
return ErrKeyRequired
} else if len(key) > MaxKeySize {
return ErrKeyTooLarge
} else if int64(len(value)) > MaxValueSize {
return ErrValueTooLarge
}
// Move cursor to correct position.
// 找到合适的位置
c := b.Cursor()
k, _, flags := c.seek(key)
// Return an error if there is an existing key with a bucket value.
if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 {
return ErrIncompatibleValue
}
// Insert into node.
// 插入到合适的 node 里
key = cloneBytes(key)
c.node().put(key, key, value, 0, 0)
return nil
}
插入操作也很简单,通过 cursor 找到 B+树合适的位置,返回 c,当前 c 持有的 node 就是合适的位置,把 kv 塞进去即可。
// Delete removes a key from the bucket.
// If the key does not exist then nothing is done and a nil error is returned.
// Returns an error if the bucket was created from a read-only transaction.
func (b *Bucket) Delete(key []byte) error {
if b.tx.db == nil {
return ErrTxClosed
} else if !b.Writable() {
return ErrTxNotWritable
}
// Move cursor to correct position.
c := b.Cursor()
_, _, flags := c.seek(key)
// Return an error if there is already existing bucket value.
if (flags & bucketLeafFlag) != 0 {
return ErrIncompatibleValue
}
// Delete the node if we have a matching key.
c.node().del(key)
return nil
}
删除也很简单,通过 cursor 定位到数据,删掉即可。
// ForEach executes a function for each key/value pair in a bucket.
// If the provided function returns an error then the iteration is stopped and
// the error is returned to the caller. The provided function must not modify
// the bucket; this will result in undefined behavior.
func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
if b.tx.db == nil {
return ErrTxClosed
}
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
if err := fn(k, v); err != nil {
return err
}
}
return nil
}
遍历依然是通过 cursor,从第一个到最后一个,可以传入一个回调函数。
cursor
cursor 是操作 B+ 树的封装。 boltdb 的 B+树有一点不一样,叶子节点没有通过链表连起来,所以cursor 里有个 stack 结构,保存遍历的路径。
db
boltdb 的数据库操作封装为 db 结构,这个结构是文件的操作封装,在这里把文件操作变为内存操作。很有意思,我们先来看下。
首先是指定文件路径,然后通过mmap 把文件映射为内存结构。
// mmap opens the underlying memory-mapped file and initializes the meta references.
// minsz is the minimum size that the new mmap can be.
// 文件映射为内存结构
func (db *DB) mmap(minsz int) error {
db.mmaplock.Lock()
defer db.mmaplock.Unlock()
info, err := db.file.Stat()
if err != nil {
return fmt.Errorf("mmap stat error: %s", err)
} else if int(info.Size()) < db.pageSize*2 {
return fmt.Errorf("file size too small")
}
// Ensure the size is at least the minimum size.
var size = int(info.Size())
if size < minsz {
size = minsz
}
size, err = db.mmapSize(size)
if err != nil {
return err
}
// Dereference all mmap references before unmapping.
if db.rwtx != nil {
db.rwtx.root.dereference()
}
// Unmap existing data before continuing.
if err := db.munmap(); err != nil {
return err
}
// Memory-map the data file as a byte slice.
if err := mmap(db, size); err != nil {
return err
}
// Save references to the meta pages.
db.meta0 = db.page(0).meta()
db.meta1 = db.page(1).meta()
// Validate the meta pages. We only return an error if both meta pages fail
// validation, since meta0 failing validation means that it wasn't saved
// properly -- but we can recover using meta1. And vice-versa.
err0 := db.meta0.validate()
err1 := db.meta1.validate()
if err0 != nil && err1 != nil {
return err0
}
return nil
}
默认文件大小是 16KB,boltdb 会扩容为 32KB,如果还是太小,会指数扩容到 1G,之后是 1G 的增幅扩大。
mmap 是通过系统调用,把文件的操作映射为内存的操作。读文件变成了读内存,大大提升了读效率。
// mmap memory maps a DB's data file.
// DB 数据映射为内存结构
func mmap(db *DB, sz int) error {
// Map the data file to memory.
b, err := syscall.Mmap(int(db.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED|db.MmapFlags)
if err != nil {
return err
}
// Advise the kernel that the mmap is accessed randomly.
if err := madvise(b, syscall.MADV_RANDOM); err != nil {
return fmt.Errorf("madvise: %s", err)
}
// Save the original byte slice and convert to a byte array pointer.
db.dataref = b
db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0]))
db.datasz = sz
return nil
}
读变成了读内存,所以性能提升,写是通过写文件直接写入,不是通过内存操作完成的。所以,如果 boltdb 有大量的写会导致大量的 IO 操作,并且会涉及到大量的 B+树平衡性调整,导致性能下降。
type DB struct {
...
ops struct {
writeAt func(b []byte, off int64) (n int, err error) // hook 了文件的写入
}
}
---
// Default values for test hooks
db.ops.writeAt = db.file.WriteAt
---
// Write the buffer to our data file.
// 真正落地到磁盘文件,还是在这里
if _, err := db.ops.writeAt(buf, 0); err != nil {
return err
}
对外操作的函数。
Begin() 返回事务操作封装 TX,自己独立去操作事务 | ||
---|---|---|
Update() 对写事务操作的封装 | ||
View() 对读事务的封装 | ||
Batch() 对批量操作的封装,可以减少 IO 的操作 |
事务
boltdb 的事务实现了 ACID,支持读读并发和一个读写,2 个可以并存。因为读操作是直接读的内存,而写操作直接写的是磁盘。
boltdb 的事务是通过 golang 的锁来实现的。写操作互斥用的是 rwlock 来实现的。
func (db *DB) beginRWTx() (*Tx, error) {
// If the database was opened with Options.ReadOnly, return an error.
if db.readOnly {
return nil, ErrDatabaseReadOnly
}
// Obtain writer lock. This is released by the transaction when it closes.
// This enforces only one writer transaction at a time.
db.rwlock.Lock()
// Once we have the writer lock then we can lock the meta pages so that
// we can set up the transaction.
db.metalock.Lock()
defer db.metalock.Unlock()
// Exit if the database is not open yet.
if !db.opened {
db.rwlock.Unlock()
return nil, ErrDatabaseNotOpen
}
// Create a transaction associated with the database.
t := &Tx{writable: true}
t.init(db)
db.rwtx = t
// Free any pages associated with closed read-only transactions.
var minid txid = 0xFFFFFFFFFFFFFFFF
for _, t := range db.txs {
if t.meta.txid < minid {
minid = t.meta.txid
}
}
if minid > 0 {
db.freelist.release(minid - 1)
}
return t, nil
}
写事务会加锁并且更新 meta 里的 txid。 在 db 的结构里获取到锁之后,会在 tx.go 里结束事务之后释放,或者是 rollback回滚之后释放。
func (tx *Tx) close() {
if tx.db == nil {
return
}
if tx.writable {
// Remove transaction ref & writer lock.
tx.db.rwtx = nil
tx.db.rwlock.Unlock()
...
} else {
tx.db.removeTx(tx)
}
// Clear all references.
tx.db = nil
tx.meta = nil
tx.root = Bucket{tx: tx}
tx.pages = nil
}
只读事务。
func (db *DB) beginTx() (*Tx, error) {
// Lock the meta pages while we initialize the transaction. We obtain
// the meta lock before the mmap lock because that's the order that the
// write transaction will obtain them.
db.metalock.Lock()
// Obtain a read-only lock on the mmap. When the mmap is remapped it will
// obtain a write lock so all transactions must finish before it can be
// remapped.
db.mmaplock.RLock()
// Exit if the database is not open yet.
if !db.opened {
db.mmaplock.RUnlock()
db.metalock.Unlock()
return nil, ErrDatabaseNotOpen
}
// Create a transaction associated with the database.
t := &Tx{}
t.init(db)
// Keep track of transaction until it closes.
db.txs = append(db.txs, t)
n := len(db.txs)
// Unlock the meta pages.
db.metalock.Unlock()
// Update the transaction stats.
db.statlock.Lock()
db.stats.TxN++
db.stats.OpenTxN = n
db.statlock.Unlock()
return t, nil
}
先获取meta 的 lock,避免这个时候写事务进入写了 meta。之后锁住 mmap 内存结构,避免 mmap 重新映射。所有的只读事务都在 txs 里。
事务的提交或回滚。
// Rollback closes the transaction and ignores all previous updates. Read-only
// transactions must be rolled back and not committed.
func (tx *Tx) Rollback() error {
_assert(!tx.managed, "managed tx rollback not allowed")
if tx.db == nil {
return ErrTxClosed
}
tx.rollback()
return nil
}
func (tx *Tx) rollback() {
if tx.db == nil {
return
}
if tx.writable {
tx.db.freelist.rollback(tx.meta.txid)
tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
}
tx.close()
}
如果是只读事务直接 close-remove 就好了,如果是读写事务,还需要更新 freelist 里的 page。总的来说,只读事务只是记录了 Tx 的结构体而已,多了一把 S 锁,而写事务是排它锁,串行的。
总结
boltDB是一个完全由go语言编写的基于B+树的kv数据库,其完全支持事务的ACID特性,整个数据库只有一个文件,且有较高的读性能与加载时间。etcd的后端存储使用的便是基于boltdb优化的kv存储 etcd-io/bbolt。
至此基本上把 boltdb 的核心功能梳理完成了,一个小小的开源项目把数据库的方方面面都涉及到了,非常值得多看几遍,不管是工程还是算法,都值得借鉴。
参考资料
https://github.com/etcd-io/bbolt
https://github.com/boltdb/bolt/tree/v1.3.1
https://www.qtmuniao.com/2021/04/02/bolt-transaction/
https://mrcroxx.github.io/posts/code-reading/boltdb-made-simple/0-introduction/