Tools
PubSub
图1: PubSub 示意图
PubSub 原理非常简单,有一些简单的小技巧,说明一下即可。第一,在 Subscribe 时,通过 defer 执行 Unsubscribe 操作;第二,Sub 有一个事件过滤器;第三,通过原子计数来统计 Subscriber 数量。
func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filter func(entry interface{}) bool) {
ps.Lock()
defer ps.Unlock()
sub := &Sub{subCh, filter}
ps.subs = append(ps.subs, sub)
atomic.AddInt32(&ps.numSubscribers, 1)
go func() {
<-doneCh
ps.Lock()
defer ps.Unlock()
for i, s := range ps.subs {
if s == sub {
ps.subs = append(ps.subs[:i], ps.subs[i+1:]...)
}
}
atomic.AddInt32(&ps.numSubscribers, -1)
}()
}
func (ps *PubSub) Publish(item interface{}) {
ps.RLock()
defer ps.RUnlock()
for _, sub := range ps.subs {
if sub.filter == nil || sub.filter(item) {
select {
case sub.ch <- item:
default:
}
}
}
}
func (ps *PubSub) NumSubscribers() int32 {
return atomic.LoadInt32(&ps.numSubscribers)
}
FS Object Layer
在 minio 服务启动过程中,成功创建 Endpoint 节点及 HTTP 服务器后,就要创建文件系统以存储对象,如下所示。如果 EndpointServerPools 里仅有一个节点时,会创建 FSObjects 对象,使用文件系统来存储对象。
newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
创建代码如下,使用唯一节点的文件系统路径作为参数
func newObjectLayer(ctx context.Context, endpointServerPools EndpointServerPools) (newObject ObjectLayer, err error) {
// For FS only, directly use the disk.
if endpointServerPools.NEndpoints() == 1 {
// Initialize new FS object layer.
return NewFSObjectLayer(endpointServerPools[0].Endpoints[0].Path)
}
return newErasureServerPools(ctx, endpointServerPools)
}
NewFSObjectLayer
首先检查文件路径是否存在、是否文件夹等。如果文件名为空字符串,直接退出;如果文件路径不存在,则创建目录;如果文件路径存在,但不是目录,返回错误。
if fsPath, err = getValidPath(fsPath); err != nil {
if err == errMinDiskSize {
return nil, config.ErrUnableToWriteInBackend(err).Hint(err.Error())
}
// ...
}
接下来,获取一个 UUID,并初始化系统文件目录,initMetaVolumeFS 创建如下目录:
- FSRoot/.minio.sys
- FSRoot/.minio.sys/tmp/UUID
- FSRoot/.minio.sys/buckets
- FSRoot/.minio.sys/multipart ```go fsUUID := mustGetUUID()
// Initialize meta volume, if volume already exists ignores it. if err = initMetaVolumeFS(fsPath, fsUUID); err != nil { return nil, err }
<a name="gIsRb"></a>
#### Format.JSON
Minio 系统目录创建完毕后,开始创建 _FSRoot_/**.minio.sys/format.json** 文件,并返回一个读共享实例,用于访问该文件。
```go
rlk, err := initFormatFS(ctx, fsPath)
if err != nil {
return nil, err
}
initFormatFS
图 2: initFormatFS 执行流程图
图 2 中,蓝色代表首次循环,文件不存在,创建文件;紫色代表二次循环,版本需要更新;红色代表第三次循环,创建 Deployment ID。
initFormatFS 执行过程相对复杂,结下来我们详细解释。首先拼接好 format.json 文件路径。假设当前是第一个 minio 实例的首次启动,此时,因为文件并不存在,程序尝试创建初始文件格式。
if osIsNotExist(err) || isEmpty {
if err == nil {
rlk.Close()
}
// Fresh disk - create format.json
err = createFormatFS(fsFormatPath)
if err == lock.ErrAlreadyLocked {
// Lock already present, sleep and attempt again.
// Can happen in a rare situation when a parallel minio process
// holds the lock and creates format.json
time.Sleep(100 * time.Millisecond)
continue
}
if err != nil {
return nil, err
}
// After successfully creating format.json try to hold a read-lock on
// the file.
continue
}
创建初始配置文件的代码如下,注意 L4 打开方式是读写方式开启,且有创建标识,因此,如果文件不存在,会在此处创建
func createFormatFS(fsFormatPath string) error {
// Attempt a write lock on formatConfigFile `format.json`
// file stored in minioMetaBucket(.minio.sys) directory.
lk, err := lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
// Close the locked file upon return.
defer lk.Close()
fi, err := lk.Stat()
if err != nil {
return err
}
if fi.Size() != 0 {
// format.json already got created because of another minio process's createFormatFS()
return nil
}
return jsonSave(lk.File, newFormatFSV1())
}
随后写入 V1 版本内容
func newFormatFSV1() (format *formatFSV1) {
f := &formatFSV1{}
f.Version = formatMetaVersionV1
f.Format = formatBackendFS
f.ID = mustGetUUID()
f.FS.Version = formatFSVersionV1
return f
}
写入内容完成后,尝试重新以只读方式打开配置文件,并判断内容是否为空,因为写入已经成功,此处内容必然不会为空
rlk, err := lock.RLockedOpenFile(fsFormatPath)
if err == nil {
// format.json can be empty in a rare condition when another
// minio process just created the file but could not hold lock
// and write to it.
var fi os.FileInfo
fi, err = rlk.Stat()
if err != nil {
return nil, err
}
isEmpty = fi.Size() == 0
}
获取配置内容后,判断版本号是否为 V2,如果不是,则进行版本迁移操作,这里需要注意,执行版本迁移操作前,要关闭当前文件,并以读写方式重新打开文件,迁移完成后,关闭读写方式打开的文件,随后进入下次循环。
version, err := formatFSGetVersion(rlk)
if err != nil {
return nil, err
}
if version != formatFSVersionV2 {
// Format needs migration
rlk.Close()
// Hold write lock during migration so that we do not disturb any
// minio processes running in parallel.
var wlk *lock.LockedFile
wlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)
if err == lock.ErrAlreadyLocked {
// Lock already present, sleep and attempt again.
time.Sleep(100 * time.Millisecond)
continue
}
if err != nil {
return nil, err
}
err = formatFSMigrate(ctx, wlk, fsPath)
wlk.Close()
if err != nil {
// Migration failed, bail out so that the user can observe what happened.
return nil, err
}
// Successfully migrated, now try to hold a read-lock on format.json
continue
}
版本迁移代码如下所示,碰到类似场景时,可考虑使用
func formatFSMigrate(ctx context.Context, wlk *lock.LockedFile, fsPath string) error {
// Add any migration code here in case we bump format.FS.Version
version, err := formatFSGetVersion(wlk)
if err != nil {
return err
}
switch version {
case formatFSVersionV1:
if err = formatFSMigrateV1ToV2(ctx, wlk, fsPath); err != nil {
return err
}
fallthrough
case formatFSVersionV2:
// We are at the latest version.
}
// Make sure that the version is what we expect after the migration.
version, err = formatFSGetVersion(wlk)
if err != nil {
return err
}
if version != formatFSVersionV2 {
return config.ErrUnexpectedBackendVersion(fmt.Errorf(`%s file: expected FS version: %s, found FS version: %s`, formatConfigFile, formatFSVersionV2, version))
}
return nil
}
第三次循环,文件已创建,且版本号已经为 V2,获取 deployment ID 并保存即可。
var id string
if id, err = formatFSGetDeploymentID(rlk); err != nil {
rlk.Close()
return nil, err
}
globalDeploymentID = id
获取 deployment ID 的方法只是把写入配置文件的 UUID 读取出来。
func formatFSGetDeploymentID(rlk *lock.RLockedFile) (id string, err error) {
format := &formatFS{}
if err := jsonLoad(rlk, format); err != nil {
return "", err
}
return format.ID, nil
}
执行完毕初始化配置文件后,创建一个 FSObjects 实例,并保存共享读实例(L19),然后启动两个协程,最后返回这个 FSObjects 实例后,创建完成。
// Initialize fs objects.
fs := &FSObjects{
fsPath: fsPath,
metaJSONFile: fsMetaJSONFile,
fsUUID: fsUUID,
rwPool: &fsIOPool{
readersMap: make(map[string]*lock.RLockedFile),
},
nsMutex: newNSLock(false),
listPool: NewTreeWalkPool(globalLookupTimeout),
appendFileMap: make(map[string]*fsAppendFile),
diskMount: mountinfo.IsLikelyMountPoint(fsPath),
}
// Once the filesystem has initialized hold the read lock for
// the life time of the server. This is done to ensure that under
// shared backend mode for FS, remote servers do not migrate
// or cause changes on backend format.
fs.fsFormatRlk = rlk
go fs.cleanupStaleUploads(ctx, GlobalStaleUploadsCleanupInterval, GlobalStaleUploadsExpiry)
go intDataUpdateTracker.start(ctx, fsPath)
FSObjects
cleanupStaleUploads
FSObjects 类型的一个协程,默认每 6 小时执行一次,清理超过 24 小时的 multipart 上传目录。具体过程为,首先遍历 FSRoot/.minio.sys/multipart 目录,获取全部子目录
now := time.Now()
entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket))
if err != nil {
continue
}
然后遍历全部子目录
for _, entry := range entries {
读取每个子目录内全部目录,并将目录名最后的 “/“去掉
uploadIDs, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry))
if err != nil {
continue
}
for i := range uploadIDs {
uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator)
}
最后,获取每个目录的最后修改时间,如果最后修改时间据目前时间已超过 24 小时,则删除目录,并清理内部结构中关联项目。
for _, uploadID := range uploadIDs {
fi, err := fsStatDir(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID))
if err != nil {
continue
}
if now.Sub(fi.ModTime()) > expiry {
fsRemoveAll(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID))
// It is safe to ignore any directory not empty error (in case there were multiple uploadIDs on the same object)
fsRemoveDir(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry))
// Remove uploadID from the append file map and its corresponding temporary file
fs.appendFileMapMu.Lock()
bgAppend, ok := fs.appendFileMap[uploadID]
if ok {
_ = fsRemoveFile(ctx, bgAppend.filePath)
delete(fs.appendFileMap, uploadID)
}
fs.appendFileMapMu.Unlock()
}
}
Make Bucket
MakeBucketWithLocation 尝试创建新的 Bucket,如果 Bucket 已经存在,返回。接下来我们具体来看一下创建过程,函数定义如下。其中参数 bucket 为新创建的 Bucket 名称,BucketOptions 为选项。
func (fs *FSObjects) MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions)
首先,对于 FSObjects 来说,不支持 LockEnabled 及 VersioningEnabled 选项
if opts.LockEnabled || opts.VersioningEnabled {
return NotImplemented{}
}
接下来判断 bucket 是否满足条件
if s3utils.CheckValidBucketNameStrict(bucket) != nil {
return BucketNameInvalid{Bucket: bucket}
}
注册命名空间变化延迟方法,并增加 activeIOCount 计数
defer NSUpdated(bucket, slashSeparator)
atomic.AddInt64(&fs.activeIOCount, 1)
defer func() {
atomic.AddInt64(&fs.activeIOCount, -1)
}()
获取新 Bucket 路径,并创建目录
bucketDir, err := fs.getBucketDir(ctx, bucket)
if err != nil {
return toObjectErr(err, bucket)
}
if err = fsMkdir(ctx, bucketDir); err != nil {
return toObjectErr(err, bucket)
}
创建 Bucket 的元数据,并保存
meta := newBucketMetadata(bucket)
if err := meta.Save(ctx, fs); err != nil {
return toObjectErr(err, bucket)
}
并在全局的 Bucket 元数据中保存元数据
globalBucketMetadataSys.Set(bucket, meta)
// Set
func (sys *BucketMetadataSys) Set(bucket string, meta BucketMetadata) {
if globalIsGateway {
return
}
if bucket != minioMetaBucket {
sys.Lock()
sys.metadataMap[bucket] = meta // <------Map
sys.Unlock()
}
}
Metadata
创建 Bucket 时,会同时创建与当前 Bucket 相关的元数据。创建方法比较简单,设置的元数据如下所示:
func newBucketMetadata(name string) BucketMetadata {
return BucketMetadata{
Name: name,
Created: UTCNow(),
notificationConfig: &event.Config{
XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/",
},
quotaConfig: &madmin.BucketQuota{},
versioningConfig: &versioning.Versioning{
XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/",
},
bucketTargetConfig: &madmin.BucketTargets{},
bucketTargetConfigMeta: make(map[string]string),
}
}
接下来将 metadata 进行保存,L2 将元数据进行内部处理;L9 - L10 写入固定头部内容;L13 将元数据编码;L18 获取元数据保存文件:buckets/bucket-name/.metadata.bin
func (b *BucketMetadata) Save(ctx context.Context, api ObjectLayer) error {
if err := b.parseAllConfigs(ctx, api); err != nil {
return err
}
data := make([]byte, 4, b.Msgsize()+4)
// Initialize the header.
binary.LittleEndian.PutUint16(data[0:2], bucketMetadataFormat)
binary.LittleEndian.PutUint16(data[2:4], bucketMetadataVersion)
// Marshal the bucket metadata
data, err := b.MarshalMsg(data)
if err != nil {
return err
}
configFile := path.Join(bucketConfigPrefix, b.Name, bucketMetadataFile)
return saveConfig(ctx, api, configFile, data)
}
saveConfig 将编码后的元数据,通过 FSObjects 的 PutObject 方法,存入元数据 Bucket。
func saveConfig(ctx context.Context, objAPI ObjectLayer, configFile string, data []byte) error {
hashReader, err := hash.NewReader(bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data), int64(len(data)))
if err != nil {
return err
}
_, err = objAPI.PutObject(ctx, minioMetaBucket, configFile, NewPutObjReader(hashReader), ObjectOptions{MaxParity: true})
return err
}
Put Object
图 3:Lock 示意图
PutObject 将一个对象存储到一个桶里。大致过程为,检测输入参数,获取一个命名空间锁,加锁后,存储对象内容。获取命名空间锁的过程如图 3 所示,在此不再展开。加锁完毕后,执行保存对象操作,代码如下
atomic.AddInt64(&fs.activeIOCount, 1)
defer func() {
atomic.AddInt64(&fs.activeIOCount, -1)
}()
return fs.putObject(ctx, bucket, object, r, opts)
putObject
putObject 方法中,当目标桶不是 .minio.sys 时,需要在 .minio.sys/buckets/bucket-name/object-name/fs.json 中写入 object 的元数据信息。
meta := cloneMSS(opts.UserDefined)
fsMeta := newFSMetaV1()
fsMeta.Meta = meta
元数据文件创建代码如下
func (fsi *fsIOPool) Create(path string) (wlk *lock.LockedFile, err error) {
if err = checkPathLength(path); err != nil {
return nil, err
}
// Creates parent if missing.
if err = mkdirAll(pathutil.Dir(path), 0777); err != nil {
return nil, err
}
// Attempt to create the file.
wlk, err = lock.LockedOpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
switch {
case osIsPermission(err):
return nil, errFileAccessDenied
case isSysErrIsDir(err):
return nil, errIsNotRegular
case isSysErrPathNotFound(err):
return nil, errFileAccessDenied
default:
return nil, err
}
}
// Success.
return wlk, nil
}
写入配置文件内容如下
if bucket != minioMetaBucket {
// Write FS metadata after a successful namespace operation.
if _, err = fsMeta.WriteTo(wlk); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
}
最终,通过 jsonSave 方法写入文件,这个方法注意参数中的 interface 使用方法,确保方法中调用方法存在,Go 使用接口技巧之一,这里传入的 f 是 LockedFile 实例。
func jsonSave(f interface {
io.WriteSeeker
Truncate(int64) error
}, data interface{}) error {
b, err := json.Marshal(data)
if err != nil {
return err
}
if err = f.Truncate(0); err != nil {
return err
}
if _, err = f.Seek(0, io.SeekStart); err != nil {
return err
}
_, err = f.Write(b)
if err != nil {
return err
}
return nil
}