背景

用 golang 写的数据库不多,boltdb 是一个非常优秀的开源项目,目前虽然是 achieve 不维护了,但是依然有很多项目都在使用,比如在云原生领域最火的 etcd。笔者一直想学习 etcd,借此先把它的底层存储 boltdb 搞定。

boltdb 是一个B+树实现的嵌入式数据库,里面实现了 ACID 事务,对于理解数据库实现,理解 golang 的指针操作有非常大的帮助。 关于 B+树的概念本篇文章不做赘述,可以去网上搜下具体的原理和使用场景。

架构

boltdb的核心代码在 5K 行左右,我这里 fork 的是 v1.3.1 版本。
image.png

image.png

一个 db 一个文件,db 下可以有多个 bucket,bucket 类似于表,里面可以嵌套 bucket,也可以直接存放 kv 数据。一个 bucket 就是一颗 B+树,B+树的节点是 node,一个 node 可以存放多个 kv 数据,一个 kv 数据在内部是用 inode 来表示的,inode 映射到 page 的结构里,page 是直接和磁盘打交道的,也就是说在内存里的指针,在磁盘里就表现为 offset。

在 bucket 找数据,需要 cursor。 而数据的读和写需要事务,事务也是和 bucket 绑定的。接下来我们来一个个分析。

核心概念

page

首先先看下 page 的结构。

  1. type pgid uint64
  2. type page struct {
  3. id pgid
  4. flags uint16
  5. count uint16
  6. overflow uint32
  7. ptr uintptr
  8. }
  9. // typ returns a human readable page type string used for debugging.
  10. func (p *page) typ() string {
  11. if (p.flags & branchPageFlag) != 0 {
  12. return "branch"
  13. } else if (p.flags & leafPageFlag) != 0 {
  14. return "leaf"
  15. } else if (p.flags & metaPageFlag) != 0 {
  16. return "meta"
  17. } else if (p.flags & freelistPageFlag) != 0 {
  18. return "freelist"
  19. }
  20. return fmt.Sprintf("unknown<%02x>", p.flags)
  21. }

page 可以存放很多结构类型的数据,在 boltdb 里,page 可以放 branch 枝干,leaf 叶子节点,可以放元信息,可以放特殊的 freelist 分配器。每个 page 都有唯一的 pageid。 ptr 指向的是 page 的数据内容,比如是kv 数据的话就代表 kv 数据的开始位置。

node

node 是 page 的内存结构,page 是非常底层的结构了,node 会更像 B+ 树的 node,可读性更好。一个 node 对应多个 page,持有 pageid 引用。

  1. // node represents an in-memory, deserialized page.
  2. type node struct {
  3. bucket *Bucket
  4. isLeaf bool
  5. unbalanced bool
  6. spilled bool
  7. key []byte
  8. pgid pgid // 持有对应的 page id
  9. parent *node
  10. children nodes
  11. inodes inodes
  12. }

node 里面实际存放数据的是 inode。

  1. // inode represents an internal node inside of a node.
  2. // It can be used to point to elements in a page or point
  3. // to an element which hasn't been added to a page yet.
  4. // inode 其实 node 的内部结构,才是持有 page 的 id
  5. type inode struct {
  6. flags uint32 // 会存放好几种 flag,有 bucket 的 flag
  7. pgid pgid
  8. key []byte // 存放了真实的数据
  9. value []byte
  10. }
  11. type inodes []inode

对于 node 的详细细节我这里不会描述的太多,很多时候我们只需要知道这个函数是做什么的就行了。
boltdb 深度剖析,从原理到源码 - 图3

bucket

bucket 是表,一个 bucket 可以存放多个 kv 数据。

  1. // Bucket represents a collection of key/value pairs inside the database.
  2. type Bucket struct {
  3. *bucket
  4. tx *Tx // the associated transaction
  5. buckets map[string]*Bucket // subbucket cache
  6. page *page // inline page reference
  7. rootNode *node // materialized node for the root page.
  8. nodes map[pgid]*node // node cache
  9. // Sets the threshold for filling nodes when they split. By default,
  10. // the bucket will fill to 50% but it can be useful to increase this
  11. // amount if you know that your write workloads are mostly append-only.
  12. //
  13. // This is non-persisted across transactions so it must be set in every Tx.
  14. FillPercent float64
  15. }
  16. // bucket represents the on-file representation of a bucket.
  17. // This is stored as the "value" of a bucket key. If the bucket is small enough,
  18. // then its root page can be stored inline in the "value", after the bucket
  19. // header. In the case of inline buckets, the "root" will be 0.
  20. type bucket struct {
  21. root pgid // page id of the bucket's root-level page
  22. sequence uint64 // monotonically incrementing, used by NextSequence()
  23. }

