Tools

PubSub

fs-object-layer-pubsub.svg
图1: PubSub 示意图

PubSub 原理非常简单,有一些简单的小技巧,说明一下即可。第一,在 Subscribe 时,通过 defer 执行 Unsubscribe 操作;第二,Sub 有一个事件过滤器;第三,通过原子计数来统计 Subscriber 数量。

  1. func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filter func(entry interface{}) bool) {
  2. ps.Lock()
  3. defer ps.Unlock()
  4. sub := &Sub{subCh, filter}
  5. ps.subs = append(ps.subs, sub)
  6. atomic.AddInt32(&ps.numSubscribers, 1)
  7. go func() {
  8. <-doneCh
  9. ps.Lock()
  10. defer ps.Unlock()
  11. for i, s := range ps.subs {
  12. if s == sub {
  13. ps.subs = append(ps.subs[:i], ps.subs[i+1:]...)
  14. }
  15. }
  16. atomic.AddInt32(&ps.numSubscribers, -1)
  17. }()
  18. }
  19. func (ps *PubSub) Publish(item interface{}) {
  20. ps.RLock()
  21. defer ps.RUnlock()
  22. for _, sub := range ps.subs {
  23. if sub.filter == nil || sub.filter(item) {
  24. select {
  25. case sub.ch <- item:
  26. default:
  27. }
  28. }
  29. }
  30. }
  31. func (ps *PubSub) NumSubscribers() int32 {
  32. return atomic.LoadInt32(&ps.numSubscribers)
  33. }

FS Object Layer

在 minio 服务启动过程中,成功创建 Endpoint 节点及 HTTP 服务器后,就要创建文件系统以存储对象,如下所示。如果 EndpointServerPools 里仅有一个节点时,会创建 FSObjects 对象,使用文件系统来存储对象。

  1. newObject, err := newObjectLayer(GlobalContext, globalEndpoints)

创建代码如下,使用唯一节点的文件系统路径作为参数

  1. func newObjectLayer(ctx context.Context, endpointServerPools EndpointServerPools) (newObject ObjectLayer, err error) {
  2. // For FS only, directly use the disk.
  3. if endpointServerPools.NEndpoints() == 1 {
  4. // Initialize new FS object layer.
  5. return NewFSObjectLayer(endpointServerPools[0].Endpoints[0].Path)
  6. }
  7. return newErasureServerPools(ctx, endpointServerPools)
  8. }

NewFSObjectLayer

首先检查文件路径是否存在、是否文件夹等。如果文件名为空字符串,直接退出;如果文件路径不存在,则创建目录;如果文件路径存在,但不是目录,返回错误。

  1. if fsPath, err = getValidPath(fsPath); err != nil {
  2. if err == errMinDiskSize {
  3. return nil, config.ErrUnableToWriteInBackend(err).Hint(err.Error())
  4. }
  5. // ...
  6. }

接下来,获取一个 UUID,并初始化系统文件目录,initMetaVolumeFS 创建如下目录:

  • FSRoot/.minio.sys
  • FSRoot/.minio.sys/tmp/UUID
  • FSRoot/.minio.sys/buckets
  • FSRoot/.minio.sys/multipart ```go fsUUID := mustGetUUID()

// Initialize meta volume, if volume already exists ignores it. if err = initMetaVolumeFS(fsPath, fsUUID); err != nil { return nil, err }

  1. <a name="gIsRb"></a>
  2. #### Format.JSON
  3. Minio 系统目录创建完毕后,开始创建 _FSRoot_/**.minio.sys/format.json** 文件,并返回一个读共享实例,用于访问该文件。
  4. ```go
  5. rlk, err := initFormatFS(ctx, fsPath)
  6. if err != nil {
  7. return nil, err
  8. }

initFormatFS

server-fs-init-format.svg
图 2: initFormatFS 执行流程图

图 2 中,蓝色代表首次循环,文件不存在,创建文件;紫色代表二次循环,版本需要更新;红色代表第三次循环,创建 Deployment ID。

