Golang 中并发 IO 的现状

对于 Go 这种本身便是为并发而生的语言来说,使用 io_uring 这种系统级异步接口也不是那么的迫切
比如对于普通文件的读写以及 socket 的操作都会通过 netpoll 来进行优化,当文件/套接字可读可写时 netpoll 便会唤醒相应 goroutine 来对文件进行读写
而对于可能会阻塞的系统调用,在 syscall 底层调用 syscall.Syscall/Syscall6 时配合 runtime 判断是否将P 和 G 绑定的 M 解绑,然后将 P 交给其他 M 来使用,通过这种机制可以减少系统调用从用户态切换到内核态对整个程序带来的损耗
Go runtime 实际上已经实现了用户态的并发 IO,现在 Linux 内核提供了新的异步 IO 接口,那又该如何去利用这种新的技术呢
我们首先先看一下当前 Go 是如何做到异步 IO 的

IO 与 netpoll

文件 IO 与 netpoll

  1. // src/os/file.go
  2. func OpenFile(name string, flag int, perm FileMode) (*File, error) {
  3. f, err := openFileNolog(name, flag, perm)
  4. }
  5. // src/os/file_unix.go
  6. func openFileNolog(name string, flag int, perm FileMode) (*File, error) {
  7. // ...
  8. r, e = syscall.Open(name, flag|syscall.O_CLOEXEC, syscallMode(perm))
  9. // ...
  10. return newFile(uintptr(r), name, kindOpenFile), nil
  11. }
  12. // src/os/file_unix.go
  13. func newFile(fd uintptr, name string, kind newFileKind) *File {
  14. fdi := int(fd)
  15. f := &File{&file{
  16. pfd: poll.FD{
  17. Sysfd: fdi,
  18. IsStream: true,
  19. ZeroReadIsEOF: true,
  20. },
  21. name: name,
  22. stdoutOrErr: fdi == 1 || fdi == 2,
  23. }}
  24. pollable := kind == kindOpenFile || kind == kindPipe || kind == kindNonBlock
  25. // ...
  26. if err := f.pfd.Init("file", pollable); err != nil {
  27. // ...
  28. } else if pollable {
  29. if err := syscall.SetNonblock(fdi, true); err == nil {
  30. f.nonblock = true
  31. }
  32. return f
  33. }
  34. }

os.OpennewFile,可以看到文件的 文件描述符 被放到 poll.FD 进行初始化了, poll.FD.Init 便是将文件描述符注册到 netpoll(epoll)

需要注意当文件被注册到 netpoll(epoll) 后,会将它置为非阻塞模式(SetNonblock),因为 netpoll(epoll) 采用的是边缘触发模式 比如说非阻塞文件描述符中有可读事件时,epoll 只会通知一次(除非有新的数据被写入文件会再次通知),也就说需要所有数据读出来直到返回 -EAGAIN,对于阻塞模式的socket文件,当从socket中读取数据时就可能会阻塞等待,这样也就失去了 epoll 的意义

我们可以再看一下 poll.FD 是如何利用 netpoll 进行读取的

  1. // src/internal/poll/fd_unix.go
  2. func (fd *FD) Read(p []byte) (int, error) {
  3. // ...
  4. for {
  5. n, err := ignoringEINTR(syscall.Read, fd.Sysfd, p)
  6. if err != nil {
  7. n = 0
  8. if err == syscall.EAGAIN && fd.pd.pollable() {
  9. continue
  10. }
  11. }
  12. err = fd.eofError(n, err)
  13. return n, err
  14. }
  15. }

可以看到 ignoringEINTR 中调用 syscall.Read 读取文件,如果出现 syscall.EAGAIN,那么就调用 fd.pd.waitRead 来等待数据可读

  1. // src/internal/poll/fd_unix.go
  2. type FD struct {
  3. // ...
  4. Sysfd int
  5. pd pollDesc
  6. }

pollDesc Go 对 netpoll 的抽象

  1. // src/internal/poll/fd_poll_runtime.go
  2. func runtime_pollServerInit()
  3. func runtime_pollOpen(fd uintptr)(uintptr, int)
  4. func runtime_pollClose(ctx uintptr)
  5. // ...
  6. type pollDesc struct {
  7. runtimeCtx uintptr
  8. }
  9. func (pd *pollDesc) init(fd *FD) error {
  10. serverInit.Do(runtime_pollServerInit)
  11. ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
  12. // ...
  13. pd.runtimeCtx = ctx
  14. return nil
  15. }

runtime_poll* 这些函数才是真正的 netpoll,而这些函数是在src/runtime/netpoll.go 中实现,并通过 go:linkname 来链接到 internal/poll

  1. // src/runtime/netpoll.go
  2. // go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
  3. func poll_runtime_pollServerInit() {
  4. // ...
  5. }

