- client.go#Client.NewContainer
- 1) client.go#Client.ContainerService
- 2) containerstore.go#remoteContainers.Create
client.go#Client.NewContainer
// NewContainer will create a new container in container with the provided id
// the id must be unique within the namespace
func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
ctx, done, err := c.WithLease(ctx)
if err != nil {
return nil, err
}
defer done(ctx)
container := containers.Container{
ID: id,
Runtime: containers.RuntimeInfo{
Name: c.runtime,
},
}
// 这里有两个o,一个是为c赋值spec,另一个是为c赋值Runtime
for _, o := range opts {
if err := o(ctx, c, &container); err != nil {
return nil, err
}
}
// ********************************** NOTICE ********************************** //
r, err := c.ContainerService().Create(ctx, container)
// ********************************** NOTICE ********************************** //
if err != nil {
return nil, err
}
// ********************************** NOTICE ********************************** //
return containerFromRecord(c, r), nil
// ********************************** NOTICE ********************************** //
}
1) client.go#Client.ContainerService
ContainersClient是生成出来的,它一定有两个实现,一个是local(即server),一个是client。
// ContainerService returns the underlying container Store
func (c *Client) ContainerService() containers.Store {
if c.containerStore != nil {
return c.containerStore
}
c.connMu.Lock()
defer c.connMu.Unlock()
return NewRemoteContainerStore(containersapi.NewContainersClient(c.conn))
}
// api/services/containers/v1/containers.pb.go
type ContainersClient interface {
Get(ctx context.Context, in *GetContainerRequest, opts ...grpc.CallOption) (*GetContainerResponse, error)
List(ctx context.Context, in *ListContainersRequest, opts ...grpc.CallOption) (*ListContainersResponse, error)
ListStream(ctx context.Context, in *ListContainersRequest, opts ...grpc.CallOption) (Containers_ListStreamClient, error)
Create(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error)
Update(ctx context.Context, in *UpdateContainerRequest, opts ...grpc.CallOption) (*UpdateContainerResponse, error)
Delete(ctx context.Context, in *DeleteContainerRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error)
}
type containersClient struct {
cc *grpc.ClientConn
}
func NewContainersClient(cc *grpc.ClientConn) ContainersClient {
return &containersClient{cc}
}
// containerstore.go
// NewRemoteContainerStore returns the container Store connected with the provided client
func NewRemoteContainerStore(client containersapi.ContainersClient) containers.Store {
return &remoteContainers{
client: client,
}
type remoteContainers struct {
client containersapi.ContainersClient
}
2) containerstore.go#remoteContainers.Create
// Store interacts with the underlying container storage
type Store interface {
Get(ctx context.Context, id string) (Container, error)
// List returns containers that match one or more of the provided filters.
List(ctx context.Context, filters ...string) ([]Container, error)
// Create a container in the store from the provided container.
Create(ctx context.Context, container Container) (Container, error)
// Update the container with the provided container object. ID must be set.
//
// If one or more fieldpaths are provided, only the field corresponding to
// the fieldpaths will be mutated.
Update(ctx context.Context, container Container, fieldpaths ...string) (Container, error)
// Delete a container using the id.
//
// nil will be returned on success. If the container is not known to the
// store, ErrNotFound will be returned.
Delete(ctx context.Context, id string) error
}
func (r *remoteContainers) Create(ctx context.Context, container containers.Container) (containers.Container, error) {
created, err := r.client.Create(ctx, &containersapi.CreateContainerRequest{
Container: containerToProto(&container),
})
if err != nil {
return containers.Container{}, errdefs.FromGRPC(err)
}
return containerFromProto(&created.Container), nil
}
2.1) containerstore.go#containerToProto
func containerToProto(container *containers.Container) containersapi.Container {
return containersapi.Container{
ID: container.ID,
Labels: container.Labels,
Image: container.Image,
Runtime: &containersapi.Container_Runtime{
Name: container.Runtime.Name,
Options: container.Runtime.Options,
},
Spec: container.Spec,
Snapshotter: container.Snapshotter,
SnapshotKey: container.SnapshotKey,
Extensions: container.Extensions,
}
}
2.2) api/services/containers/v1/containers.pb.go#containersClient.Create
func (c *containersClient) Create(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) {
out := new(CreateContainerResponse)
// ********************************** NOTICE ********************************** //
err := grpc.Invoke(ctx, "/containerd.services.containers.v1.Containers/Create", in, out, c.cc, opts...)
// ********************************** NOTICE ********************************** //
if err != nil {
return nil, err
}
return out, nil
}
2.3) api/services/containers/v1/containers.pb.go#_Containers_Create_Handler
var _Containers_serviceDesc = grpc.ServiceDesc{
ServiceName: "containerd.services.containers.v1.Containers",
HandlerType: (*ContainersServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Get",
Handler: _Containers_Get_Handler,
},
{
MethodName: "List",
Handler: _Containers_List_Handler,
},
{
MethodName: "Create",
Handler: _Containers_Create_Handler,
},
{
MethodName: "Update",
Handler: _Containers_Update_Handler,
},
{
MethodName: "Delete",
Handler: _Containers_Delete_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "ListStream",
Handler: _Containers_ListStream_Handler,
ServerStreams: true,
},
},
Metadata: "github.com/containerd/containerd/api/services/containers/v1/containers.proto",
}
func _Containers_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateContainerRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ContainersServer).Create(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/containerd.services.containers.v1.Containers/Create",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
// ********************************** NOTICE ********************************** //
return srv.(ContainersServer).Create(ctx, req.(*CreateContainerRequest))
// ********************************** NOTICE ********************************** //
}
return interceptor(ctx, in, info, handler)
}
2.3.1) services/containers/service.go#service.Create
type ContainersServer interface {
Get(context.Context, *GetContainerRequest) (*GetContainerResponse, error)
List(context.Context, *ListContainersRequest) (*ListContainersResponse, error)
ListStream(*ListContainersRequest, Containers_ListStreamServer) error
Create(context.Context, *CreateContainerRequest) (*CreateContainerResponse, error)
Update(context.Context, *UpdateContainerRequest) (*UpdateContainerResponse, error)
Delete(context.Context, *DeleteContainerRequest) (*google_protobuf2.Empty, error)
}
func (s *service) Create(ctx context.Context, req *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {
return s.local.Create(ctx, req)
}
2.3.1.1) services/containers/local.go#local.Create
func (l *local) Create(ctx context.Context, req *api.CreateContainerRequest, _ ...grpc.CallOption) (*api.CreateContainerResponse, error) {
var resp api.CreateContainerResponse
// ********************************** NOTICE ********************************** //
if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
// ********************************** NOTICE ********************************** //
container := containerFromProto(&req.Container)
// ********************************** NOTICE ********************************** //
created, err := store.Create(ctx, container)
// ********************************** NOTICE ********************************** //
if err != nil {
return err
}
// ********************************** NOTICE ********************************** //
resp.Container = containerToProto(&created)
// ********************************** NOTICE ********************************** //
return nil
}); err != nil {
return &resp, errdefs.ToGRPC(err)
}
// ********************************** NOTICE ********************************** //
if err := l.publisher.Publish(ctx, "/containers/create", &eventstypes.ContainerCreate{
// ********************************** NOTICE ********************************** //
ID: resp.Container.ID,
Image: resp.Container.Image,
Runtime: &eventstypes.ContainerCreate_Runtime{
Name: resp.Container.Runtime.Name,
Options: resp.Container.Runtime.Options,
},
}); err != nil {
return &resp, err
}
return &resp, nil
}
—2.3.1.1.1) services/containers/local.go#local.withStoreUpdate
func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {
return l.db.Update(l.withStore(ctx, fn))
}
func (l *local) withStore(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) func(tx *bolt.Tx) error {
return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewContainerStore(tx)) }
}
// NewContainerStore returns a Store backed by an underlying bolt DB
func NewContainerStore(tx *bolt.Tx) containers.Store {
return &containerStore{
tx: tx,
}
}
——2.3.1.1.1.1) metadata/db.go#DB.Update
// Update runs a writable transaction on the metadata store.
func (m *DB) Update(fn func(*bolt.Tx) error) error {
m.wlock.RLock()
defer m.wlock.RUnlock()
err := m.db.Update(fn)
if err == nil {
m.dirtyL.Lock()
dirty := m.dirtyCS || len(m.dirtySS) > 0
for _, fn := range m.mutationCallbacks {
fn(dirty)
}
m.dirtyL.Unlock()
}
return err
}
———2.3.1.1.1.1.1) bbolt/db.go#DB.Update
// Update executes a function within the context of a read-write managed transaction.
// If no error is returned from the function then the transaction is committed.
// If an error is returned then the entire transaction is rolled back.
// Any error that is returned from the function or returned from the commit is
// returned from the Update() method.
//
// Attempting to manually commit or rollback within the function will cause a panic.
func (db *DB) Update(fn func(*Tx) error) error {
t, err := db.Begin(true)
if err != nil {
return err
}
// Make sure the transaction rolls back in the event of a panic.
defer func() {
if t.db != nil {
t.rollback()
}
}()
// Mark as a managed tx so that the inner function cannot manually commit.
t.managed = true
// If an error is returned from the function then rollback and return error.
err = fn(t)
t.managed = false
if err != nil {
_ = t.Rollback()
return err
}
return t.Commit()
}
————2.3.1.1.1.1.1.1) metadata/containers.go#containerStore#Create
func (s *containerStore) Create(ctx context.Context, container containers.Container) (containers.Container, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return containers.Container{}, err
}
if err := validateContainer(&container); err != nil {
return containers.Container{}, errors.Wrap(err, "create container failed validation")
}
bkt, err := createContainersBucket(s.tx, namespace)
if err != nil {
return containers.Container{}, err
}
cbkt, err := bkt.CreateBucket([]byte(container.ID))
if err != nil {
if err == bolt.ErrBucketExists {
err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID)
}
return containers.Container{}, err
}
container.CreatedAt = time.Now().UTC()
container.UpdatedAt = container.CreatedAt
if err := writeContainer(cbkt, &container); err != nil {
return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID)
}
return container, nil
}