Overview

image-service-overview.svg
图 1:ImageSerivce 主要组件关系图

ImageSerivce 用于管理镜像,其中的核心组件及其关系如图 1 所示。根据主要功能,可以区分为镜像下载管理,镜像存储管理,镜像编译等等。

StoreBackend 用于管理镜像在存储设备上的创建、遍历、搜索、删除等功能,其接口定义如下,可以看到 digest.Digest 是管理镜像的核心结构,这个暂时先不展开,后面会有详细说明。

  1. type StoreBackend interface {
  2. Walk(f DigestWalkFunc) error
  3. Get(id digest.Digest) ([]byte, error)
  4. Set(data []byte) (digest.Digest, error)
  5. Delete(id digest.Digest) error
  6. SetMetadata(id digest.Digest, key string, data []byte) error
  7. GetMetadata(id digest.Digest, key string) ([]byte, error)
  8. DeleteMetadata(id digest.Digest, key string) error
  9. }

image.Store 用于创建和管理镜像,定义如下所示,注意 SetParent 和 GetParent 方法,说明镜像间是有一定关联的。

  1. type Store interface {
  2. Create(config []byte) (ID, error)
  3. Get(id ID) (*Image, error)
  4. Delete(id ID) ([]layer.Metadata, error)
  5. Search(partialID string) (ID, error)
  6. SetParent(id ID, parent ID) error
  7. GetParent(id ID) (ID, error)
  8. SetLastUpdated(id ID) error
  9. GetLastUpdated(id ID) (time.Time, error)
  10. Children(id ID) []ID
  11. Map() map[ID]*Image
  12. Heads() map[ID]*Image
  13. Len() int
  14. }

layer.Store 用于管理构成镜像的层及层间关系,镜像和层的关系可以参照图 2。简单说,每一层记录其父层,并仅保存与父层的差别,这样层会形成一条链。由于层的设计,对于使用同一个镜像的多个容器实例,可以使用各自的可写入层,即节约空间,又不会对其他容器实例产生影响。

  1. type Store interface {
  2. Register(io.Reader, ChainID) (Layer, error)
  3. Get(ChainID) (Layer, error)
  4. Map() map[ChainID]Layer
  5. Release(Layer) ([]Metadata, error)
  6. CreateRWLayer(id string, parent ChainID, opts *CreateRWLayerOpts) (RWLayer, error)
  7. GetRWLayer(id string) (RWLayer, error)
  8. GetMountID(id string) (string, error)
  9. ReleaseRWLayer(RWLayer) ([]Metadata, error)
  10. Cleanup() error
  11. DriverStatus() [][2]string
  12. DriverName() string
  13. }

image.png image.png
图 2:Image 与 Layer

Layer 接口定义如下

  1. type Layer interface {
  2. TarStreamer
  3. // TarStreamFrom returns a tar archive stream for all the layer chain with
  4. // arbitrary depth.
  5. TarStreamFrom(ChainID) (io.ReadCloser, error)
  6. // ChainID returns the content hash of the entire layer chain. The hash
  7. // chain is made up of DiffID of top layer and all of its parents.
  8. ChainID() ChainID
  9. // DiffID returns the content hash of the layer
  10. // tar stream used to create this layer.
  11. DiffID() DiffID
  12. // Parent returns the next layer in the layer chain.
  13. Parent() Layer
  14. // Size returns the size of the entire layer chain. The size
  15. // is calculated from the total size of all files in the layers.
  16. Size() int64
  17. // DiffSize returns the size difference of the top layer
  18. // from parent layer.
  19. DiffSize() int64
  20. // Metadata returns the low level storage metadata associated
  21. // with layer.
  22. Metadata() (map[string]string, error)
  23. }

Utility

Digest