根据具体的平台来实现 poller,对于 Linux,便是使用 epoll

  1. // src/runtime/netpoll_epoll.go
  2. // 注册文件到 netpoll 中
  3. func netpolllopen(fd uintptr, pd *pollDesc) int32 {
  4. var ev epollevent
  5. ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
  6. // ...
  7. return -epollctr(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
  8. }

添加新的文件描述符时,可以发现fd是以 边缘触发 的方式注册到 netpoll(epoll)

socket IO 与 netpoll

netpoll 这个名字上就可以看出,netpoll 是 Go 为了高性能的异步网络而实现的
看一下创建 TCPListener socket 的流程

  1. // src/net/tcpsock.go
  2. type TCPListener struct {
  3. fd *netFD
  4. // ...
  5. }
  6. // src/net/fd_posix.go
  7. type netFD struct {
  8. pfd poll.FD
  9. // ...
  10. }
  11. // 1.
  12. // src/net/tcpsock.go
  13. func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error) {
  14. sl := &sysListener{network: network, address: laddr.String()}
  15. ln, err := sl.listenTCP(context.Background(), laddr)
  16. // ...
  17. }
  18. // 2.
  19. // src/net/tcpsock_posix.go
  20. func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
  21. fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
  22. // ...
  23. return &TCPListener{fd: fd, lc, sl.ListenConfig}, nil
  24. }
  25. // 3.
  26. // src/net/ipsock_posix.go
  27. func internelSocket(ctx context.Context, ...) (fd *netFD, err error) {
  28. // ...
  29. return socket(ctx, net, family, sotype, proto, ipv6only, laddr, radddr, ctrlFn)
  30. }
  31. // 4.
  32. // src/sock_posix.go
  33. func socket(...) (fd *netFD, err error) {
  34. s, err := sysSocket(family, sotype, proto)
  35. // ...
  36. fd, err = newFD(s, family, sotype, net)
  37. }
  38. // 5.
  39. // src/fd_unix.go
  40. func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
  41. ret := &netFD{
  42. pfd: poll.FD{
  43. Sysfd: sysfd,
  44. IsStream: sotype == syscall.SOCK_STREAM,
  45. // ...
  46. },
  47. // ...
  48. }
  49. return ret, nil
  50. }

创建 TCPListener 链路还是挺长的,不过在第四步 socket 函数中可以看到调用 newFD 来返回 netFD 实例,而 netFD.pfd 便是 poll.FD, 而对 netFD 的读写和文件IO一样便都会通过 poll.FD 来利用 netpoll

netpoll 唤醒 goroutine

挂起 goroutine

通过 poll.pollDesc 将文件描述符加入到 netpoll 后,当对文件描述符进行读写时,如果 syscall.Read 返回 syscall.EAGAIN 的话就需要调用 pollDesc.waitRead/waitWrite 来等待可读可写

  1. // src/internal/poll/fd_poll_runtime.go
  2. func (pd *pollDesc) waitRead(isFile bool) error{
  3. return pd.wait('r', isFile)
  4. }
  5. func (pd *pollDesc) waitWrite(isFile bool) error{
  6. return pd.wait('w', isFile)
  7. }
  8. func (pd *pollDesc) wait(mode int, isFile bool) error{
  9. // ...
  10. res := runtime_pollWait(pd.runtimeCtx, mode)
  11. return convertErr(res, isFile)
  12. }
  13. // src/runtime/netpoll.go
  14. //go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
  15. func poll_runtime_pollWait(pd *pollDesc, mode int) int {
  16. // ...
  17. for !netpollblock(pd, int32(mode), false) {
  18. // ...
  19. }
  20. // ...
  21. }
  22. func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
  23. gpp := &pd.rg
  24. // ...
  25. // 状态检查
  26. if waitio || netpollcheckerr(pd, mode) == 0 {
  27. gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
  28. }
  29. // ...
  30. }
  31. func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
  32. r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
  33. // ...
  34. }

等待文件可读写,最终会调用 netpollblock 函数,并不会直接调用 epoll wait 的系统调用,而是挂起当前 goroutine, 并等待唤醒

唤醒 goroutine

  1. // src/runtime/netpoll_epoll.go
  2. func netpoll(delay int64) gList {
  3. // ...
  4. var waitms int32
  5. // 计算 waitms,大概规则:
  6. // delay < 0, waitms = -1,阻塞等待
  7. // delay == 0, waitms = 0, 不阻塞
  8. // delay > 0, delay 以纳秒为单位作为 waitms
  9. var events [128]epollevent
  10. retry:
  11. n := epollwait(epfd, &events[0], int32(len(events)), waitms)
  12. if n < 0 {
  13. // ...
  14. }
  15. var toRun gList
  16. for i := int32(0); i < n; i++ {
  17. ev := &events[i]
  18. // ...
  19. var mode int32
  20. if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
  21. mode += 'r'
  22. }
  23. if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
  24. mode += 'w'
  25. }
  26. if mode != 0 {
  27. pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
  28. pd.everr = false
  29. if ev.events == _EPOLLERR {
  30. pd.everr = true
  31. }
  32. netpollready(&toRun, pd, mode)
  33. }
  34. }
  35. return toRun
  36. }

netpoll 会调用 epollWait 来获取epoll事件,而在 runtime 中很多地方都会调用 netpoll 函数
监控函数 sysmon

  1. // src/runtime/proc.go
  2. func sysmon() {
  3. // ....
  4. for {
  5. // ...
  6. list := netpoll(0)
  7. if !list.empty() {
  8. //...
  9. injectglist(&list) // 将 goroutine 放到 runable 队列中
  10. }
  11. }
  12. }

查找可运行的 goroutine

  1. // src/runtime/proc.go
  2. func findrunable() (gp *g, inheritTIme bool) {
  3. top:
  4. // ...
  5. if list := netpoll(0); !list.empty() {
  6. gp := list.pop()
  7. injectglist(&list)
  8. // ...
  9. return gp, false
  10. }
  11. // ....
  12. stop:
  13. // ...
  14. list := netpoll(delta) // block until new work is available
  15. // ...
  16. }

