Tracker
图 1: dataUpdateTracker 核心结构
dataUpdateTracker 用于跟踪 Minio 系统的文件、目录变更情况。通过 dataUpdateFilter 中的 idx 来区分版本,通过 current 方法可获取当前版本
func (d *dataUpdateTracker) current() uint64 {d.mu.Lock()defer d.mu.Unlock()return d.Current.idx}
历史版本查找如下,遍历全部 History 元素,找到与 idx 相同的记录并返回。
func (d dataUpdateTrackerHistory) find(idx uint64) *dataUpdateFilter {for _, f := range d {if f.idx == idx {return &f}}return nil}
Serialize
Minio 元数据中,设置了单独的文件记录 .minio.sys/buckets/.tracker.bin 文件保存 tracker 内容。当服务启动时,会根据文件保存驱动器位置来加载操作记录。最新的操作记录会定时触发保存操作或强制保存,接下来我们通过代码来看一下保存时的文件格式。
func (d *dataUpdateTracker) serialize(dst io.Writer) (err error) {ctx := GlobalContextvar tmp [8]byteo := bufio.NewWriter(dst)defer func() {if err == nil {err = o.Flush()}}()// Versionif err := o.WriteByte(dataUpdateTrackerVersion); err != nil {if d.debug {logger.LogIf(ctx, err)}return err}// Timestamp.binary.LittleEndian.PutUint64(tmp[:], uint64(d.Saved.Unix()))if _, err := o.Write(tmp[:]); err != nil {if d.debug {logger.LogIf(ctx, err)}return err}// Currentbinary.LittleEndian.PutUint64(tmp[:], d.Current.idx)if _, err := o.Write(tmp[:]); err != nil {if d.debug {logger.LogIf(ctx, err)}return err}if _, err := d.Current.bf.WriteTo(o); err != nil {if d.debug {logger.LogIf(ctx, err)}return err}// Historybinary.LittleEndian.PutUint64(tmp[:], uint64(len(d.History)))if _, err := o.Write(tmp[:]); err != nil {if d.debug {logger.LogIf(ctx, err)}return err}for _, bf := range d.History {// Currentbinary.LittleEndian.PutUint64(tmp[:], bf.idx)if _, err := o.Write(tmp[:]); err != nil {if d.debug {logger.LogIf(ctx, err)}return err}if _, err := bf.bf.WriteTo(o); err != nil {if d.debug {logger.LogIf(ctx, err)}return err}}return nil}
通过分析可知格式如下所示
图 2:.tracker.bin 文件格式
加载过程参照文件格式进行操作即可,只需要注意对于版本号要求必须为 6,并且再次强调,每个文件驱动都有自己的 tracker 文件。同时需要注意,由于 tracker 是定时存储的,因此极端情况下,可能会发生掉电丢失 tracker 内容的情况发生。
Saver Go Routine
Minio 服务启动时,会同步开启 tracker 的保存日志协程。该协程在定时器触发或接收到立即保存指令时,对 tracker 内容执行落盘操作。L12 - L17 执行等待操作,接收到操作、退出信号后,执行后续操作;L18 - L26 处理没有内容变化发生的情况;L27 - L32 对操作记录进行序列号操作;L45 - L55 将序列化后的内容,写入每个驱动的 tracker 文件。
func (d *dataUpdateTracker) startSaver(ctx context.Context, interval time.Duration, drives []string) {saveNow := d.saveexited := make(chan struct{})d.saveExited = exitedd.mu.Unlock()t := time.NewTicker(interval)defer t.Stop()defer close(exited)var buf bytes.Bufferfor {var exit boolselect {case <-ctx.Done():exit = truecase <-t.C:case <-saveNow:}buf.Reset()d.mu.Lock()if !d.dirty {d.mu.Unlock()if exit {return}continue}d.Saved = UTCNow()err := d.serialize(&buf)if d.debug {console.Debugf(color.Green("dataUpdateTracker:")+" Saving: %v bytes, Current idx: %v\n", buf.Len(), d.Current.idx)}d.dirty = falsed.mu.Unlock()if err != nil {logger.LogIf(ctx, err, "Error serializing usage tracker data")if exit {return}continue}if buf.Len() == 0 {logger.LogIf(ctx, errors.New("zero sized output, skipping save"))continue}for _, drive := range drives {cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename)err := ioutil.WriteFile(cacheFormatPath, buf.Bytes(), os.ModePerm)if err != nil {if osIsNotExist(err) {continue}logger.LogIf(ctx, err)continue}}if exit {return}}}
当文件系统内容发生变化时,调用 markDirty 可以标记 dirty 域,L15 - L17 将全部子目录都添加至 Bloom Filter 中。
func (d *dataUpdateTracker) markDirty(bucket, prefix string) {dateUpdateTrackerLogPrefix := color.Green("dataUpdateTracker:")if bucket == "" && d.debug {console.Debugf(dateUpdateTrackerLogPrefix + " no bucket specified\n")return}if isReservedOrInvalidBucket(bucket, false) && d.debug {return}split := splitPathDeterministic(pathJoin(bucket, prefix))// Add all paths until done.d.mu.Lock()for i := range split {d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String())}d.dirty = d.dirty || len(split) > 0d.mu.Unlock()}
BloomFilter
Minio 中的 Bloom Filter 直接使用开源项目 github.com/bits-and-blooms/bloom,仅做了简单封装,因此在此处简要介绍下,定义如下
type BloomFilter struct {m uintk uintb *bitset.BitSet}
核心结构体为 BitSet,定义如下,注意 set 类型为无符号 64 位整数切片,这意味着每个域可存储 64 位,计算时需要考虑。
type BitSet struct {length uintset []uint64}
设置过程非常简单,定位到 set 对应索引,然后在定位在位中的位置。
func (b *BitSet) Set(i uint) *BitSet {b.extendSetMaybe(i)b.set[i>>log2WordSize] |= 1 << (i & (wordSize - 1))return b}
常量定义如下,wordSize 代表一个 set[n] 的比特数量,用于定位某位在一个无符号 64 位整数内部的偏移量,log2WordSize 用于计算索引值。
const wordSize = uint(64)// log2WordSize is lg(wordSize)const log2WordSize = uint(6)
清除操作如下,注意除了要清理的位,其他位内容都为 1 即可。
func (b *BitSet) Clear(i uint) *BitSet {if i >= b.length {return b}b.set[i>>log2WordSize] &^= 1 << (i & (wordSize - 1))return b}
