LevelDB 是 Google 开源的,具有非常高效的KV单机数据库。相比随机读,其随机写,顺序读/写性能非常好,以太坊里大量使用 LevelDB 作为数据持久化存储。
这里推荐一款 Golang 实现的 LevelDB。

LevelDB key/value database in Go. https://github.com/syndtr/goleveldb

在我们的日常工作中常常会给KV类的数据库/缓存/配置等增加 schema,这样使用上更加方便。以下代码参考以太坊 Swarm 中的实现,并进行一些改动,实现了一套基于 LevelDB 的 Schema。
Golang 之 LevelDB - 图1

具体代码如下:

Schema Prefix

  1. /*
  2. DB Key Prefix
  3. */
  4. var (
  5. keySchema = []byte{0} // schema
  6. keyPreFields = []byte{1} // field
  7. keyStartPreGroups byte = 8 // group start from
  8. )

Schema = Field + Group

  1. type attrSchema int
  2. const (
  3. asFields attrSchema = iota
  4. asGroups
  5. )
  6. type schema struct {
  7. Fields map[string]field `json:"fields"`
  8. Groups map[byte]group `json:"groups"`
  9. }
  10. type field struct {
  11. Desc string `json:"desc"`
  12. Type string `json:"type"` // string, uint64...
  13. }
  14. type group struct {
  15. Desc string `json:"desc"`
  16. Name string `json:"name"`
  17. Type string `json:"type"` // string, uint64...
  18. }

Schema Operations

  1. func (db *DB) schemaField(name, fType, desc string) ([]byte, error) {
  2. if len(name) == 0 || len(fType) == 0 {
  3. return nil, ErrParasBlank
  4. }
  5. db.mu.Lock()
  6. defer db.mu.Unlock()
  7. s, err := db.fetchSchema()
  8. if err != nil {
  9. return nil, err
  10. }
  11. var found bool
  12. for k, v := range s.Fields {
  13. if k == name {
  14. if fType != v.Type {
  15. // type mismatch
  16. return nil, fmt.Errorf(
  17. "field %q of type %q stored as %q in db", name, fType, v.Type)
  18. }
  19. found = true
  20. }
  21. }
  22. if !found {
  23. s.Fields[name] = field{
  24. Type: fType,
  25. Desc: desc,
  26. }
  27. if err = db.storeSchema(s); err != nil {
  28. return nil, err
  29. }
  30. }
  31. return append(keyPreFields, []byte(name)...), nil
  32. }
  33. func (db *DB) schemaFieldDelete(name string) error {
  34. if len(name) == 0 {
  35. return ErrParasBlank
  36. }
  37. db.mu.Lock()
  38. defer db.mu.Unlock()
  39. s, err := db.fetchSchema()
  40. if err != nil {
  41. return err
  42. }
  43. if _, ok := s.Fields[name]; ok {
  44. delete(s.Fields, name)
  45. err = db.storeSchema(s)
  46. }
  47. return err
  48. }
  49. func (db *DB) schemaGroup(name, fType, desc string) (byte, error) {
  50. if len(name) == 0 || len(fType) == 0 {
  51. return 0, ErrParasBlank
  52. }
  53. db.mu.Lock()
  54. defer db.mu.Unlock()
  55. s, err := db.fetchSchema()
  56. if err != nil {
  57. return 0, err
  58. }
  59. nextPrefix := keyStartPreGroups
  60. for prefix, v := range s.Groups {
  61. if prefix > nextPrefix {
  62. nextPrefix = prefix + 1
  63. }
  64. if v.Name == name {
  65. if fType != v.Type {
  66. // type mismatch
  67. return 0, fmt.Errorf(
  68. "group %q of type %q stored as %q in db", name, fType, v.Type)
  69. }
  70. return prefix, nil
  71. }
  72. }
  73. s.Groups[nextPrefix] = group{
  74. Name: name,
  75. Type: fType,
  76. Desc: desc,
  77. }
  78. return nextPrefix, db.storeSchema(s)
  79. }
  80. func (db *DB) schemaGroupDelete(name string) error {
  81. if len(name) == 0 {
  82. return ErrParasBlank
  83. }
  84. db.mu.Lock()
  85. defer db.mu.Unlock()
  86. s, err := db.fetchSchema()
  87. if err != nil {
  88. return err
  89. }
  90. for prefix, v := range s.Groups {
  91. if v.Name == name {
  92. delete(s.Groups, prefix)
  93. err = db.storeSchema(s)
  94. break
  95. }
  96. }
  97. return err
  98. }
  99. func (db *DB) fetchSchema() (schema, error) {
  100. var s schema
  101. b, err := db.Get(keySchema)
  102. if err != nil {
  103. return s, err
  104. }
  105. err = json.Unmarshal(b, &s)
  106. return s, err
  107. }
  108. func (db *DB) storeSchema(s schema) error {
  109. b, err := json.Marshal(s)
  110. if err != nil {
  111. return err
  112. }
  113. return db.Put(keySchema, b)
  114. }

