Relationships

通过对 ImageService 的初步分析,我们大概的了解一下 Docker 镜像管理的初步内容。接下来,对于镜像管理要获得更进一步的知识,必须找到一个好的切入点,从镜像生命周期看,如何下载远程镜像到本地似乎是 Docker 必须执行的第一步操作,那么对镜像下载管理机制的深入研究肯定可以让我们更清晰的理解镜像管理机制。
downloader-manager-image-service-in-detail.svg
图 1:LayerDownloadManager 在 ImageService 中关联

通过图 1 的关系,我们大致可以将镜像下载管理分为三层,首先是下载管理器,它直接交互的是第二层功能(layer.Store),这里会发现其实 layer.Store 就是 graph driver,这里选择的是 overlay2 驱动;第三层就是 image.Store 了,通过这个接口,可以将镜像底层进行屏蔽,为镜像使用提供操作便利。同时,由于 containerd 中自带镜像管理功能,还需要通过 lease.Manager 与 containerd 服务进行交互。通过下面的代码,可以看到 image.Store 主要负责的是 ID 和镜像间关系,基本屏蔽了镜像层存储机制。

  1. type Store interface {
  2. Create(config []byte) (ID, error)
  3. Get(id ID) (*Image, error)
  4. Delete(id ID) ([]layer.Metadata, error)
  5. Search(partialID string) (ID, error)
  6. SetParent(id ID, parent ID) error
  7. GetParent(id ID) (ID, error)
  8. SetLastUpdated(id ID) error
  9. GetLastUpdated(id ID) (time.Time, error)
  10. Children(id ID) []ID
  11. Map() map[ID]*Image
  12. Heads() map[ID]*Image
  13. Len() int
  14. }

Graph Driver 还需要通过镜像层链接 ID 获取、释放镜像层,这个接口是 LayerGetReleaser 其定义如下,通过这个接口可以获得镜像层信息。

  1. type LayerGetReleaser interface {
  2. Get(layer.ChainID) (layer.Layer, error)
  3. Release(layer.Layer) ([]layer.Metadata, error)
  4. }

layer.Store 接口负责层管理的具体功能实现,其定义如下,无论读、写层的创建还是清理都由这个接口实例来提供。

  1. type Store interface {
  2. Register(io.Reader, ChainID) (Layer, error)
  3. Get(ChainID) (Layer, error)
  4. Map() map[ChainID]Layer
  5. Release(Layer) ([]Metadata, error)
  6. CreateRWLayer(id string, parent ChainID, opts *CreateRWLayerOpts) (RWLayer, error)
  7. GetRWLayer(id string) (RWLayer, error)
  8. GetMountID(id string) (string, error)
  9. ReleaseRWLayer(RWLayer) ([]Metadata, error)
  10. Cleanup() error
  11. DriverStatus() [][2]string
  12. DriverName() string
  13. }

镜像层的元数据定义为 layer.Metadata,其定义如下,通过这个定义,可以大致看到一个层是如何定义、如何组织的,ChainID 和 DiffID 显然是核心连接点,这两个类型都是 digest.Digest 的类型别名。

  1. type Metadata struct {
  2. // ChainID is the content hash of the layer
  3. ChainID ChainID
  4. // DiffID is the hash of the tar data used to
  5. // create the layer
  6. DiffID DiffID
  7. // Size is the size of the layer and all parents
  8. Size int64
  9. // DiffSize is the size of the top layer
  10. DiffSize int64
  11. }

Pull Procedure

Manifests

downloader-manager-manifest.svg
图 2:Manifest 相关组件