initFormatFS 执行过程相对复杂,结下来我们详细解释。首先拼接好 format.json 文件路径。假设当前是第一个 minio 实例的首次启动,此时,因为文件并不存在,程序尝试创建初始文件格式。

  1. if osIsNotExist(err) || isEmpty {
  2. if err == nil {
  3. rlk.Close()
  4. }
  5. // Fresh disk - create format.json
  6. err = createFormatFS(fsFormatPath)
  7. if err == lock.ErrAlreadyLocked {
  8. // Lock already present, sleep and attempt again.
  9. // Can happen in a rare situation when a parallel minio process
  10. // holds the lock and creates format.json
  11. time.Sleep(100 * time.Millisecond)
  12. continue
  13. }
  14. if err != nil {
  15. return nil, err
  16. }
  17. // After successfully creating format.json try to hold a read-lock on
  18. // the file.
  19. continue
  20. }

创建初始配置文件的代码如下,注意 L4 打开方式是读写方式开启,且有创建标识,因此,如果文件不存在,会在此处创建

  1. func createFormatFS(fsFormatPath string) error {
  2. // Attempt a write lock on formatConfigFile `format.json`
  3. // file stored in minioMetaBucket(.minio.sys) directory.
  4. lk, err := lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR|os.O_CREATE, 0600)
  5. if err != nil {
  6. return err
  7. }
  8. // Close the locked file upon return.
  9. defer lk.Close()
  10. fi, err := lk.Stat()
  11. if err != nil {
  12. return err
  13. }
  14. if fi.Size() != 0 {
  15. // format.json already got created because of another minio process's createFormatFS()
  16. return nil
  17. }
  18. return jsonSave(lk.File, newFormatFSV1())
  19. }

随后写入 V1 版本内容

  1. func newFormatFSV1() (format *formatFSV1) {
  2. f := &formatFSV1{}
  3. f.Version = formatMetaVersionV1
  4. f.Format = formatBackendFS
  5. f.ID = mustGetUUID()
  6. f.FS.Version = formatFSVersionV1
  7. return f
  8. }

写入内容完成后,尝试重新以只读方式打开配置文件,并判断内容是否为空,因为写入已经成功,此处内容必然不会为空

  1. rlk, err := lock.RLockedOpenFile(fsFormatPath)
  2. if err == nil {
  3. // format.json can be empty in a rare condition when another
  4. // minio process just created the file but could not hold lock
  5. // and write to it.
  6. var fi os.FileInfo
  7. fi, err = rlk.Stat()
  8. if err != nil {
  9. return nil, err
  10. }
  11. isEmpty = fi.Size() == 0
  12. }

获取配置内容后,判断版本号是否为 V2,如果不是,则进行版本迁移操作,这里需要注意,执行版本迁移操作前,要关闭当前文件,并以读写方式重新打开文件,迁移完成后,关闭读写方式打开的文件,随后进入下次循环。

  1. version, err := formatFSGetVersion(rlk)
  2. if err != nil {
  3. return nil, err
  4. }
  5. if version != formatFSVersionV2 {
  6. // Format needs migration
  7. rlk.Close()
  8. // Hold write lock during migration so that we do not disturb any
  9. // minio processes running in parallel.
  10. var wlk *lock.LockedFile
  11. wlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)
  12. if err == lock.ErrAlreadyLocked {
  13. // Lock already present, sleep and attempt again.
  14. time.Sleep(100 * time.Millisecond)
  15. continue
  16. }
  17. if err != nil {
  18. return nil, err
  19. }
  20. err = formatFSMigrate(ctx, wlk, fsPath)
  21. wlk.Close()
  22. if err != nil {
  23. // Migration failed, bail out so that the user can observe what happened.
  24. return nil, err
  25. }
  26. // Successfully migrated, now try to hold a read-lock on format.json
  27. continue
  28. }