Schema Debug

  1. /*
  2. debugging
  3. * show schema
  4. * show fields
  5. * show groups
  6. */
  7. func (db *DB) ShowSchema() {
  8. s, err := db.fetchSchema()
  9. if err != nil {
  10. fmt.Printf("Failed to fetch schema data. %v\n", err)
  11. return
  12. }
  13. // fields
  14. dataFields := [][]string{}
  15. for k, v := range s.Fields {
  16. field := []string{
  17. fmt.Sprintf("%X+%X", keyPreFields, k),
  18. k,
  19. v.Type,
  20. v.Desc,
  21. }
  22. dataFields = append(dataFields, field)
  23. }
  24. displayFields := common.NewDisplay()
  25. displayFields.SetHeader([]string{"Key", "Name", "Type", "Desc"})
  26. displayFields.Print(dataFields)
  27. // groups
  28. dataGroups := [][]string{}
  29. for k, v := range s.Groups {
  30. group := []string{
  31. fmt.Sprintf("%X", k),
  32. v.Name,
  33. v.Type,
  34. v.Desc,
  35. }
  36. dataGroups = append(dataGroups, group)
  37. }
  38. displayGroups := common.NewDisplay()
  39. displayGroups.SetHeader([]string{"Key", "Name", "Type", "Desc"})
  40. displayGroups.Print(dataGroups)
  41. }
  42. func (db *DB) ShowGroup(name string) {
  43. grp, err := db.NewDataGroup(name, "")
  44. if err != nil {
  45. fmt.Printf("Failed to retrieve group (%s) info: %v", name, err)
  46. return
  47. }
  48. fmt.Printf("Group Name: %s\n", name)
  49. fmt.Printf("Group KeyPrefix: %0x\n", grp.key)
  50. it := db.NewIterator()
  51. defer it.Release()
  52. data := [][]string{}
  53. it.Next() // skipping the schema
  54. for it.Next() {
  55. key := it.Key()
  56. if !bytes.HasPrefix(key, grp.key) {
  57. continue
  58. }
  59. one := []string{
  60. fmt.Sprintf("%0x", key),
  61. string(key[1:]),
  62. }
  63. data = append(data, one)
  64. }
  65. if err := it.Error(); err != nil {
  66. panic(Fmt("Panicked on db showing: %v", err))
  67. }
  68. display := common.NewDisplay()
  69. display.SetHeader([]string{"Key/Byte", "Key/String"})
  70. display.Print(data)
  71. }

