Tracker

general-data-update-tracker.svg
图 1: dataUpdateTracker 核心结构

dataUpdateTracker 用于跟踪 Minio 系统的文件、目录变更情况。通过 dataUpdateFilter 中的 idx 来区分版本,通过 current 方法可获取当前版本

  1. func (d *dataUpdateTracker) current() uint64 {
  2. d.mu.Lock()
  3. defer d.mu.Unlock()
  4. return d.Current.idx
  5. }

历史版本查找如下,遍历全部 History 元素,找到与 idx 相同的记录并返回。

  1. func (d dataUpdateTrackerHistory) find(idx uint64) *dataUpdateFilter {
  2. for _, f := range d {
  3. if f.idx == idx {
  4. return &f
  5. }
  6. }
  7. return nil
  8. }

Serialize

Minio 元数据中,设置了单独的文件记录 .minio.sys/buckets/.tracker.bin 文件保存 tracker 内容。当服务启动时,会根据文件保存驱动器位置来加载操作记录。最新的操作记录会定时触发保存操作或强制保存,接下来我们通过代码来看一下保存时的文件格式。

  1. func (d *dataUpdateTracker) serialize(dst io.Writer) (err error) {
  2. ctx := GlobalContext
  3. var tmp [8]byte
  4. o := bufio.NewWriter(dst)
  5. defer func() {
  6. if err == nil {
  7. err = o.Flush()
  8. }
  9. }()
  10. // Version
  11. if err := o.WriteByte(dataUpdateTrackerVersion); err != nil {
  12. if d.debug {
  13. logger.LogIf(ctx, err)
  14. }
  15. return err
  16. }
  17. // Timestamp.
  18. binary.LittleEndian.PutUint64(tmp[:], uint64(d.Saved.Unix()))
  19. if _, err := o.Write(tmp[:]); err != nil {
  20. if d.debug {
  21. logger.LogIf(ctx, err)
  22. }
  23. return err
  24. }
  25. // Current
  26. binary.LittleEndian.PutUint64(tmp[:], d.Current.idx)
  27. if _, err := o.Write(tmp[:]); err != nil {
  28. if d.debug {
  29. logger.LogIf(ctx, err)
  30. }
  31. return err
  32. }
  33. if _, err := d.Current.bf.WriteTo(o); err != nil {
  34. if d.debug {
  35. logger.LogIf(ctx, err)
  36. }
  37. return err
  38. }
  39. // History
  40. binary.LittleEndian.PutUint64(tmp[:], uint64(len(d.History)))
  41. if _, err := o.Write(tmp[:]); err != nil {
  42. if d.debug {
  43. logger.LogIf(ctx, err)
  44. }
  45. return err
  46. }
  47. for _, bf := range d.History {
  48. // Current
  49. binary.LittleEndian.PutUint64(tmp[:], bf.idx)
  50. if _, err := o.Write(tmp[:]); err != nil {
  51. if d.debug {
  52. logger.LogIf(ctx, err)
  53. }
  54. return err
  55. }
  56. if _, err := bf.bf.WriteTo(o); err != nil {
  57. if d.debug {
  58. logger.LogIf(ctx, err)
  59. }
  60. return err
  61. }
  62. }
  63. return nil
  64. }

通过分析可知格式如下所示
general-data-updater-serializer-format.svg
图 2:.tracker.bin 文件格式

加载过程参照文件格式进行操作即可,只需要注意对于版本号要求必须为 6,并且再次强调,每个文件驱动都有自己的 tracker 文件。同时需要注意,由于 tracker 是定时存储的,因此极端情况下,可能会发生掉电丢失 tracker 内容的情况发生。

Saver Go Routine