版本迁移代码如下所示,碰到类似场景时,可考虑使用

  1. func formatFSMigrate(ctx context.Context, wlk *lock.LockedFile, fsPath string) error {
  2. // Add any migration code here in case we bump format.FS.Version
  3. version, err := formatFSGetVersion(wlk)
  4. if err != nil {
  5. return err
  6. }
  7. switch version {
  8. case formatFSVersionV1:
  9. if err = formatFSMigrateV1ToV2(ctx, wlk, fsPath); err != nil {
  10. return err
  11. }
  12. fallthrough
  13. case formatFSVersionV2:
  14. // We are at the latest version.
  15. }
  16. // Make sure that the version is what we expect after the migration.
  17. version, err = formatFSGetVersion(wlk)
  18. if err != nil {
  19. return err
  20. }
  21. if version != formatFSVersionV2 {
  22. return config.ErrUnexpectedBackendVersion(fmt.Errorf(`%s file: expected FS version: %s, found FS version: %s`, formatConfigFile, formatFSVersionV2, version))
  23. }
  24. return nil
  25. }

第三次循环,文件已创建,且版本号已经为 V2,获取 deployment ID 并保存即可。

  1. var id string
  2. if id, err = formatFSGetDeploymentID(rlk); err != nil {
  3. rlk.Close()
  4. return nil, err
  5. }
  6. globalDeploymentID = id

获取 deployment ID 的方法只是把写入配置文件的 UUID 读取出来。

  1. func formatFSGetDeploymentID(rlk *lock.RLockedFile) (id string, err error) {
  2. format := &formatFS{}
  3. if err := jsonLoad(rlk, format); err != nil {
  4. return "", err
  5. }
  6. return format.ID, nil
  7. }

执行完毕初始化配置文件后,创建一个 FSObjects 实例,并保存共享读实例(L19),然后启动两个协程,最后返回这个 FSObjects 实例后,创建完成。

  1. // Initialize fs objects.
  2. fs := &FSObjects{
  3. fsPath: fsPath,
  4. metaJSONFile: fsMetaJSONFile,
  5. fsUUID: fsUUID,
  6. rwPool: &fsIOPool{
  7. readersMap: make(map[string]*lock.RLockedFile),
  8. },
  9. nsMutex: newNSLock(false),
  10. listPool: NewTreeWalkPool(globalLookupTimeout),
  11. appendFileMap: make(map[string]*fsAppendFile),
  12. diskMount: mountinfo.IsLikelyMountPoint(fsPath),
  13. }
  14. // Once the filesystem has initialized hold the read lock for
  15. // the life time of the server. This is done to ensure that under
  16. // shared backend mode for FS, remote servers do not migrate
  17. // or cause changes on backend format.
  18. fs.fsFormatRlk = rlk
  19. go fs.cleanupStaleUploads(ctx, GlobalStaleUploadsCleanupInterval, GlobalStaleUploadsExpiry)
  20. go intDataUpdateTracker.start(ctx, fsPath)

FSObjects

cleanupStaleUploads

FSObjects 类型的一个协程,默认每 6 小时执行一次,清理超过 24 小时的 multipart 上传目录。具体过程为,首先遍历 FSRoot/.minio.sys/multipart 目录,获取全部子目录

  1. now := time.Now()
  2. entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket))
  3. if err != nil {
  4. continue
  5. }

然后遍历全部子目录

  1. for _, entry := range entries {

读取每个子目录内全部目录,并将目录名最后的 “/“去掉

  1. uploadIDs, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket, entry))
  2. if err != nil {
  3. continue
  4. }
  5. for i := range uploadIDs {
  6. uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator)
  7. }