Simple Wrapper for LevelDB

  1. package leveldb
  2. import (
  3. "sync"
  4. "github.com/syndtr/goleveldb/leveldb"
  5. "github.com/syndtr/goleveldb/leveldb/errors"
  6. "github.com/syndtr/goleveldb/leveldb/iterator"
  7. "github.com/syndtr/goleveldb/leveldb/opt"
  8. "github.com/syndtr/goleveldb/leveldb/storage"
  9. )
  10. type Config struct {
  11. Path string
  12. FileCacheCapacity int
  13. }
  14. type DB struct {
  15. lvl *leveldb.DB
  16. mu sync.Mutex // synchronises access to the schema
  17. exitC chan struct{} // TODO
  18. }
  19. func NewDB(config *Config) (*DB, error) {
  20. if len(config.Path) == 0 {
  21. // TODO: newMemoryDB()
  22. return nil, nil
  23. }
  24. return newPersistentDB(config.Path,
  25. &opt.Options{
  26. OpenFilesCacheCapacity: config.FileCacheCapacity,
  27. })
  28. }
  29. func newMemoryDB() (*DB, error) {
  30. lvl, err := leveldb.Open(storage.NewMemStorage(), nil)
  31. if err != nil {
  32. return nil, err
  33. }
  34. return &DB{lvl: lvl, exitC: make(chan struct{})}, nil
  35. }
  36. func newPersistentDB(path string, opts *opt.Options) (*DB, error) {
  37. lvl, err := leveldb.OpenFile(path, opts)
  38. if err != nil {
  39. if _, isCorrupted := err.(*errors.ErrCorrupted); isCorrupted {
  40. lvl, err = leveldb.RecoverFile(path, nil)
  41. }
  42. }
  43. if err != nil {
  44. return nil, err
  45. }
  46. db := &DB{lvl: lvl, exitC: make(chan struct{})}
  47. if _, err := db.fetchSchema(); err != nil {
  48. if err == leveldb.ErrNotFound {
  49. if err = db.storeSchema(schema{
  50. Fields: make(map[string]field),
  51. Groups: make(map[byte]group),
  52. }); err != nil {
  53. return nil, err
  54. }
  55. } else {
  56. return nil, err
  57. }
  58. }
  59. return db, nil
  60. }
  61. func (db *DB) Close() {
  62. close(db.exitC)
  63. db.lvl.Close()
  64. }
  65. func (db *DB) Get(key []byte) ([]byte, error) {
  66. return db.lvl.Get(key, nil)
  67. }
  68. func (db *DB) Put(key, value []byte) error {
  69. return db.lvl.Put(key, value, nil)
  70. }
  71. func (db *DB) Has(key []byte) (bool, error) {
  72. return db.lvl.Has(key, nil)
  73. }
  74. func (db *DB) Delete(key []byte) error {
  75. return db.lvl.Delete(key, nil)
  76. }
  77. // iterator reads
  78. func (db *DB) NewIterator() iterator.Iterator {
  79. return db.lvl.NewIterator(nil, nil)
  80. }
  81. // batch writes
  82. func NewBatch() *leveldb.Batch {
  83. return new(leveldb.Batch)
  84. }
  85. func (db *DB) WriteBatch(batch *leveldb.Batch) error {
  86. return db.lvl.Write(batch, nil)
  87. }

Errors

  1. package leveldb
  2. import (
  3. "errors"
  4. "github.com/syndtr/goleveldb/leveldb"
  5. )
  6. var (
  7. ErrNotFound = leveldb.ErrNotFound
  8. ErrParasBlank = errors.New("field name or type cannot be blank")
  9. ErrBinaryOverflow = errors.New("value larger than 64 bits (overflow)")
  10. )