获取远程镜像关键结构是 distribution.v2Puller,创建这个实例时会创建一个 client.repository 实例,这个实例负责到远程节点的 HTTP 通信细节,到远程节点的认证请求过程都在这个实例中完成。在图 2 中可以看到 distribution.manifestStore 中有本地存储和远程存储两部分,本地通过 content.Store 接口实例与主机上的 containerd 进行通信;远程则通过 HTTP 与远端节点进行 Manifest 文件交互,接口定义如下所示,负责查询要镜象是否存在,获取,上传及删除操作。

  1. type ManifestService interface {
  2. // Exists returns true if the manifest exists.
  3. Exists(ctx context.Context, dgst digest.Digest) (bool, error)
  4. // Get retrieves the manifest specified by the given digest
  5. Get(ctx context.Context, dgst digest.Digest, options ...ManifestServiceOption) (Manifest, error)
  6. // Put creates or updates the given manifest returning the manifest digest
  7. Put(ctx context.Context, manifest Manifest, options ...ManifestServiceOption) (digest.Digest, error)
  8. // Delete removes the manifest specified by the given digest. Deleting
  9. // a manifest that doesn't exist will return ErrManifestNotFound
  10. Delete(ctx context.Context, dgst digest.Digest) error
  11. }

Types

从远端下载的 Manifest 文件有四种格式,每一种不同的文件对应的拉取镜像方法都不一样,接下来我们以 schema2.DeserializedManifest 格式展开。

  1. switch v := manifest.(type) {
  2. case *schema1.SignedManifest:
  3. if p.config.RequireSchema2 {
  4. return false, fmt.Errorf("invalid manifest: not schema2")
  5. }
  6. // give registries time to upgrade to schema2 and only warn if we know a registry has been upgraded long time ago
  7. // TODO: condition to be removed
  8. if reference.Domain(ref) == "docker.io" {
  9. msg := fmt.Sprintf("Image %s uses outdated schema1 manifest format. Please upgrade to a schema2 image for better future compatibility. More information at https://docs.docker.com/registry/spec/deprecated-schema-v1/", ref)
  10. logrus.Warn(msg)
  11. progress.Message(p.config.ProgressOutput, "", msg)
  12. }
  13. id, manifestDigest, err = p.pullSchema1(ctx, ref, v, platform)
  14. if err != nil {
  15. return false, err
  16. }
  17. case *schema2.DeserializedManifest:
  18. id, manifestDigest, err = p.pullSchema2(ctx, ref, v, platform)
  19. if err != nil {
  20. return false, err
  21. }
  22. case *ocischema.DeserializedManifest:
  23. id, manifestDigest, err = p.pullOCI(ctx, ref, v, platform)
  24. if err != nil {
  25. return false, err
  26. }
  27. case *manifestlist.DeserializedManifestList:
  28. id, manifestDigest, err = p.pullManifestList(ctx, ref, v, platform)
  29. if err != nil {
  30. return false, err
  31. }
  32. default:
  33. return false, invalidManifestFormatError{}
  34. }

关键定义如下所示,可以看到第一个有效域是版本信息,接下来是配置及分层,使用的都是统一的描述符。

  1. type DeserializedManifest struct {
  2. Manifest
  3. // canonical is the canonical byte representation of the Manifest.
  4. canonical []byte
  5. }
  6. type Manifest struct {
  7. manifest.Versioned
  8. // Config references the image configuration as a blob.
  9. Config distribution.Descriptor `json:"config"`
  10. // Layers lists descriptors for the layers referenced by the
  11. // configuration.
  12. Layers []distribution.Descriptor `json:"layers"`
  13. }

描述符里包含了媒体类型、大小、唯一 ID、及平台信息,如下所示

  1. type Descriptor struct {
  2. // MediaType describe the type of the content. All text based formats are
  3. // encoded as utf-8.
  4. MediaType string `json:"mediaType,omitempty"`
  5. // Size in bytes of content.
  6. Size int64 `json:"size,omitempty"`
  7. // Digest uniquely identifies the content. A byte stream can be verified
  8. // against this digest.
  9. Digest digest.Digest `json:"digest,omitempty"`
  10. // URLs contains the source URLs of this content.
  11. URLs []string `json:"urls,omitempty"`
  12. // Annotations contains arbitrary metadata relating to the targeted content.
  13. Annotations map[string]string `json:"annotations,omitempty"`
  14. // Platform describes the platform which the image in the manifest runs on.
  15. // This should only be used when referring to a manifest.
  16. Platform *v1.Platform `json:"platform,omitempty"`
  17. // NOTE: Before adding a field here, please ensure that all
  18. // other options have been exhausted. Much of the type relationships
  19. // depend on the simplicity of this type.
  20. }