Digest 是一个字符串类型的别名,是一个形如:algorithm:hex 的字符串,用于唯一标定一个对象并验证这个对象的内容是否正确。

  1. type Digest string
  2. func (d Digest) Algorithm() Algorithm {
  3. return Algorithm(d[:d.sepIndex()])
  4. }
  5. // Encoded returns the encoded portion of the digest. This will panic if the
  6. // underlying digest is not in a valid format.
  7. func (d Digest) Encoded() string {
  8. return string(d[d.sepIndex()+1:])
  9. }
  10. // Hex is deprecated. Please use Digest.Encoded.
  11. func (d Digest) Hex() string {
  12. return d.Encoded()
  13. }
  14. func (d Digest) String() string {
  15. return string(d)
  16. }
  17. func (d Digest) sepIndex() int {
  18. i := strings.Index(string(d), ":")
  19. if i < 0 {
  20. panic(fmt.Sprintf("no ':' separator in digest %q", d))
  21. }
  22. return i
  23. }

Algorithm

Algorithm 也是字符串类型的一个别名,通过这个类型可以将常用的算法进行封装,放在这里的目的主要是展示 Go 编程的技巧。

  1. type Algorithm string
  2. const (
  3. SHA256 Algorithm = "sha256" // sha256 with hex encoding (lower case only)
  4. SHA384 Algorithm = "sha384" // sha384 with hex encoding (lower case only)
  5. SHA512 Algorithm = "sha512" // sha512 with hex encoding (lower case only)
  6. // Canonical is the primary digest algorithm used with the distribution
  7. // project. Other digests may be used but this one is the primary storage
  8. // digest.
  9. Canonical = SHA256
  10. )
  11. var (
  12. // TODO(stevvooe): Follow the pattern of the standard crypto package for
  13. // registration of digests. Effectively, we are a registerable set and
  14. // common symbol access.
  15. // algorithms maps values to hash.Hash implementations. Other algorithms
  16. // may be available but they cannot be calculated by the digest package.
  17. algorithms = map[Algorithm]crypto.Hash{
  18. SHA256: crypto.SHA256,
  19. SHA384: crypto.SHA384,
  20. SHA512: crypto.SHA512,
  21. }
  22. // anchoredEncodedRegexps contains anchored regular expressions for hex-encoded digests.
  23. // Note that /A-F/ disallowed.
  24. anchoredEncodedRegexps = map[Algorithm]*regexp.Regexp{
  25. SHA256: regexp.MustCompile(`^[a-f0-9]{64}$`),
  26. SHA384: regexp.MustCompile(`^[a-f0-9]{96}$`),
  27. SHA512: regexp.MustCompile(`^[a-f0-9]{128}$`),
  28. }
  29. )
  30. func (a Algorithm) Available() bool {
  31. h, ok := algorithms[a]
  32. if !ok {
  33. return false
  34. }
  35. // check availability of the hash, as well
  36. return h.Available()
  37. }
  38. func (a Algorithm) Hash() hash.Hash {
  39. if !a.Available() {
  40. // Empty algorithm string is invalid
  41. if a == "" {
  42. panic(fmt.Sprintf("empty digest algorithm, validate before calling Algorithm.Hash()"))
  43. }
  44. // NOTE(stevvooe): A missing hash is usually a programming error that
  45. // must be resolved at compile time. We don't import in the digest
  46. // package to allow users to choose their hash implementation (such as
  47. // when using stevvooe/resumable or a hardware accelerated package).
  48. //
  49. // Applications that may want to resolve the hash at runtime should
  50. // call Algorithm.Available before call Algorithm.Hash().
  51. panic(fmt.Sprintf("%v not available (make sure it is imported)", a))
  52. }
  53. return algorithms[a].New()
  54. }

Storage Backend

StorageBackend 用于将镜像持久化至磁盘。在 Docker Daemon 中,使用的是 fs 类型,也就是文件系统,其定义如下,两个常量分别代表数据目录和元数据目录。

  1. type fs struct {
  2. sync.RWMutex
  3. root string
  4. }
  5. const (
  6. contentDirName = "content"
  7. metadataDirName = "metadata"
  8. )