Wrapper Type to int64/uint64/string/float64

  1. package leveldb
  2. import (
  3. "encoding/binary"
  4. "math"
  5. "github.com/syndtr/goleveldb/leveldb"
  6. )
  7. /*
  8. Data Types
  9. */
  10. type TypeKeyer interface {
  11. TypeKey() string
  12. }
  13. // Type base
  14. type dataTypeBase struct {
  15. db *DB
  16. name string
  17. key []byte
  18. attr attrSchema
  19. }
  20. func (d dataTypeBase) TypeKey() string {
  21. return "base"
  22. }
  23. func (d dataTypeBase) Delete() error {
  24. switch d.attr {
  25. case asFields:
  26. if err := d.db.schemaFieldDelete(d.name); err != nil {
  27. return err
  28. }
  29. return d.db.Delete(d.key)
  30. case asGroups:
  31. return d.db.schemaGroupDelete(d.name)
  32. }
  33. return nil
  34. }
  35. // Type string
  36. type dataTypeString struct {
  37. dataTypeBase
  38. }
  39. func (d dataTypeString) TypeKey() string {
  40. return "string"
  41. }
  42. // If the value is not found, error (ErrNotFound) is returned
  43. func (d dataTypeString) Get() (string, error) {
  44. b, err := d.db.Get(d.key)
  45. if err != nil {
  46. return "", err
  47. }
  48. return string(b), nil
  49. }
  50. func (d dataTypeString) Put(v string) error {
  51. return d.db.Put(d.key, []byte(v))
  52. }
  53. func (d dataTypeString) PutInBatch(batch *leveldb.Batch, v string) {
  54. batch.Put(d.key, []byte(v))
  55. }
  56. // Type int64
  57. type dataTypeInt64 struct {
  58. dataTypeBase
  59. }
  60. func (d dataTypeInt64) TypeKey() string {
  61. return "int64"
  62. }
  63. //If the value is not found, error (ErrNotFound) is returned
  64. func (d dataTypeInt64) Get() (int64, error) {
  65. b, err := d.db.Get(d.key)
  66. if err != nil {
  67. return 0, err
  68. }
  69. return ByteToInt64(b)
  70. }
  71. func (d dataTypeInt64) Put(v int64) error {
  72. return d.db.Put(d.key, Int64ToByte(v))
  73. }
  74. func (d dataTypeInt64) PutInBatch(batch *leveldb.Batch, v int64) {
  75. batch.Put(d.key, Int64ToByte(v))
  76. }
  77. // Type uint64
  78. type dataTypeUint64 struct {
  79. dataTypeBase
  80. }
  81. func (d dataTypeUint64) TypeKey() string {
  82. return "uint64"
  83. }
  84. //If the value is not found, error (ErrNotFound) is returned
  85. func (d dataTypeUint64) Get() (uint64, error) {
  86. b, err := d.db.Get(d.key)
  87. if err != nil {
  88. return 0, err
  89. }
  90. return ByteToUint64(b)
  91. }
  92. func (d dataTypeUint64) Put(v uint64) error {
  93. return d.db.Put(d.key, Uint64ToByte(v))
  94. }
  95. func (d dataTypeUint64) PutInBatch(batch *leveldb.Batch, v uint64) {
  96. batch.Put(d.key, Uint64ToByte(v))
  97. }
  98. // Type float64
  99. type dataTypeFloat64 struct {
  100. dataTypeBase
  101. }
  102. func (d dataTypeFloat64) TypeKey() string {
  103. return "float64"
  104. }
  105. //If the value is not found, error (ErrNotFound) is returned
  106. func (d dataTypeFloat64) Get() (float64, error) {
  107. b, err := d.db.Get(d.key)
  108. if err != nil {
  109. return 0, err
  110. }
  111. return ByteToFloat64(b)
  112. }
  113. func (d dataTypeFloat64) Put(v float64) error {
  114. return d.db.Put(d.key, Float64ToByte(v))
  115. }
  116. func (d dataTypeFloat64) PutInBatch(batch *leveldb.Batch, v float64) {
  117. batch.Put(d.key, Float64ToByte(v))
  118. }
  119. /*
  120. utils
  121. */
  122. func Int64ToByte(x int64) []byte {
  123. b := make([]byte, binary.MaxVarintLen64)
  124. return b[:binary.PutVarint(b, x)]
  125. }
  126. func Uint64ToByte(x uint64) []byte {
  127. b := make([]byte, binary.MaxVarintLen64)
  128. return b[:binary.PutUvarint(b, x)]
  129. }
  130. func ByteToInt64(b []byte) (int64, error) {
  131. x, n := binary.Varint(b)
  132. if n < 0 {
  133. return 0, ErrBinaryOverflow
  134. }
  135. return x, nil
  136. }
  137. func ByteToUint64(b []byte) (uint64, error) {
  138. x, n := binary.Uvarint(b)
  139. if n < 0 {
  140. return 0, ErrBinaryOverflow
  141. }
  142. return x, nil
  143. }
  144. func Float64ToByte(x float64) []byte {
  145. return Uint64ToByte(math.Float64bits(x))
  146. }
  147. func ByteToFloat64(b []byte) (float64, error) {
  148. n, err := ByteToUint64(b)
  149. if err != nil {
  150. return 0, err
  151. }
  152. return math.Float64frombits(n), nil
  153. }
  154. func makeSliceByteWithLen(n int, ss ...[]byte) []byte {
  155. bs := make([]byte, 0, n)
  156. for _, s := range ss {
  157. if s != nil {
  158. bs = append(bs, s...)
  159. }
  160. }
  161. return bs
  162. }
  163. func makeSliceByte(ss ...[]byte) []byte {
  164. n := 0
  165. for i := range ss {
  166. n += len(ss[i])
  167. }
  168. return makeSliceByteWithLen(n, ss...)
  169. }

