client.go#Client.NewContainer

  1. // NewContainer will create a new container in container with the provided id
  2. // the id must be unique within the namespace
  3. func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
  4. ctx, done, err := c.WithLease(ctx)
  5. if err != nil {
  6. return nil, err
  7. }
  8. defer done(ctx)
  9. container := containers.Container{
  10. ID: id,
  11. Runtime: containers.RuntimeInfo{
  12. Name: c.runtime,
  13. },
  14. }
  15. // 这里有两个o,一个是为c赋值spec,另一个是为c赋值Runtime
  16. for _, o := range opts {
  17. if err := o(ctx, c, &container); err != nil {
  18. return nil, err
  19. }
  20. }
  21. // ********************************** NOTICE ********************************** //
  22. r, err := c.ContainerService().Create(ctx, container)
  23. // ********************************** NOTICE ********************************** //
  24. if err != nil {
  25. return nil, err
  26. }
  27. // ********************************** NOTICE ********************************** //
  28. return containerFromRecord(c, r), nil
  29. // ********************************** NOTICE ********************************** //
  30. }

1) client.go#Client.ContainerService

image.png
ContainersClient是生成出来的,它一定有两个实现,一个是local(即server),一个是client。

  1. // ContainerService returns the underlying container Store
  2. func (c *Client) ContainerService() containers.Store {
  3. if c.containerStore != nil {
  4. return c.containerStore
  5. }
  6. c.connMu.Lock()
  7. defer c.connMu.Unlock()
  8. return NewRemoteContainerStore(containersapi.NewContainersClient(c.conn))
  9. }
  10. // api/services/containers/v1/containers.pb.go
  11. type ContainersClient interface {
  12. Get(ctx context.Context, in *GetContainerRequest, opts ...grpc.CallOption) (*GetContainerResponse, error)
  13. List(ctx context.Context, in *ListContainersRequest, opts ...grpc.CallOption) (*ListContainersResponse, error)
  14. ListStream(ctx context.Context, in *ListContainersRequest, opts ...grpc.CallOption) (Containers_ListStreamClient, error)
  15. Create(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error)
  16. Update(ctx context.Context, in *UpdateContainerRequest, opts ...grpc.CallOption) (*UpdateContainerResponse, error)
  17. Delete(ctx context.Context, in *DeleteContainerRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error)
  18. }
  19. type containersClient struct {
  20. cc *grpc.ClientConn
  21. }
  22. func NewContainersClient(cc *grpc.ClientConn) ContainersClient {
  23. return &containersClient{cc}
  24. }
  25. // containerstore.go
  26. // NewRemoteContainerStore returns the container Store connected with the provided client
  27. func NewRemoteContainerStore(client containersapi.ContainersClient) containers.Store {
  28. return &remoteContainers{
  29. client: client,
  30. }
  31. type remoteContainers struct {
  32. client containersapi.ContainersClient
  33. }

2) containerstore.go#remoteContainers.Create

  1. // Store interacts with the underlying container storage
  2. type Store interface {
  3. Get(ctx context.Context, id string) (Container, error)
  4. // List returns containers that match one or more of the provided filters.
  5. List(ctx context.Context, filters ...string) ([]Container, error)
  6. // Create a container in the store from the provided container.
  7. Create(ctx context.Context, container Container) (Container, error)
  8. // Update the container with the provided container object. ID must be set.
  9. //
  10. // If one or more fieldpaths are provided, only the field corresponding to
  11. // the fieldpaths will be mutated.
  12. Update(ctx context.Context, container Container, fieldpaths ...string) (Container, error)
  13. // Delete a container using the id.
  14. //
  15. // nil will be returned on success. If the container is not known to the
  16. // store, ErrNotFound will be returned.
  17. Delete(ctx context.Context, id string) error
  18. }
  19. func (r *remoteContainers) Create(ctx context.Context, container containers.Container) (containers.Container, error) {
  20. created, err := r.client.Create(ctx, &containersapi.CreateContainerRequest{
  21. Container: containerToProto(&container),
  22. })
  23. if err != nil {
  24. return containers.Container{}, errdefs.FromGRPC(err)
  25. }
  26. return containerFromProto(&created.Container), nil
  27. }