Walk 是目录遍历方法,首先遍历 content/sha256 目录下内容,并通过 Hash 方法进行验证,验证成功的 Digest 会使用传入的 DigestWalkFunc 进行处理。

  1. func (s *fs) Walk(f DigestWalkFunc) error {
  2. // Only Canonical digest (sha256) is currently supported
  3. s.RLock()
  4. dir, err := os.ReadDir(filepath.Join(s.root, contentDirName, string(digest.Canonical)))
  5. s.RUnlock()
  6. if err != nil {
  7. return err
  8. }
  9. for _, v := range dir {
  10. dgst := digest.NewDigestFromHex(string(digest.Canonical), v.Name())
  11. if err := dgst.Validate(); err != nil {
  12. logrus.Debugf("skipping invalid digest %s: %s", dgst, err)
  13. continue
  14. }
  15. if err := f(dgst); err != nil {
  16. return err
  17. }
  18. }
  19. return nil
  20. }

Set 方法将传入的内容写入 content 目录下,并根据传入数据内容默认使用 SHA256 计算其摘要编码,配合 Walk 可以知道,写入目录与遍历目录一定是相同的。

  1. func (s *fs) Set(data []byte) (digest.Digest, error) {
  2. s.Lock()
  3. defer s.Unlock()
  4. if len(data) == 0 {
  5. return "", fmt.Errorf("invalid empty data")
  6. }
  7. dgst := digest.FromBytes(data)
  8. if err := ioutils.AtomicWriteFile(s.contentFile(dgst), data, 0600); err != nil {
  9. return "", errors.Wrap(err, "failed to write digest data")
  10. }
  11. return dgst, nil
  12. }

SetMetadata 将 Digest 对应的 metadata 目录下,创建 key 文件,并写入内容,要注意在删除一个 Digest 时,会同时删除内容和元数据文件。

  1. func (s *fs) SetMetadata(dgst digest.Digest, key string, data []byte) error {
  2. s.Lock()
  3. defer s.Unlock()
  4. if _, err := s.get(dgst); err != nil {
  5. return err
  6. }
  7. baseDir := filepath.Join(s.metadataDir(dgst))
  8. if err := os.MkdirAll(baseDir, 0700); err != nil {
  9. return err
  10. }
  11. return ioutils.AtomicWriteFile(filepath.Join(s.metadataDir(dgst), key), data, 0600)
  12. }

Command Lines

可以在 macOS 的控制台中,通过执行 docker info 来获取 Docker Daemon 基础信息,如下所示:

  1. Client:
  2. Context: default
  3. Debug Mode: false
  4. Plugins:
  5. buildx: Docker Buildx (Docker Inc., v0.7.1)
  6. compose: Docker Compose (Docker Inc., v2.2.3)
  7. scan: Docker Scan (Docker Inc., v0.17.0)
  8. Server:
  9. Containers: 11
  10. Running: 11
  11. Paused: 0
  12. Stopped: 0
  13. Images: 16
  14. Server Version: 20.10.12
  15. Storage Driver: overlay2
  16. Backing Filesystem: extfs
  17. Supports d_type: true
  18. Native Overlay Diff: true
  19. userxattr: false
  20. Logging Driver: json-file
  21. Cgroup Driver: cgroupfs
  22. Cgroup Version: 2
  23. Plugins:
  24. Volume: local
  25. Network: bridge host ipvlan macvlan null overlay
  26. Log: awslogs fluentd gcplogs gelf journald json-file local logentries splunk syslog
  27. Swarm: inactive
  28. Runtimes: runc io.containerd.runc.v2 io.containerd.runtime.v1.linux
  29. Default Runtime: runc
  30. Init Binary: docker-init
  31. containerd version: 7b11cfaabd73bb80907dd23182b9347b4245eb5d
  32. runc version: v1.0.2-0-g52b36a2
  33. init version: de40ad0
  34. Security Options:
  35. seccomp
  36. Profile: default
  37. cgroupns
  38. Kernel Version: 5.10.76-linuxkit
  39. Operating System: Docker Desktop
  40. OSType: linux
  41. Architecture: x86_64
  42. CPUs: 2
  43. Total Memory: 1.94GiB
  44. Name: docker-desktop
  45. ID: X3GD:QCUE:WZJM:DZQY:KBJ3:2ZM7:NDBG:OF6U:UT3Q:MFR5:I6MF:6DWS
  46. Docker Root Dir: /var/lib/docker
  47. Debug Mode: true
  48. File Descriptors: 147
  49. Goroutines: 114
  50. System Time: 2022-03-20T09:09:38.484312531Z
  51. EventsListeners: 4
  52. HTTP Proxy: http.docker.internal:3128
  53. HTTPS Proxy: http.docker.internal:3128
  54. No Proxy: hubproxy.docker.internal
  55. Registry: https://index.docker.io/v1/
  56. Labels:
  57. Experimental: false
  58. Insecure Registries:
  59. hubproxy.docker.internal:5000
  60. 127.0.0.0/8
  61. Registry Mirrors:
  62. https://dockerhub.azk8s.cn/
  63. https://reg-mirror.qiniu.com/
  64. Live Restore Enabled: false

