Tools
PubSub
图1: PubSub 示意图
PubSub 原理非常简单,有一些简单的小技巧,说明一下即可。第一,在 Subscribe 时,通过 defer 执行 Unsubscribe 操作;第二,Sub 有一个事件过滤器;第三,通过原子计数来统计 Subscriber 数量。
func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filter func(entry interface{}) bool) {ps.Lock()defer ps.Unlock()sub := &Sub{subCh, filter}ps.subs = append(ps.subs, sub)atomic.AddInt32(&ps.numSubscribers, 1)go func() {<-doneChps.Lock()defer ps.Unlock()for i, s := range ps.subs {if s == sub {ps.subs = append(ps.subs[:i], ps.subs[i+1:]...)}}atomic.AddInt32(&ps.numSubscribers, -1)}()}func (ps *PubSub) Publish(item interface{}) {ps.RLock()defer ps.RUnlock()for _, sub := range ps.subs {if sub.filter == nil || sub.filter(item) {select {case sub.ch <- item:default:}}}}func (ps *PubSub) NumSubscribers() int32 {return atomic.LoadInt32(&ps.numSubscribers)}
FS Object Layer
在 minio 服务启动过程中,成功创建 Endpoint 节点及 HTTP 服务器后,就要创建文件系统以存储对象,如下所示。如果 EndpointServerPools 里仅有一个节点时,会创建 FSObjects 对象,使用文件系统来存储对象。
newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
创建代码如下,使用唯一节点的文件系统路径作为参数
func newObjectLayer(ctx context.Context, endpointServerPools EndpointServerPools) (newObject ObjectLayer, err error) {// For FS only, directly use the disk.if endpointServerPools.NEndpoints() == 1 {// Initialize new FS object layer.return NewFSObjectLayer(endpointServerPools[0].Endpoints[0].Path)}return newErasureServerPools(ctx, endpointServerPools)}
NewFSObjectLayer
首先检查文件路径是否存在、是否文件夹等。如果文件名为空字符串,直接退出;如果文件路径不存在,则创建目录;如果文件路径存在,但不是目录,返回错误。
if fsPath, err = getValidPath(fsPath); err != nil {if err == errMinDiskSize {return nil, config.ErrUnableToWriteInBackend(err).Hint(err.Error())}// ...}
接下来,获取一个 UUID,并初始化系统文件目录,initMetaVolumeFS 创建如下目录:
- FSRoot/.minio.sys
- FSRoot/.minio.sys/tmp/UUID
- FSRoot/.minio.sys/buckets
- FSRoot/.minio.sys/multipart ```go fsUUID := mustGetUUID()
// Initialize meta volume, if volume already exists ignores it. if err = initMetaVolumeFS(fsPath, fsUUID); err != nil { return nil, err }
<a name="gIsRb"></a>#### Format.JSONMinio 系统目录创建完毕后,开始创建 _FSRoot_/**.minio.sys/format.json** 文件,并返回一个读共享实例,用于访问该文件。```gorlk, err := initFormatFS(ctx, fsPath)if err != nil {return nil, err}
initFormatFS
图 2: initFormatFS 执行流程图
图 2 中,蓝色代表首次循环,文件不存在,创建文件;紫色代表二次循环,版本需要更新;红色代表第三次循环,创建 Deployment ID。
initFormatFS 执行过程相对复杂,结下来我们详细解释。首先拼接好 format.json 文件路径。假设当前是第一个 minio 实例的首次启动,此时,因为文件并不存在,程序尝试创建初始文件格式。
if osIsNotExist(err) || isEmpty {if err == nil {rlk.Close()}// Fresh disk - create format.jsonerr = createFormatFS(fsFormatPath)if err == lock.ErrAlreadyLocked {// Lock already present, sleep and attempt again.// Can happen in a rare situation when a parallel minio process// holds the lock and creates format.jsontime.Sleep(100 * time.Millisecond)continue}if err != nil {return nil, err}// After successfully creating format.json try to hold a read-lock on// the file.continue}
创建初始配置文件的代码如下,注意 L4 打开方式是读写方式开启,且有创建标识,因此,如果文件不存在,会在此处创建
func createFormatFS(fsFormatPath string) error {// Attempt a write lock on formatConfigFile `format.json`// file stored in minioMetaBucket(.minio.sys) directory.lk, err := lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR|os.O_CREATE, 0600)if err != nil {return err}// Close the locked file upon return.defer lk.Close()fi, err := lk.Stat()if err != nil {return err}if fi.Size() != 0 {// format.json already got created because of another minio process's createFormatFS()return nil}return jsonSave(lk.File, newFormatFSV1())}
随后写入 V1 版本内容
func newFormatFSV1() (format *formatFSV1) {f := &formatFSV1{}f.Version = formatMetaVersionV1f.Format = formatBackendFSf.ID = mustGetUUID()f.FS.Version = formatFSVersionV1return f}
写入内容完成后,尝试重新以只读方式打开配置文件,并判断内容是否为空,因为写入已经成功,此处内容必然不会为空
rlk, err := lock.RLockedOpenFile(fsFormatPath)if err == nil {// format.json can be empty in a rare condition when another// minio process just created the file but could not hold lock// and write to it.var fi os.FileInfofi, err = rlk.Stat()if err != nil {return nil, err}isEmpty = fi.Size() == 0}
获取配置内容后,判断版本号是否为 V2,如果不是,则进行版本迁移操作,这里需要注意,执行版本迁移操作前,要关闭当前文件,并以读写方式重新打开文件,迁移完成后,关闭读写方式打开的文件,随后进入下次循环。
version, err := formatFSGetVersion(rlk)if err != nil {return nil, err}if version != formatFSVersionV2 {// Format needs migrationrlk.Close()// Hold write lock during migration so that we do not disturb any// minio processes running in parallel.var wlk *lock.LockedFilewlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)if err == lock.ErrAlreadyLocked {// Lock already present, sleep and attempt again.time.Sleep(100 * time.Millisecond)continue}if err != nil {return nil, err}err = formatFSMigrate(ctx, wlk, fsPath)wlk.Close()if err != nil {// Migration failed, bail out so that the user can observe what happened.return nil, err}// Successfully migrated, now try to hold a read-lock on format.jsoncontinue}
版本迁移代码如下所示,碰到类似场景时,可考虑使用
func formatFSMigrate(ctx context.Context, wlk *lock.LockedFile, fsPath string) error {// Add any migration code here in case we bump format.FS.Versionversion, err := formatFSGetVersion(wlk)if err != nil {return err}switch version {case formatFSVersionV1:if err = formatFSMigrateV1ToV2(ctx, wlk, fsPath); err != nil {return err}fallthroughcase formatFSVersionV2:// We are at the latest version.}// Make sure that the version is what we expect after the migration.version, err = formatFSGetVersion(wlk)if err != nil {return err}if version != formatFSVersionV2 {return config.ErrUnexpectedBackendVersion(fmt.Errorf(`%s file: expected FS version: %s, found FS version: %s`, formatConfigFile, formatFSVersionV2, version))}return nil}
第三次循环,文件已创建,且版本号已经为 V2,获取 deployment ID 并保存即可。
var id stringif id, err = formatFSGetDeploymentID(rlk); err != nil {rlk.Close()return nil, err}globalDeploymentID = id
获取 deployment ID 的方法只是把写入配置文件的 UUID 读取出来。
func formatFSGetDeploymentID(rlk *lock.RLockedFile) (id string, err error) {format := &formatFS{}if err := jsonLoad(rlk, format); err != nil {return "", err}return format.ID, nil}
执行完毕初始化配置文件后,创建一个 FSObjects 实例,并保存共享读实例(L19),然后启动两个协程,最后返回这个 FSObjects 实例后,创建完成。
// Initialize fs objects.fs := &FSObjects{fsPath: fsPath,metaJSONFile: fsMetaJSONFile,fsUUID: fsUUID,rwPool: &fsIOPool{readersMap: make(map[string]*lock.RLockedFile),},nsMutex: newNSLock(false),listPool: NewTreeWalkPool(globalLookupTimeout),appendFileMap: make(map[string]*fsAppendFile),diskMount: mountinfo.IsLikelyMountPoint(fsPath),}// Once the filesystem has initialized hold the read lock for// the life time of the server. This is done to ensure that under// shared backend mode for FS, remote servers do not migrate// or cause changes on backend format.fs.fsFormatRlk = rlkgo fs.cleanupStaleUploads(ctx, GlobalStaleUploadsCleanupInterval, GlobalStaleUploadsExpiry)go intDataUpdateTracker.start(ctx, fsPath)
FSObjects
cleanupStaleUploads
FSObjects 类型的一个协程,默认每 6 小时执行一次,清理超过 24 小时的 multipart 上传目录。具体过程为,首先遍历 FSRoot/.minio.sys/multipart 目录,获取全部子目录
now := time.Now()entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket))if err != nil {continue}
然后遍历全部子目录
for _, entry := range entries {
读取每个子目录内全部目录,并将目录名最后的 “/“去掉
uploadIDs, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry))if err != nil {continue}for i := range uploadIDs {uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator)}
最后,获取每个目录的最后修改时间,如果最后修改时间据目前时间已超过 24 小时,则删除目录,并清理内部结构中关联项目。
for _, uploadID := range uploadIDs {fi, err := fsStatDir(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID))if err != nil {continue}if now.Sub(fi.ModTime()) > expiry {fsRemoveAll(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID))// It is safe to ignore any directory not empty error (in case there were multiple uploadIDs on the same object)fsRemoveDir(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry))// Remove uploadID from the append file map and its corresponding temporary filefs.appendFileMapMu.Lock()bgAppend, ok := fs.appendFileMap[uploadID]if ok {_ = fsRemoveFile(ctx, bgAppend.filePath)delete(fs.appendFileMap, uploadID)}fs.appendFileMapMu.Unlock()}}
Make Bucket
MakeBucketWithLocation 尝试创建新的 Bucket,如果 Bucket 已经存在,返回。接下来我们具体来看一下创建过程,函数定义如下。其中参数 bucket 为新创建的 Bucket 名称,BucketOptions 为选项。
func (fs *FSObjects) MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions)
首先,对于 FSObjects 来说,不支持 LockEnabled 及 VersioningEnabled 选项
if opts.LockEnabled || opts.VersioningEnabled {return NotImplemented{}}
接下来判断 bucket 是否满足条件
if s3utils.CheckValidBucketNameStrict(bucket) != nil {return BucketNameInvalid{Bucket: bucket}}
注册命名空间变化延迟方法,并增加 activeIOCount 计数
defer NSUpdated(bucket, slashSeparator)atomic.AddInt64(&fs.activeIOCount, 1)defer func() {atomic.AddInt64(&fs.activeIOCount, -1)}()
获取新 Bucket 路径,并创建目录
bucketDir, err := fs.getBucketDir(ctx, bucket)if err != nil {return toObjectErr(err, bucket)}if err = fsMkdir(ctx, bucketDir); err != nil {return toObjectErr(err, bucket)}
创建 Bucket 的元数据,并保存
meta := newBucketMetadata(bucket)if err := meta.Save(ctx, fs); err != nil {return toObjectErr(err, bucket)}
并在全局的 Bucket 元数据中保存元数据
globalBucketMetadataSys.Set(bucket, meta)// Setfunc (sys *BucketMetadataSys) Set(bucket string, meta BucketMetadata) {if globalIsGateway {return}if bucket != minioMetaBucket {sys.Lock()sys.metadataMap[bucket] = meta // <------Mapsys.Unlock()}}
Metadata
创建 Bucket 时,会同时创建与当前 Bucket 相关的元数据。创建方法比较简单,设置的元数据如下所示:
func newBucketMetadata(name string) BucketMetadata {return BucketMetadata{Name: name,Created: UTCNow(),notificationConfig: &event.Config{XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/",},quotaConfig: &madmin.BucketQuota{},versioningConfig: &versioning.Versioning{XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/",},bucketTargetConfig: &madmin.BucketTargets{},bucketTargetConfigMeta: make(map[string]string),}}
接下来将 metadata 进行保存,L2 将元数据进行内部处理;L9 - L10 写入固定头部内容;L13 将元数据编码;L18 获取元数据保存文件:buckets/bucket-name/.metadata.bin
func (b *BucketMetadata) Save(ctx context.Context, api ObjectLayer) error {if err := b.parseAllConfigs(ctx, api); err != nil {return err}data := make([]byte, 4, b.Msgsize()+4)// Initialize the header.binary.LittleEndian.PutUint16(data[0:2], bucketMetadataFormat)binary.LittleEndian.PutUint16(data[2:4], bucketMetadataVersion)// Marshal the bucket metadatadata, err := b.MarshalMsg(data)if err != nil {return err}configFile := path.Join(bucketConfigPrefix, b.Name, bucketMetadataFile)return saveConfig(ctx, api, configFile, data)}
saveConfig 将编码后的元数据,通过 FSObjects 的 PutObject 方法,存入元数据 Bucket。
func saveConfig(ctx context.Context, objAPI ObjectLayer, configFile string, data []byte) error {hashReader, err := hash.NewReader(bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data), int64(len(data)))if err != nil {return err}_, err = objAPI.PutObject(ctx, minioMetaBucket, configFile, NewPutObjReader(hashReader), ObjectOptions{MaxParity: true})return err}
Put Object
图 3:Lock 示意图
PutObject 将一个对象存储到一个桶里。大致过程为,检测输入参数,获取一个命名空间锁,加锁后,存储对象内容。获取命名空间锁的过程如图 3 所示,在此不再展开。加锁完毕后,执行保存对象操作,代码如下
atomic.AddInt64(&fs.activeIOCount, 1)defer func() {atomic.AddInt64(&fs.activeIOCount, -1)}()return fs.putObject(ctx, bucket, object, r, opts)
putObject
putObject 方法中,当目标桶不是 .minio.sys 时,需要在 .minio.sys/buckets/bucket-name/object-name/fs.json 中写入 object 的元数据信息。
meta := cloneMSS(opts.UserDefined)fsMeta := newFSMetaV1()fsMeta.Meta = meta
元数据文件创建代码如下
func (fsi *fsIOPool) Create(path string) (wlk *lock.LockedFile, err error) {if err = checkPathLength(path); err != nil {return nil, err}// Creates parent if missing.if err = mkdirAll(pathutil.Dir(path), 0777); err != nil {return nil, err}// Attempt to create the file.wlk, err = lock.LockedOpenFile(path, os.O_RDWR|os.O_CREATE, 0666)if err != nil {switch {case osIsPermission(err):return nil, errFileAccessDeniedcase isSysErrIsDir(err):return nil, errIsNotRegularcase isSysErrPathNotFound(err):return nil, errFileAccessDenieddefault:return nil, err}}// Success.return wlk, nil}
写入配置文件内容如下
if bucket != minioMetaBucket {// Write FS metadata after a successful namespace operation.if _, err = fsMeta.WriteTo(wlk); err != nil {return ObjectInfo{}, toObjectErr(err, bucket, object)}}
最终,通过 jsonSave 方法写入文件,这个方法注意参数中的 interface 使用方法,确保方法中调用方法存在,Go 使用接口技巧之一,这里传入的 f 是 LockedFile 实例。
func jsonSave(f interface {io.WriteSeekerTruncate(int64) error}, data interface{}) error {b, err := json.Marshal(data)if err != nil {return err}if err = f.Truncate(0); err != nil {return err}if _, err = f.Seek(0, io.SeekStart); err != nil {return err}_, err = f.Write(b)if err != nil {return err}return nil}