2.1) containerstore.go#containerToProto

  1. func containerToProto(container *containers.Container) containersapi.Container {
  2. return containersapi.Container{
  3. ID: container.ID,
  4. Labels: container.Labels,
  5. Image: container.Image,
  6. Runtime: &containersapi.Container_Runtime{
  7. Name: container.Runtime.Name,
  8. Options: container.Runtime.Options,
  9. },
  10. Spec: container.Spec,
  11. Snapshotter: container.Snapshotter,
  12. SnapshotKey: container.SnapshotKey,
  13. Extensions: container.Extensions,
  14. }
  15. }

2.2) api/services/containers/v1/containers.pb.go#containersClient.Create

  1. func (c *containersClient) Create(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) {
  2. out := new(CreateContainerResponse)
  3. // ********************************** NOTICE ********************************** //
  4. err := grpc.Invoke(ctx, "/containerd.services.containers.v1.Containers/Create", in, out, c.cc, opts...)
  5. // ********************************** NOTICE ********************************** //
  6. if err != nil {
  7. return nil, err
  8. }
  9. return out, nil
  10. }

2.3) api/services/containers/v1/containers.pb.go#_Containers_Create_Handler

  1. var _Containers_serviceDesc = grpc.ServiceDesc{
  2. ServiceName: "containerd.services.containers.v1.Containers",
  3. HandlerType: (*ContainersServer)(nil),
  4. Methods: []grpc.MethodDesc{
  5. {
  6. MethodName: "Get",
  7. Handler: _Containers_Get_Handler,
  8. },
  9. {
  10. MethodName: "List",
  11. Handler: _Containers_List_Handler,
  12. },
  13. {
  14. MethodName: "Create",
  15. Handler: _Containers_Create_Handler,
  16. },
  17. {
  18. MethodName: "Update",
  19. Handler: _Containers_Update_Handler,
  20. },
  21. {
  22. MethodName: "Delete",
  23. Handler: _Containers_Delete_Handler,
  24. },
  25. },
  26. Streams: []grpc.StreamDesc{
  27. {
  28. StreamName: "ListStream",
  29. Handler: _Containers_ListStream_Handler,
  30. ServerStreams: true,
  31. },
  32. },
  33. Metadata: "github.com/containerd/containerd/api/services/containers/v1/containers.proto",
  34. }
  35. func _Containers_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
  36. in := new(CreateContainerRequest)
  37. if err := dec(in); err != nil {
  38. return nil, err
  39. }
  40. if interceptor == nil {
  41. return srv.(ContainersServer).Create(ctx, in)
  42. }
  43. info := &grpc.UnaryServerInfo{
  44. Server: srv,
  45. FullMethod: "/containerd.services.containers.v1.Containers/Create",
  46. }
  47. handler := func(ctx context.Context, req interface{}) (interface{}, error) {
  48. // ********************************** NOTICE ********************************** //
  49. return srv.(ContainersServer).Create(ctx, req.(*CreateContainerRequest))
  50. // ********************************** NOTICE ********************************** //
  51. }
  52. return interceptor(ctx, in, info, handler)
  53. }

2.3.1) services/containers/service.go#service.Create

  1. type ContainersServer interface {
  2. Get(context.Context, *GetContainerRequest) (*GetContainerResponse, error)
  3. List(context.Context, *ListContainersRequest) (*ListContainersResponse, error)
  4. ListStream(*ListContainersRequest, Containers_ListStreamServer) error
  5. Create(context.Context, *CreateContainerRequest) (*CreateContainerResponse, error)
  6. Update(context.Context, *UpdateContainerRequest) (*UpdateContainerResponse, error)
  7. Delete(context.Context, *DeleteContainerRequest) (*google_protobuf2.Empty, error)
  8. }
  9. func (s *service) Create(ctx context.Context, req *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {
  10. return s.local.Create(ctx, req)
  11. }

2.3.1.1) services/containers/local.go#local.Create

  1. func (l *local) Create(ctx context.Context, req *api.CreateContainerRequest, _ ...grpc.CallOption) (*api.CreateContainerResponse, error) {
  2. var resp api.CreateContainerResponse
  3. // ********************************** NOTICE ********************************** //
  4. if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
  5. // ********************************** NOTICE ********************************** //
  6. container := containerFromProto(&req.Container)
  7. // ********************************** NOTICE ********************************** //
  8. created, err := store.Create(ctx, container)
  9. // ********************************** NOTICE ********************************** //
  10. if err != nil {
  11. return err
  12. }
  13. // ********************************** NOTICE ********************************** //
  14. resp.Container = containerToProto(&created)
  15. // ********************************** NOTICE ********************************** //
  16. return nil
  17. }); err != nil {
  18. return &resp, errdefs.ToGRPC(err)
  19. }
  20. // ********************************** NOTICE ********************************** //
  21. if err := l.publisher.Publish(ctx, "/containers/create", &eventstypes.ContainerCreate{
  22. // ********************************** NOTICE ********************************** //
  23. ID: resp.Container.ID,
  24. Image: resp.Container.Image,
  25. Runtime: &eventstypes.ContainerCreate_Runtime{
  26. Name: resp.Container.Runtime.Name,
  27. Options: resp.Container.Runtime.Options,
  28. },
  29. }); err != nil {
  30. return &resp, err
  31. }
  32. return &resp, nil
  33. }