在 macOS 中,Docker 需要通过 Hyperkit 来运行虚拟机,可以通过下面的命令进入虚拟机,并通过 Ctrl + a + k 来结束会话。

screen ~/Library/Containers/com.docker.docker/Data/vms/0/tty

对于某些版本的 Docker,上述命令存在 Bug,可以使用如下命令来访问,使用这个命令进入 Shell 后,退出时直接使用 exit 命令即可。

docker run -it --privileged --pid=host debian nsenter -t 1 -m -u -n -i sh

Docker 默认配置在目录 /var/lib/docker 下,比如可以通过下面的命令确认 Storage Backend 默认目录为 image/overlay2/imagedb

/var/lib/docker # ls -la image/overlay2/imagedb/
total 16
drwx------    4 root     root          4096 Apr 17  2019 .
drwx------    5 root     root          4096 Mar 20 09:21 ..
drwx------    3 root     root          4096 Apr 17  2019 content
drwx------    3 root     root          4096 Apr 17  2019 metadata

Image Actions

Interfaces

与镜像名称相关的接口类型主要有以下几个,镜像链接地址格式为:host/repository/name:tags

type Reference interface {
    // String returns the full reference
    String() string
}

type Named interface {
    Reference
    Name() string
}

type Canonical interface {
    Named
    Digest() digest.Digest
}

Pull Image

当 Docker Client 触发 /images/create 路由时,会调用 ImageService 的 PullImage 方法进行处理。该方法首先对要拉取的镜像参数(包括 tag)进行处理,并最终将镜像参数转换为一个 Named 实例,然后执行拉取操作,操作完成后,再通过 40 行的 GetImage 方法验证本地是否存在已下载的镜像。

func (i *ImageService) PullImage(ctx context.Context, image, tag string, platform *specs.Platform, metaHeaders map[string][]string, authConfig *types.AuthConfig, outStream io.Writer) error {
    start := time.Now()
    // Special case: "pull -a" may send an image name with a
    // trailing :. This is ugly, but let's not break API
    // compatibility.
    image = strings.TrimSuffix(image, ":")

    ref, err := reference.ParseNormalizedNamed(image)
    if err != nil {
        return errdefs.InvalidParameter(err)
    }

    if tag != "" {
        // The "tag" could actually be a digest.
        var dgst digest.Digest
        dgst, err = digest.Parse(tag)
        if err == nil {
            ref, err = reference.WithDigest(reference.TrimNamed(ref), dgst)
        } else {
            ref, err = reference.WithTag(ref, tag)
        }
        if err != nil {
            return errdefs.InvalidParameter(err)
        }
    }

    err = i.pullImageWithReference(ctx, ref, platform, metaHeaders, authConfig, outStream)
    imageActions.WithValues("pull").UpdateSince(start)
    if err != nil {
        return err
    }

    if platform != nil {
        // If --platform was specified, check that the image we pulled matches
        // the expected platform. This check is for situations where the image
        // is a single-arch image, in which case (for backward compatibility),
        // we allow the image to have a non-matching architecture. The code
        // below checks for this situation, and returns a warning to the client,
        // as well as logging it to the daemon logs.
        img, err := i.GetImage(image, platform)

        // Note that this is a special case where GetImage returns both an image
        // and an error: https://github.com/docker/docker/blob/v20.10.7/daemon/images/image.go#L175-L183
        if errdefs.IsNotFound(err) && img != nil {
            po := streamformatter.NewJSONProgressOutput(outStream, false)
            progress.Messagef(po, "", `WARNING: %s`, err.Error())
            logrus.WithError(err).WithField("image", image).Warn("ignoring platform mismatch on single-arch image")
        }
    }

    return nil
}