Fields

  1. func (db *DB) NewDataUint64Field(name, desc string) (dataTypeUint64, error) {
  2. dts := dataTypeUint64{}
  3. key, err := db.schemaField(name, dts.TypeKey(), desc)
  4. if err != nil {
  5. return dts, err
  6. }
  7. dts.db = db
  8. dts.name = name
  9. dts.attr = asFields
  10. dts.key = key
  11. return dts, nil
  12. }
  13. func (db *DB) NewDataInt64Field(name, desc string) (dataTypeInt64, error) {
  14. dts := dataTypeInt64{}
  15. key, err := db.schemaField(name, dts.TypeKey(), desc)
  16. if err != nil {
  17. return dts, err
  18. }
  19. dts.db = db
  20. dts.name = name
  21. dts.attr = asFields
  22. dts.key = key
  23. return dts, nil
  24. }
  25. func (db *DB) NewDataFloat64Field(name, desc string) (dataTypeFloat64, error) {
  26. dts := dataTypeFloat64{}
  27. key, err := db.schemaField(name, dts.TypeKey(), desc)
  28. if err != nil {
  29. return dts, err
  30. }
  31. dts.db = db
  32. dts.name = name
  33. dts.attr = asFields
  34. dts.key = key
  35. return dts, nil
  36. }
  37. func (db *DB) NewDataStringField(name, desc string) (dataTypeString, error) {
  38. dts := dataTypeString{}
  39. key, err := db.schemaField(name, dts.TypeKey(), desc)
  40. if err != nil {
  41. return dts, err
  42. }
  43. dts.db = db
  44. dts.name = name
  45. dts.attr = asFields
  46. dts.key = key
  47. return dts, nil
  48. }

Sample: Field Operations

  1. func addFields(db *DB) error {
  2. version, err := db.NewDataStringField("version", "Database Version")
  3. if err != nil {
  4. return err
  5. }
  6. version.Put("0.4.0")
  7. name, err := db.NewDataStringField("name", "Database Name")
  8. if err != nil {
  9. return err
  10. }
  11. name.Put("My First Database")
  12. age, err := db.NewDataInt64Field("age", "Database Age")
  13. if err != nil {
  14. return err
  15. }
  16. age.Put(36)
  17. return nil
  18. }
  19. func addBatchFields(db *DB) error {
  20. batch := NewBatch()
  21. version, _ := db.NewDataStringField("version", "")
  22. version.PutInBatch(batch, "9.9.0")
  23. name, _ := db.NewDataStringField("name", "")
  24. name.PutInBatch(batch, "New Name Change")
  25. build, _ := db.NewDataFloat64Field("build", "Build Number")
  26. build.PutInBatch(batch, 66.66)
  27. db.WriteBatch(batch)
  28. return nil
  29. }

Golang 之 LevelDB - 图2