GC 时调用 startTheWorld

  1. // src/runtime/proc.go
  2. func startTheWorld() {
  3. systemstack(func() {startTheWorldWithSema(false)})
  4. // ...
  5. }
  6. func startTheWorldWithSema(emitTraceEvent bool) int64 {
  7. // ...
  8. list := netpoll(0)
  9. injectglist(&list)
  10. // ...
  11. }

通常获取可用的 goroutine 时都可能有机会去调用 netpoll,然后再调用 injectglist(&list) 将可运行 goroutine 加入到runq队列中

系统级异步接口 —— io_uring

本节不会详细介绍 io_uring 的具体操作,关于 io_uring 的使用,可以查看 Lord of the io_uring

Linux kernel 5.1 新增了异步接口 io_uring,它是 Jens Axboe 以高效,可扩展,易用为目的设计的一种全新异步接口,为什么是全新呢,因为 Linux 已经提供了异步 IO 接口 —— AIO,不过就连 Linus 都对它一阵吐槽
Re: [PATCH 09/13] aio: add support for async openat()

So I think this is ridiculously ugly. AIO is a horrible ad-hoc design, with the main excuse being “other, less gifted people, made that design, and we are implementing it for compatibility because database people - who seldom have any shred of taste - actually use it”. But AIO was always really really ugly.

io_uring 提供的异步接口,不仅仅可以使用 文件 IO,套接字 IO,甚至未来可以扩展加入其它系统调用
而且 io_uring 采用应用程序和内核共享内存的方式,来提交请求和获取完成事件

使用共享内存的方式可能是内核对接口优化的一种趋势

iouring 名称的意思便是 _io use ring,而 ring 便是指和内核内存共享的 提交队列完成队列两个环形缓冲区

SubmissionQueueEntry

我们来看一下 io_uring 用来提交请求的结构

  1. struct io_uring_sqe {
  2. __u8 opcode; /* 请求的操作类型 */
  3. __u8 flags; /* IOSQE_ flags */
  4. __u16 ioprio; /* ioprio for the request */
  5. __s32 fd; /* 用于 IO 的文件描述符 */
  6. union {
  7. __u64 off; /* offset into file */
  8. __u64 addr2;
  9. };
  10. union {
  11. __u64 addr; /* pointer to buffer or iovecs */
  12. __u64 splice_off_in;
  13. };
  14. __u32 len; /* buffer size or number of iovecs */
  15. /*
  16. * 用于特定操作的字段
  17. */
  18. union {
  19. __kernel_rwf_t rw_flags;
  20. __u32 fsync_flags;
  21. __u16 poll_events; /* compatibility */
  22. __u32 poll32_events; /* word-reversed for BE */
  23. __u32 sync_range_flags;
  24. __u32 msg_flags;
  25. __u32 timeout_flags;
  26. __u32 accept_flags;
  27. __u32 cancel_flags;
  28. __u32 open_flags;
  29. __u32 statx_flags;
  30. __u32 fadvise_advice;
  31. __u32 splice_flags;
  32. };
  33. __u64 user_data; /* 用来关联请求的完成事件 */
  34. union {
  35. struct {
  36. /* pack this to avoid bogus arm OABI complaints */
  37. union {
  38. /* index into fixed buffers, if used */
  39. __u16 buf_index;
  40. /* for grouped buffer selection */
  41. __u16 buf_group;
  42. } __attribute__((packed));
  43. /* personality to use, if used */
  44. __u16 personality;
  45. __s32 splice_fd_in;
  46. };
  47. __u64 __pad2[3];
  48. };
  49. };

io_uring_sqe 一个非常复杂的结构,最核心的三个字段 opcodefduser_data

  • opcode 指定具体是什么操作,比如 IORING_OP_READVIORING_OP_ACCEPTIORING_OP_OPENAT,现在支持 35 种操作
  • fd 表示用于 IO 操作的文件描述符
  • user_data 当请求操作完成后,io_uring 会生成一个完成事件放到 完成队列(CompletionQueue) 中,而 user_data 便是用来和完成队列中的事件进行绑定的,他会原封不动的复制到完成队列的事件(cqe)
  • flags 字段用来实现链式请求之类的功能

    可以发现 opcode 是uint8,也就是现在来看最多支持 256 个系统调用,但是整个 io_uring_sqe 还为未来预留了一些空间 __pad2

通过 opcode 结合结合其他 union 的字段,便实现了扩展性极强的 io_uring 接口

CompletionQueueEvent

完成队列事件(CompletionQueueEvent) 的结构就比较简单了,主要是表示提交的异步操作执行的结果

  1. struct io_uring_cqe {
  2. __u64 user_data; /* 直接复制 sqe->data */
  3. __s32 res; /* 异步操作的结果 */
  4. __u32 flags;
  5. };

user_data 便是从 sqe->data 直接复制过来了,可以通过 user_data 绑定到对应的 sqe
res 便是异步操作执行的结果,如果 res < 0,通常说明操作执行错误
flags 暂时没有使用

io_uring_setup

io_uring 使用共享内存的方式,来提交请求和获取执行结果,减少内存拷贝带来的损耗
io_uring_setup 接口会接受一个指定提交队列大小的 uint32 类型参数和一个 io_uring_params 对象

  1. #include <linux/io_uring.h>
  2. int io_uring_setup(u32 entries, struct io_uring_params *p);