从代码可以看出,bucket 支持嵌套 bucket。一个 bucket 有一个 tx 事务绑定。bucket 的常用操作有如下。

  1. // Cursor creates a cursor associated with the bucket.
  2. // The cursor is only valid as long as the transaction is open.
  3. // Do not use a cursor after the transaction is closed.
  4. func (b *Bucket) Cursor() *Cursor {
  5. // Update transaction statistics.
  6. b.tx.stats.CursorCount++
  7. // Allocate and return a cursor.
  8. return &Cursor{
  9. bucket: b,
  10. stack: make([]elemRef, 0),
  11. }
  12. }

遍历bucket,cursor 是每次需要的时候临时创建的,也就是说生命周期是跟一次遍历相同。

  1. // Get retrieves the value for a key in the bucket.
  2. // Returns a nil value if the key does not exist or if the key is a nested bucket.
  3. // The returned value is only valid for the life of the transaction.
  4. func (b *Bucket) Get(key []byte) []byte {
  5. k, v, flags := b.Cursor().seek(key)
  6. // Return nil if this is a bucket.
  7. if (flags & bucketLeafFlag) != 0 {
  8. return nil
  9. }
  10. // If our target node isn't the same key as what's passed in then return nil.
  11. if !bytes.Equal(key, k) {
  12. return nil
  13. }
  14. return v
  15. }

从 bucket 里获取 KV 数据。可以看到获取是通过 cursor 游标来遍历的,这个就是 B+树的查找过程。

  1. // Put sets the value for a key in the bucket.
  2. // If the key exist then its previous value will be overwritten.
  3. // Supplied value must remain valid for the life of the transaction.
  4. // Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
  5. func (b *Bucket) Put(key []byte, value []byte) error {
  6. if b.tx.db == nil {
  7. return ErrTxClosed
  8. } else if !b.Writable() {
  9. return ErrTxNotWritable
  10. } else if len(key) == 0 {
  11. return ErrKeyRequired
  12. } else if len(key) > MaxKeySize {
  13. return ErrKeyTooLarge
  14. } else if int64(len(value)) > MaxValueSize {
  15. return ErrValueTooLarge
  16. }
  17. // Move cursor to correct position.
  18. // 找到合适的位置
  19. c := b.Cursor()
  20. k, _, flags := c.seek(key)
  21. // Return an error if there is an existing key with a bucket value.
  22. if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 {
  23. return ErrIncompatibleValue
  24. }
  25. // Insert into node.
  26. // 插入到合适的 node 里
  27. key = cloneBytes(key)
  28. c.node().put(key, key, value, 0, 0)
  29. return nil
  30. }

插入操作也很简单,通过 cursor 找到 B+树合适的位置,返回 c,当前 c 持有的 node 就是合适的位置,把 kv 塞进去即可。

  1. // Delete removes a key from the bucket.
  2. // If the key does not exist then nothing is done and a nil error is returned.
  3. // Returns an error if the bucket was created from a read-only transaction.
  4. func (b *Bucket) Delete(key []byte) error {
  5. if b.tx.db == nil {
  6. return ErrTxClosed
  7. } else if !b.Writable() {
  8. return ErrTxNotWritable
  9. }
  10. // Move cursor to correct position.
  11. c := b.Cursor()
  12. _, _, flags := c.seek(key)
  13. // Return an error if there is already existing bucket value.
  14. if (flags & bucketLeafFlag) != 0 {
  15. return ErrIncompatibleValue
  16. }
  17. // Delete the node if we have a matching key.
  18. c.node().del(key)
  19. return nil
  20. }

删除也很简单,通过 cursor 定位到数据,删掉即可。

  1. // ForEach executes a function for each key/value pair in a bucket.
  2. // If the provided function returns an error then the iteration is stopped and
  3. // the error is returned to the caller. The provided function must not modify
  4. // the bucket; this will result in undefined behavior.
  5. func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
  6. if b.tx.db == nil {
  7. return ErrTxClosed
  8. }
  9. c := b.Cursor()
  10. for k, v := c.First(); k != nil; k, v = c.Next() {
  11. if err := fn(k, v); err != nil {
  12. return err
  13. }
  14. }
  15. return nil
  16. }

遍历依然是通过 cursor,从第一个到最后一个,可以传入一个回调函数。

cursor

image.png
cursor 是操作 B+ 树的封装。 boltdb 的 B+树有一点不一样,叶子节点没有通过链表连起来,所以cursor 里有个 stack 结构,保存遍历的路径。