Minio 服务启动时,会同步开启 tracker 的保存日志协程。该协程在定时器触发或接收到立即保存指令时,对 tracker 内容执行落盘操作。L12 - L17 执行等待操作,接收到操作、退出信号后,执行后续操作;L18 - L26 处理没有内容变化发生的情况;L27 - L32 对操作记录进行序列号操作;L45 - L55 将序列化后的内容,写入每个驱动的 tracker 文件。

  1. func (d *dataUpdateTracker) startSaver(ctx context.Context, interval time.Duration, drives []string) {
  2. saveNow := d.save
  3. exited := make(chan struct{})
  4. d.saveExited = exited
  5. d.mu.Unlock()
  6. t := time.NewTicker(interval)
  7. defer t.Stop()
  8. defer close(exited)
  9. var buf bytes.Buffer
  10. for {
  11. var exit bool
  12. select {
  13. case <-ctx.Done():
  14. exit = true
  15. case <-t.C:
  16. case <-saveNow:
  17. }
  18. buf.Reset()
  19. d.mu.Lock()
  20. if !d.dirty {
  21. d.mu.Unlock()
  22. if exit {
  23. return
  24. }
  25. continue
  26. }
  27. d.Saved = UTCNow()
  28. err := d.serialize(&buf)
  29. if d.debug {
  30. console.Debugf(color.Green("dataUpdateTracker:")+" Saving: %v bytes, Current idx: %v\n", buf.Len(), d.Current.idx)
  31. }
  32. d.dirty = false
  33. d.mu.Unlock()
  34. if err != nil {
  35. logger.LogIf(ctx, err, "Error serializing usage tracker data")
  36. if exit {
  37. return
  38. }
  39. continue
  40. }
  41. if buf.Len() == 0 {
  42. logger.LogIf(ctx, errors.New("zero sized output, skipping save"))
  43. continue
  44. }
  45. for _, drive := range drives {
  46. cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename)
  47. err := ioutil.WriteFile(cacheFormatPath, buf.Bytes(), os.ModePerm)
  48. if err != nil {
  49. if osIsNotExist(err) {
  50. continue
  51. }
  52. logger.LogIf(ctx, err)
  53. continue
  54. }
  55. }
  56. if exit {
  57. return
  58. }
  59. }
  60. }

当文件系统内容发生变化时,调用 markDirty 可以标记 dirty 域,L15 - L17 将全部子目录都添加至 Bloom Filter 中。

  1. func (d *dataUpdateTracker) markDirty(bucket, prefix string) {
  2. dateUpdateTrackerLogPrefix := color.Green("dataUpdateTracker:")
  3. if bucket == "" && d.debug {
  4. console.Debugf(dateUpdateTrackerLogPrefix + " no bucket specified\n")
  5. return
  6. }
  7. if isReservedOrInvalidBucket(bucket, false) && d.debug {
  8. return
  9. }
  10. split := splitPathDeterministic(pathJoin(bucket, prefix))
  11. // Add all paths until done.
  12. d.mu.Lock()
  13. for i := range split {
  14. d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String())
  15. }
  16. d.dirty = d.dirty || len(split) > 0
  17. d.mu.Unlock()
  18. }

BloomFilter

Minio 中的 Bloom Filter 直接使用开源项目 github.com/bits-and-blooms/bloom,仅做了简单封装,因此在此处简要介绍下,定义如下

  1. type BloomFilter struct {
  2. m uint
  3. k uint
  4. b *bitset.BitSet
  5. }

核心结构体为 BitSet,定义如下,注意 set 类型为无符号 64 位整数切片,这意味着每个域可存储 64 位,计算时需要考虑。

  1. type BitSet struct {
  2. length uint
  3. set []uint64
  4. }

设置过程非常简单,定位到 set 对应索引,然后在定位在位中的位置。

  1. func (b *BitSet) Set(i uint) *BitSet {
  2. b.extendSetMaybe(i)
  3. b.set[i>>log2WordSize] |= 1 << (i & (wordSize - 1))
  4. return b
  5. }

常量定义如下,wordSize 代表一个 set[n] 的比特数量,用于定位某位在一个无符号 64 位整数内部的偏移量,log2WordSize 用于计算索引值。

  1. const wordSize = uint(64)
  2. // log2WordSize is lg(wordSize)
  3. const log2WordSize = uint(6)

清除操作如下,注意除了要清理的位,其他位内容都为 1 即可。

  1. func (b *BitSet) Clear(i uint) *BitSet {
  2. if i >= b.length {
  3. return b
  4. }
  5. b.set[i>>log2WordSize] &^= 1 << (i & (wordSize - 1))
  6. return b
  7. }