调用成功会返回一个文件描述符,用于后续的操作

io_uring_params

  1. struct io_uring_params {
  2. __u32 sq_entries;
  3. __u32 cq_entries;
  4. __u32 flags;
  5. __u32 sq_thread_cpu;
  6. __u32 sq_thread_idle;
  7. __u32 features;
  8. __u32 wq_fd;
  9. __u32 resv[3];
  10. struct io_sqring_offsets sq_off;
  11. struct io_cqring_offsets cq_off;
  12. };

io_uring_params 不只用来配置 io_uring 实例,内核也会填充 io_uring_params 中关于 io_uring 实例的信息,比如用来映射共享内存的请求队列和完成队列字段的偏移量 - io_sqring_offsetsio_cqring_offsets

配置 io_uring

flags 以位掩码的方式,结合相应 sq_thread_cpusq_thread_idlewq_fdcq_entries 字段来配置 io_uring 实例

  1. /*
  2. * io_uring_setup() flags
  3. */
  4. #define IORING_SETUP_IOPOLL (1U << 0) /* io_context is polled */
  5. #define IORING_SETUP_SQPOLL (1U << 1) /* SQ poll thread */
  6. #define IORING_SETUP_SQ_AFF (1U << 2) /* sq_thread_cpu is valid */
  7. #define IORING_SETUP_CQSIZE (1U << 3) /* app defines CQ size */
  8. #define IORING_SETUP_CLAMP (1U << 4) /* clamp SQ/CQ ring sizes */
  9. #define IORING_SETUP_ATTACH_WQ (1U << 5) /* attach to existing wq */
  10. #define IORING_SETUP_R_DISABLED (1U << 6) /* start with ring disabled */

通常 cq_entries 为 sq_entries 的两倍,通过 flags 指定 IORING_SETUP_CQSIZE ,然后设置 cq_entries 字段为指定大小

cq_entries 不能小于 sq_entries

iouring-go 提供了初始化 io_uring 对象时的配置函数,可以看一下这些函数的具体实现

  1. type IOURingOption func(*IOURing)
  2. func New(entries uint, opts ...IOURingOption) (iour *IOURing, err error)
  3. func WithParams(params *iouring_syscall.IOURingParams) IOURingOption
  4. func WithAsync() IOURingOption
  5. func WithDisableRing() IOURingOption
  6. func WithCQSize(size uint32) IOURingOption
  7. func WithSQPoll() IOURingOption
  8. func WithSQPollThreadCPU(cpu uint32) IOURingOption
  9. func WithSQPollThreadIdle(idle time.Duration) IOURingOption

内核填充信息

内核会向 io_uring_params 填充跟 io_uring 实例相关的信息
sq_entries 请求队列的大小,io_uring_setup 会传递请求队列的大小 entries,io_uring 会根据 entries 设置 sq_entries 为 2 的次方大小
cq_entries 完成队列的大小,通常为 sq_entries 的两倍,即使通过 IORING_SETUP_CQSIZE flag 设置了 cq_enries ,内核依然会以 2 的次方重新计算出 cq_entries 的大小
features 记录了当前内核版本支持的一些功能

  1. /*
  2. * io_uring_params->features flags
  3. */
  4. #define IORING_FEAT_SINGLE_MMAP (1U << 0)
  5. #define IORING_FEAT_NODROP (1U << 1)
  6. #define IORING_FEAT_SUBMIT_STABLE (1U << 2)
  7. #define IORING_FEAT_RW_CUR_POS (1U << 3)
  8. #define IORING_FEAT_CUR_PERSONALITY (1U << 4)
  9. #define IORING_FEAT_FAST_POLL (1U << 5)
  10. #define IORING_FEAT_POLL_32BITS (1U << 6)
  11. #define IORING_FEAT_SQPOLL_NONFIXED (1U << 7)

io_sqring_offsetsio_cqring_offsets 便是SQCQ在共享内存中的偏移量

  1. struct io_sqring_offsets {
  2. __u32 head;
  3. __u32 tail;
  4. __u32 ring_mask;
  5. __u32 ring_entries;
  6. __u32 flags;
  7. __u32 dropped;
  8. __u32 array;
  9. __u32 resv1;
  10. __u64 resv2;
  11. };
  12. struct io_cqring_offsets {
  13. __u32 head;
  14. __u32 tail;
  15. __u32 ring_mask;
  16. __u32 ring_entries;
  17. __u32 overflow;
  18. __u32 cqes;
  19. __u32 flags;
  20. __u32 resv1;
  21. __u64 resv2;
  22. };

根据这些偏移量便可以调用 mmap 来映射 SQ 和 CQ

  1. ptr = mmap(0, sq_off.array + sq_entries * sizeof(__u32),
  2. PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE,
  3. ring_fd, IORING_OFF_SQ_RING);

可以参考 iouring-go 对 IOURing 对象的初始化

  1. // iouring-go/iouring.go
  2. func New(entries uint, opts ...IOURingOption) (*IOURing, error) {
  3. iour := &IOURing{
  4. params: &iouring_syscall.IOURingParams{},
  5. userDatas: make(map[uint64]*UserData),
  6. cqeSign: make(chan struct{}, 1),
  7. closer: make(chan struct{}),
  8. closed: make(chan struct{}),
  9. }
  10. for _, opt := range opts {
  11. opt(iour)
  12. }
  13. var err error
  14. iour.fd, err = iouring_syscall.IOURingSetup(entries, iour.params)
  15. if err != nil {
  16. return nil, err
  17. }
  18. if err := mmapIOURing(iour); err != nil {
  19. munmapIOURing(iour)
  20. return nil, err
  21. }
  22. // ...
  23. }

