kubelet与containerd交互概览

下图展示了kubelet如何与containerd进行交互,containred通过cri插件实现了CRI的接口。

  1. kubelet通过grpc调用containerd的RunSanbox方法,创建了Pod运行的基本环境,包括pause容器和containerd-shim。
  2. kubelet通过grpc调用containerd的CreateContainer和StartContainer创建Pod中的容器,包括initContainer和normalContainer。

image.png

创建pod

创建一个使用busybox镜像且cmd设置sleep 3000000的pod。下图是使用bcc工具exec-snoop抓取了containerd创建Pod的具体流程,其中省略了部分runc调用的细节。
1、 调用container-shim start 启动用于创建runc的containerd-shim,第一个containerd-shim(pid 20792)退出后,第二个containerd-shim(pid 20799)的父进程就变成了1,这样containerd-shim就与containerd脱离了关系,重启containerd也不会影响containerd-shim进程。
2、通过ttrpc调用第二个containerd-shim(pid 20799)的Newtask方法,会调用runc create。
3、再通过ttrpc调用第二个containerd-shim(pid 20799)的Start方法,会调用runc start启动pause容器。
4、以同样的方式启动Pod中定义的container,注意这次的containerd-shim只有start调用,原因是1中创建的第二个containerd-shim(pid 20799)是可以与runc交互的进程,这里不用重复创建

注意:containerd-shim 20802 和 20803 是线程

  1. systemd(1)─┬
  2. ├─containerd(1694)─┬─{containerd}(1696)
  3. ├─containerd-shim(20799)─┬─pause(20821)
  4. ├─sleep(20852)
  5. ├─{containerd-shim}(20802)
  6. ├─{containerd-shim}(20803)
  7. containerd-shim 20792 1694 0 /usr/bin/containerd-shim-runc-v2 -namespace k8s.io -address /run/containerd/containerd.sock -publish-binary /usr/bin/containerd -id 193727d35be73014e4adfe4ba93aa20b3834654a65990b9de9fba123592044aa -debug start
  8. containerd-shim 20799 20792 0 /usr/bin/containerd-shim-runc-v2 -namespace k8s.io -id 193727d35be73014e4adfe4ba93aa20b3834654a65990b9de9fba123592044aa -address /run/containerd/containerd.sock
  9. runc 20811 20799 0 /usr/bin/runc --root /run/containerd/runc/k8s.io --log /run/containerd/io.containerd.runtime.v2.task/k8s.io/193727d35be73014e4adfe4ba93aa20b3834654a65990b9de9fba123592044aa/log.json --log-format json --systemd-cgroup create --bundle /run/containerd/io.containerd.runtime.v2.task/k8s.io/193727d35be73014e4adfe4ba93aa20b3834654a65990b9de9fba123592044aa --pid-file /run/containerd/io.containerd.runtime.v2.task/k8s.io/193727d35be73014e4adfe4ba93aa20b3834654a65990b9de9fba123592044aa/init.pid 193727d35be73014e4adfe4ba93aa20b3834654a65990b9de9fba123592044aa/run/containerd/io.containerd.runtime.v2.task/k8s.io/193727d35be73014e4adfe4ba93aa20b3834654a65990b9de9fba123592044aa --pid-file /run/containerd/io.containerd.runtime.v2.task/k8s.io/193727d35be73014e4adfe4ba93aa20b3834654a65990b9de9fba123592044aa/init.pid 193727d35be73014e4adfe4ba93aa20b3834654a65990b9de9fba123592044aa
  10. runc 20827 20799 0 /usr/bin/runc --root /run/containerd/runc/k8s.io --log /run/containerd/io.containerd.runtime.v2.task/k8s.io/193727d35be73014e4adfe4ba93aa20b3834654a65990b9de9fba123592044aa/log.json --log-format json --systemd-cgroup start 193727d35be73014e4adfe4ba93aa20b3834654a65990b9de9fba123592044aa
  11. pause 20821 20799 0 /pause
  12. containerd-shim 20834 1694 0 /usr/bin/containerd-shim-runc-v2 -namespace k8s.io -address /run/containerd/containerd.sock -publish-binary /usr/bin/containerd -id f2ba685582ea81fb6568da6cec0ee3086dd8e15db11214ddfed19f4ded41725b -debug start
  13. runc 20841 20799 0 /usr/bin/runc --root /run/containerd/runc/k8s.io --log /run/containerd/io.containerd.runtime.v2.task/k8s.io/f2ba685582ea81fb6568da6cec0ee3086dd8e15db11214ddfed19f4ded41725b/log.json --log-format json --systemd-cgroup create --bundle /run/containerd/io.containerd.runtime.v2.task/k8s.io/f2ba685582ea81fb6568da6cec0ee3086dd8e15db11214ddfed19f4ded41725b --pid-file /run/containerd/io.containerd.runtime.v2.task/k8s.io/f2ba685582ea81fb6568da6cec0ee3086dd8e15db11214ddfed19f4ded41725b/init.pid f2ba685582ea81fb6568da6cec0ee3086dd8e15db11214ddfed19f4ded41725b
  14. runc 20859 20799 0 /usr/bin/runc --root /run/containerd/runc/k8s.io --log /run/containerd/io.containerd.runtime.v2.task/k8s.io/f2ba685582ea81fb6568da6cec0ee3086dd8e15db11214ddfed19f4ded41725b/log.json --log-format json --systemd-cgroup start f2ba685582ea81fb6568da6cec0ee3086dd8e15db11214ddfed19f4ded41725b
  15. sleep 20852 20799 0 /bin/sleep 30000000