—2.3.1.1.1) services/containers/local.go#local.withStoreUpdate

  1. func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {
  2. return l.db.Update(l.withStore(ctx, fn))
  3. }
  4. func (l *local) withStore(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) func(tx *bolt.Tx) error {
  5. return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewContainerStore(tx)) }
  6. }
  7. // NewContainerStore returns a Store backed by an underlying bolt DB
  8. func NewContainerStore(tx *bolt.Tx) containers.Store {
  9. return &containerStore{
  10. tx: tx,
  11. }
  12. }

——2.3.1.1.1.1) metadata/db.go#DB.Update

  1. // Update runs a writable transaction on the metadata store.
  2. func (m *DB) Update(fn func(*bolt.Tx) error) error {
  3. m.wlock.RLock()
  4. defer m.wlock.RUnlock()
  5. err := m.db.Update(fn)
  6. if err == nil {
  7. m.dirtyL.Lock()
  8. dirty := m.dirtyCS || len(m.dirtySS) > 0
  9. for _, fn := range m.mutationCallbacks {
  10. fn(dirty)
  11. }
  12. m.dirtyL.Unlock()
  13. }
  14. return err
  15. }

———2.3.1.1.1.1.1) bbolt/db.go#DB.Update

  1. // Update executes a function within the context of a read-write managed transaction.
  2. // If no error is returned from the function then the transaction is committed.
  3. // If an error is returned then the entire transaction is rolled back.
  4. // Any error that is returned from the function or returned from the commit is
  5. // returned from the Update() method.
  6. //
  7. // Attempting to manually commit or rollback within the function will cause a panic.
  8. func (db *DB) Update(fn func(*Tx) error) error {
  9. t, err := db.Begin(true)
  10. if err != nil {
  11. return err
  12. }
  13. // Make sure the transaction rolls back in the event of a panic.
  14. defer func() {
  15. if t.db != nil {
  16. t.rollback()
  17. }
  18. }()
  19. // Mark as a managed tx so that the inner function cannot manually commit.
  20. t.managed = true
  21. // If an error is returned from the function then rollback and return error.
  22. err = fn(t)
  23. t.managed = false
  24. if err != nil {
  25. _ = t.Rollback()
  26. return err
  27. }
  28. return t.Commit()
  29. }

————2.3.1.1.1.1.1.1) metadata/containers.go#containerStore#Create

  1. func (s *containerStore) Create(ctx context.Context, container containers.Container) (containers.Container, error) {
  2. namespace, err := namespaces.NamespaceRequired(ctx)
  3. if err != nil {
  4. return containers.Container{}, err
  5. }
  6. if err := validateContainer(&container); err != nil {
  7. return containers.Container{}, errors.Wrap(err, "create container failed validation")
  8. }
  9. bkt, err := createContainersBucket(s.tx, namespace)
  10. if err != nil {
  11. return containers.Container{}, err
  12. }
  13. cbkt, err := bkt.CreateBucket([]byte(container.ID))
  14. if err != nil {
  15. if err == bolt.ErrBucketExists {
  16. err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID)
  17. }
  18. return containers.Container{}, err
  19. }
  20. container.CreatedAt = time.Now().UTC()
  21. container.UpdatedAt = container.CreatedAt
  22. if err := writeContainer(cbkt, &container); err != nil {
  23. return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID)
  24. }
  25. return container, nil
  26. }