mmapIOURing 中实现了对请求队列以及完成队列的内存映射

  1. // iouring-go/mmap.go
  2. func mmapIOURing(iour *IOURing) (err error) {
  3. defer func() {
  4. if err != nil {
  5. munmapIOURing(iour)
  6. }
  7. }()
  8. iour.sq = new(SubmissionQueue)
  9. iour.cq = new(CompletionQueue)
  10. if err = mmapSQ(iour); err != nil {
  11. return err
  12. }
  13. if (iour.params.Features & iouring_syscall.IORING_FEAT_SINGLE_MMAP) != 0 {
  14. iour.cq.ptr = iour.sq.ptr
  15. }
  16. if err = mmapCQ(iour); err != nil {
  17. return err
  18. }
  19. if err = mmapSQEs(iour); err != nil {
  20. return err
  21. }
  22. return nil
  23. }

这里不再详细介绍 io_uring 的使用 ,想要了解更多可以查看文末的推荐阅读

io_uring 的功能

这里简单介绍一下 io_uring 提供的一些功能,以及在 Go 中如何去使用

顺序执行

设置 sqe->flagsIOSQE_IO_DRAIN 标记,这样只有当该 sqe 之前所有的 sqes 都完成后,才会执行该 sqe,而后续的 sqe 也会在该 sqe 完成后才会执行
iouring-go 中可以在构建 IOURing 对象时使用 WithDrain 来全局设置请求顺序执行

  1. iour := iouring.New(8, WithDrain())

针对单一请求设置 WithDrain,保证请求会在之前所有的请求都完成才会执行,而后续的请求也都会在该请求完成之后才会开始执行

  1. request, err := iour.SubmitRequest(iouring.Read(fd, buf).WithDrain(), nil)

链式请求

io_uring 提供了一组请求的链式/顺序执行的方法,可以让链中的请求会在上一个请求执行完成后才会执行,而且不影响链外其他请求的并发执行
设置 sqe->flagsIOSQE_IO_LINK 标记后,下一个 sqe 和当前 sqe 自动组成新链或者当前 sqe 的链中,链中没有设置 IOSQWE_IO_LINKsqe 便是链尾
如果链中的有请求执行失败了,那么链中后续的 sqe 都会被取消( cqe.res-ECANCELED)
io_uring 还提供了以外一种设置链式请求的方式,设置 sqe->flagsIOSQE_IO_HARDLINK flag,这种方式会让链中的请求忽略之前请求的结果,也就是说即使链中之前的请求执行失败了,也不会取消链中后边的请求
iouring-go 中可以使用 SubmitLinkRequests 或者 SubmitHardLinkRequests 方法来设置链式请求

  1. preps := []iouring.PrepRequest{ iouring.Read(fd1, buf), iouring.Write(fd2, buf) }
  2. requests, err := iour.SubmitLinkRequest(preps, nil)

请求取消

当请求提交后,还可以提交取消请求的请求,这样如果请求还没有执行或者请求的操作可以被中断(比如 socket IO),那么就可以被异步的取消,而对于已经启动的磁盘IO请求则无法取消
iouring-go 中,提交请求后会返回一个 iouring.Request 对象,通过request.Cancel 方法就可以取消请求

  1. request, err := iour.SubmitRequest(iouring.Timeout(1 * time.Second), nil)
  2. cancelRequest, err := request.Cancel()

Cancel 方法会返回一个 cancelRequest 对象,表示提交的取消请求
可以监听 request 的执行是否失败,并且失败原因是否为 iouring.ErrRequestCanceled

  1. <- request.Done()
  2. if err := request.Err(); if err != nil {
  3. if err == iouring.ErrRequestCanceled {
  4. fmt.Println("request is canceled")
  5. }
  6. }

也可去监听 cancelRequest 的执行结果,如果cancelRequest.Err 方法返回 nil,便是可能成功取消了,注意是可能取消了,因为一些操作是无法被取消的

  1. <- cancelRequest.Done()
  2. if err := cancelRequest.Err(); if err != nil{
  3. if err == iouring.ErrRequestNotFound(){
  4. fmt.Println("canceled request is not found")
  5. }
  6. // do something
  7. }

定时和请求完成计数

io_uring 提供了 IORING_OP_TIMEOUT 请求,可以用来提交超时请求
超时请求可以分为三种:

  • 相对时间超时
  • 绝对时间超时
  • 对请求完成计数,到达指定的完成事件数量后,超时请求就会完成

iouring-go 对这三种情况封装了三个函数 iouring.Timeoutiouring.TimeoutWithTimeiouring.CountCompletionEvent 来分别代表三种超时请求

  1. now := time.Now()
  2. request, err := iouring.SubmitRequest(iouring.Timeout(2 * time.Second), nil)
  3. if err != nil {
  4. panic(err)
  5. }
  6. <- request.Done()
  7. fmt.Println(time.Now().Sub(now))

根据 io_uring 提供的超时请求,可以实现系统级的异步定时器

请求超时

io_uring 通过 IOSQE_IO_LINK 将一个请求和 IORING_OP_LINK_TIMEOUT 请求链接在一起,那么就可以做到请求的超时控制
iouring-go 同样提供了简便方法 WithTimeout

  1. preps := iouring.Read(fd, buf).WithTimeout()