平台信息则包含了 CPU 类型、OS 类型、OS 版本、特征及其他信息,通过这些,我们可以猜测引入 Manifest 文件是为了让镜像适配不同的 CPU 及操作系统的。

type Platform struct {
    // Architecture field specifies the CPU architecture, for example
    // `amd64` or `ppc64`.
    Architecture string `json:"architecture"`

    // OS specifies the operating system, for example `linux` or `windows`.
    OS string `json:"os"`

    // OSVersion is an optional field specifying the operating system
    // version, for example on Windows `10.0.14393.1066`.
    OSVersion string `json:"os.version,omitempty"`

    // OSFeatures is an optional field specifying an array of strings,
    // each listing a required OS feature (for example on Windows `win32k`).
    OSFeatures []string `json:"os.features,omitempty"`

    // Variant is an optional field specifying a variant of the CPU, for
    // example `v7` to specify ARMv7 when architecture is `arm`.
    Variant string `json:"variant,omitempty"`
}

Download in Details

downloader-manager-download-in-detail.svg
图 3:Layer 下载详细过程

下载一个镜像时,根据配置文件中的分层信息,对每一层构建一个 v2LayerDescriptor 信息,这个结构中包含了到远端仓库需要的连接信息,真正的下载动作由它关联的 Repository 执行。然后,生成一个负责下载的方法,再将这个下载方法做为参数传递给 transferManager。在 transferManager 执行 transfer 方法时,创建下载开始、结束的控制 channel,这两个 channel 通过 close 操作发送信号给监听协程。下面的代码对可以并发执行的下载数量进行控制,如果没有设置上限或当前下载数量没有超过预定上限,则立即执行本次下载。

if tm.concurrencyLimit == 0 || tm.activeTransfers < tm.concurrencyLimit {
    close(start)
    tm.activeTransfers++
} else {
    tm.waitingTransfers = append(tm.waitingTransfers, start)
}

传入的下载方法,会返回一个 downloadTransfer 实例,并启动一个下载协程,在这个下载协程中会监听下载开始信号,并在结束时发送下载完成信号(关闭 inactive)。

transferManager 中,还会启动两个协程,分别监听下载进度及下载完成信号,这样就可以将下载信息最终展示给控制台,并在下载结束时,启动下一个下载协程。

进度控制协程由 xfer.broadcast 方法实现,当然,细节还要看 xfer 结构实现的整体代码,广播部分代码如下所示。

func (t *xfer) broadcast(mainProgressChan <-chan progress.Progress) {
    for {
        var (
            p  progress.Progress
            ok bool
        )
        select {
        case p, ok = <-mainProgressChan:
        default:
            // We've depleted the channel, so now we can handle
            // reads on broadcastSyncChan to let detaching watchers
            // know we're caught up.
            select {
            case <-t.broadcastSyncChan:
                continue
            case p, ok = <-mainProgressChan:
            }
        }

        t.mu.Lock()
        if ok {
            t.lastProgress = p
            t.hasLastProgress = true
            for _, w := range t.watchers {
                select {
                case w.signalChan <- struct{}{}:
                default:
                }
            }
        } else {
            t.broadcastDone = true
        }
        t.mu.Unlock()
        if !ok {
            close(t.running)
            return
        }
    }
}

下载取消或完成监控代码则如下所示,tm.inactivates 中会启动下一个未完成的下载协程。

go func() {
    for {
        select {
        case <-inactive:
            tm.mu.Lock()
            tm.inactivate(start)
            tm.mu.Unlock()
            inactive = nil
        case <-xfer.done():
            tm.mu.Lock()
            if inactive != nil {
                tm.inactivate(start)
            }
            delete(tm.transfers, key)
            tm.mu.Unlock()
            xfer.close()
            return
        }
    }
}()

downloader-manager.xml