拉取镜像的核心方法为 pullImageWithReference,在这个方法中,首先创建进度、写入完成、取消方法等控制结构,然后启动协程监控拉取进度(10 ~ 13);然后创建 ImagePullConfig 核心配置,并执行拉取操作。

func (i *ImageService) pullImageWithReference(ctx context.Context, ref reference.Named, platform *specs.Platform, metaHeaders map[string][]string, authConfig *types.AuthConfig, outStream io.Writer) error {
    // Include a buffer so that slow client connections don't affect
    // transfer performance.
    progressChan := make(chan progress.Progress, 100)

    writesDone := make(chan struct{})

    ctx, cancelFunc := context.WithCancel(ctx)

    go func() {
        progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan)
        close(writesDone)
    }()

    ctx = namespaces.WithNamespace(ctx, i.contentNamespace)
    // Take out a temporary lease for everything that gets persisted to the content store.
    // Before the lease is cancelled, any content we want to keep should have it's own lease applied.
    ctx, done, err := tempLease(ctx, i.leases)
    if err != nil {
        return err
    }
    defer done(ctx)

    cs := &contentStoreForPull{
        ContentStore: i.content,
        leases:       i.leases,
    }
    imageStore := &imageStoreForPull{
        ImageConfigStore: distribution.NewImageConfigStoreFromStore(i.imageStore),
        ingested:         cs,
        leases:           i.leases,
    }

    imagePullConfig := &distribution.ImagePullConfig{
        Config: distribution.Config{
            MetaHeaders:      metaHeaders,
            AuthConfig:       authConfig,
            ProgressOutput:   progress.ChanOutput(progressChan),
            RegistryService:  i.registryService,
            ImageEventLogger: i.LogImageEvent,
            MetadataStore:    i.distributionMetadataStore,
            ImageStore:       imageStore,
            ReferenceStore:   i.referenceStore,
        },
        DownloadManager: i.downloadManager,
        Schema2Types:    distribution.ImageTypes,
        Platform:        platform,
    }

    err = distribution.Pull(ctx, ref, imagePullConfig, cs)
    close(progressChan)
    <-writesDone
    return err
}

执行上述方法后,关键实例如图 3 所示
image-service-image-pull-config.svg
图 3:Image Pull 相关结构体关系图

distribution.ContentStore 用于存储 Manifest 配置文件。

type ContentStore interface {
    content.Ingester
    content.Provider
    Info(ctx context.Context, dgst digest.Digest) (content.Info, error)
    Abort(ctx context.Context, ref string) error
}

distribution.ImageConfigStore 则用于持久化镜像配置文件。

type ImageConfigStore interface {
    Put(context.Context, []byte) (digest.Digest, error)
    Get(context.Context, digest.Digest) ([]byte, error)
    RootFSFromConfig([]byte) (*image.RootFS, error)
    PlatformFromConfig([]byte) (*specs.Platform, error)
}

reference.Store 用于镜像标签、名称及摘要编码等信息。

type Store interface {
    References(id digest.Digest) []reference.Named
    ReferencesByName(ref reference.Named) []Association
    AddTag(ref reference.Named, id digest.Digest, force bool) error
    AddDigest(ref reference.Canonical, id digest.Digest, force bool) error
    Delete(ref reference.Named) (bool, error)
    Get(ref reference.Named) (digest.Digest, error)
}

Repository Authentication

