LevelDB 是 Google 开源的,具有非常高效的KV单机数据库。相比随机读,其随机写,顺序读/写性能非常好,以太坊里大量使用 LevelDB 作为数据持久化存储。
这里推荐一款 Golang 实现的 LevelDB。
LevelDB key/value database in Go. https://github.com/syndtr/goleveldb
在我们的日常工作中常常会给KV类的数据库/缓存/配置等增加 schema,这样使用上更加方便。以下代码参考以太坊 Swarm 中的实现,并进行一些改动,实现了一套基于 LevelDB 的 Schema。
Schema Prefix
/*DB Key Prefix*/var (keySchema = []byte{0} // schemakeyPreFields = []byte{1} // fieldkeyStartPreGroups byte = 8 // group start from)
Schema = Field + Group
type attrSchema intconst (asFields attrSchema = iotaasGroups)type schema struct {Fields map[string]field `json:"fields"`Groups map[byte]group `json:"groups"`}type field struct {Desc string `json:"desc"`Type string `json:"type"` // string, uint64...}type group struct {Desc string `json:"desc"`Name string `json:"name"`Type string `json:"type"` // string, uint64...}
Schema Operations
func (db *DB) schemaField(name, fType, desc string) ([]byte, error) {if len(name) == 0 || len(fType) == 0 {return nil, ErrParasBlank}db.mu.Lock()defer db.mu.Unlock()s, err := db.fetchSchema()if err != nil {return nil, err}var found boolfor k, v := range s.Fields {if k == name {if fType != v.Type {// type mismatchreturn nil, fmt.Errorf("field %q of type %q stored as %q in db", name, fType, v.Type)}found = true}}if !found {s.Fields[name] = field{Type: fType,Desc: desc,}if err = db.storeSchema(s); err != nil {return nil, err}}return append(keyPreFields, []byte(name)...), nil}func (db *DB) schemaFieldDelete(name string) error {if len(name) == 0 {return ErrParasBlank}db.mu.Lock()defer db.mu.Unlock()s, err := db.fetchSchema()if err != nil {return err}if _, ok := s.Fields[name]; ok {delete(s.Fields, name)err = db.storeSchema(s)}return err}func (db *DB) schemaGroup(name, fType, desc string) (byte, error) {if len(name) == 0 || len(fType) == 0 {return 0, ErrParasBlank}db.mu.Lock()defer db.mu.Unlock()s, err := db.fetchSchema()if err != nil {return 0, err}nextPrefix := keyStartPreGroupsfor prefix, v := range s.Groups {if prefix > nextPrefix {nextPrefix = prefix + 1}if v.Name == name {if fType != v.Type {// type mismatchreturn 0, fmt.Errorf("group %q of type %q stored as %q in db", name, fType, v.Type)}return prefix, nil}}s.Groups[nextPrefix] = group{Name: name,Type: fType,Desc: desc,}return nextPrefix, db.storeSchema(s)}func (db *DB) schemaGroupDelete(name string) error {if len(name) == 0 {return ErrParasBlank}db.mu.Lock()defer db.mu.Unlock()s, err := db.fetchSchema()if err != nil {return err}for prefix, v := range s.Groups {if v.Name == name {delete(s.Groups, prefix)err = db.storeSchema(s)break}}return err}func (db *DB) fetchSchema() (schema, error) {var s schemab, err := db.Get(keySchema)if err != nil {return s, err}err = json.Unmarshal(b, &s)return s, err}func (db *DB) storeSchema(s schema) error {b, err := json.Marshal(s)if err != nil {return err}return db.Put(keySchema, b)}
Schema Debug
/*debugging* show schema* show fields* show groups*/func (db *DB) ShowSchema() {s, err := db.fetchSchema()if err != nil {fmt.Printf("Failed to fetch schema data. %v\n", err)return}// fieldsdataFields := [][]string{}for k, v := range s.Fields {field := []string{fmt.Sprintf("%X+%X", keyPreFields, k),k,v.Type,v.Desc,}dataFields = append(dataFields, field)}displayFields := common.NewDisplay()displayFields.SetHeader([]string{"Key", "Name", "Type", "Desc"})displayFields.Print(dataFields)// groupsdataGroups := [][]string{}for k, v := range s.Groups {group := []string{fmt.Sprintf("%X", k),v.Name,v.Type,v.Desc,}dataGroups = append(dataGroups, group)}displayGroups := common.NewDisplay()displayGroups.SetHeader([]string{"Key", "Name", "Type", "Desc"})displayGroups.Print(dataGroups)}func (db *DB) ShowGroup(name string) {grp, err := db.NewDataGroup(name, "")if err != nil {fmt.Printf("Failed to retrieve group (%s) info: %v", name, err)return}fmt.Printf("Group Name: %s\n", name)fmt.Printf("Group KeyPrefix: %0x\n", grp.key)it := db.NewIterator()defer it.Release()data := [][]string{}it.Next() // skipping the schemafor it.Next() {key := it.Key()if !bytes.HasPrefix(key, grp.key) {continue}one := []string{fmt.Sprintf("%0x", key),string(key[1:]),}data = append(data, one)}if err := it.Error(); err != nil {panic(Fmt("Panicked on db showing: %v", err))}display := common.NewDisplay()display.SetHeader([]string{"Key/Byte", "Key/String"})display.Print(data)}
Simple Wrapper for LevelDB
package leveldbimport ("sync""github.com/syndtr/goleveldb/leveldb""github.com/syndtr/goleveldb/leveldb/errors""github.com/syndtr/goleveldb/leveldb/iterator""github.com/syndtr/goleveldb/leveldb/opt""github.com/syndtr/goleveldb/leveldb/storage")type Config struct {Path stringFileCacheCapacity int}type DB struct {lvl *leveldb.DBmu sync.Mutex // synchronises access to the schemaexitC chan struct{} // TODO}func NewDB(config *Config) (*DB, error) {if len(config.Path) == 0 {// TODO: newMemoryDB()return nil, nil}return newPersistentDB(config.Path,&opt.Options{OpenFilesCacheCapacity: config.FileCacheCapacity,})}func newMemoryDB() (*DB, error) {lvl, err := leveldb.Open(storage.NewMemStorage(), nil)if err != nil {return nil, err}return &DB{lvl: lvl, exitC: make(chan struct{})}, nil}func newPersistentDB(path string, opts *opt.Options) (*DB, error) {lvl, err := leveldb.OpenFile(path, opts)if err != nil {if _, isCorrupted := err.(*errors.ErrCorrupted); isCorrupted {lvl, err = leveldb.RecoverFile(path, nil)}}if err != nil {return nil, err}db := &DB{lvl: lvl, exitC: make(chan struct{})}if _, err := db.fetchSchema(); err != nil {if err == leveldb.ErrNotFound {if err = db.storeSchema(schema{Fields: make(map[string]field),Groups: make(map[byte]group),}); err != nil {return nil, err}} else {return nil, err}}return db, nil}func (db *DB) Close() {close(db.exitC)db.lvl.Close()}func (db *DB) Get(key []byte) ([]byte, error) {return db.lvl.Get(key, nil)}func (db *DB) Put(key, value []byte) error {return db.lvl.Put(key, value, nil)}func (db *DB) Has(key []byte) (bool, error) {return db.lvl.Has(key, nil)}func (db *DB) Delete(key []byte) error {return db.lvl.Delete(key, nil)}// iterator readsfunc (db *DB) NewIterator() iterator.Iterator {return db.lvl.NewIterator(nil, nil)}// batch writesfunc NewBatch() *leveldb.Batch {return new(leveldb.Batch)}func (db *DB) WriteBatch(batch *leveldb.Batch) error {return db.lvl.Write(batch, nil)}
Errors
package leveldbimport ("errors""github.com/syndtr/goleveldb/leveldb")var (ErrNotFound = leveldb.ErrNotFoundErrParasBlank = errors.New("field name or type cannot be blank")ErrBinaryOverflow = errors.New("value larger than 64 bits (overflow)"))
Wrapper Type to int64/uint64/string/float64
package leveldbimport ("encoding/binary""math""github.com/syndtr/goleveldb/leveldb")/*Data Types*/type TypeKeyer interface {TypeKey() string}// Type basetype dataTypeBase struct {db *DBname stringkey []byteattr attrSchema}func (d dataTypeBase) TypeKey() string {return "base"}func (d dataTypeBase) Delete() error {switch d.attr {case asFields:if err := d.db.schemaFieldDelete(d.name); err != nil {return err}return d.db.Delete(d.key)case asGroups:return d.db.schemaGroupDelete(d.name)}return nil}// Type stringtype dataTypeString struct {dataTypeBase}func (d dataTypeString) TypeKey() string {return "string"}// If the value is not found, error (ErrNotFound) is returnedfunc (d dataTypeString) Get() (string, error) {b, err := d.db.Get(d.key)if err != nil {return "", err}return string(b), nil}func (d dataTypeString) Put(v string) error {return d.db.Put(d.key, []byte(v))}func (d dataTypeString) PutInBatch(batch *leveldb.Batch, v string) {batch.Put(d.key, []byte(v))}// Type int64type dataTypeInt64 struct {dataTypeBase}func (d dataTypeInt64) TypeKey() string {return "int64"}//If the value is not found, error (ErrNotFound) is returnedfunc (d dataTypeInt64) Get() (int64, error) {b, err := d.db.Get(d.key)if err != nil {return 0, err}return ByteToInt64(b)}func (d dataTypeInt64) Put(v int64) error {return d.db.Put(d.key, Int64ToByte(v))}func (d dataTypeInt64) PutInBatch(batch *leveldb.Batch, v int64) {batch.Put(d.key, Int64ToByte(v))}// Type uint64type dataTypeUint64 struct {dataTypeBase}func (d dataTypeUint64) TypeKey() string {return "uint64"}//If the value is not found, error (ErrNotFound) is returnedfunc (d dataTypeUint64) Get() (uint64, error) {b, err := d.db.Get(d.key)if err != nil {return 0, err}return ByteToUint64(b)}func (d dataTypeUint64) Put(v uint64) error {return d.db.Put(d.key, Uint64ToByte(v))}func (d dataTypeUint64) PutInBatch(batch *leveldb.Batch, v uint64) {batch.Put(d.key, Uint64ToByte(v))}// Type float64type dataTypeFloat64 struct {dataTypeBase}func (d dataTypeFloat64) TypeKey() string {return "float64"}//If the value is not found, error (ErrNotFound) is returnedfunc (d dataTypeFloat64) Get() (float64, error) {b, err := d.db.Get(d.key)if err != nil {return 0, err}return ByteToFloat64(b)}func (d dataTypeFloat64) Put(v float64) error {return d.db.Put(d.key, Float64ToByte(v))}func (d dataTypeFloat64) PutInBatch(batch *leveldb.Batch, v float64) {batch.Put(d.key, Float64ToByte(v))}/*utils*/func Int64ToByte(x int64) []byte {b := make([]byte, binary.MaxVarintLen64)return b[:binary.PutVarint(b, x)]}func Uint64ToByte(x uint64) []byte {b := make([]byte, binary.MaxVarintLen64)return b[:binary.PutUvarint(b, x)]}func ByteToInt64(b []byte) (int64, error) {x, n := binary.Varint(b)if n < 0 {return 0, ErrBinaryOverflow}return x, nil}func ByteToUint64(b []byte) (uint64, error) {x, n := binary.Uvarint(b)if n < 0 {return 0, ErrBinaryOverflow}return x, nil}func Float64ToByte(x float64) []byte {return Uint64ToByte(math.Float64bits(x))}func ByteToFloat64(b []byte) (float64, error) {n, err := ByteToUint64(b)if err != nil {return 0, err}return math.Float64frombits(n), nil}func makeSliceByteWithLen(n int, ss ...[]byte) []byte {bs := make([]byte, 0, n)for _, s := range ss {if s != nil {bs = append(bs, s...)}}return bs}func makeSliceByte(ss ...[]byte) []byte {n := 0for i := range ss {n += len(ss[i])}return makeSliceByteWithLen(n, ss...)}
Fields
func (db *DB) NewDataUint64Field(name, desc string) (dataTypeUint64, error) {dts := dataTypeUint64{}key, err := db.schemaField(name, dts.TypeKey(), desc)if err != nil {return dts, err}dts.db = dbdts.name = namedts.attr = asFieldsdts.key = keyreturn dts, nil}func (db *DB) NewDataInt64Field(name, desc string) (dataTypeInt64, error) {dts := dataTypeInt64{}key, err := db.schemaField(name, dts.TypeKey(), desc)if err != nil {return dts, err}dts.db = dbdts.name = namedts.attr = asFieldsdts.key = keyreturn dts, nil}func (db *DB) NewDataFloat64Field(name, desc string) (dataTypeFloat64, error) {dts := dataTypeFloat64{}key, err := db.schemaField(name, dts.TypeKey(), desc)if err != nil {return dts, err}dts.db = dbdts.name = namedts.attr = asFieldsdts.key = keyreturn dts, nil}func (db *DB) NewDataStringField(name, desc string) (dataTypeString, error) {dts := dataTypeString{}key, err := db.schemaField(name, dts.TypeKey(), desc)if err != nil {return dts, err}dts.db = dbdts.name = namedts.attr = asFieldsdts.key = keyreturn dts, nil}
Sample: Field Operations
func addFields(db *DB) error {version, err := db.NewDataStringField("version", "Database Version")if err != nil {return err}version.Put("0.4.0")name, err := db.NewDataStringField("name", "Database Name")if err != nil {return err}name.Put("My First Database")age, err := db.NewDataInt64Field("age", "Database Age")if err != nil {return err}age.Put(36)return nil}func addBatchFields(db *DB) error {batch := NewBatch()version, _ := db.NewDataStringField("version", "")version.PutInBatch(batch, "9.9.0")name, _ := db.NewDataStringField("name", "")name.PutInBatch(batch, "New Name Change")build, _ := db.NewDataFloat64Field("build", "Build Number")build.PutInBatch(batch, 66.66)db.WriteBatch(batch)return nil}

Groups
package leveldbimport ("bytes""fmt""github.com/syndtr/goleveldb/leveldb""github.com/syndtr/goleveldb/leveldb/iterator")// Type structtype DataTypeStructer interface {EncodeKey() ([]byte, error)DecodeKey([]byte) errorEncodeValue() ([]byte, error)DecodeValue([]byte) error}type dataTypeStruct struct {dataTypeBase}func (d dataTypeStruct) TypeKey() string {return "struct"}func (d dataTypeStruct) Get(x DataTypeStructer) error {key, err := x.EncodeKey()if err != nil {return err}b, err := d.db.Get(makeSliceByte(d.key, key))if err != nil {return err}return x.DecodeValue(b)}func (d dataTypeStruct) Put(x DataTypeStructer) error {key, err := x.EncodeKey()if err != nil {return err}value, err := x.EncodeValue()if err != nil {return err}return d.db.Put(makeSliceByte(d.key, key), value)}func (d dataTypeStruct) Delete(x DataTypeStructer) error {key, err := x.EncodeKey()if err != nil {return err}return d.db.Delete(makeSliceByte(d.key, key))}func (d dataTypeStruct) Has(x DataTypeStructer) (bool, error) {key, err := x.EncodeKey()if err != nil {return false, err}return d.db.Has(makeSliceByte(d.key, key))}func (d dataTypeStruct) GetMulti(xs ...DataTypeStructer) error {snapshot, err := d.db.lvl.GetSnapshot()if err != nil {return err}defer snapshot.Release()for i := range xs {key, err := xs[i].EncodeKey()if err != nil {return err}b, err := snapshot.Get(makeSliceByte(d.key, key), nil)if err != nil {return err}if err = xs[i].DecodeValue(b); err != nil {return err}}return nil}func (d dataTypeStruct) HasMulti(xs ...DataTypeStructer) ([]bool, error) {have := make([]bool, len(xs))snapshot, err := d.db.lvl.GetSnapshot()if err != nil {return nil, err}defer snapshot.Release()for i := range xs {key, err := xs[i].EncodeKey()if err != nil {return nil, err}have[i], err = snapshot.Has(makeSliceByte(d.key, key), nil)if err != nil {return nil, err}}return have, nil}func (d dataTypeStruct) PutInBatch(batch *leveldb.Batch, x DataTypeStructer) {key, err := x.EncodeKey()if err != nil {panic(fmt.Errorf("encode error: %v", err))}value, err := x.EncodeValue()if err != nil {panic(fmt.Errorf("decode error: %v", err))}batch.Put(makeSliceByte(d.key, key), value)}func (d dataTypeStruct) DeleteInBatch(batch *leveldb.Batch, x DataTypeStructer) {key, err := x.EncodeKey()if err != nil {panic(fmt.Errorf("encode error: %v", err))}batch.Delete(makeSliceByte(d.key, key))}/*Seek Doc:https://godoc.org/github.com/syndtr/goleveldb/leveldb/iterator#IteratorSeekerSeek moves the iterator to the first key/value pair whose key is greaterthan or equal to the given key. It returns whether such pair exist.First() Prev() Seek() Next() Last()| | | | |-------------------------------------------------*/// Count returns the number of items in group.func (d dataTypeStruct) Count() (int, error) {it := d.db.NewIterator()defer it.Release()count := 0for ok := it.Seek(d.key); ok; ok = it.Next() {key := it.Key()if key[0] != d.key[0] {break}count++}return count, it.Error()}// CountFrom returns the number of items in group// starting from the item.func (d dataTypeStruct) CountFrom(start DataTypeStructer) (int, error) {it := d.db.NewIterator()defer it.Release()startKey, err := start.EncodeKey()if err != nil {return 0, err}count := 0for ok := it.Seek(startKey); ok; ok = it.Next() {key := it.Key()if key[0] != d.key[0] {break}count++}return count, it.Error()}func (d dataTypeStruct) getFromIterator(it iterator.Iterator,prefix []byte, x DataTypeStructer) error {// check keykey := it.Key()if !bytes.HasPrefix(key, prefix) {return ErrNotFound}// decode keyif err := x.DecodeKey(key); err != nil {return err}if err := x.DecodeValue(it.Value()); err != nil {return err}return it.Error()}// First returns the first item in group which encoded key starts with a prefix.func (d dataTypeStruct) First(prefix []byte, x DataTypeStructer) error {it := d.db.NewIterator()defer it.Release()totalPrefix := makeSliceByte(d.key, prefix)it.Seek(totalPrefix)return d.getFromIterator(it, totalPrefix, x)}// Last returns the last item in group which encoded key starts with a prefix.func (d dataTypeStruct) Last(prefix []byte, x DataTypeStructer) error {it := d.db.NewIterator()defer it.Release()totalPrefix := makeSliceByte(d.key, prefix)// get the next prefix in line// since leveldb iterator Seek seeks to the// next key if the key that it seeks to is not found// and by getting the previous key, the last one for the// actual prefix is foundincPrefix := incByteSlice(totalPrefix)if !it.Seek(incPrefix) {it.Last()} else {it.Prev()}return d.getFromIterator(it, totalPrefix, x)}func (d dataTypeStruct) Destroy() error {it := d.db.NewIterator()defer it.Release()batch := NewBatch()for ok := it.Seek(d.key); ok; ok = it.Next() {key := it.Key()if key[0] != d.key[0] {break}batch.Delete(key)}if err := it.Error(); err != nil {return err}if err := d.db.WriteBatch(batch); err != nil {return err}return d.dataTypeBase.Delete()}//// Iterate functions iterates over keys of the Index.//// IterateOptions defines optional parameters for Iterate function.type IterateOptions struct {// StartFrom is the Item to start the iteration from.StartFrom DataTypeStructer// If SkipStart is true, StartFrom item will not be iterated on.SkipStart bool// Iterate over items which keys have a common prefix.Prefix []byte// Callback on every DataTypeStructer// By returning a true for stop variable, iteration will// stop, and by returning the error, that error will be// propagated to the called iterator method.Func func(x DataTypeStructer) (stop bool, err error)}type groupIterator struct {it iterator.Iteratorok booldg *dataTypeStructopts *IterateOptionsprefix []byteerr error}func (d dataTypeStruct) NewIterator(opts *IterateOptions) (*groupIterator, error) {if opts == nil {opts = &IterateOptions{}}prefix := makeSliceByte(d.key, opts.Prefix)startKey := prefixif opts.StartFrom != nil {key, err := opts.StartFrom.EncodeKey()if err != nil {return nil, err}startKey = makeSliceByte(d.key, key)}it := d.db.NewIterator()ok := it.Seek(startKey)if !ok {// stop iterator if seek has failedreturn nil, it.Error()}if opts.SkipStart && bytes.Equal(startKey, it.Key()) {// skip the start from Item if it is the first key// and it is explicitly configured to skip itok = it.Next()}return &groupIterator{dg: &d,prefix: prefix,it: it,ok: ok,opts: opts,}, nil}func (g *groupIterator) Release() {g.it.Release()}func (g *groupIterator) Next(x DataTypeStructer) bool {if err := g.dg.getFromIterator(g.it, g.prefix, x); err != nil {if err != ErrNotFound {g.err = err}return false}if g.opts.Func != nil {stop, err := g.opts.Func(x)if err != nil {g.err = errreturn false}if stop {return false}}g.it.Next()return true}func (g *groupIterator) Error() error {return g.err}func (db *DB) NewDataGroup(name, desc string) (dataTypeStruct, error) {dg := dataTypeStruct{}key, err := db.schemaGroup(name, dg.TypeKey(), desc)if err != nil {return dg, err}dg.db = dbdg.name = namedg.attr = asGroupsdg.key = []byte{key}return dg, nil}func incByteSlice(b []byte) []byte {next := append(b[:0:0], b...)for i := len(b) - 1; i >= 0; i-- {if b[i] == 255 {next[i] = 0} else {next[i] += 1return next}}return nil}
Sample: Group Operations
var (Dump = common.DumpFmt = common.Fmt)type Node struct {id stringhostname string}func (n *Node) EncodeKey() ([]byte, error) {return []byte(n.hostname), nil}func (n *Node) DecodeKey(b []byte) error {n.hostname = string(b)return nil}func (n *Node) EncodeValue() ([]byte, error) {return []byte(fmt.Sprintf("%v:%v", n.id, n.hostname)), nil}func (n *Node) DecodeValue(b []byte) error {ss := strings.Split(string(b), ":")n.id = ss[0]n.hostname = ss[1]return nil}func addGroups(db *DB) error {node, err := db.NewDataGroup("node", "Database Node")if err != nil {return err}node.Put(&Node{id: "123456",hostname: "haidian",})node.Put(&Node{id: "563455",hostname: "chaoyang",})node.Put(&Node{id: "9123456",hostname: "daxiang",})node.Put(&Node{id: "7891011",hostname: "xicheng3",})node.Put(&Node{id: "9087766",hostname: "001TaiShan",})db.ShowGroup("node")x := &Node{}node.First(nil, x)y := &Node{}node.Last(nil, x)Dump(x)Dump(y)return nil}func ShowNodeGroup(db *DB) {db.ShowGroup("node")nodeGrp, err := db.NewDataGroup("node", "")if err != nil {fmt.Printf("Failed to retrieve group info: %v", err)return}if total, err := nodeGrp.Count(); err != nil {fmt.Printf("Failed to retrieve group total number: %v", err)return} else {fmt.Printf("Group total number: %d\n\n", total)}opts := &IterateOptions{Prefix: []byte("xicheng"),}it, err := nodeGrp.NewIterator(opts)if err != nil {fmt.Printf("Failed to create group iterator: %v", err)return}defer it.Release()for x := (&Node{}); it.Next(x); x = (&Node{}) {Dump(x)}if err := it.Error(); err != nil {fmt.Printf("Group iterator error: %v\n", err)return}}