最后,获取每个目录的最后修改时间,如果最后修改时间据目前时间已超过 24 小时,则删除目录,并清理内部结构中关联项目。

  1. for _, uploadID := range uploadIDs {
  2. fi, err := fsStatDir(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID))
  3. if err != nil {
  4. continue
  5. }
  6. if now.Sub(fi.ModTime()) > expiry {
  7. fsRemoveAll(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry, uploadID))
  8. // It is safe to ignore any directory not empty error (in case there were multiple uploadIDs on the same object)
  9. fsRemoveDir(ctx, pathJoin(fs.fsPath, minioMetaMultipartBucket, entry))
  10. // Remove uploadID from the append file map and its corresponding temporary file
  11. fs.appendFileMapMu.Lock()
  12. bgAppend, ok := fs.appendFileMap[uploadID]
  13. if ok {
  14. _ = fsRemoveFile(ctx, bgAppend.filePath)
  15. delete(fs.appendFileMap, uploadID)
  16. }
  17. fs.appendFileMapMu.Unlock()
  18. }
  19. }

Make Bucket

MakeBucketWithLocation 尝试创建新的 Bucket,如果 Bucket 已经存在,返回。接下来我们具体来看一下创建过程,函数定义如下。其中参数 bucket 为新创建的 Bucket 名称,BucketOptions 为选项。

  1. func (fs *FSObjects) MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions)

首先,对于 FSObjects 来说,不支持 LockEnabled 及 VersioningEnabled 选项

  1. if opts.LockEnabled || opts.VersioningEnabled {
  2. return NotImplemented{}
  3. }

接下来判断 bucket 是否满足条件

  1. if s3utils.CheckValidBucketNameStrict(bucket) != nil {
  2. return BucketNameInvalid{Bucket: bucket}
  3. }

注册命名空间变化延迟方法,并增加 activeIOCount 计数

  1. defer NSUpdated(bucket, slashSeparator)
  2. atomic.AddInt64(&fs.activeIOCount, 1)
  3. defer func() {
  4. atomic.AddInt64(&fs.activeIOCount, -1)
  5. }()

获取新 Bucket 路径,并创建目录

  1. bucketDir, err := fs.getBucketDir(ctx, bucket)
  2. if err != nil {
  3. return toObjectErr(err, bucket)
  4. }
  5. if err = fsMkdir(ctx, bucketDir); err != nil {
  6. return toObjectErr(err, bucket)
  7. }

创建 Bucket 的元数据,并保存

  1. meta := newBucketMetadata(bucket)
  2. if err := meta.Save(ctx, fs); err != nil {
  3. return toObjectErr(err, bucket)
  4. }

并在全局的 Bucket 元数据中保存元数据

  1. globalBucketMetadataSys.Set(bucket, meta)
  2. // Set
  3. func (sys *BucketMetadataSys) Set(bucket string, meta BucketMetadata) {
  4. if globalIsGateway {
  5. return
  6. }
  7. if bucket != minioMetaBucket {
  8. sys.Lock()
  9. sys.metadataMap[bucket] = meta // <------Map
  10. sys.Unlock()
  11. }
  12. }

Metadata

创建 Bucket 时,会同时创建与当前 Bucket 相关的元数据。创建方法比较简单,设置的元数据如下所示:

  1. func newBucketMetadata(name string) BucketMetadata {
  2. return BucketMetadata{
  3. Name: name,
  4. Created: UTCNow(),
  5. notificationConfig: &event.Config{
  6. XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/",
  7. },
  8. quotaConfig: &madmin.BucketQuota{},
  9. versioningConfig: &versioning.Versioning{
  10. XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/",
  11. },
  12. bucketTargetConfig: &madmin.BucketTargets{},
  13. bucketTargetConfigMeta: make(map[string]string),
  14. }
  15. }