Groups

  1. package leveldb
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/syndtr/goleveldb/leveldb"
  6. "github.com/syndtr/goleveldb/leveldb/iterator"
  7. )
  8. // Type struct
  9. type DataTypeStructer interface {
  10. EncodeKey() ([]byte, error)
  11. DecodeKey([]byte) error
  12. EncodeValue() ([]byte, error)
  13. DecodeValue([]byte) error
  14. }
  15. type dataTypeStruct struct {
  16. dataTypeBase
  17. }
  18. func (d dataTypeStruct) TypeKey() string {
  19. return "struct"
  20. }
  21. func (d dataTypeStruct) Get(x DataTypeStructer) error {
  22. key, err := x.EncodeKey()
  23. if err != nil {
  24. return err
  25. }
  26. b, err := d.db.Get(makeSliceByte(d.key, key))
  27. if err != nil {
  28. return err
  29. }
  30. return x.DecodeValue(b)
  31. }
  32. func (d dataTypeStruct) Put(x DataTypeStructer) error {
  33. key, err := x.EncodeKey()
  34. if err != nil {
  35. return err
  36. }
  37. value, err := x.EncodeValue()
  38. if err != nil {
  39. return err
  40. }
  41. return d.db.Put(makeSliceByte(d.key, key), value)
  42. }
  43. func (d dataTypeStruct) Delete(x DataTypeStructer) error {
  44. key, err := x.EncodeKey()
  45. if err != nil {
  46. return err
  47. }
  48. return d.db.Delete(makeSliceByte(d.key, key))
  49. }
  50. func (d dataTypeStruct) Has(x DataTypeStructer) (bool, error) {
  51. key, err := x.EncodeKey()
  52. if err != nil {
  53. return false, err
  54. }
  55. return d.db.Has(makeSliceByte(d.key, key))
  56. }
  57. func (d dataTypeStruct) GetMulti(xs ...DataTypeStructer) error {
  58. snapshot, err := d.db.lvl.GetSnapshot()
  59. if err != nil {
  60. return err
  61. }
  62. defer snapshot.Release()
  63. for i := range xs {
  64. key, err := xs[i].EncodeKey()
  65. if err != nil {
  66. return err
  67. }
  68. b, err := snapshot.Get(
  69. makeSliceByte(d.key, key), nil)
  70. if err != nil {
  71. return err
  72. }
  73. if err = xs[i].DecodeValue(b); err != nil {
  74. return err
  75. }
  76. }
  77. return nil
  78. }
  79. func (d dataTypeStruct) HasMulti(xs ...DataTypeStructer) ([]bool, error) {
  80. have := make([]bool, len(xs))
  81. snapshot, err := d.db.lvl.GetSnapshot()
  82. if err != nil {
  83. return nil, err
  84. }
  85. defer snapshot.Release()
  86. for i := range xs {
  87. key, err := xs[i].EncodeKey()
  88. if err != nil {
  89. return nil, err
  90. }
  91. have[i], err = snapshot.Has(
  92. makeSliceByte(d.key, key), nil)
  93. if err != nil {
  94. return nil, err
  95. }
  96. }
  97. return have, nil
  98. }
  99. func (d dataTypeStruct) PutInBatch(batch *leveldb.Batch, x DataTypeStructer) {
  100. key, err := x.EncodeKey()
  101. if err != nil {
  102. panic(fmt.Errorf("encode error: %v", err))
  103. }
  104. value, err := x.EncodeValue()
  105. if err != nil {
  106. panic(fmt.Errorf("decode error: %v", err))
  107. }
  108. batch.Put(makeSliceByte(d.key, key), value)
  109. }
  110. func (d dataTypeStruct) DeleteInBatch(batch *leveldb.Batch, x DataTypeStructer) {
  111. key, err := x.EncodeKey()
  112. if err != nil {
  113. panic(fmt.Errorf("encode error: %v", err))
  114. }
  115. batch.Delete(makeSliceByte(d.key, key))
  116. }
  117. /*
  118. Seek Doc:
  119. https://godoc.org/github.com/syndtr/goleveldb/leveldb/iterator#IteratorSeeker
  120. Seek moves the iterator to the first key/value pair whose key is greater
  121. than or equal to the given key. It returns whether such pair exist.
  122. First() Prev() Seek() Next() Last()
  123. | | | | |
  124. -------------------------------------------------
  125. */
  126. // Count returns the number of items in group.
  127. func (d dataTypeStruct) Count() (int, error) {
  128. it := d.db.NewIterator()
  129. defer it.Release()
  130. count := 0
  131. for ok := it.Seek(d.key); ok; ok = it.Next() {
  132. key := it.Key()
  133. if key[0] != d.key[0] {
  134. break
  135. }
  136. count++
  137. }
  138. return count, it.Error()
  139. }
  140. // CountFrom returns the number of items in group
  141. // starting from the item.
  142. func (d dataTypeStruct) CountFrom(start DataTypeStructer) (int, error) {
  143. it := d.db.NewIterator()
  144. defer it.Release()
  145. startKey, err := start.EncodeKey()
  146. if err != nil {
  147. return 0, err
  148. }
  149. count := 0
  150. for ok := it.Seek(startKey); ok; ok = it.Next() {
  151. key := it.Key()
  152. if key[0] != d.key[0] {
  153. break
  154. }
  155. count++
  156. }
  157. return count, it.Error()
  158. }
  159. func (d dataTypeStruct) getFromIterator(it iterator.Iterator,
  160. prefix []byte, x DataTypeStructer) error {
  161. // check key
  162. key := it.Key()
  163. if !bytes.HasPrefix(key, prefix) {
  164. return ErrNotFound
  165. }
  166. // decode key
  167. if err := x.DecodeKey(key); err != nil {
  168. return err
  169. }
  170. if err := x.DecodeValue(it.Value()); err != nil {
  171. return err
  172. }
  173. return it.Error()
  174. }
  175. // First returns the first item in group which encoded key starts with a prefix.
  176. func (d dataTypeStruct) First(prefix []byte, x DataTypeStructer) error {
  177. it := d.db.NewIterator()
  178. defer it.Release()
  179. totalPrefix := makeSliceByte(d.key, prefix)
  180. it.Seek(totalPrefix)
  181. return d.getFromIterator(it, totalPrefix, x)
  182. }
  183. // Last returns the last item in group which encoded key starts with a prefix.
  184. func (d dataTypeStruct) Last(prefix []byte, x DataTypeStructer) error {
  185. it := d.db.NewIterator()
  186. defer it.Release()
  187. totalPrefix := makeSliceByte(d.key, prefix)
  188. // get the next prefix in line
  189. // since leveldb iterator Seek seeks to the
  190. // next key if the key that it seeks to is not found
  191. // and by getting the previous key, the last one for the
  192. // actual prefix is found
  193. incPrefix := incByteSlice(totalPrefix)
  194. if !it.Seek(incPrefix) {
  195. it.Last()
  196. } else {
  197. it.Prev()
  198. }
  199. return d.getFromIterator(it, totalPrefix, x)
  200. }
  201. func (d dataTypeStruct) Destroy() error {
  202. it := d.db.NewIterator()
  203. defer it.Release()
  204. batch := NewBatch()
  205. for ok := it.Seek(d.key); ok; ok = it.Next() {
  206. key := it.Key()
  207. if key[0] != d.key[0] {
  208. break
  209. }
  210. batch.Delete(key)
  211. }
  212. if err := it.Error(); err != nil {
  213. return err
  214. }
  215. if err := d.db.WriteBatch(batch); err != nil {
  216. return err
  217. }
  218. return d.dataTypeBase.Delete()
  219. }
  220. //
  221. // Iterate functions iterates over keys of the Index.
  222. //
  223. // IterateOptions defines optional parameters for Iterate function.
  224. type IterateOptions struct {
  225. // StartFrom is the Item to start the iteration from.
  226. StartFrom DataTypeStructer
  227. // If SkipStart is true, StartFrom item will not be iterated on.
  228. SkipStart bool
  229. // Iterate over items which keys have a common prefix.
  230. Prefix []byte
  231. // Callback on every DataTypeStructer
  232. // By returning a true for stop variable, iteration will
  233. // stop, and by returning the error, that error will be
  234. // propagated to the called iterator method.
  235. Func func(x DataTypeStructer) (stop bool, err error)
  236. }
  237. type groupIterator struct {
  238. it iterator.Iterator
  239. ok bool
  240. dg *dataTypeStruct
  241. opts *IterateOptions
  242. prefix []byte
  243. err error
  244. }
  245. func (d dataTypeStruct) NewIterator(opts *IterateOptions) (*groupIterator, error) {
  246. if opts == nil {
  247. opts = &IterateOptions{}
  248. }
  249. prefix := makeSliceByte(d.key, opts.Prefix)
  250. startKey := prefix
  251. if opts.StartFrom != nil {
  252. key, err := opts.StartFrom.EncodeKey()
  253. if err != nil {
  254. return nil, err
  255. }
  256. startKey = makeSliceByte(d.key, key)
  257. }
  258. it := d.db.NewIterator()
  259. ok := it.Seek(startKey)
  260. if !ok {
  261. // stop iterator if seek has failed
  262. return nil, it.Error()
  263. }
  264. if opts.SkipStart && bytes.Equal(startKey, it.Key()) {
  265. // skip the start from Item if it is the first key
  266. // and it is explicitly configured to skip it
  267. ok = it.Next()
  268. }
  269. return &groupIterator{
  270. dg: &d,
  271. prefix: prefix,
  272. it: it,
  273. ok: ok,
  274. opts: opts,
  275. }, nil
  276. }
  277. func (g *groupIterator) Release() {
  278. g.it.Release()
  279. }
  280. func (g *groupIterator) Next(x DataTypeStructer) bool {
  281. if err := g.dg.getFromIterator(g.it, g.prefix, x); err != nil {
  282. if err != ErrNotFound {
  283. g.err = err
  284. }
  285. return false
  286. }
  287. if g.opts.Func != nil {
  288. stop, err := g.opts.Func(x)
  289. if err != nil {
  290. g.err = err
  291. return false
  292. }
  293. if stop {
  294. return false
  295. }
  296. }
  297. g.it.Next()
  298. return true
  299. }
  300. func (g *groupIterator) Error() error {
  301. return g.err
  302. }
  303. func (db *DB) NewDataGroup(name, desc string) (dataTypeStruct, error) {
  304. dg := dataTypeStruct{}
  305. key, err := db.schemaGroup(name, dg.TypeKey(), desc)
  306. if err != nil {
  307. return dg, err
  308. }
  309. dg.db = db
  310. dg.name = name
  311. dg.attr = asGroups
  312. dg.key = []byte{key}
  313. return dg, nil
  314. }
  315. func incByteSlice(b []byte) []byte {
  316. next := append(b[:0:0], b...)
  317. for i := len(b) - 1; i >= 0; i-- {
  318. if b[i] == 255 {
  319. next[i] = 0
  320. } else {
  321. next[i] += 1
  322. return next
  323. }
  324. }
  325. return nil
  326. }