WithTimeout 方法会返回两个 PrepRequest 对象,所以需要使用 SubmitRequests 来提交

iouring-go 中请求超时的一些操作使用起来感觉还不是特别友好,有待优化

注册文件

io_uring 的一些 IO 操作需要提供文件描述符,而频繁的将文件和内核之间进行映射也会导致一定的性能损耗,所以可以使用 io_uringio_uring_register 接口来提前注册文件描述符
详细的概念可以参考 io_uring_register
iouring-go 也提供了文件描述符的注册功能,而且对于已经注册的文件描述符会自动使用

  1. func (iour *IOURing) RegisterFile(file *os.File) error
  2. func (iour *IOURing) RegisterFiles(files []*os.File) error
  3. func (iour *IOURing) UnregisterFile(file *os.File) error
  4. func (iour *IOURing) UnregisterFiles(files []*os.File) error

io_uring 的文件描述符被关闭后,这些注册的文件会自动注销
需要注意,调用 io_uring_register 来注册文件描述符时,如果有其他的正在进行的请求的话,会等到这些请求都完成才会注册
注册文件描述符在 Go 中带来的并发问题

  1. type fileRegister struct {
  2. lock sync.Mutex
  3. iouringFd int
  4. fds []int32
  5. sparseindexs map[int]int
  6. registered bool
  7. indexs sync.Map
  8. }

需要注意由于存在对索引 fileRegister.indexs 的并发读写,所以使用 sync.Map,也就意味着,使用注册文件描述符,会带来一定的并发问题,经过简单的测试,sync.Map 带来的性能损耗导致注册文件描述符带来的优势并没有那么大的

在 Go 中使用 io_uring 的最大问题便是对 io_uring 实例的竞争问题,而通过 Go 暴露给外部使用的并发机制,并不能让 io_uring 带来的异步 IO 发挥最大的性能 将 io_uring 融入 runtime 中,才是最终的解决方案

注册缓冲区

和注册文件描述符类似, io_uring 为了减少 IO 请求中缓冲区的映射,同样可以使用 io_uring_register 来注册缓冲区
如果要在请求中使用缓冲区的话,需要使用 IORING_OP_READ_FIXED 或者 IORING_OP_WRITE_FIXED 请求
具体可以参考 io_uring_register

内核侧请求队列轮询

将请求放到SQ的环形缓冲区后,需要调用 io_uring_enter 来通知内核有请求需要处理
io_uring 为了进一步减少系统调用,可以在 io_uring_setup 是设置 io_uring_params->flagsIORING_SETUP_SQPOLL flags,内核就会创建一个轮询请求队列的线程
可以通过 ps 命令查看用来轮询的内核线程

  1. ps --ppid 2 | grep io_uring-sq

需要注意在 5.10 之前的版本,需要使用特权用户来执行,而 5.10 以后只需 CAP_SYS_NICE 权限即可
并且 5.10 之前,SQPoll 需要配合注册的文件描述符一起使用,而 5.10 以后则不需要,可以通过查看内核填充的 io_uring_params->features 是否设置了 IORING_FEAT_SQPOLL_NONFIXED

  1. // iouring-go/iouring.go
  2. func (iour *IOURing) doRequest(sqe *iouring_syscall.SubmissionQueueEntry, request PrepRequest, ch chan<- Result) (*UserData, error) {
  3. // ...
  4. if sqe.Fd() >= 0 {
  5. if index, ok := iour.fileRegister.GetFileIndex(int32(sqe.Fd())); ok {
  6. sqe.SetFdIndex(int32(index))
  7. } else if iour.Flags&iouring_syscall.IORING_SETUP_SQPOLL != 0 &&
  8. iour.Features&iouring_syscall.IORING_FEAT_SQPOLL_NONFIXED == 0 {
  9. return nil, ErrUnregisteredFile
  10. }
  11. }
  12. // ...
  13. }

iouring-go 同样提供了开启 SQPoll 的 WithSQPoll 以及设置与 SQPoll 内核线程的相关配置 WithSQPollThreadCpuWithSQPollThreadIdle

  1. iour, err := iouring.New(8, iouring.WithSQPoll())

但是在 Go 简单的设置 io_uring_params 并不能正常的工作,可能是由于 Go 的 GMP 模型导致的一些问题。暂时还在思考解决方案

注册 eventfd,利用 epoll

通过 io_uring_register 可以将 eventfd 注册到 io_uring 实例中,然后将 eventfd 加入到 epoll 中,如果当 io_uring 中有完成事件时,便会通知 eventfd
iouring-go 中,对于完成事件的监听便是使用了 eventfdepoll

  1. type IOURing struct {
  2. eventfd int
  3. cqeSign chan struct{}
  4. // ...
  5. }
  6. func New() (*IOURing, error) {
  7. // ....
  8. if err := iour.registerEventfd(); err != nil {
  9. return nil, err
  10. }
  11. if err := registerIOURing(iour); err != nil {
  12. return nil, err
  13. }
  14. // ...
  15. }
  16. func (iour *IOURing) registerEventfd() error {
  17. eventfd, err := unix.Eventfd(0, unix.EFD_NONBLOCK|unix.FD_CLOEXEC)
  18. if err != nil {
  19. return os.NewSyscallError("eventfd", err)
  20. }
  21. iour.eventfd = eventfd
  22. return iouring_syscall.IOURingRegister(
  23. iour.fd,
  24. iouring_syscall.IOURING_REGISTER_EVENTFD,
  25. unsafe.Pointer(&iour.eventfd), 1,
  26. )
  27. }
  28. func registerIOURing(iour *IOURing) error {
  29. if err := initpoller(); err != nil {
  30. return err
  31. }
  32. if err := unix.EpollCtl(poller.fd, unix.EPOLL_CTL_ADD, iour.eventfd,
  33. &unix.EpollEvent{Fd: int32(iour.eventfd), Events: unix.EPOLLIN | unix.EPOLLET},
  34. ); err != nil {
  35. return os.NewSyscallError("epoll_ctl_add", err)
  36. }
  37. poller.Lock()
  38. poller.iours[iour.eventfd] = iour
  39. poller.Unlock()
  40. return nil
  41. }

