在执行docker load的时候,首先由Docker Client向Docker Server发送load请求。是由Docker Daemon完成镜像的load。Docker是C/S架构,所以先Docker Client去解析请求,源码实现在./docker/api/cient/command.go#L2289-2316
func (cli *DockerCli) CmdLoad(args ...string) error {
cmd := cli.Subcmd("load", "", "Load an image from a tar archive on STDIN")
通过client包的Subcmd方法返回一个实例化的flagSet对象cmd。
func (cli *DockerCli) Subcmd(name, signature, description string) *flag.FlagSet {
flags := flag.NewFlagSet(name, flag.ContinueOnError)
flags.Usage = func() {
fmt.Fprintf(cli.err, "\nUsage: docker %s %s\n\n%s\n\n", name, signature, description)
flags.PrintDefaults()
os.Exit(2)
}
return flags
}
通过cmd.String生成命令行参数对应的变量,变量为指针类型,通过cmd.Parse(args)对变量进行解析,解析后就会建立绑定关系。由于docker load -i没有不被解析的参数,所以,如果cmd.NArg()不等于零,则异常,返回帮助信息。
infile := cmd.String([]string{"i", "-input"}, "", "Read from a tar archive file, instead of STDIN")
if err := cmd.Parse(args); err != nil {
return err
}
if cmd.NArg() != 0 {
cmd.Usage()
return nil
}
以上其实,代码其实就是对docker load后面的参数进行解析绑定,如果有异常直接返回。详细的解释,可以参考处理命令行参数的flags库。
定义io.Reader类型和error类型的变量,然后打开输入的文件,此时*infile存储的是load的镜像。in_put存储的是镜像文件对象。
var (
input io.Reader = cli.in
err error
)
if *infile != "" {
input, err = os.Open(*infile)
if err != nil {
return err
}
}
准备好参数后,就调用http请求的POST方法,向Docker Server发送请求。
if err := cli.stream("POST", "/images/load", input, cli.out, nil); err != nil {
return err
}
return nil
Docker Server接受Client的发送的请求。Docker Server接收到镜像的load请求后,通过路由分发最后由具体的方法去处理,具体方法在./docker/api/server/server.go#622-626
func postImagesLoad(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
job := eng.Job("load")
job.Stdin.Add(r.Body)
return job.Run()
}
这个方法主要工作就是创建镜像load的Job,并且把请求的body作为Job的标准输入参数,最后触发下载的Job.
Docker Daemon是完成Job执行的主要载体。具体的执行函数是CmdLoad,具体在./docker/graph/load.go#18-94
func (s *TagStore) CmdLoad(job *engine.Job) engine.Status {
// 创建临时目录
tmpImageDir, err := ioutil.TempDir("", "docker-import-")
var (
repoTarFile = path.Join(tmpImageDir, "repo.tar")
repoDir = path.Join(tmpImageDir, "repo")
)
tarFile, err := os.Create(repoTarFile)
if err != nil {
return job.Error(err)
}
if _, err := io.Copy(tarFile, job.Stdin); err != nil {
return job.Error(err)
}
tarFile.Close()
repoFile, err := os.Open(repoTarFile)
if err != nil {
return job.Error(err)
}
if err := os.Mkdir(repoDir, os.ModeDir); err != nil {
return job.Error(err)
}
首先在临时目录tmp中创建目录docker-import-xxxx.
然后定义两个路径变量repoTarFile和repoDir
创建文件repoTarFile,并且返回文件对象tarFile,然后把输入的镜像文件对象复制给打开的文件对象tarFile。打开已经拷贝的镜像文件repoTarFile,创建文件repoDir。
images, err := s.graph.Map()
if err != nil {
return job.Error(err)
}
excludes := make([]string, len(images))
i := 0
for k := range images {
excludes[i] = k
i++
}
if err := archive.Untar(repoFile, repoDir, &archive.TarOptions{Excludes: excludes}); err != nil {
return job.Error(err)
}
然后通过方法s.graph.Map()获取已有的镜像列表,得到镜像的列表excludes,然后通过方法archive.Untar去解压,如果load的镜像已经存在的话,则不会解压镜像的层次,也就不会出现目录,下面的s.recursiveLoad方法就不会调用。
dirs, err := ioutil.ReadDir(repoDir)
if err != nil {
return job.Error(err)
}
for _, d := range dirs {
if d.IsDir() {
if err := s.recursiveLoad(job.Eng, d.Name(), tmpImageDir); err != nil {
return job.Error(err)
}
}
}
通过ioutil.ReadDir方法去读取解压的文件repoDir,然后遍历,如果是文件夹则执行方法s.recursiveLoad
func (s *TagStore) recursiveLoad(eng *engine.Engine, address, tmpImageDir string) error {
if err := eng.Job("image_get", address).Run(); err != nil {
log.Debugf("Loading %s", address)
// core
imageJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", address, "json"))
if err != nil {
log.Debugf("Error reading json", err)
return err
}
// core
layer, err := os.Open(path.Join(tmpImageDir, "repo", address, "layer.tar"))
if err != nil {
log.Debugf("Error reading embedded tar", err)
return err
}
// core
img, err := image.NewImgJSON(imageJson)
if err != nil {
log.Debugf("Error unmarshalling json", err)
return err
}
if img.Parent != "" {
if !s.graph.Exists(img.Parent) {
if err := s.recursiveLoad(eng, img.Parent, tmpImageDir); err != nil {
return err
}
}
}
// core
if err := s.graph.Register(imageJson, layer, img); err != nil {
return err
}
}
log.Debugf("Completed processing %s", address)
return nil
}
首先通过方法eng.Job()判断,加载的镜像的层是否已经存在,如果存在则直接退出,如果不存在,通过方法ioutil.ReadFile()读取解压的json信息。通过方法os.Open(path.Join(tmpImageDir, “repo”, address, “layer.tar”))读取打开的文件。然后实例化一个image的类。
image的类定义和初始化在./docker/image/image.go#322-331
func NewImgJSON(src []byte) (*Image, error) {
ret := &Image{}
log.Debugf("Json string: {%s}", src)
// FIXME: Is there a cleaner way to "purify" the input json?
if err := json.Unmarshal(src, ret); err != nil {
return nil, err
}
return ret, nil
}
然后一个实例化的镜像对象,
然后对镜像的Json文件进行解析,如果存在Parent的layer,则循环执行s.recursiveLoad,在解压后,会遍历对镜像的层进行添加,为啥函数内还需要去判断Parent的layer是否存在去执行循环呢?如果不要内部的判断,从文件中遍历image的layer也是可以的。
需要去Register,把导入的镜像的每个层都注册到graph中。具体代码在./docker/graph/graph.go#161-218.
在grap中注册后,还需要在store中建立容器名字和镜像ID之间的关系。
repositoriesJson, err := ioutil.ReadFile(path.Join(tmpImageDir, "repo", "repositories"))
if err == nil {
repositories := map[string]Repository{}
if err := json.Unmarshal(repositoriesJson, &repositories); err != nil {
return job.Error(err)
}
log.Debugf("repositories: %s", repositories)
for imageName, tagMap := range repositories {
for tag, address := range tagMap {
if err := s.Set(imageName, tag, address, true); err != nil {
return job.Error(err)
}
}
}
} else if !os.IsNotExist(err) {
return job.Error(err)
}
打开文件repositories,然后遍历去执行方法s.Set(),具体的实现是./docker/graph/tags.go#172-205
type TagStore struct {
path string
graph *Graph
Repositories map[string]Repository
sync.Mutex
// FIXME: move push/pull-related fields
// to a helper type
pullingPool map[string]chan struct{}
pushingPool map[string]chan struct{}
}
type Repository map[string]string
类TagStore的属性Repositories,是一个map嵌套的类型。
func (store *TagStore) Set(repoName, tag, imageName string, force bool) error {
img, err := store.LookupImage(imageName)
store.Lock()
defer store.Unlock()
if err != nil {
return err
}
if tag == "" {
tag = DEFAULTTAG
}
if err := validateRepoName(repoName); err != nil {
return err
}
if err := validateTagName(tag); err != nil {
return err
}
if err := store.reload(); err != nil {
return err
}
var repo Repository
if r, exists := store.Repositories[repoName]; exists {
repo = r
} else {
repo = make(map[string]string)
if old, exists := store.Repositories[repoName]; exists && !force {
return fmt.Errorf("Conflict: Tag %s:%s is already set to %s", repoName, tag, old)
}
// core
store.Repositories[repoName] = repo
}
repo[tag] = img.ID
return store.save()
}
方法Set先去graph去查找是否存在对用的镜像,通过方法store.LookupImage(imageName)。最终也是通过graph.get()方法去获取镜像是否存在graph中。
func (store *TagStore) LookupImage(name string) (*image.Image, error) {
// FIXME: standardize on returning nil when the image doesn't exist, and err for everything else
// (so we can pass all errors here)
repos, tag := parsers.ParseRepositoryTag(name)
if tag == "" {
tag = DEFAULTTAG
}
img, err := store.GetImage(repos, tag)
store.Lock()
defer store.Unlock()
if err != nil {
return nil, err
} else if img == nil {
if img, err = store.graph.Get(name); err != nil {
return nil, err
}
}
return img, nil
}
如果存在,那就需要存到store中,建立容器名字和容器ID之间的关系。store.Repositories[repoName] = repo,也就是存储的是{“imageName”:{“tag”:”imageID”}}
而最后的store.save()会进行存盘,具体的文件是./var/lib/docker/repositories-devicemapper。
docker load上传过程结束,代码执行完后,需要调用
defer os.RemoveAll(tmpImageDir)
去删除刚开始创建的临时目录。
docker相关的镜像文件存在/var/lib/docker目录下,镜像的存储结构主要分成两部分,一是镜像ID之间的联系,二是镜像ID与镜像名称之间的关联。前者的结构体是Graph,后者是TagStore。
Graph结构存储了各个镜像的元数据及其之间的关系,但是并没有建立镜像名字和镜像的ID之间的关系。而TagStore结构存储了镜像名字tag(centos:latest)与其镜像ID( d0955f21bf24 )关联起来的数据结构。