Sample: Group Operations

  1. var (
  2. Dump = common.Dump
  3. Fmt = common.Fmt
  4. )
  5. type Node struct {
  6. id string
  7. hostname string
  8. }
  9. func (n *Node) EncodeKey() ([]byte, error) {
  10. return []byte(n.hostname), nil
  11. }
  12. func (n *Node) DecodeKey(b []byte) error {
  13. n.hostname = string(b)
  14. return nil
  15. }
  16. func (n *Node) EncodeValue() ([]byte, error) {
  17. return []byte(fmt.Sprintf("%v:%v", n.id, n.hostname)), nil
  18. }
  19. func (n *Node) DecodeValue(b []byte) error {
  20. ss := strings.Split(string(b), ":")
  21. n.id = ss[0]
  22. n.hostname = ss[1]
  23. return nil
  24. }
  25. func addGroups(db *DB) error {
  26. node, err := db.NewDataGroup("node", "Database Node")
  27. if err != nil {
  28. return err
  29. }
  30. node.Put(&Node{
  31. id: "123456",
  32. hostname: "haidian",
  33. })
  34. node.Put(&Node{
  35. id: "563455",
  36. hostname: "chaoyang",
  37. })
  38. node.Put(&Node{
  39. id: "9123456",
  40. hostname: "daxiang",
  41. })
  42. node.Put(&Node{
  43. id: "7891011",
  44. hostname: "xicheng3",
  45. })
  46. node.Put(&Node{
  47. id: "9087766",
  48. hostname: "001TaiShan",
  49. })
  50. db.ShowGroup("node")
  51. x := &Node{}
  52. node.First(nil, x)
  53. y := &Node{}
  54. node.Last(nil, x)
  55. Dump(x)
  56. Dump(y)
  57. return nil
  58. }
  59. func ShowNodeGroup(db *DB) {
  60. db.ShowGroup("node")
  61. nodeGrp, err := db.NewDataGroup("node", "")
  62. if err != nil {
  63. fmt.Printf("Failed to retrieve group info: %v", err)
  64. return
  65. }
  66. if total, err := nodeGrp.Count(); err != nil {
  67. fmt.Printf("Failed to retrieve group total number: %v", err)
  68. return
  69. } else {
  70. fmt.Printf("Group total number: %d\n\n", total)
  71. }
  72. opts := &IterateOptions{
  73. Prefix: []byte("xicheng"),
  74. }
  75. it, err := nodeGrp.NewIterator(opts)
  76. if err != nil {
  77. fmt.Printf("Failed to create group iterator: %v", err)
  78. return
  79. }
  80. defer it.Release()
  81. for x := (&Node{}); it.Next(x); x = (&Node{}) {
  82. Dump(x)
  83. }
  84. if err := it.Error(); err != nil {
  85. fmt.Printf("Group iterator error: %v\n", err)
  86. return
  87. }
  88. }

LevelDBSchema.jpg
LevelDBSchema3.jpg