接下来将 metadata 进行保存,L2 将元数据进行内部处理;L9 - L10 写入固定头部内容;L13 将元数据编码;L18 获取元数据保存文件:buckets/bucket-name/.metadata.bin

  1. func (b *BucketMetadata) Save(ctx context.Context, api ObjectLayer) error {
  2. if err := b.parseAllConfigs(ctx, api); err != nil {
  3. return err
  4. }
  5. data := make([]byte, 4, b.Msgsize()+4)
  6. // Initialize the header.
  7. binary.LittleEndian.PutUint16(data[0:2], bucketMetadataFormat)
  8. binary.LittleEndian.PutUint16(data[2:4], bucketMetadataVersion)
  9. // Marshal the bucket metadata
  10. data, err := b.MarshalMsg(data)
  11. if err != nil {
  12. return err
  13. }
  14. configFile := path.Join(bucketConfigPrefix, b.Name, bucketMetadataFile)
  15. return saveConfig(ctx, api, configFile, data)
  16. }

saveConfig 将编码后的元数据,通过 FSObjects 的 PutObject 方法,存入元数据 Bucket。

  1. func saveConfig(ctx context.Context, objAPI ObjectLayer, configFile string, data []byte) error {
  2. hashReader, err := hash.NewReader(bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data), int64(len(data)))
  3. if err != nil {
  4. return err
  5. }
  6. _, err = objAPI.PutObject(ctx, minioMetaBucket, configFile, NewPutObjReader(hashReader), ObjectOptions{MaxParity: true})
  7. return err
  8. }

Put Object

fs-objects-ns-lock.svg
图 3:Lock 示意图

PutObject 将一个对象存储到一个桶里。大致过程为,检测输入参数,获取一个命名空间锁,加锁后,存储对象内容。获取命名空间锁的过程如图 3 所示,在此不再展开。加锁完毕后,执行保存对象操作,代码如下

  1. atomic.AddInt64(&fs.activeIOCount, 1)
  2. defer func() {
  3. atomic.AddInt64(&fs.activeIOCount, -1)
  4. }()
  5. return fs.putObject(ctx, bucket, object, r, opts)

putObject

putObject 方法中,当目标桶不是 .minio.sys 时,需要在 .minio.sys/buckets/bucket-name/object-name/fs.json 中写入 object 的元数据信息。

  1. meta := cloneMSS(opts.UserDefined)
  2. fsMeta := newFSMetaV1()
  3. fsMeta.Meta = meta

元数据文件创建代码如下

  1. func (fsi *fsIOPool) Create(path string) (wlk *lock.LockedFile, err error) {
  2. if err = checkPathLength(path); err != nil {
  3. return nil, err
  4. }
  5. // Creates parent if missing.
  6. if err = mkdirAll(pathutil.Dir(path), 0777); err != nil {
  7. return nil, err
  8. }
  9. // Attempt to create the file.
  10. wlk, err = lock.LockedOpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
  11. if err != nil {
  12. switch {
  13. case osIsPermission(err):
  14. return nil, errFileAccessDenied
  15. case isSysErrIsDir(err):
  16. return nil, errIsNotRegular
  17. case isSysErrPathNotFound(err):
  18. return nil, errFileAccessDenied
  19. default:
  20. return nil, err
  21. }
  22. }
  23. // Success.
  24. return wlk, nil
  25. }

写入配置文件内容如下

  1. if bucket != minioMetaBucket {
  2. // Write FS metadata after a successful namespace operation.
  3. if _, err = fsMeta.WriteTo(wlk); err != nil {
  4. return ObjectInfo{}, toObjectErr(err, bucket, object)
  5. }
  6. }

最终,通过 jsonSave 方法写入文件,这个方法注意参数中的 interface 使用方法,确保方法中调用方法存在,Go 使用接口技巧之一,这里传入的 f 是 LockedFile 实例。

  1. func jsonSave(f interface {
  2. io.WriteSeeker
  3. Truncate(int64) error
  4. }, data interface{}) error {
  5. b, err := json.Marshal(data)
  6. if err != nil {
  7. return err
  8. }
  9. if err = f.Truncate(0); err != nil {
  10. return err
  11. }
  12. if _, err = f.Seek(0, io.SeekStart); err != nil {
  13. return err
  14. }
  15. _, err = f.Write(b)
  16. if err != nil {
  17. return err
  18. }
  19. return nil
  20. }