db

boltdb 的数据库操作封装为 db 结构,这个结构是文件的操作封装,在这里把文件操作变为内存操作。很有意思,我们先来看下。

首先是指定文件路径,然后通过mmap 把文件映射为内存结构。

  1. // mmap opens the underlying memory-mapped file and initializes the meta references.
  2. // minsz is the minimum size that the new mmap can be.
  3. // 文件映射为内存结构
  4. func (db *DB) mmap(minsz int) error {
  5. db.mmaplock.Lock()
  6. defer db.mmaplock.Unlock()
  7. info, err := db.file.Stat()
  8. if err != nil {
  9. return fmt.Errorf("mmap stat error: %s", err)
  10. } else if int(info.Size()) < db.pageSize*2 {
  11. return fmt.Errorf("file size too small")
  12. }
  13. // Ensure the size is at least the minimum size.
  14. var size = int(info.Size())
  15. if size < minsz {
  16. size = minsz
  17. }
  18. size, err = db.mmapSize(size)
  19. if err != nil {
  20. return err
  21. }
  22. // Dereference all mmap references before unmapping.
  23. if db.rwtx != nil {
  24. db.rwtx.root.dereference()
  25. }
  26. // Unmap existing data before continuing.
  27. if err := db.munmap(); err != nil {
  28. return err
  29. }
  30. // Memory-map the data file as a byte slice.
  31. if err := mmap(db, size); err != nil {
  32. return err
  33. }
  34. // Save references to the meta pages.
  35. db.meta0 = db.page(0).meta()
  36. db.meta1 = db.page(1).meta()
  37. // Validate the meta pages. We only return an error if both meta pages fail
  38. // validation, since meta0 failing validation means that it wasn't saved
  39. // properly -- but we can recover using meta1. And vice-versa.
  40. err0 := db.meta0.validate()
  41. err1 := db.meta1.validate()
  42. if err0 != nil && err1 != nil {
  43. return err0
  44. }
  45. return nil
  46. }

默认文件大小是 16KB,boltdb 会扩容为 32KB,如果还是太小,会指数扩容到 1G,之后是 1G 的增幅扩大。
mmap 是通过系统调用,把文件的操作映射为内存的操作。读文件变成了读内存,大大提升了读效率。

  1. // mmap memory maps a DB's data file.
  2. // DB 数据映射为内存结构
  3. func mmap(db *DB, sz int) error {
  4. // Map the data file to memory.
  5. b, err := syscall.Mmap(int(db.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED|db.MmapFlags)
  6. if err != nil {
  7. return err
  8. }
  9. // Advise the kernel that the mmap is accessed randomly.
  10. if err := madvise(b, syscall.MADV_RANDOM); err != nil {
  11. return fmt.Errorf("madvise: %s", err)
  12. }
  13. // Save the original byte slice and convert to a byte array pointer.
  14. db.dataref = b
  15. db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0]))
  16. db.datasz = sz
  17. return nil
  18. }

读变成了读内存,所以性能提升,写是通过写文件直接写入,不是通过内存操作完成的。所以,如果 boltdb 有大量的写会导致大量的 IO 操作,并且会涉及到大量的 B+树平衡性调整,导致性能下降。

  1. type DB struct {
  2. ...
  3. ops struct {
  4. writeAt func(b []byte, off int64) (n int, err error) // hook 了文件的写入
  5. }
  6. }
  7. ---
  8. // Default values for test hooks
  9. db.ops.writeAt = db.file.WriteAt
  10. ---
  11. // Write the buffer to our data file.
  12. // 真正落地到磁盘文件,还是在这里
  13. if _, err := db.ops.writeAt(buf, 0); err != nil {
  14. return err
  15. }

对外操作的函数。

Begin() 返回事务操作封装 TX,自己独立去操作事务
Update() 对写事务操作的封装
View() 对读事务的封装
Batch() 对批量操作的封装,可以减少 IO 的操作

事务

boltdb 的事务实现了 ACID,支持读读并发和一个读写,2 个可以并存。因为读操作是直接读的内存,而写操作直接写的是磁盘。