containerd的CRI插件

containerd的CRI的实现部分通过插件的形式集成到了containerd中,这种插件化的方式使得扩展containerd变得相对容易,并且核心代码几乎不需要变动。CRI注册了自己特有的grpc方法到containerd中,这样外部就可以通过调用统一的containerd的grpc方法进而调用到CRI的grpc方法。


image.png

containerd-shim

containerd启动containerd-shim,containerd-shim再启动一个containerd-shim,此时先前containerd-shim退出。新的container-shim进程的父进程就变成了1,此containerd-shim就是用户init程序的父进程。

这里讨论的其实都是 containerd-shim-runc-v2。这是containerd现在用的shim。containerd代码库中还有一个containerd-shim,这个是dockerd在用的 https://stackoverflow.com/questions/67585798/what-process-is-the-parent-process-of-containerd-shim

image.png

  1. func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) {
  2. bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value)
  3. // 启动containerd-shim进程
  4. shim, err := m.startShim(ctx, bundle, id, opts)
  5. // 向第二个containerd-shim发送create指令
  6. t, err := shim.Create(ctx, opts)
  7. if err := m.tasks.Add(ctx, t); err != nil
  8. return t, nil
  9. }
  10. func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
  11. args := []string{"-id", b.bundle.ID}
  12. args = append(args, "start")
  13. cmd, err := client.Command(
  14. ctx,
  15. b.runtime,
  16. b.containerdAddress,
  17. b.containerdTTRPCAddress,
  18. b.bundle.Path,
  19. opts,
  20. args...,
  21. )
  22. out, err := cmd.CombinedOutput()
  23. // 返回一个unix socket
  24. address := strings.TrimSpace(string(out))
  25. conn, err := client.Connect(address, client.AnonDialer)
  26. // 新建与unix socket的连接
  27. client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))
  28. return &shim{
  29. bundle: b.bundle,
  30. client: client,
  31. task: task.NewTaskClient(client),
  32. events: b.events,
  33. rtTasks: b.rtTasks,
  34. }
  35. }
  36. // 第二个containerd-shim启动grpc服务
  37. func (s *Client) Serve() {
  38. shimapi.RegisterTaskService(server, s.service)
  39. }

containerd-shim 向 containerd转发事件


cri服务处理事件,进程退出,oom等,并更新container或者sandbox的状态。之后kubelet就可以调用CRI接口获取container的状态信息,包括oom状态码等。
image.png

  1. // containerd-shim
  2. func run(id string, initFunc Init, config Config) error {
  3. ttrpcAddress := os.Getenv(ttrpcAddressEnv)
  4. // 建立到containerd的连接
  5. publisher, err := NewPublisher(ttrpcAddress)
  6. }
  7. // 开始处理shim的事件并转发到containerd
  8. func (l *RemoteEventsPublisher) processQueue() {
  9. forwardRequest()
  10. }
  11. func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
  12. // 到containerd的events的grpc接口
  13. service, err := l.client.EventsService()
  14. service.Forward(fCtx, req)
  15. }
  16. // containerd :
  17. // 发布消息
  18. func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event) (err error) {
  19. validateTopic(topic)
  20. return e.broadcaster.Write(&envelope)
  21. }
  22. // 向订阅者发布消息
  23. func (b *Broadcaster) run() {
  24. for {
  25. select {
  26. case event := <-b.events:
  27. for _, sink := range b.sinks {
  28. sink.Write(event)
  29. }
  30. }
  31. }
  32. }
  33. // cri订阅事件
  34. func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
  35. // note: filters are any match, if you want any match but not in namespace foo
  36. // then you have to manually filter namespace foo
  37. filters := []string{
  38. `topic=="/tasks/oom"`,
  39. `topic~="/images/"`,
  40. }
  41. em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
  42. }
  43. // cri处理容器事件,并更新容器状态,
  44. func (em *eventMonitor) start() <-chan error {
  45. go func() {
  46. for {
  47. select {
  48. case e := <-em.ch:
  49. id, evt, err := convertEvent(e.Event)
  50. em.handleEvent(evt)
  51. }
  52. }
  53. }()
  54. }