poller 会调用 EpollWait 等待完成队列中有完成事件,并通知相应的 IOURing 对象

  1. // iouring-go/iouring.go
  2. func (iour *IOURing) getCQEvent(wait bool) (cqe *iouring_syscall.CompletionQueueEvent, err error) {
  3. var tryPeeks int
  4. for {
  5. if cqe = iour.cq.peek(); cqe != nil {
  6. iour.cq.advance(1)
  7. return
  8. }
  9. if !wait && !iour.sq.cqOverflow() {
  10. err = syscall.EAGAIN
  11. return
  12. }
  13. if iour.sq.cqOverflow() {
  14. _, err = iouring_syscall.IOURingEnter(iour.fd, 0, 0, iouring_syscall.IORING_ENTER_FLAGS_GETEVENTS, nil)
  15. if err != nil {
  16. return
  17. }
  18. continue
  19. }
  20. if tryPeeks++; tryPeeks < 3 {
  21. runtime.Gosched()
  22. continue
  23. }
  24. select {
  25. case <-iour.cqeSign:
  26. case <-iour.closer:
  27. return nil, ErrIOURingClosed
  28. }
  29. }
  30. }
  31. // iouring-go/poller.go
  32. func (poller *iourPoller) run() {
  33. for {
  34. n, err := unix.EpollWait(poller.fd, poller.events, -1)
  35. if err != nil {
  36. continue
  37. }
  38. for i := 0; i < n; i++ {
  39. fd := int(poller.events[i].Fd)
  40. poller.Lock()
  41. iour, ok := poller.iours[fd]
  42. poller.Unlock()
  43. if !ok {
  44. continue
  45. }
  46. select {
  47. case iour.cqeSign <- struct{}{}:
  48. default:
  49. }
  50. }
  51. poller.adjust()
  52. }
  53. }

保证数据不丢失

默认情况下, CQ 环的大小是 SQ 环的 两倍,为什么 SQ 环的大小会小于 CQ 环,是因为 SQ 环中的 sqe 一旦被内核发现,便会被内核消耗掉,也就意味着 sqe 的生命周期很短,而请求的完成事件都会放到 CQ 环中
我们也可以通过 IORING_SETUP_CQSIZE 或者 iouring-goWithCQSize Option 里设置 CQ 环的大小
但是依然会存在 CQ 环溢出的情况,而内核会在内部存储溢出的时间,直到 CQ 环有空间容纳更多事件。

可以通过 io_uring_params->features 是否设置 IORING_FEAT_NODROP 来判断当前内核是否支持该功能

如果 CQ 环溢出,那么提交请求时可能会以 -EBUSY 错误失败,需要重新提交
并且当 CQ 环中数据被消耗后,需要调用 io_uring_enter 来通知内核 CQ 环中有空余空间

  1. func (iour *IOURing) getCQEvent(wait bool) (cqe *iouring_syscall.CompletionQueueEvent, err error) {
  2. // ...
  3. if iour.sq.cqOverflow() {
  4. _, err := iour.syscall.IOURingEnter(iour.fd, 0, 0, iouring_syscall.IORING_ENTER_FLAGS_GETEVENTS, nil)
  5. if err != nil{
  6. return
  7. }
  8. continue
  9. }
  10. // ...
  11. }

io_uring 与 Go —— iouring-go

竞争问题

在实现 iouring-go 中遇到的问题,一个是并发导致对 io_uring 的竞争问题
对于 CQ 环的竞争是使用单一的 CQ 环消费 goroutine IOURing.run() 来完成 cqe 的消费

  1. func New(entries int, opts ...IOURingOption) (iour *IOURing, err error) {
  2. iour := &IOURing{...}
  3. // ...
  4. go iour.run()
  5. return
  6. }
  7. func (iour *IOURing) run() {
  8. for {
  9. cqe, err := iour.getCQEvent(true)
  10. // ...
  11. }
  12. }

SQ 环的解决方案有两种

  1. 使用单独的提交 goroutine,将需要提交的请求通过内部 channel 发送给提交 goroutine,这样保证了 SQ 环的单一生产者
  2. 使用锁的方式,对于提交请求的函数加锁,保证同一时间只有一个 goroutine 在提交请求

