Tracker
图 1: dataUpdateTracker 核心结构
dataUpdateTracker 用于跟踪 Minio 系统的文件、目录变更情况。通过 dataUpdateFilter 中的 idx 来区分版本,通过 current 方法可获取当前版本
func (d *dataUpdateTracker) current() uint64 {
d.mu.Lock()
defer d.mu.Unlock()
return d.Current.idx
}
历史版本查找如下,遍历全部 History 元素,找到与 idx 相同的记录并返回。
func (d dataUpdateTrackerHistory) find(idx uint64) *dataUpdateFilter {
for _, f := range d {
if f.idx == idx {
return &f
}
}
return nil
}
Serialize
Minio 元数据中,设置了单独的文件记录 .minio.sys/buckets/.tracker.bin 文件保存 tracker 内容。当服务启动时,会根据文件保存驱动器位置来加载操作记录。最新的操作记录会定时触发保存操作或强制保存,接下来我们通过代码来看一下保存时的文件格式。
func (d *dataUpdateTracker) serialize(dst io.Writer) (err error) {
ctx := GlobalContext
var tmp [8]byte
o := bufio.NewWriter(dst)
defer func() {
if err == nil {
err = o.Flush()
}
}()
// Version
if err := o.WriteByte(dataUpdateTrackerVersion); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
// Timestamp.
binary.LittleEndian.PutUint64(tmp[:], uint64(d.Saved.Unix()))
if _, err := o.Write(tmp[:]); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
// Current
binary.LittleEndian.PutUint64(tmp[:], d.Current.idx)
if _, err := o.Write(tmp[:]); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
if _, err := d.Current.bf.WriteTo(o); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
// History
binary.LittleEndian.PutUint64(tmp[:], uint64(len(d.History)))
if _, err := o.Write(tmp[:]); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
for _, bf := range d.History {
// Current
binary.LittleEndian.PutUint64(tmp[:], bf.idx)
if _, err := o.Write(tmp[:]); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
if _, err := bf.bf.WriteTo(o); err != nil {
if d.debug {
logger.LogIf(ctx, err)
}
return err
}
}
return nil
}
通过分析可知格式如下所示
图 2:.tracker.bin 文件格式
加载过程参照文件格式进行操作即可,只需要注意对于版本号要求必须为 6,并且再次强调,每个文件驱动都有自己的 tracker 文件。同时需要注意,由于 tracker 是定时存储的,因此极端情况下,可能会发生掉电丢失 tracker 内容的情况发生。
Saver Go Routine
Minio 服务启动时,会同步开启 tracker 的保存日志协程。该协程在定时器触发或接收到立即保存指令时,对 tracker 内容执行落盘操作。L12 - L17 执行等待操作,接收到操作、退出信号后,执行后续操作;L18 - L26 处理没有内容变化发生的情况;L27 - L32 对操作记录进行序列号操作;L45 - L55 将序列化后的内容,写入每个驱动的 tracker 文件。
func (d *dataUpdateTracker) startSaver(ctx context.Context, interval time.Duration, drives []string) {
saveNow := d.save
exited := make(chan struct{})
d.saveExited = exited
d.mu.Unlock()
t := time.NewTicker(interval)
defer t.Stop()
defer close(exited)
var buf bytes.Buffer
for {
var exit bool
select {
case <-ctx.Done():
exit = true
case <-t.C:
case <-saveNow:
}
buf.Reset()
d.mu.Lock()
if !d.dirty {
d.mu.Unlock()
if exit {
return
}
continue
}
d.Saved = UTCNow()
err := d.serialize(&buf)
if d.debug {
console.Debugf(color.Green("dataUpdateTracker:")+" Saving: %v bytes, Current idx: %v\n", buf.Len(), d.Current.idx)
}
d.dirty = false
d.mu.Unlock()
if err != nil {
logger.LogIf(ctx, err, "Error serializing usage tracker data")
if exit {
return
}
continue
}
if buf.Len() == 0 {
logger.LogIf(ctx, errors.New("zero sized output, skipping save"))
continue
}
for _, drive := range drives {
cacheFormatPath := pathJoin(drive, dataUpdateTrackerFilename)
err := ioutil.WriteFile(cacheFormatPath, buf.Bytes(), os.ModePerm)
if err != nil {
if osIsNotExist(err) {
continue
}
logger.LogIf(ctx, err)
continue
}
}
if exit {
return
}
}
}
当文件系统内容发生变化时,调用 markDirty 可以标记 dirty 域,L15 - L17 将全部子目录都添加至 Bloom Filter 中。
func (d *dataUpdateTracker) markDirty(bucket, prefix string) {
dateUpdateTrackerLogPrefix := color.Green("dataUpdateTracker:")
if bucket == "" && d.debug {
console.Debugf(dateUpdateTrackerLogPrefix + " no bucket specified\n")
return
}
if isReservedOrInvalidBucket(bucket, false) && d.debug {
return
}
split := splitPathDeterministic(pathJoin(bucket, prefix))
// Add all paths until done.
d.mu.Lock()
for i := range split {
d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String())
}
d.dirty = d.dirty || len(split) > 0
d.mu.Unlock()
}
BloomFilter
Minio 中的 Bloom Filter 直接使用开源项目 github.com/bits-and-blooms/bloom,仅做了简单封装,因此在此处简要介绍下,定义如下
type BloomFilter struct {
m uint
k uint
b *bitset.BitSet
}
核心结构体为 BitSet,定义如下,注意 set 类型为无符号 64 位整数切片,这意味着每个域可存储 64 位,计算时需要考虑。
type BitSet struct {
length uint
set []uint64
}
设置过程非常简单,定位到 set 对应索引,然后在定位在位中的位置。
func (b *BitSet) Set(i uint) *BitSet {
b.extendSetMaybe(i)
b.set[i>>log2WordSize] |= 1 << (i & (wordSize - 1))
return b
}
常量定义如下,wordSize 代表一个 set[n] 的比特数量,用于定位某位在一个无符号 64 位整数内部的偏移量,log2WordSize 用于计算索引值。
const wordSize = uint(64)
// log2WordSize is lg(wordSize)
const log2WordSize = uint(6)
清除操作如下,注意除了要清理的位,其他位内容都为 1 即可。
func (b *BitSet) Clear(i uint) *BitSet {
if i >= b.length {
return b
}
b.set[i>>log2WordSize] &^= 1 << (i & (wordSize - 1))
return b
}