下载 Docker 镜像需要指定所需的目标镜像仓库,目标仓库一般都需要进行身份鉴定操作,通过图 4 的设计可以完美实现该功能。首先,通过镜像仓库获取可访问的 endpoint 信息,并通过 ping 操作,保证端点是可访问的,然后通过应答获取鉴权类型。最后,通过新建一个 transport.transport 实例,并添加对应的 RequestModifier 接口实例来实现对于仓库的访问。

image-service-repository-request-design.svg
图 4:Transport 设计

transport.RequestModifier 用于对传入的 http.Request 实例进行修改,根据实际需要添加相应功能,保证设计灵活性。

type RequestModifier interface {
    ModifyRequest(*http.Request) error
}

以 headerModifier 实现为例,首先将该类型设计为 http.Header 的类型别名,然后在此基础上添加新增功能,实现方式非常简单,在这个例子中也要理解 Go 类型设计的便利性及正确使用方式。

type headerModifier http.Header

// NewHeaderRequestModifier returns a new RequestModifier which will
// add the given headers to a request.
func NewHeaderRequestModifier(header http.Header) RequestModifier {
    return headerModifier(header)
}

func (h headerModifier) ModifyRequest(req *http.Request) error {
    for k, s := range http.Header(h) {
        req.Header[k] = append(req.Header[k], s...)
    }

    return nil
}

最终,通过认证请求获取的 challenge.Manager 作用是管理 repository 下不同 endpoint 的认证信息。

type Manager interface {
    // GetChallenges returns the challenges for the given
    // endpoint URL.
    GetChallenges(endpoint url.URL) ([]Challenge, error)

    // AddResponse adds the response to the challenge
    // manager. The challenges will be parsed out of
    // the WWW-Authenicate headers and added to the
    // URL which was produced the response. If the
    // response was authorized, any challenges for the
    // endpoint will be cleared.
    AddResponse(resp *http.Response) error
}

完整实现如下所示,11 ~ 40 获取 challenge.Manager 时使用一个 transport.transport 实例,这个实例中的请求修改器与鉴权关系不大;42 ~ 63 行创建新的 transport.transport 实例,注意这时候就已经包含了真实的鉴权请求修改器了,最终返回的实例可以对仓库进行访问。

func NewV2Repository(
    ctx context.Context, repoInfo *registry.RepositoryInfo, endpoint registry.APIEndpoint,
    metaHeaders http.Header, authConfig *types.AuthConfig, actions ...string,
) (repo distribution.Repository, err error) {
    repoName := repoInfo.Name.Name()
    // If endpoint does not support CanonicalName, use the RemoteName instead
    if endpoint.TrimHostname {
        repoName = reference.Path(repoInfo.Name)
    }

    direct := &net.Dialer{
        Timeout:   30 * time.Second,
        KeepAlive: 30 * time.Second,
    }

    // TODO(dmcgowan): Call close idle connections when complete, use keep alive
    base := &http.Transport{
        Proxy:               http.ProxyFromEnvironment,
        DialContext:         direct.DialContext,
        TLSHandshakeTimeout: 10 * time.Second,
        TLSClientConfig:     endpoint.TLSConfig,
        // TODO(dmcgowan): Call close idle connections when complete and use keep alive
        DisableKeepAlives: true,
    }

    modifiers := registry.Headers(dockerversion.DockerUserAgent(ctx), metaHeaders)
    authTransport := transport.NewTransport(base, modifiers...)

    challengeManager, err := registry.PingV2Registry(endpoint.URL, authTransport)
    if err != nil {
        transportOK := false
        if responseErr, ok := err.(registry.PingResponseError); ok {
            transportOK = true
            err = responseErr.Err
        }
        return nil, fallbackError{
            err:         err,
            transportOK: transportOK,
        }
    }

    if authConfig.RegistryToken != "" {
        passThruTokenHandler := &existingTokenHandler{token: authConfig.RegistryToken}
        modifiers = append(modifiers, auth.NewAuthorizer(challengeManager, passThruTokenHandler))
    } else {
        scope := auth.RepositoryScope{
            Repository: repoName,
            Actions:    actions,
            Class:      repoInfo.Class,
        }

        creds := registry.NewStaticCredentialStore(authConfig)
        tokenHandlerOptions := auth.TokenHandlerOptions{
            Transport:   authTransport,
            Credentials: creds,
            Scopes:      []auth.Scope{scope},
            ClientID:    registry.AuthClientID,
        }
        tokenHandler := auth.NewTokenHandlerWithOptions(tokenHandlerOptions)
        basicHandler := auth.NewBasicHandler(creds)
        modifiers = append(modifiers, auth.NewAuthorizer(challengeManager, tokenHandler, basicHandler))
    }
    tr := transport.NewTransport(base, modifiers...)

    repoNameRef, err := reference.WithName(repoName)
    if err != nil {
        return nil, fallbackError{
            err:         err,
            transportOK: true,
        }
    }

    repo, err = client.NewRepository(repoNameRef, endpoint.URL.String(), tr)
    if err != nil {
        err = fallbackError{
            err:         err,
            transportOK: true,
        }
    }
    return
}