第一种方式听起来使用 channel 更优雅一些,但是 channel 内部依然使用锁的方式以及额外的内存复制
另外最大的弊端就是将 IOURIng提交函数将请求发送给提交channel)和真正将请求提交给内核(调用 io_uring_enter通知内核有新的请求)分开
当多个提交函数 向 channel 发送的请求的顺序无法保证,这样链式请求就无法实现(除非对于链式请求再次加锁)
第二种方式,采用加锁的方式,保证了同一时间只有一个提交函数在处理 SQ 环,并且可以立即是否真正提交成功(调用 IOURing.submit 方法通知内核有新的请求
iouring-go 采用了第二种方式
真正去解决这个问题的方式,估计可能只有 runtime 才能给出答案,为每一个 P 创建一个 io_uring 实例在 runtime 内部解决竞争问题,内部使用 eventfd 注册到 netpoll 中来获取完成队列通知

io_uring 与 channel

对于 iouring-go 设计比较好的地方,我感觉便是对 channel 的利用,异步 IO 加上 channel,可以将异步在并发的程序中发挥出最大的作用
当然,如果只是简单的使用 channel 的话又会引入其他一些问题,后续会进行说明

  1. func (iour *IOURing) SubmitRequest(request PrepRequest, ch chan<- Result) (Request, error)

SubmitRequest 方法接收一个 channel,当请求完成后,会将结果发送到 channel 中,这样通过多个请求复用同一个 channel,程序便可以监听一组请求的完成情况

  1. func (iour *IOURing) run() {
  2. for {
  3. cqe, err := iour.getCQEvent(true)
  4. // ...
  5. userData := iour.userData[cqe.UserData]
  6. // ...
  7. userData.request.complate(cqe)
  8. if userData.resulter != nil {
  9. userData.resulter <- userData.request
  10. }
  11. }
  12. }

SubmitRequest 方法同样会返回一个 Request 接口对象,通过 Request 我们同样可以去查看请求的是否完成已经它的完成结果

  1. type Request interface {
  2. Result
  3. Cancel() (Request, error)
  4. Done() <-chan struct{}
  5. GetRes() (int, error)
  6. // Can Only be used in ResultResolver
  7. SetResult(r0, r1 interface{}, err error) error
  8. }
  9. type Result interface {
  10. Fd() int
  11. Opcode() uint8
  12. GetRequestBuffer() (b0, b1 []byte)
  13. GetRequestBuffers() [][]byte
  14. GetRequestInfo() interface{}
  15. FreeRequestBuffer()
  16. Err() error
  17. ReturnValue0() interface{}
  18. ReturnValue1() interface{}
  19. ReturnFd() (int, error)
  20. ReturnInt() (int, error)
  21. }

利用 channel 便可以完成对异步 IO 的异步监听和同步监听

channel 带来的问题

当然使用 channel 又会带来其他的问题,比如 channel 满了以后,对 io_uring 完成队列的消费便会阻塞在向 channel 发送数据,阻塞时间过长也会导致 CQ 环溢出
比较好的解决方案是,在 channel 上抽象出一层 ResulterResulter 会对完成事件进行自动缓冲,当然这也会带来一定的代码复杂度,所以 iouring-go 便将 channel 阻塞的问题交给使用者,要求 channel 的消费端尽快消费掉数据

思考 io_uring 在 Go 中的发展

netpoll 在 Linux 平台下使用了 epoll,而且 epoll 在使用上并没有竞争问题,当然如果要使用 io_uring 来替代 epoll 来实现 netpoll 的话并不是不可能,只是这样对于工作很好的 epoll 来说并没有什么必要,而且是否能够带来可观的性能收益也都是不确定的
在高并发的情况下,有限的 SQ 环和 CQ 环,对于请求数量大于完成事件的消费速度的情况,CQ 环的大量溢出带来对内核的压力以及新的请求提交带来的错误处理,都会提高真正利用 io_uring 的难度
对于 SQ 环和 CQ 环的大小限制,也许需要通过 Pool 的方式来解决,初始化多个 io_uring 实例,当一个实例的 SQ 环满,那么就使用另外的实例来提交请求
而使用 Pool 又会增加一定的复杂度
io_uring 的功能实际可以覆盖了 epoll 的,比如提交的阻塞 IO 请求便相当于 epoll + syscall,另外 io_uring 还提供了超时设置和请求的超时控制,相当于实现了系统级的定时器以及 netpoll 的 deadline
但是 epoll 自身的优势,比如没有竞争问题,没有监听文件描述符的数量限制,都让 epoll 在实际的使用中更加好用,而这些问题对于 io_uring 在本身设计上就会导致的问题
比如竞争问题,使用环形缓冲区可以协调应用和内核对请求队列的访问,但是应用中多个线程或者 goroutine 就会引发对环形缓冲区的竞争问题
而请求数量的限制,那么就需要考虑到请求完成事件的溢出问题,内核不能无限制的去保存溢出的完成事件,当然这个问题通过应用中在 io_uring 实例上抽象出 io_uring 池的方式来解决
使用 io_uring 来实现异步网络框架,对已有的网络模型会是非常大的冲击,怎么去使用 io_uring 来发挥最大的能力依然处于探索阶段,毕竟 io_uring 是一个出现才 1 年的技术
而对于普通的磁盘 IO 来说,io_uring 还是有很大的发挥空间的,利用 Go 中已有的并发机制,结合具体的性能评估,对于文件服务器来说,也许会带来极大的提升
另外一个问题便是,对于 5.1 引入,5.6 开始功能变得丰富成熟的 io_uring 来说,现在大量的环境处于 3.X,4.X,甚至 2.X , io_uring 仍然需要等待时机才能去发挥它真正的作用,而这段时间便是留给我们去探讨怎么让 io_uring 更好用