- runC config.json文件示例
- 1) container.go#container.NewTask
- 1.1) api/services/tasks/v1/tasks.pb.go#tasksClient.Create
- 1.1.1) services/tasks/local.go#local.Create
- 1.1.1.1) runtime/v1/linux/runtime.go#Runtime.Create
- —1.1.1.1.1-创建bundle和config.json) runtime/v1/linux/bundle.go#newBundle
- —1.1.1.1.2) runtime/v1/linux/bundle.go#bundle.NewShimClient
- —1.1.1.1.3) runtime/v1/shim/local.go#local.Create
- ——1.1.1.1.3.1) runtime/v1/shim/service.go#Service.Create
- ———1.1.1.1.3.1.1) runtime/v1/shim/service.go#newInit
- ———1.1.1.1.3.1.2) runtime/v1/linux/proc/init.go#Init.Create
- 1.1.1) services/tasks/local.go#local.Create
- 1.1) api/services/tasks/v1/tasks.pb.go#tasksClient.Create
- 2) task.go#task.Start
runC config.json文件示例
/run/containerd/io.containerd.runtime.v1.linux/moby/790b6f02b02464bd9301125fb83eee5c9527f411433346dd194b74eb5f096e44/config.json
.
├── config.json
├── init.pid
├── log.json
└── rootfs
简化
{
"ociVersion": "1.0.1-dev",
"process": {
"user": {
"uid": 0,
"gid": 0,
"additionalGids": [
10
]
},
"args": [
"sleep",
"24h"
],
"env": [
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
"HOSTNAME=790b6f02b024"
],
"cwd": "/",
"oomScoreAdj": 0
},
"root": {
"path": "/var/lib/docker/aufs/mnt/071238d45a177890efadf9dbd1f66c505808e77e4c3bba6355323081ac269351"
},
"hostname": "790b6f02b024",
"mounts": [
{
"destination": "/proc",
"type": "proc",
"source": "proc",
"options": [
"nosuid",
"noexec",
"nodev"
]
},
{
"destination": "/dev",
"type": "tmpfs",
"source": "tmpfs",
"options": [
"nosuid",
"strictatime",
"mode=755",
"size=65536k"
]
},
{
"destination": "/dev/pts",
"type": "devpts",
"source": "devpts",
"options": [
"nosuid",
"noexec",
"newinstance",
"ptmxmode=0666",
"mode=0620",
"gid=5"
]
},
{
"destination": "/sys",
"type": "sysfs",
"source": "sysfs",
"options": [
"nosuid",
"noexec",
"nodev",
"ro"
]
},
{
"destination": "/sys/fs/cgroup",
"type": "cgroup",
"source": "cgroup",
"options": [
"ro",
"nosuid",
"noexec",
"nodev"
]
},
{
"destination": "/dev/mqueue",
"type": "mqueue",
"source": "mqueue",
"options": [
"nosuid",
"noexec",
"nodev"
]
},
{
"destination": "/etc/resolv.conf",
"type": "bind",
"source": "/var/lib/docker/containers/790b6f02b02464bd9301125fb83eee5c9527f411433346dd194b74eb5f096e44/resolv.conf",
"options": [
"rbind",
"rprivate"
]
},
{
"destination": "/etc/hostname",
"type": "bind",
"source": "/var/lib/docker/containers/790b6f02b02464bd9301125fb83eee5c9527f411433346dd194b74eb5f096e44/hostname",
"options": [
"rbind",
"rprivate"
]
},
{
"destination": "/etc/hosts",
"type": "bind",
"source": "/var/lib/docker/containers/790b6f02b02464bd9301125fb83eee5c9527f411433346dd194b74eb5f096e44/hosts",
"options": [
"rbind",
"rprivate"
]
},
{
"destination": "/dev/shm",
"type": "bind",
"source": "/var/lib/docker/containers/790b6f02b02464bd9301125fb83eee5c9527f411433346dd194b74eb5f096e44/mounts/shm",
"options": [
"rbind",
"rprivate"
]
}
],
"hooks": {
"prestart": [
{
"path": "/proc/2325/exe",
"args": [
"libnetwork-setkey",
"790b6f02b02464bd9301125fb83eee5c9527f411433346dd194b74eb5f096e44",
"c2a68d82d6cfefcc3404f3a2ce3a5bba4204f518d27da191ae7f7359619cd6c3"
]
}
]
},
"linux": {
"resources": {
"devices": [
{
"allow": false,
"access": "rwm"
},
{
"allow": true,
"type": "c",
"major": 1,
"minor": 5,
"access": "rwm"
},
{
"allow": true,
"type": "c",
"major": 1,
"minor": 3,
"access": "rwm"
},
{
"allow": true,
"type": "c",
"major": 1,
"minor": 9,
"access": "rwm"
},
{
"allow": true,
"type": "c",
"major": 1,
"minor": 8,
"access": "rwm"
},
{
"allow": true,
"type": "c",
"major": 5,
"minor": 0,
"access": "rwm"
},
{
"allow": true,
"type": "c",
"major": 5,
"minor": 1,
"access": "rwm"
},
{
"allow": false,
"type": "c",
"major": 10,
"minor": 229,
"access": "rwm"
}
],
"memory": {
"disableOOMKiller": false
},
"cpu": {
"shares": 0
},
"pids": {
"limit": 0
},
"blockIO": {
"weight": 0
}
},
"cgroupsPath": "/docker/790b6f02b02464bd9301125fb83eee5c9527f411433346dd194b74eb5f096e44",
"namespaces": [
{
"type": "mount"
},
{
"type": "network"
},
{
"type": "uts"
},
{
"type": "pid"
},
{
"type": "ipc"
}
],
"maskedPaths": [
"/proc/asound",
"/proc/acpi",
"/proc/kcore",
"/proc/keys",
"/proc/latency_stats",
"/proc/timer_list",
"/proc/timer_stats",
"/proc/sched_debug",
"/proc/scsi",
"/sys/firmware"
],
"readonlyPaths": [
"/proc/bus",
"/proc/fs",
"/proc/irq",
"/proc/sys",
"/proc/sysrq-trigger"
]
}
}
1) container.go#container.NewTask
func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts) (_ Task, err error) {
i, err := ioCreate(c.id)
if err != nil {
return nil, err
}
defer func() {
if err != nil && i != nil {
i.Cancel()
i.Close()
}
}()
cfg := i.Config()
request := &tasks.CreateTaskRequest{
ContainerID: c.id,
Terminal: cfg.Terminal,
Stdin: cfg.Stdin,
Stdout: cfg.Stdout,
Stderr: cfg.Stderr,
}
r, err := c.get(ctx)
if err != nil {
return nil, err
}
if r.SnapshotKey != "" {
if r.Snapshotter == "" {
return nil, errors.Wrapf(errdefs.ErrInvalidArgument, "unable to resolve rootfs mounts without snapshotter on container")
}
// get the rootfs from the snapshotter and add it to the request
mounts, err := c.client.SnapshotService(r.Snapshotter).Mounts(ctx, r.SnapshotKey)
if err != nil {
return nil, err
}
for _, m := range mounts {
request.Rootfs = append(request.Rootfs, &types.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
})
}
}
var info TaskInfo
for _, o := range opts {
if err := o(ctx, c.client, &info); err != nil {
return nil, err
}
}
if info.RootFS != nil {
for _, m := range info.RootFS {
request.Rootfs = append(request.Rootfs, &types.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
})
}
}
if info.Options != nil {
any, err := typeurl.MarshalAny(info.Options)
if err != nil {
return nil, err
}
request.Options = any
}
t := &task{
client: c.client,
io: i,
id: c.id,
}
if info.Checkpoint != nil {
request.Checkpoint = info.Checkpoint
}
// ********************************** NOTICE ********************************** //
response, err := c.client.TaskService().Create(ctx, request)
// ********************************** NOTICE ********************************** //
if err != nil {
return nil, errdefs.FromGRPC(err)
}
t.pid = response.Pid
return t, nil
}
1.1) api/services/tasks/v1/tasks.pb.go#tasksClient.Create
func (c *tasksClient) Create(ctx context.Context, in *CreateTaskRequest, opts ...grpc.CallOption) (*CreateTaskResponse, error) {
out := new(CreateTaskResponse)
err := grpc.Invoke(ctx, "/containerd.services.tasks.v1.Tasks/Create", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
1.1.1) services/tasks/local.go#local.Create
func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {
container, err := l.getContainer(ctx, r.ContainerID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
checkpointPath, err := getRestorePath(container.Runtime.Name, r.Options)
if err != nil {
return nil, err
}
// jump get checkpointPath from checkpoint image
if checkpointPath != "" && r.Checkpoint != nil {
checkpointPath, err = ioutil.TempDir(os.Getenv("XDG_RUNTIME_DIR"), "ctrd-checkpoint")
if err != nil {
return nil, err
}
if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint {
return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType)
}
reader, err := l.store.ReaderAt(ctx, ocispec.Descriptor{
MediaType: r.Checkpoint.MediaType,
Digest: r.Checkpoint.Digest,
Size: r.Checkpoint.Size_,
})
if err != nil {
return nil, err
}
_, err = archive.Apply(ctx, checkpointPath, content.NewReader(reader))
reader.Close()
if err != nil {
return nil, err
}
}
opts := runtime.CreateOpts{
Spec: container.Spec,
IO: runtime.IO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: checkpointPath,
Runtime: container.Runtime.Name,
RuntimeOptions: container.Runtime.Options,
TaskOptions: r.Options,
}
for _, m := range r.Rootfs {
opts.Rootfs = append(opts.Rootfs, mount.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
})
}
// linux
// ********************************** NOTICE ********************************** //
rtime, err := l.getRuntime(container.Runtime.Name)
// ********************************** NOTICE ********************************** //
if err != nil {
return nil, err
}
if _, err := rtime.Get(ctx, r.ContainerID); err != runtime.ErrTaskNotExists {
return nil, errdefs.ToGRPC(fmt.Errorf("task %s already exists", r.ContainerID))
}
// ********************************** NOTICE ********************************** //
c, err := rtime.Create(ctx, r.ContainerID, opts)
// ********************************** NOTICE ********************************** //
if err != nil {
return nil, errdefs.ToGRPC(err)
}
// TODO: fast path for getting pid on create
if err := l.monitor.Monitor(c); err != nil {
return nil, errors.Wrap(err, "monitor task")
}
state, err := c.State(ctx)
if err != nil {
log.G(ctx).Error(err)
}
return &api.CreateTaskResponse{
ContainerID: r.ContainerID,
Pid: state.Pid,
}, nil
}
1.1.1.1) runtime/v1/linux/runtime.go#Runtime.Create
// Create a new task
func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, err error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
if err := identifiers.Validate(id); err != nil {
return nil, errors.Wrapf(err, "invalid task id")
}
ropts, err := r.getRuncOptions(ctx, id)
if err != nil {
return nil, err
}
// ********************************** NOTICE ********************************** //
bundle, err := newBundle(id,
filepath.Join(r.state, namespace),
filepath.Join(r.root, namespace),
opts.Spec.Value)
// ********************************** NOTICE ********************************** //
if err != nil {
return nil, err
}
defer func() {
if err != nil {
bundle.Delete()
}
}()
shimopt := ShimLocal(r.config, r.events)
if !r.config.NoShim {
var cgroup string
if opts.TaskOptions != nil {
v, err := typeurl.UnmarshalAny(opts.TaskOptions)
if err != nil {
return nil, err
}
cgroup = v.(*runctypes.CreateOptions).ShimCgroup
}
exitHandler := func() {
log.G(ctx).WithField("id", id).Info("shim reaped")
t, err := r.tasks.Get(ctx, id)
if err != nil {
// Task was never started or was already successfully deleted
return
}
lc := t.(*Task)
log.G(ctx).WithFields(logrus.Fields{
"id": id,
"namespace": namespace,
}).Warn("cleaning up after killed shim")
if err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid); err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": namespace,
}).Warn("failed to clean up after killed shim")
}
}
shimopt = ShimRemote(r.config, r.address, cgroup, exitHandler)
}
// ********************************** NOTICE ********************************** //
s, err := bundle.NewShimClient(ctx, namespace, shimopt, ropts)
// ********************************** NOTICE ********************************** //
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if kerr := s.KillShim(ctx); kerr != nil {
log.G(ctx).WithError(err).Error("failed to kill shim")
}
}
}()
rt := r.config.Runtime
if ropts != nil && ropts.Runtime != "" {
rt = ropts.Runtime
}
sopts := &shim.CreateTaskRequest{
ID: id,
Bundle: bundle.path,
Runtime: rt,
Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout,
Stderr: opts.IO.Stderr,
Terminal: opts.IO.Terminal,
Checkpoint: opts.Checkpoint,
Options: opts.TaskOptions,
}
for _, m := range opts.Rootfs {
sopts.Rootfs = append(sopts.Rootfs, &types.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
})
}
// ********************************** NOTICE ********************************** //
cr, err := s.Create(ctx, sopts)
// ********************************** NOTICE ********************************** //
if err != nil {
return nil, errdefs.FromGRPC(err)
}
// ********************************** NOTICE ********************************** //
t, err := newTask(id, namespace, int(cr.Pid), s, r.events, r.tasks, bundle)
// ********************************** NOTICE ********************************** //
if err != nil {
return nil, err
}
if err := r.tasks.Add(ctx, t); err != nil {
return nil, err
}
r.events.Publish(ctx, runtime.TaskCreateEventTopic, &eventstypes.TaskCreate{
ContainerID: sopts.ID,
Bundle: sopts.Bundle,
Rootfs: sopts.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: sopts.Stdin,
Stdout: sopts.Stdout,
Stderr: sopts.Stderr,
Terminal: sopts.Terminal,
},
Checkpoint: sopts.Checkpoint,
Pid: uint32(t.pid),
})
return t, nil
}
—1.1.1.1.1-创建bundle和config.json) runtime/v1/linux/bundle.go#newBundle
// newBundle creates a new bundle on disk at the provided path for the given id
func newBundle(id, path, workDir string, spec []byte) (b *bundle, err error) {
if err := os.MkdirAll(path, 0711); err != nil {
return nil, err
}
path = filepath.Join(path, id)
if err := os.Mkdir(path, 0711); err != nil {
return nil, err
}
defer func() {
if err != nil {
os.RemoveAll(path)
}
}()
workDir = filepath.Join(workDir, id)
if err := os.MkdirAll(workDir, 0711); err != nil {
return nil, err
}
defer func() {
if err != nil {
os.RemoveAll(workDir)
}
}()
if err := os.Mkdir(filepath.Join(path, "rootfs"), 0711); err != nil {
return nil, err
}
err = ioutil.WriteFile(filepath.Join(path, configFilename), spec, 0666)
return &bundle{
id: id,
path: path,
workDir: workDir,
}, err
}
—1.1.1.1.2) runtime/v1/linux/bundle.go#bundle.NewShimClient
// NewShimClient connects to the shim managing the bundle and tasks creating it if needed
func (b *bundle) NewShimClient(ctx context.Context, namespace string, getClientOpts ShimOpt, runcOpts *runctypes.RuncOptions) (*client.Client, error) {
cfg, opt := getClientOpts(b, namespace, runcOpts)
return client.New(ctx, cfg, opt)
}
// getClientOpts
// ShimLocal is a ShimOpt for using an in process shim implementation
func ShimLocal(c *Config, exchange *exchange.Exchange) ShimOpt {
return func(b *bundle, ns string, ropts *runctypes.RuncOptions) (shim.Config, client.Opt) {
return b.shimConfig(ns, c, ropts), client.WithLocal(exchange)
}
}
func (b *bundle) shimConfig(namespace string, c *Config, runcOptions *runctypes.RuncOptions) shim.Config {
var (
criuPath string
runtimeRoot = c.RuntimeRoot
systemdCgroup bool
)
if runcOptions != nil {
criuPath = runcOptions.CriuPath
systemdCgroup = runcOptions.SystemdCgroup
if runcOptions.RuntimeRoot != "" {
runtimeRoot = runcOptions.RuntimeRoot
}
}
return shim.Config{
Path: b.path,
WorkDir: b.workDir,
Namespace: namespace,
Criu: criuPath,
RuntimeRoot: runtimeRoot,
SystemdCgroup: systemdCgroup,
}
}
// WithLocal uses an in process shim
func WithLocal(publisher events.Publisher) func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error) {
return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) {
service, err := shim.NewService(config, publisher)
if err != nil {
return nil, nil, err
}
return shim.NewLocal(service), nil, nil
}
}
// NewService returns a new shim service that can be used via GRPC
func NewService(config Config, publisher events.Publisher) (*Service, error) {
if config.Namespace == "" {
return nil, fmt.Errorf("shim namespace cannot be empty")
}
ctx := namespaces.WithNamespace(context.Background(), config.Namespace)
ctx = log.WithLogger(ctx, logrus.WithFields(logrus.Fields{
"namespace": config.Namespace,
"path": config.Path,
"pid": os.Getpid(),
}))
s := &Service{
config: config,
context: ctx,
processes: make(map[string]rproc.Process),
events: make(chan interface{}, 128),
ec: Default.Subscribe(),
}
go s.processExits()
if err := s.initPlatform(); err != nil {
return nil, errors.Wrap(err, "failed to initialized platform behavior")
}
go s.forward(publisher)
return s, nil
}
—1.1.1.1.3) runtime/v1/shim/local.go#local.Create
type ShimService interface {
State(ctx context.Context, req *StateRequest) (*StateResponse, error)
Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error)
Start(ctx context.Context, req *StartRequest) (*StartResponse, error)
Delete(ctx context.Context, req *google_protobuf1.Empty) (*DeleteResponse, error)
DeleteProcess(ctx context.Context, req *DeleteProcessRequest) (*DeleteResponse, error)
ListPids(ctx context.Context, req *ListPidsRequest) (*ListPidsResponse, error)
Pause(ctx context.Context, req *google_protobuf1.Empty) (*google_protobuf1.Empty, error)
Resume(ctx context.Context, req *google_protobuf1.Empty) (*google_protobuf1.Empty, error)
Checkpoint(ctx context.Context, req *CheckpointTaskRequest) (*google_protobuf1.Empty, error)
Kill(ctx context.Context, req *KillRequest) (*google_protobuf1.Empty, error)
Exec(ctx context.Context, req *ExecProcessRequest) (*google_protobuf1.Empty, error)
ResizePty(ctx context.Context, req *ResizePtyRequest) (*google_protobuf1.Empty, error)
CloseIO(ctx context.Context, req *CloseIORequest) (*google_protobuf1.Empty, error)
ShimInfo(ctx context.Context, req *google_protobuf1.Empty) (*ShimInfoResponse, error)
Update(ctx context.Context, req *UpdateTaskRequest) (*google_protobuf1.Empty, error)
Wait(ctx context.Context, req *WaitRequest) (*WaitResponse, error)
}
func (c *local) Create(ctx context.Context, in *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
return c.s.Create(ctx, in)
}
——1.1.1.1.3.1) runtime/v1/shim/service.go#Service.Create
// Create a new initial process and container with the underlying OCI runtime
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *shimapi.CreateTaskResponse, err error) {
var mounts []proc.Mount
for _, m := range r.Rootfs {
mounts = append(mounts, proc.Mount{
Type: m.Type,
Source: m.Source,
Target: m.Target,
Options: m.Options,
})
}
config := &proc.CreateConfig{
ID: r.ID,
Bundle: r.Bundle,
Runtime: r.Runtime,
Rootfs: mounts,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Checkpoint: r.Checkpoint,
ParentCheckpoint: r.ParentCheckpoint,
Options: r.Options,
}
rootfs := filepath.Join(r.Bundle, "rootfs")
defer func() {
if err != nil {
if err2 := mount.UnmountAll(rootfs, 0); err2 != nil {
log.G(ctx).WithError(err2).Warn("Failed to cleanup rootfs mount")
}
}
}()
for _, rm := range mounts {
m := &mount.Mount{
Type: rm.Type,
Source: rm.Source,
Options: rm.Options,
}
if err := m.Mount(rootfs); err != nil {
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
}
}
s.mu.Lock()
defer s.mu.Unlock()
// ********************************** NOTICE ********************************** //
process, err := newInit(
ctx,
s.config.Path,
s.config.WorkDir,
s.config.RuntimeRoot,
s.config.Namespace,
s.config.Criu,
s.config.SystemdCgroup,
s.platform,
config,
)
// ********************************** NOTICE ********************************** //
if err != nil {
return nil, errdefs.ToGRPC(err)
}
// ********************************** NOTICE ********************************** //
if err := process.Create(ctx, config); err != nil {
// ********************************** NOTICE ********************************** //
return nil, errdefs.ToGRPC(err)
}
// save the main task id and bundle to the shim for additional requests
s.id = r.ID
s.bundle = r.Bundle
pid := process.Pid()
s.processes[r.ID] = process
return &shimapi.CreateTaskResponse{
Pid: uint32(pid),
}, nil
}
———1.1.1.1.3.1.1) runtime/v1/shim/service.go#newInit
func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace, criu string, systemdCgroup bool, platform rproc.Platform, r *proc.CreateConfig) (*proc.Init, error) {
var options runctypes.CreateOptions
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return nil, err
}
options = *v.(*runctypes.CreateOptions)
}
rootfs := filepath.Join(path, "rootfs")
// ********************************** NOTICE ********************************** //
runtime := proc.NewRunc(runtimeRoot, path, namespace, r.Runtime, criu, systemdCgroup)
// ********************************** NOTICE ********************************** //
p := proc.New(r.ID, runtime, rproc.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
})
// ********************************** NOTICE ********************************** //
p.Bundle = r.Bundle
p.Platform = platform
p.Rootfs = rootfs
p.WorkDir = workDir
p.IoUID = int(options.IoUid)
p.IoGID = int(options.IoGid)
p.NoPivotRoot = options.NoPivotRoot
p.NoNewKeyring = options.NoNewKeyring
p.CriuWorkPath = options.CriuWorkPath
if p.CriuWorkPath == "" {
// if criu work path not set, use container WorkDir
p.CriuWorkPath = p.WorkDir
}
return p, nil
}
// NewRunc returns a new runc instance for a process
func NewRunc(root, path, namespace, runtime, criu string, systemd bool) *runc.Runc {
if root == "" {
root = RuncRoot
}
return &runc.Runc{
Command: runtime,
Log: filepath.Join(path, "log.json"),
LogFormat: runc.JSON,
PdeathSignal: syscall.SIGKILL,
Root: filepath.Join(root, namespace),
Criu: criu,
SystemdCgroup: systemd,
}
}
// New returns a new process
func New(id string, runtime *runc.Runc, stdio proc.Stdio) *Init {
p := &Init{
id: id,
runtime: runtime,
stdio: stdio,
status: 0,
waitBlock: make(chan struct{}),
}
p.initState = &createdState{p: p}
return p
}
———1.1.1.1.3.1.2) runtime/v1/linux/proc/init.go#Init.Create
// Create the process with the provided config
func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
var (
err error
socket *runc.Socket
)
if r.Terminal {
if socket, err = runc.NewTempConsoleSocket(); err != nil {
return errors.Wrap(err, "failed to create OCI runtime console socket")
}
defer socket.Close()
} else if hasNoIO(r) {
if p.io, err = runc.NewNullIO(); err != nil {
return errors.Wrap(err, "creating new NULL IO")
}
} else {
if p.io, err = runc.NewPipeIO(p.IoUID, p.IoGID, withConditionalIO(p.stdio)); err != nil {
return errors.Wrap(err, "failed to create OCI runtime io pipes")
}
}
pidFile := filepath.Join(p.Bundle, InitPidFile)
if r.Checkpoint != "" {
opts := &runc.RestoreOpts{
CheckpointOpts: runc.CheckpointOpts{
ImagePath: r.Checkpoint,
WorkDir: p.CriuWorkPath,
ParentPath: r.ParentCheckpoint,
},
PidFile: pidFile,
IO: p.io,
NoPivot: p.NoPivotRoot,
Detach: true,
NoSubreaper: true,
}
p.initState = &createdCheckpointState{
p: p,
opts: opts,
}
return nil
}
opts := &runc.CreateOpts{
PidFile: pidFile,
IO: p.io,
NoPivot: p.NoPivotRoot,
NoNewKeyring: p.NoNewKeyring,
}
if socket != nil {
opts.ConsoleSocket = socket
}
// ********************************** NOTICE ********************************** //
if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil {
// ********************************** NOTICE ********************************** //
return p.runtimeError(err, "OCI runtime create failed")
}
if r.Stdin != "" {
sc, err := fifo.OpenFifo(context.Background(), r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", r.Stdin)
}
p.stdin = sc
p.closers = append(p.closers, sc)
}
var copyWaitGroup sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if socket != nil {
console, err := socket.ReceiveMaster()
if err != nil {
return errors.Wrap(err, "failed to retrieve console master")
}
console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup)
if err != nil {
return errors.Wrap(err, "failed to start console copy")
}
p.console = console
} else if !hasNoIO(r) {
if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup); err != nil {
return errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(pidFile)
if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime container pid")
}
p.pid = pid
return nil
}
// Create creates a new container and returns its pid if it was created successfully
func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOpts) error {
// ********************************** NOTICE ********************************** //
args := []string{"create", "--bundle", bundle}
// ********************************** NOTICE ********************************** //
if opts != nil {
oargs, err := opts.args()
if err != nil {
return err
}
args = append(args, oargs...)
}
cmd := r.command(context, append(args, id)...)
if opts != nil && opts.IO != nil {
opts.Set(cmd)
}
cmd.ExtraFiles = opts.ExtraFiles
if cmd.Stdout == nil && cmd.Stderr == nil {
data, err := cmdOutput(cmd, true)
if err != nil {
return fmt.Errorf("%s: %s", err, data)
}
return nil
}
ec, err := Monitor.Start(cmd)
if err != nil {
return err
}
if opts != nil && opts.IO != nil {
if c, ok := opts.IO.(StartCloser); ok {
if err := c.CloseAfterStart(); err != nil {
return err
}
}
}
status, err := Monitor.Wait(cmd, ec)
if err == nil && status != 0 {
err = fmt.Errorf("%s did not terminate sucessfully", cmd.Args[0])
}
return err
}
2) task.go#task.Start
func (t *task) Start(ctx context.Context) error {
r, err := t.client.TaskService().Start(ctx, &tasks.StartRequest{
ContainerID: t.id,
})
if err != nil {
if t.io != nil {
t.io.Cancel()
t.io.Close()
}
return errdefs.FromGRPC(err)
}
t.pid = r.Pid
return nil
}
2.1) api/services/tasks/v1/tasks.pb.go#tasksClient.Start
func (c *tasksClient) Start(ctx context.Context, in *StartRequest, opts ...grpc.CallOption) (*StartResponse, error) {
out := new(StartResponse)
err := grpc.Invoke(ctx, "/containerd.services.tasks.v1.Tasks/Start", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
2.1.1) services/tasks/local.go#local.Start
func (l *local) Start(ctx context.Context, r *api.StartRequest, _ ...grpc.CallOption) (*api.StartResponse, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
// 这里为空
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
// ********************************** NOTICE ********************************** //
if err := p.Start(ctx); err != nil {
// ********************************** NOTICE ********************************** //
return nil, errdefs.ToGRPC(err)
}
state, err := p.State(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.StartResponse{
Pid: state.Pid,
}, nil
}
2.1.1.1) runtime/v1/shim/local.go#local.Start
func (c *local) Start(ctx context.Context, in *shimapi.StartRequest) (*shimapi.StartResponse, error) {
return c.s.Start(ctx, in)
}
—2.1.1.1.1) runtime/v1/shim/service.go#Service.Start
// Start a process
func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) {
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
}
// ********************************** NOTICE ********************************** //
if err := p.Start(ctx); err != nil {
// ********************************** NOTICE ********************************** //
return nil, err
}
return &shimapi.StartResponse{
ID: p.ID(),
Pid: uint32(p.Pid()),
}, nil
}
// Start the init process
func (p *Init) Start(ctx context.Context) error {
p.mu.Lock()
defer p.mu.Unlock()
// ********************************** NOTICE ********************************** //
return p.initState.Start(ctx)
// ********************************** NOTICE ********************************** //
}
func (s *createdState) Start(ctx context.Context) error {
// ********************************** NOTICE ********************************** //
if err := s.p.start(ctx); err != nil {
// ********************************** NOTICE ********************************** //
return err
}
return s.transition("running")
}
func (p *Init) start(ctx context.Context) error {
err := p.runtime.Start(ctx, p.id)
return p.runtimeError(err, "OCI runtime start failed")
}
// Start will start an already created container
func (r *Runc) Start(context context.Context, id string) error {
return r.runOrError(r.command(context, "start", id))
}
func (r *Runc) command(context context.Context, args ...string) *exec.Cmd {
command := r.Command
if command == "" {
command = DefaultCommand
}
cmd := exec.CommandContext(context, command, append(r.args(), args...)...)
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: r.Setpgid,
}
cmd.Env = os.Environ()
if r.PdeathSignal != 0 {
cmd.SysProcAttr.Pdeathsig = r.PdeathSignal
}
return cmd
}
// runOrError will run the provided command. If an error is
// encountered and neither Stdout or Stderr was set the error and the
// stderr of the command will be returned in the format of <error>:
// <stderr>
func (r *Runc) runOrError(cmd *exec.Cmd) error {
if cmd.Stdout != nil || cmd.Stderr != nil {
ec, err := Monitor.Start(cmd)
if err != nil {
return err
}
status, err := Monitor.Wait(cmd, ec)
if err == nil && status != 0 {
err = fmt.Errorf("%s did not terminate sucessfully", cmd.Args[0])
}
return err
}
data, err := cmdOutput(cmd, true)
if err != nil {
return fmt.Errorf("%s: %s", err, data)
}
return nil
}