boltdb 的事务是通过 golang 的锁来实现的。写操作互斥用的是 rwlock 来实现的。

  1. func (db *DB) beginRWTx() (*Tx, error) {
  2. // If the database was opened with Options.ReadOnly, return an error.
  3. if db.readOnly {
  4. return nil, ErrDatabaseReadOnly
  5. }
  6. // Obtain writer lock. This is released by the transaction when it closes.
  7. // This enforces only one writer transaction at a time.
  8. db.rwlock.Lock()
  9. // Once we have the writer lock then we can lock the meta pages so that
  10. // we can set up the transaction.
  11. db.metalock.Lock()
  12. defer db.metalock.Unlock()
  13. // Exit if the database is not open yet.
  14. if !db.opened {
  15. db.rwlock.Unlock()
  16. return nil, ErrDatabaseNotOpen
  17. }
  18. // Create a transaction associated with the database.
  19. t := &Tx{writable: true}
  20. t.init(db)
  21. db.rwtx = t
  22. // Free any pages associated with closed read-only transactions.
  23. var minid txid = 0xFFFFFFFFFFFFFFFF
  24. for _, t := range db.txs {
  25. if t.meta.txid < minid {
  26. minid = t.meta.txid
  27. }
  28. }
  29. if minid > 0 {
  30. db.freelist.release(minid - 1)
  31. }
  32. return t, nil
  33. }

写事务会加锁并且更新 meta 里的 txid。 在 db 的结构里获取到锁之后,会在 tx.go 里结束事务之后释放,或者是 rollback回滚之后释放。

  1. func (tx *Tx) close() {
  2. if tx.db == nil {
  3. return
  4. }
  5. if tx.writable {
  6. // Remove transaction ref & writer lock.
  7. tx.db.rwtx = nil
  8. tx.db.rwlock.Unlock()
  9. ...
  10. } else {
  11. tx.db.removeTx(tx)
  12. }
  13. // Clear all references.
  14. tx.db = nil
  15. tx.meta = nil
  16. tx.root = Bucket{tx: tx}
  17. tx.pages = nil
  18. }

只读事务。

  1. func (db *DB) beginTx() (*Tx, error) {
  2. // Lock the meta pages while we initialize the transaction. We obtain
  3. // the meta lock before the mmap lock because that's the order that the
  4. // write transaction will obtain them.
  5. db.metalock.Lock()
  6. // Obtain a read-only lock on the mmap. When the mmap is remapped it will
  7. // obtain a write lock so all transactions must finish before it can be
  8. // remapped.
  9. db.mmaplock.RLock()
  10. // Exit if the database is not open yet.
  11. if !db.opened {
  12. db.mmaplock.RUnlock()
  13. db.metalock.Unlock()
  14. return nil, ErrDatabaseNotOpen
  15. }
  16. // Create a transaction associated with the database.
  17. t := &Tx{}
  18. t.init(db)
  19. // Keep track of transaction until it closes.
  20. db.txs = append(db.txs, t)
  21. n := len(db.txs)
  22. // Unlock the meta pages.
  23. db.metalock.Unlock()
  24. // Update the transaction stats.
  25. db.statlock.Lock()
  26. db.stats.TxN++
  27. db.stats.OpenTxN = n
  28. db.statlock.Unlock()
  29. return t, nil
  30. }

先获取meta 的 lock,避免这个时候写事务进入写了 meta。之后锁住 mmap 内存结构,避免 mmap 重新映射。所有的只读事务都在 txs 里。

事务的提交或回滚。

  1. // Rollback closes the transaction and ignores all previous updates. Read-only
  2. // transactions must be rolled back and not committed.
  3. func (tx *Tx) Rollback() error {
  4. _assert(!tx.managed, "managed tx rollback not allowed")
  5. if tx.db == nil {
  6. return ErrTxClosed
  7. }
  8. tx.rollback()
  9. return nil
  10. }
  11. func (tx *Tx) rollback() {
  12. if tx.db == nil {
  13. return
  14. }
  15. if tx.writable {
  16. tx.db.freelist.rollback(tx.meta.txid)
  17. tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
  18. }
  19. tx.close()
  20. }

如果是只读事务直接 close-remove 就好了,如果是读写事务,还需要更新 freelist 里的 page。总的来说,只读事务只是记录了 Tx 的结构体而已,多了一把 S 锁,而写事务是排它锁,串行的。

总结

boltDB是一个完全由go语言编写的基于B+树的kv数据库,其完全支持事务的ACID特性,整个数据库只有一个文件,且有较高的读性能与加载时间。etcd的后端存储使用的便是基于boltdb优化的kv存储 etcd-io/bbolt

至此基本上把 boltdb 的核心功能梳理完成了,一个小小的开源项目把数据库的方方面面都涉及到了,非常值得多看几遍,不管是工程还是算法,都值得借鉴。

参考资料

https://github.com/etcd-io/bbolt
https://github.com/boltdb/bolt/tree/v1.3.1
https://www.qtmuniao.com/2021/04/02/bolt-transaction/
https://mrcroxx.github.io/posts/code-reading/boltdb-made-simple/0-introduction/