distribution.Repository 用于管理与镜像标签、内容相关的操作,通过这个接口可以获取与镜像内容相关的各种访问、管理功能的接口实例,这些实例与 Repository 共享一个真实的 http.Client 实例。

type Repository interface {
    // Named returns the name of the repository.
    Named() reference.Named

    // Manifests returns a reference to this repository's manifest service.
    // with the supplied options applied.
    Manifests(ctx context.Context, options ...ManifestServiceOption) (ManifestService, error)

    // Blobs returns a reference to this repository's blob service.
    Blobs(ctx context.Context) BlobStore

    // TODO(stevvooe): The above BlobStore return can probably be relaxed to
    // be a BlobService for use with clients. This will allow such
    // implementations to avoid implementing ServeBlob.

    // Tags returns a reference to this repositories tag service
    Tags(ctx context.Context) TagService
}

实现 Repository 的实例关系如图 5 所示,这部分关于 HTTP 请求的设计是非常不错的,建议结合代码深入学习、理解。
image-service-repository.svg
图 5:Repository 接口实现

Download Manager

image-service-download-manager-download.svg
图 6:Download 流程图

LayerDownloadManager 启动一个下载任务时,需要根据 DownloadDescriptor 接口实例创建一个下载方法。DownloadDescriptor 包含了全部的核心方法,事实上下载任务也是由它自己完成的。

type DownloadDescriptor interface {
    // Key returns the key used to deduplicate downloads.
    Key() string
    // ID returns the ID for display purposes.
    ID() string
    // DiffID should return the DiffID for this layer, or an error
    // if it is unknown (for example, if it has not been downloaded
    // before).
    DiffID() (layer.DiffID, error)
    // Download is called to perform the download.
    Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error)
    // Close is called when the download manager is finished with this
    // descriptor and will not call Download again or read from the reader
    // that Download returned.
    Close()
}

实现 DownloadDescriptor 接口的唯一实体是 v2LayerDescriptor,其中第 5 行包含的 Repository 是关键,它包含了真实的到镜像仓库的 http.Client,真正的下载就是由这个 Client 来完成的。

type v2LayerDescriptor struct {
    digest            digest.Digest
    diffID            layer.DiffID
    repoInfo          *registry.RepositoryInfo
    repo              distribution.Repository
    V2MetadataService metadata.V2MetadataService
    tmpFile           *os.File
    verifier          digest.Verifier
    src               distribution.Descriptor
}

v2LayerDescriptor 的 Download 方法中,传入了一个 transfer 接口实例提供的 context.Context,这个 Context 可以控制下载的实际进度,主要用于取消下载。v2LayerDescriptor 的 Download 方法在生成的下载方法中以独立的协程方式执行。

type transfer interface {
    watch(progressOutput progress.Output) *watcher
    release(*watcher)
    context() context.Context
    close()
    done() <-chan struct{}
    released() <-chan struct{}
    broadcast(mainProgressChan <-chan progress.Progress)
}

至此,与镜像下载的核心框架就完成了,细节设计可结合源码详细学习,比如 layer.Store 的作用是查找 layer 是否存在,并在下载完成后注册 layer 等等。