- client.go#Client.NewContainer
- 1) client.go#Client.ContainerService
- 2) containerstore.go#remoteContainers.Create
client.go#Client.NewContainer
// NewContainer will create a new container in container with the provided id// the id must be unique within the namespacefunc (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {ctx, done, err := c.WithLease(ctx)if err != nil {return nil, err}defer done(ctx)container := containers.Container{ID: id,Runtime: containers.RuntimeInfo{Name: c.runtime,},}// 这里有两个o,一个是为c赋值spec,另一个是为c赋值Runtimefor _, o := range opts {if err := o(ctx, c, &container); err != nil {return nil, err}}// ********************************** NOTICE ********************************** //r, err := c.ContainerService().Create(ctx, container)// ********************************** NOTICE ********************************** //if err != nil {return nil, err}// ********************************** NOTICE ********************************** //return containerFromRecord(c, r), nil// ********************************** NOTICE ********************************** //}
1) client.go#Client.ContainerService

ContainersClient是生成出来的,它一定有两个实现,一个是local(即server),一个是client。
// ContainerService returns the underlying container Storefunc (c *Client) ContainerService() containers.Store {if c.containerStore != nil {return c.containerStore}c.connMu.Lock()defer c.connMu.Unlock()return NewRemoteContainerStore(containersapi.NewContainersClient(c.conn))}// api/services/containers/v1/containers.pb.gotype ContainersClient interface {Get(ctx context.Context, in *GetContainerRequest, opts ...grpc.CallOption) (*GetContainerResponse, error)List(ctx context.Context, in *ListContainersRequest, opts ...grpc.CallOption) (*ListContainersResponse, error)ListStream(ctx context.Context, in *ListContainersRequest, opts ...grpc.CallOption) (Containers_ListStreamClient, error)Create(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error)Update(ctx context.Context, in *UpdateContainerRequest, opts ...grpc.CallOption) (*UpdateContainerResponse, error)Delete(ctx context.Context, in *DeleteContainerRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error)}type containersClient struct {cc *grpc.ClientConn}func NewContainersClient(cc *grpc.ClientConn) ContainersClient {return &containersClient{cc}}// containerstore.go// NewRemoteContainerStore returns the container Store connected with the provided clientfunc NewRemoteContainerStore(client containersapi.ContainersClient) containers.Store {return &remoteContainers{client: client,}type remoteContainers struct {client containersapi.ContainersClient}
2) containerstore.go#remoteContainers.Create
// Store interacts with the underlying container storagetype Store interface {Get(ctx context.Context, id string) (Container, error)// List returns containers that match one or more of the provided filters.List(ctx context.Context, filters ...string) ([]Container, error)// Create a container in the store from the provided container.Create(ctx context.Context, container Container) (Container, error)// Update the container with the provided container object. ID must be set.//// If one or more fieldpaths are provided, only the field corresponding to// the fieldpaths will be mutated.Update(ctx context.Context, container Container, fieldpaths ...string) (Container, error)// Delete a container using the id.//// nil will be returned on success. If the container is not known to the// store, ErrNotFound will be returned.Delete(ctx context.Context, id string) error}func (r *remoteContainers) Create(ctx context.Context, container containers.Container) (containers.Container, error) {created, err := r.client.Create(ctx, &containersapi.CreateContainerRequest{Container: containerToProto(&container),})if err != nil {return containers.Container{}, errdefs.FromGRPC(err)}return containerFromProto(&created.Container), nil}
2.1) containerstore.go#containerToProto
func containerToProto(container *containers.Container) containersapi.Container {return containersapi.Container{ID: container.ID,Labels: container.Labels,Image: container.Image,Runtime: &containersapi.Container_Runtime{Name: container.Runtime.Name,Options: container.Runtime.Options,},Spec: container.Spec,Snapshotter: container.Snapshotter,SnapshotKey: container.SnapshotKey,Extensions: container.Extensions,}}
2.2) api/services/containers/v1/containers.pb.go#containersClient.Create
func (c *containersClient) Create(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) {out := new(CreateContainerResponse)// ********************************** NOTICE ********************************** //err := grpc.Invoke(ctx, "/containerd.services.containers.v1.Containers/Create", in, out, c.cc, opts...)// ********************************** NOTICE ********************************** //if err != nil {return nil, err}return out, nil}
2.3) api/services/containers/v1/containers.pb.go#_Containers_Create_Handler
var _Containers_serviceDesc = grpc.ServiceDesc{ServiceName: "containerd.services.containers.v1.Containers",HandlerType: (*ContainersServer)(nil),Methods: []grpc.MethodDesc{{MethodName: "Get",Handler: _Containers_Get_Handler,},{MethodName: "List",Handler: _Containers_List_Handler,},{MethodName: "Create",Handler: _Containers_Create_Handler,},{MethodName: "Update",Handler: _Containers_Update_Handler,},{MethodName: "Delete",Handler: _Containers_Delete_Handler,},},Streams: []grpc.StreamDesc{{StreamName: "ListStream",Handler: _Containers_ListStream_Handler,ServerStreams: true,},},Metadata: "github.com/containerd/containerd/api/services/containers/v1/containers.proto",}func _Containers_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {in := new(CreateContainerRequest)if err := dec(in); err != nil {return nil, err}if interceptor == nil {return srv.(ContainersServer).Create(ctx, in)}info := &grpc.UnaryServerInfo{Server: srv,FullMethod: "/containerd.services.containers.v1.Containers/Create",}handler := func(ctx context.Context, req interface{}) (interface{}, error) {// ********************************** NOTICE ********************************** //return srv.(ContainersServer).Create(ctx, req.(*CreateContainerRequest))// ********************************** NOTICE ********************************** //}return interceptor(ctx, in, info, handler)}
2.3.1) services/containers/service.go#service.Create
type ContainersServer interface {Get(context.Context, *GetContainerRequest) (*GetContainerResponse, error)List(context.Context, *ListContainersRequest) (*ListContainersResponse, error)ListStream(*ListContainersRequest, Containers_ListStreamServer) errorCreate(context.Context, *CreateContainerRequest) (*CreateContainerResponse, error)Update(context.Context, *UpdateContainerRequest) (*UpdateContainerResponse, error)Delete(context.Context, *DeleteContainerRequest) (*google_protobuf2.Empty, error)}func (s *service) Create(ctx context.Context, req *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {return s.local.Create(ctx, req)}
2.3.1.1) services/containers/local.go#local.Create
func (l *local) Create(ctx context.Context, req *api.CreateContainerRequest, _ ...grpc.CallOption) (*api.CreateContainerResponse, error) {var resp api.CreateContainerResponse// ********************************** NOTICE ********************************** //if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {// ********************************** NOTICE ********************************** //container := containerFromProto(&req.Container)// ********************************** NOTICE ********************************** //created, err := store.Create(ctx, container)// ********************************** NOTICE ********************************** //if err != nil {return err}// ********************************** NOTICE ********************************** //resp.Container = containerToProto(&created)// ********************************** NOTICE ********************************** //return nil}); err != nil {return &resp, errdefs.ToGRPC(err)}// ********************************** NOTICE ********************************** //if err := l.publisher.Publish(ctx, "/containers/create", &eventstypes.ContainerCreate{// ********************************** NOTICE ********************************** //ID: resp.Container.ID,Image: resp.Container.Image,Runtime: &eventstypes.ContainerCreate_Runtime{Name: resp.Container.Runtime.Name,Options: resp.Container.Runtime.Options,},}); err != nil {return &resp, err}return &resp, nil}
—2.3.1.1.1) services/containers/local.go#local.withStoreUpdate
func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {return l.db.Update(l.withStore(ctx, fn))}func (l *local) withStore(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) func(tx *bolt.Tx) error {return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewContainerStore(tx)) }}// NewContainerStore returns a Store backed by an underlying bolt DBfunc NewContainerStore(tx *bolt.Tx) containers.Store {return &containerStore{tx: tx,}}
——2.3.1.1.1.1) metadata/db.go#DB.Update
// Update runs a writable transaction on the metadata store.func (m *DB) Update(fn func(*bolt.Tx) error) error {m.wlock.RLock()defer m.wlock.RUnlock()err := m.db.Update(fn)if err == nil {m.dirtyL.Lock()dirty := m.dirtyCS || len(m.dirtySS) > 0for _, fn := range m.mutationCallbacks {fn(dirty)}m.dirtyL.Unlock()}return err}
———2.3.1.1.1.1.1) bbolt/db.go#DB.Update
// Update executes a function within the context of a read-write managed transaction.// If no error is returned from the function then the transaction is committed.// If an error is returned then the entire transaction is rolled back.// Any error that is returned from the function or returned from the commit is// returned from the Update() method.//// Attempting to manually commit or rollback within the function will cause a panic.func (db *DB) Update(fn func(*Tx) error) error {t, err := db.Begin(true)if err != nil {return err}// Make sure the transaction rolls back in the event of a panic.defer func() {if t.db != nil {t.rollback()}}()// Mark as a managed tx so that the inner function cannot manually commit.t.managed = true// If an error is returned from the function then rollback and return error.err = fn(t)t.managed = falseif err != nil {_ = t.Rollback()return err}return t.Commit()}
————2.3.1.1.1.1.1.1) metadata/containers.go#containerStore#Create
func (s *containerStore) Create(ctx context.Context, container containers.Container) (containers.Container, error) {namespace, err := namespaces.NamespaceRequired(ctx)if err != nil {return containers.Container{}, err}if err := validateContainer(&container); err != nil {return containers.Container{}, errors.Wrap(err, "create container failed validation")}bkt, err := createContainersBucket(s.tx, namespace)if err != nil {return containers.Container{}, err}cbkt, err := bkt.CreateBucket([]byte(container.ID))if err != nil {if err == bolt.ErrBucketExists {err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID)}return containers.Container{}, err}container.CreatedAt = time.Now().UTC()container.UpdatedAt = container.CreatedAtif err := writeContainer(cbkt, &container); err != nil {return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID)}return container, nil}
