Golang 中并发 IO 的现状
对于 Go 这种本身便是为并发而生的语言来说,使用 io_uring
这种系统级异步接口也不是那么的迫切
比如对于普通文件的读写以及 socket 的操作都会通过 netpoll
来进行优化,当文件/套接字可读可写时 netpoll 便会唤醒相应 goroutine 来对文件进行读写
而对于可能会阻塞的系统调用,在 syscall 底层调用 syscall.Syscall/Syscall6
时配合 runtime 判断是否将P 和 G 绑定的 M 解绑,然后将 P 交给其他 M 来使用,通过这种机制可以减少系统调用从用户态切换到内核态对整个程序带来的损耗
Go runtime 实际上已经实现了用户态的并发 IO,现在 Linux 内核提供了新的异步 IO 接口,那又该如何去利用这种新的技术呢
我们首先先看一下当前 Go 是如何做到异步 IO 的
IO 与 netpoll
文件 IO 与 netpoll
// src/os/file.go
func OpenFile(name string, flag int, perm FileMode) (*File, error) {
f, err := openFileNolog(name, flag, perm)
}
// src/os/file_unix.go
func openFileNolog(name string, flag int, perm FileMode) (*File, error) {
// ...
r, e = syscall.Open(name, flag|syscall.O_CLOEXEC, syscallMode(perm))
// ...
return newFile(uintptr(r), name, kindOpenFile), nil
}
// src/os/file_unix.go
func newFile(fd uintptr, name string, kind newFileKind) *File {
fdi := int(fd)
f := &File{&file{
pfd: poll.FD{
Sysfd: fdi,
IsStream: true,
ZeroReadIsEOF: true,
},
name: name,
stdoutOrErr: fdi == 1 || fdi == 2,
}}
pollable := kind == kindOpenFile || kind == kindPipe || kind == kindNonBlock
// ...
if err := f.pfd.Init("file", pollable); err != nil {
// ...
} else if pollable {
if err := syscall.SetNonblock(fdi, true); err == nil {
f.nonblock = true
}
return f
}
}
从 os.Open
到 newFile
,可以看到文件的 文件描述符
被放到 poll.FD
进行初始化了, poll.FD.Init
便是将文件描述符
注册到 netpoll(epoll)
中
需要注意当文件被注册到
netpoll(epoll)
后,会将它置为非阻塞模式(SetNonblock
),因为netpoll(epoll)
采用的是边缘触发模式 比如说非阻塞文件描述符中有可读事件时,epoll
只会通知一次(除非有新的数据被写入文件会再次通知),也就说需要所有数据读出来直到返回-EAGAIN
,对于阻塞模式的socket文件,当从socket中读取数据时就可能会阻塞等待,这样也就失去了 epoll 的意义
我们可以再看一下 poll.FD
是如何利用 netpoll
进行读取的
// src/internal/poll/fd_unix.go
func (fd *FD) Read(p []byte) (int, error) {
// ...
for {
n, err := ignoringEINTR(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}
可以看到 ignoringEINTR
中调用 syscall.Read
读取文件,如果出现 syscall.EAGAIN
,那么就调用 fd.pd.waitRead
来等待数据可读
// src/internal/poll/fd_unix.go
type FD struct {
// ...
Sysfd int
pd pollDesc
}
pollDesc
Go 对 netpoll
的抽象
// src/internal/poll/fd_poll_runtime.go
func runtime_pollServerInit()
func runtime_pollOpen(fd uintptr)(uintptr, int)
func runtime_pollClose(ctx uintptr)
// ...
type pollDesc struct {
runtimeCtx uintptr
}
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
// ...
pd.runtimeCtx = ctx
return nil
}
runtime_poll*
这些函数才是真正的 netpoll,而这些函数是在src/runtime/netpoll.go 中实现,并通过 go:linkname
来链接到 internal/poll 中
// src/runtime/netpoll.go
// go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
// ...
}
根据具体的平台来实现 poller,对于 Linux,便是使用 epoll
// src/runtime/netpoll_epoll.go
// 注册文件到 netpoll 中
func netpolllopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
// ...
return -epollctr(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
添加新的文件描述符时,可以发现fd
是以 边缘触发
的方式注册到 netpoll(epoll)
中
socket IO 与 netpoll
从 netpoll
这个名字上就可以看出,netpoll
是 Go 为了高性能的异步网络而实现的
看一下创建 TCPListener socket 的流程
// src/net/tcpsock.go
type TCPListener struct {
fd *netFD
// ...
}
// src/net/fd_posix.go
type netFD struct {
pfd poll.FD
// ...
}
// 1.
// src/net/tcpsock.go
func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error) {
sl := &sysListener{network: network, address: laddr.String()}
ln, err := sl.listenTCP(context.Background(), laddr)
// ...
}
// 2.
// src/net/tcpsock_posix.go
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
// ...
return &TCPListener{fd: fd, lc, sl.ListenConfig}, nil
}
// 3.
// src/net/ipsock_posix.go
func internelSocket(ctx context.Context, ...) (fd *netFD, err error) {
// ...
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, radddr, ctrlFn)
}
// 4.
// src/sock_posix.go
func socket(...) (fd *netFD, err error) {
s, err := sysSocket(family, sotype, proto)
// ...
fd, err = newFD(s, family, sotype, net)
}
// 5.
// src/fd_unix.go
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
// ...
},
// ...
}
return ret, nil
}
创建 TCPListener 链路还是挺长的,不过在第四步 socket
函数中可以看到调用 newFD
来返回 netFD
实例,而 netFD.pfd
便是 poll.FD
, 而对 netFD
的读写和文件IO一样便都会通过 poll.FD
来利用 netpoll
。
netpoll 唤醒 goroutine
挂起 goroutine
通过 poll.pollDesc
将文件描述符加入到 netpoll
后,当对文件描述符进行读写时,如果 syscall.Read
返回 syscall.EAGAIN
的话就需要调用 pollDesc.waitRead/waitWrite
来等待可读可写
// src/internal/poll/fd_poll_runtime.go
func (pd *pollDesc) waitRead(isFile bool) error{
return pd.wait('r', isFile)
}
func (pd *pollDesc) waitWrite(isFile bool) error{
return pd.wait('w', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error{
// ...
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
// src/runtime/netpoll.go
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// ...
for !netpollblock(pd, int32(mode), false) {
// ...
}
// ...
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
// ...
// 状态检查
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// ...
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
// ...
}
等待文件可读写,最终会调用 netpollblock
函数,并不会直接调用 epoll wait 的系统调用,而是挂起当前 goroutine, 并等待唤醒
唤醒 goroutine
// src/runtime/netpoll_epoll.go
func netpoll(delay int64) gList {
// ...
var waitms int32
// 计算 waitms,大概规则:
// delay < 0, waitms = -1,阻塞等待
// delay == 0, waitms = 0, 不阻塞
// delay > 0, delay 以纳秒为单位作为 waitms
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
// ...
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
// ...
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
netpollready(&toRun, pd, mode)
}
}
return toRun
}
netpoll
会调用 epollWait
来获取epoll事件,而在 runtime 中很多地方都会调用 netpoll
函数
监控函数 sysmon
// src/runtime/proc.go
func sysmon() {
// ....
for {
// ...
list := netpoll(0)
if !list.empty() {
//...
injectglist(&list) // 将 goroutine 放到 runable 队列中
}
}
}
查找可运行的 goroutine
// src/runtime/proc.go
func findrunable() (gp *g, inheritTIme bool) {
top:
// ...
if list := netpoll(0); !list.empty() {
gp := list.pop()
injectglist(&list)
// ...
return gp, false
}
// ....
stop:
// ...
list := netpoll(delta) // block until new work is available
// ...
}
GC 时调用 startTheWorld
// src/runtime/proc.go
func startTheWorld() {
systemstack(func() {startTheWorldWithSema(false)})
// ...
}
func startTheWorldWithSema(emitTraceEvent bool) int64 {
// ...
list := netpoll(0)
injectglist(&list)
// ...
}
通常获取可用的 goroutine 时都可能有机会去调用 netpoll
,然后再调用 injectglist(&list)
将可运行 goroutine 加入到runq
队列中
系统级异步接口 —— io_uring
本节不会详细介绍 io_uring 的具体操作,关于 io_uring 的使用,可以查看 Lord of the io_uring
Linux kernel 5.1 新增了异步接口 io_uring
,它是 Jens Axboe 以高效,可扩展,易用为目的设计的一种全新异步接口,为什么是全新呢,因为 Linux 已经提供了异步 IO 接口 —— AIO,不过就连 Linus 都对它一阵吐槽
Re: [PATCH 09/13] aio: add support for async openat()
So I think this is ridiculously ugly. AIO is a horrible ad-hoc design, with the main excuse being “other, less gifted people, made that design, and we are implementing it for compatibility because database people - who seldom have any shred of taste - actually use it”. But AIO was always really really ugly.
io_uring 提供的异步接口,不仅仅可以使用 文件 IO,套接字 IO,甚至未来可以扩展加入其它系统调用
而且 io_uring
采用应用程序和内核共享内存的方式,来提交请求和获取完成事件
使用共享内存的方式可能是内核对接口优化的一种趋势
iouring 名称的意思便是 _io use ring,而 ring 便是指和内核内存共享的 提交队列
和完成队列
两个环形缓冲区
SubmissionQueueEntry
我们来看一下 io_uring 用来提交请求的结构
struct io_uring_sqe {
__u8 opcode; /* 请求的操作类型 */
__u8 flags; /* IOSQE_ flags */
__u16 ioprio; /* ioprio for the request */
__s32 fd; /* 用于 IO 的文件描述符 */
union {
__u64 off; /* offset into file */
__u64 addr2;
};
union {
__u64 addr; /* pointer to buffer or iovecs */
__u64 splice_off_in;
};
__u32 len; /* buffer size or number of iovecs */
/*
* 用于特定操作的字段
*/
union {
__kernel_rwf_t rw_flags;
__u32 fsync_flags;
__u16 poll_events; /* compatibility */
__u32 poll32_events; /* word-reversed for BE */
__u32 sync_range_flags;
__u32 msg_flags;
__u32 timeout_flags;
__u32 accept_flags;
__u32 cancel_flags;
__u32 open_flags;
__u32 statx_flags;
__u32 fadvise_advice;
__u32 splice_flags;
};
__u64 user_data; /* 用来关联请求的完成事件 */
union {
struct {
/* pack this to avoid bogus arm OABI complaints */
union {
/* index into fixed buffers, if used */
__u16 buf_index;
/* for grouped buffer selection */
__u16 buf_group;
} __attribute__((packed));
/* personality to use, if used */
__u16 personality;
__s32 splice_fd_in;
};
__u64 __pad2[3];
};
};
io_uring_sqe
一个非常复杂的结构,最核心的三个字段 opcode
,fd
,user_data
opcode
指定具体是什么操作,比如IORING_OP_READV
,IORING_OP_ACCEPT
,IORING_OP_OPENAT
,现在支持 35 种操作fd
表示用于 IO 操作的文件描述符user_data
当请求操作完成后,io_uring 会生成一个完成事件放到完成队列(CompletionQueue)
中,而user_data
便是用来和完成队列
中的事件进行绑定的,他会原封不动的复制到完成队列的事件(cqe)
中flags
字段用来实现链式请求之类的功能可以发现
opcode
是uint8,也就是现在来看最多支持 256 个系统调用,但是整个io_uring_sqe
还为未来预留了一些空间__pad2
通过 opcode
结合结合其他 union
的字段,便实现了扩展性极强的 io_uring
接口
CompletionQueueEvent
完成队列事件(CompletionQueueEvent)
的结构就比较简单了,主要是表示提交的异步操作执行的结果
struct io_uring_cqe {
__u64 user_data; /* 直接复制 sqe->data */
__s32 res; /* 异步操作的结果 */
__u32 flags;
};
user_data
便是从 sqe->data
直接复制过来了,可以通过 user_data
绑定到对应的 sqe
res
便是异步操作执行的结果,如果 res < 0,通常说明操作执行错误flags
暂时没有使用
io_uring_setup
io_uring 使用共享内存的方式,来提交请求和获取执行结果,减少内存拷贝带来的损耗io_uring_setup
接口会接受一个指定提交队列大小的 uint32
类型参数和一个 io_uring_params
对象
#include <linux/io_uring.h>
int io_uring_setup(u32 entries, struct io_uring_params *p);
io_uring_params
struct io_uring_params {
__u32 sq_entries;
__u32 cq_entries;
__u32 flags;
__u32 sq_thread_cpu;
__u32 sq_thread_idle;
__u32 features;
__u32 wq_fd;
__u32 resv[3];
struct io_sqring_offsets sq_off;
struct io_cqring_offsets cq_off;
};
io_uring_params
不只用来配置 io_uring
实例,内核也会填充 io_uring_params
中关于 io_uring
实例的信息,比如用来映射共享内存的请求队列和完成队列字段的偏移量 - io_sqring_offsets
和 io_cqring_offsets
配置 io_uring
flags
以位掩码的方式,结合相应 sq_thread_cpu
,sq_thread_idle
,wq_fd
,cq_entries
字段来配置 io_uring 实例
/*
* io_uring_setup() flags
*/
#define IORING_SETUP_IOPOLL (1U << 0) /* io_context is polled */
#define IORING_SETUP_SQPOLL (1U << 1) /* SQ poll thread */
#define IORING_SETUP_SQ_AFF (1U << 2) /* sq_thread_cpu is valid */
#define IORING_SETUP_CQSIZE (1U << 3) /* app defines CQ size */
#define IORING_SETUP_CLAMP (1U << 4) /* clamp SQ/CQ ring sizes */
#define IORING_SETUP_ATTACH_WQ (1U << 5) /* attach to existing wq */
#define IORING_SETUP_R_DISABLED (1U << 6) /* start with ring disabled */
通常 cq_entries 为 sq_entries 的两倍,通过 flags
指定 IORING_SETUP_CQSIZE
,然后设置 cq_entries
字段为指定大小
cq_entries 不能小于 sq_entries
iouring-go 提供了初始化 io_uring 对象时的配置函数,可以看一下这些函数的具体实现
type IOURingOption func(*IOURing)
func New(entries uint, opts ...IOURingOption) (iour *IOURing, err error)
func WithParams(params *iouring_syscall.IOURingParams) IOURingOption
func WithAsync() IOURingOption
func WithDisableRing() IOURingOption
func WithCQSize(size uint32) IOURingOption
func WithSQPoll() IOURingOption
func WithSQPollThreadCPU(cpu uint32) IOURingOption
func WithSQPollThreadIdle(idle time.Duration) IOURingOption
内核填充信息
内核会向 io_uring_params
填充跟 io_uring 实例相关的信息sq_entries
请求队列的大小,io_uring_setup
会传递请求队列的大小 entries
,io_uring 会根据 entries
设置 sq_entries
为 2 的次方大小cq_entries
完成队列的大小,通常为 sq_entries
的两倍,即使通过 IORING_SETUP_CQSIZE
flag 设置了 cq_enries
,内核依然会以 2 的次方重新计算出 cq_entries
的大小features
记录了当前内核版本支持的一些功能
/*
* io_uring_params->features flags
*/
#define IORING_FEAT_SINGLE_MMAP (1U << 0)
#define IORING_FEAT_NODROP (1U << 1)
#define IORING_FEAT_SUBMIT_STABLE (1U << 2)
#define IORING_FEAT_RW_CUR_POS (1U << 3)
#define IORING_FEAT_CUR_PERSONALITY (1U << 4)
#define IORING_FEAT_FAST_POLL (1U << 5)
#define IORING_FEAT_POLL_32BITS (1U << 6)
#define IORING_FEAT_SQPOLL_NONFIXED (1U << 7)
io_sqring_offsets
和 io_cqring_offsets
便是SQ
和CQ
在共享内存中的偏移量
struct io_sqring_offsets {
__u32 head;
__u32 tail;
__u32 ring_mask;
__u32 ring_entries;
__u32 flags;
__u32 dropped;
__u32 array;
__u32 resv1;
__u64 resv2;
};
struct io_cqring_offsets {
__u32 head;
__u32 tail;
__u32 ring_mask;
__u32 ring_entries;
__u32 overflow;
__u32 cqes;
__u32 flags;
__u32 resv1;
__u64 resv2;
};
根据这些偏移量便可以调用 mmap 来映射 SQ 和 CQ
ptr = mmap(0, sq_off.array + sq_entries * sizeof(__u32),
PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE,
ring_fd, IORING_OFF_SQ_RING);
可以参考 iouring-go 对 IOURing 对象的初始化
// iouring-go/iouring.go
func New(entries uint, opts ...IOURingOption) (*IOURing, error) {
iour := &IOURing{
params: &iouring_syscall.IOURingParams{},
userDatas: make(map[uint64]*UserData),
cqeSign: make(chan struct{}, 1),
closer: make(chan struct{}),
closed: make(chan struct{}),
}
for _, opt := range opts {
opt(iour)
}
var err error
iour.fd, err = iouring_syscall.IOURingSetup(entries, iour.params)
if err != nil {
return nil, err
}
if err := mmapIOURing(iour); err != nil {
munmapIOURing(iour)
return nil, err
}
// ...
}
mmapIOURing
中实现了对请求队列以及完成队列的内存映射
// iouring-go/mmap.go
func mmapIOURing(iour *IOURing) (err error) {
defer func() {
if err != nil {
munmapIOURing(iour)
}
}()
iour.sq = new(SubmissionQueue)
iour.cq = new(CompletionQueue)
if err = mmapSQ(iour); err != nil {
return err
}
if (iour.params.Features & iouring_syscall.IORING_FEAT_SINGLE_MMAP) != 0 {
iour.cq.ptr = iour.sq.ptr
}
if err = mmapCQ(iour); err != nil {
return err
}
if err = mmapSQEs(iour); err != nil {
return err
}
return nil
}
这里不再详细介绍 io_uring
的使用 ,想要了解更多可以查看文末的推荐阅读
io_uring 的功能
这里简单介绍一下 io_uring
提供的一些功能,以及在 Go 中如何去使用
顺序执行
设置 sqe->flags
的 IOSQE_IO_DRAIN
标记,这样只有当该 sqe
之前所有的 sqes
都完成后,才会执行该 sqe
,而后续的 sqe
也会在该 sqe
完成后才会执行
在 iouring-go 中可以在构建 IOURing
对象时使用 WithDrain
来全局设置请求顺序执行
iour := iouring.New(8, WithDrain())
针对单一请求设置 WithDrain
,保证请求会在之前所有的请求都完成才会执行,而后续的请求也都会在该请求完成之后才会开始执行
request, err := iour.SubmitRequest(iouring.Read(fd, buf).WithDrain(), nil)
链式请求
io_uring
提供了一组请求的链式/顺序执行的方法,可以让链中的请求会在上一个请求执行完成后才会执行,而且不影响链外其他请求的并发执行
设置 sqe->flags
的 IOSQE_IO_LINK
标记后,下一个 sqe
和当前 sqe
自动组成新链或者当前 sqe
的链中,链中没有设置 IOSQWE_IO_LINK
的 sqe
便是链尾
如果链中的有请求执行失败了,那么链中后续的 sqe
都会被取消( cqe.res
为 -ECANCELED
)io_uring
还提供了以外一种设置链式请求的方式,设置 sqe->flags
为 IOSQE_IO_HARDLINK
flag,这种方式会让链中的请求忽略之前请求的结果,也就是说即使链中之前的请求执行失败了,也不会取消链中后边的请求
iouring-go 中可以使用 SubmitLinkRequests
或者 SubmitHardLinkRequests
方法来设置链式请求
preps := []iouring.PrepRequest{ iouring.Read(fd1, buf), iouring.Write(fd2, buf) }
requests, err := iour.SubmitLinkRequest(preps, nil)
请求取消
当请求提交后,还可以提交取消请求的请求,这样如果请求还没有执行或者请求的操作可以被中断(比如 socket IO),那么就可以被异步的取消,而对于已经启动的磁盘IO请求则无法取消
在 iouring-go 中,提交请求后会返回一个 iouring.Request
对象,通过request.Cancel
方法就可以取消请求
request, err := iour.SubmitRequest(iouring.Timeout(1 * time.Second), nil)
cancelRequest, err := request.Cancel()
Cancel
方法会返回一个 cancelRequest 对象,表示提交的取消请求
可以监听 request
的执行是否失败,并且失败原因是否为 iouring.ErrRequestCanceled
<- request.Done()
if err := request.Err(); if err != nil {
if err == iouring.ErrRequestCanceled {
fmt.Println("request is canceled")
}
}
也可去监听 cancelRequest 的执行结果,如果cancelRequest.Err
方法返回 nil
,便是可能成功取消了,注意是可能取消了,因为一些操作是无法被取消的
<- cancelRequest.Done()
if err := cancelRequest.Err(); if err != nil{
if err == iouring.ErrRequestNotFound(){
fmt.Println("canceled request is not found")
}
// do something
}
定时和请求完成计数
io_uring
提供了 IORING_OP_TIMEOUT
请求,可以用来提交超时请求
超时请求可以分为三种:
- 相对时间超时
- 绝对时间超时
- 对请求完成计数,到达指定的完成事件数量后,超时请求就会完成
iouring-go 对这三种情况封装了三个函数 iouring.Timeout
,iouring.TimeoutWithTime
,iouring.CountCompletionEvent
来分别代表三种超时请求
now := time.Now()
request, err := iouring.SubmitRequest(iouring.Timeout(2 * time.Second), nil)
if err != nil {
panic(err)
}
<- request.Done()
fmt.Println(time.Now().Sub(now))
根据 io_uring
提供的超时请求,可以实现系统级的异步定时器
请求超时
io_uring
通过 IOSQE_IO_LINK
将一个请求和 IORING_OP_LINK_TIMEOUT
请求链接在一起,那么就可以做到请求的超时控制
iouring-go 同样提供了简便方法 WithTimeout
preps := iouring.Read(fd, buf).WithTimeout()
WithTimeout
方法会返回两个 PrepRequest
对象,所以需要使用 SubmitRequests
来提交
iouring-go 中请求超时的一些操作使用起来感觉还不是特别友好,有待优化
注册文件
io_uring
的一些 IO 操作需要提供文件描述符,而频繁的将文件和内核之间进行映射也会导致一定的性能损耗,所以可以使用 io_uring
的 io_uring_register
接口来提前注册文件描述符
详细的概念可以参考 io_uring_register
iouring-go 也提供了文件描述符的注册功能,而且对于已经注册的文件描述符会自动使用
func (iour *IOURing) RegisterFile(file *os.File) error
func (iour *IOURing) RegisterFiles(files []*os.File) error
func (iour *IOURing) UnregisterFile(file *os.File) error
func (iour *IOURing) UnregisterFiles(files []*os.File) error
当 io_uring
的文件描述符被关闭后,这些注册的文件会自动注销
需要注意,调用 io_uring_register
来注册文件描述符时,如果有其他的正在进行的请求的话,会等到这些请求都完成才会注册
注册文件描述符在 Go 中带来的并发问题
type fileRegister struct {
lock sync.Mutex
iouringFd int
fds []int32
sparseindexs map[int]int
registered bool
indexs sync.Map
}
需要注意由于存在对索引 fileRegister.indexs
的并发读写,所以使用 sync.Map
,也就意味着,使用注册文件描述符,会带来一定的并发问题,经过简单的测试,sync.Map
带来的性能损耗导致注册文件描述符带来的优势并没有那么大的
在 Go 中使用
io_uring
的最大问题便是对io_uring
实例的竞争问题,而通过 Go 暴露给外部使用的并发机制,并不能让io_uring
带来的异步 IO 发挥最大的性能 将io_uring
融入 runtime 中,才是最终的解决方案
注册缓冲区
和注册文件描述符类似, io_uring
为了减少 IO 请求中缓冲区的映射,同样可以使用 io_uring_register
来注册缓冲区
如果要在请求中使用缓冲区的话,需要使用 IORING_OP_READ_FIXED
或者 IORING_OP_WRITE_FIXED
请求
具体可以参考 io_uring_register
内核侧请求队列轮询
将请求放到SQ
的环形缓冲区后,需要调用 io_uring_enter
来通知内核有请求需要处理
io_uring 为了进一步减少系统调用,可以在 io_uring_setup
是设置 io_uring_params->flags
的 IORING_SETUP_SQPOLL
flags,内核就会创建一个轮询请求队列的线程
可以通过 ps
命令查看用来轮询的内核线程
ps --ppid 2 | grep io_uring-sq
需要注意在 5.10 之前的版本,需要使用特权用户来执行,而 5.10 以后只需 CAP_SYS_NICE
权限即可
并且 5.10 之前,SQPoll 需要配合注册的文件描述符一起使用,而 5.10 以后则不需要,可以通过查看内核填充的 io_uring_params->features
是否设置了 IORING_FEAT_SQPOLL_NONFIXED
// iouring-go/iouring.go
func (iour *IOURing) doRequest(sqe *iouring_syscall.SubmissionQueueEntry, request PrepRequest, ch chan<- Result) (*UserData, error) {
// ...
if sqe.Fd() >= 0 {
if index, ok := iour.fileRegister.GetFileIndex(int32(sqe.Fd())); ok {
sqe.SetFdIndex(int32(index))
} else if iour.Flags&iouring_syscall.IORING_SETUP_SQPOLL != 0 &&
iour.Features&iouring_syscall.IORING_FEAT_SQPOLL_NONFIXED == 0 {
return nil, ErrUnregisteredFile
}
}
// ...
}
iouring-go 同样提供了开启 SQPoll 的 WithSQPoll
以及设置与 SQPoll 内核线程的相关配置 WithSQPollThreadCpu
和 WithSQPollThreadIdle
iour, err := iouring.New(8, iouring.WithSQPoll())
但是在 Go 简单的设置 io_uring_params
并不能正常的工作,可能是由于 Go 的 GMP 模型导致的一些问题。暂时还在思考解决方案
注册 eventfd,利用 epoll
通过 io_uring_register
可以将 eventfd
注册到 io_uring 实例中,然后将 eventfd
加入到 epoll
中,如果当 io_uring 中有完成事件时,便会通知 eventfd
在 iouring-go 中,对于完成事件的监听便是使用了 eventfd
和 epoll
type IOURing struct {
eventfd int
cqeSign chan struct{}
// ...
}
func New() (*IOURing, error) {
// ....
if err := iour.registerEventfd(); err != nil {
return nil, err
}
if err := registerIOURing(iour); err != nil {
return nil, err
}
// ...
}
func (iour *IOURing) registerEventfd() error {
eventfd, err := unix.Eventfd(0, unix.EFD_NONBLOCK|unix.FD_CLOEXEC)
if err != nil {
return os.NewSyscallError("eventfd", err)
}
iour.eventfd = eventfd
return iouring_syscall.IOURingRegister(
iour.fd,
iouring_syscall.IOURING_REGISTER_EVENTFD,
unsafe.Pointer(&iour.eventfd), 1,
)
}
func registerIOURing(iour *IOURing) error {
if err := initpoller(); err != nil {
return err
}
if err := unix.EpollCtl(poller.fd, unix.EPOLL_CTL_ADD, iour.eventfd,
&unix.EpollEvent{Fd: int32(iour.eventfd), Events: unix.EPOLLIN | unix.EPOLLET},
); err != nil {
return os.NewSyscallError("epoll_ctl_add", err)
}
poller.Lock()
poller.iours[iour.eventfd] = iour
poller.Unlock()
return nil
}
poller
会调用 EpollWait 等待完成队列
中有完成事件,并通知相应的 IOURing 对象
// iouring-go/iouring.go
func (iour *IOURing) getCQEvent(wait bool) (cqe *iouring_syscall.CompletionQueueEvent, err error) {
var tryPeeks int
for {
if cqe = iour.cq.peek(); cqe != nil {
iour.cq.advance(1)
return
}
if !wait && !iour.sq.cqOverflow() {
err = syscall.EAGAIN
return
}
if iour.sq.cqOverflow() {
_, err = iouring_syscall.IOURingEnter(iour.fd, 0, 0, iouring_syscall.IORING_ENTER_FLAGS_GETEVENTS, nil)
if err != nil {
return
}
continue
}
if tryPeeks++; tryPeeks < 3 {
runtime.Gosched()
continue
}
select {
case <-iour.cqeSign:
case <-iour.closer:
return nil, ErrIOURingClosed
}
}
}
// iouring-go/poller.go
func (poller *iourPoller) run() {
for {
n, err := unix.EpollWait(poller.fd, poller.events, -1)
if err != nil {
continue
}
for i := 0; i < n; i++ {
fd := int(poller.events[i].Fd)
poller.Lock()
iour, ok := poller.iours[fd]
poller.Unlock()
if !ok {
continue
}
select {
case iour.cqeSign <- struct{}{}:
default:
}
}
poller.adjust()
}
}
保证数据不丢失
默认情况下, CQ 环的大小是 SQ 环的 两倍,为什么 SQ 环的大小会小于 CQ 环,是因为 SQ 环中的 sqe 一旦被内核发现,便会被内核消耗掉,也就意味着 sqe 的生命周期很短,而请求的完成事件都会放到 CQ 环中
我们也可以通过 IORING_SETUP_CQSIZE
或者 iouring-go
的 WithCQSize
Option 里设置 CQ 环的大小
但是依然会存在 CQ 环溢出的情况,而内核会在内部存储溢出的时间,直到 CQ 环有空间容纳更多事件。
可以通过 io_uring_params->features 是否设置 IORING_FEAT_NODROP 来判断当前内核是否支持该功能
如果 CQ 环溢出,那么提交请求时可能会以 -EBUSY
错误失败,需要重新提交
并且当 CQ 环中数据被消耗后,需要调用 io_uring_enter
来通知内核 CQ 环中有空余空间
func (iour *IOURing) getCQEvent(wait bool) (cqe *iouring_syscall.CompletionQueueEvent, err error) {
// ...
if iour.sq.cqOverflow() {
_, err := iour.syscall.IOURingEnter(iour.fd, 0, 0, iouring_syscall.IORING_ENTER_FLAGS_GETEVENTS, nil)
if err != nil{
return
}
continue
}
// ...
}
io_uring 与 Go —— iouring-go
竞争问题
在实现 iouring-go 中遇到的问题,一个是并发导致对 io_uring
的竞争问题
对于 CQ 环的竞争是使用单一的 CQ 环消费 goroutine IOURing.run()
来完成 cqe
的消费
func New(entries int, opts ...IOURingOption) (iour *IOURing, err error) {
iour := &IOURing{...}
// ...
go iour.run()
return
}
func (iour *IOURing) run() {
for {
cqe, err := iour.getCQEvent(true)
// ...
}
}
SQ 环的解决方案有两种
- 使用单独的提交 goroutine,将需要提交的请求通过内部 channel 发送给提交 goroutine,这样保证了 SQ 环的单一生产者
- 使用锁的方式,对于提交请求的函数加锁,保证同一时间只有一个 goroutine 在提交请求
第一种方式听起来使用 channel 更优雅一些,但是 channel 内部依然使用锁的方式以及额外的内存复制
另外最大的弊端就是将 IOURIng
的提交函数
(将请求发送给提交channel)和真正将请求提交给内核(调用 io_uring_enter
通知内核有新的请求)分开
当多个提交函数
向 channel 发送的请求的顺序无法保证,这样链式请求就无法实现(除非对于链式请求再次加锁)
第二种方式,采用加锁的方式,保证了同一时间只有一个提交函数在处理 SQ 环,并且可以立即是否真正提交成功(调用 IOURing.submit
方法通知内核有新的请求)
iouring-go 采用了第二种方式
真正去解决这个问题的方式,估计可能只有 runtime 才能给出答案,为每一个 P 创建一个 io_uring 实例在 runtime 内部解决竞争问题,内部使用 eventfd 注册到 netpoll
中来获取完成队列
通知
io_uring 与 channel
对于 iouring-go 设计比较好的地方,我感觉便是对 channel 的利用,异步 IO 加上 channel,可以将异步在并发的程序中发挥出最大的作用
当然,如果只是简单的使用 channel 的话又会引入其他一些问题,后续会进行说明
func (iour *IOURing) SubmitRequest(request PrepRequest, ch chan<- Result) (Request, error)
SubmitRequest
方法接收一个 channel,当请求完成后,会将结果发送到 channel 中,这样通过多个请求复用同一个 channel,程序便可以监听一组请求的完成情况
func (iour *IOURing) run() {
for {
cqe, err := iour.getCQEvent(true)
// ...
userData := iour.userData[cqe.UserData]
// ...
userData.request.complate(cqe)
if userData.resulter != nil {
userData.resulter <- userData.request
}
}
}
而 SubmitRequest
方法同样会返回一个 Request 接口对象,通过 Request 我们同样可以去查看请求的是否完成已经它的完成结果
type Request interface {
Result
Cancel() (Request, error)
Done() <-chan struct{}
GetRes() (int, error)
// Can Only be used in ResultResolver
SetResult(r0, r1 interface{}, err error) error
}
type Result interface {
Fd() int
Opcode() uint8
GetRequestBuffer() (b0, b1 []byte)
GetRequestBuffers() [][]byte
GetRequestInfo() interface{}
FreeRequestBuffer()
Err() error
ReturnValue0() interface{}
ReturnValue1() interface{}
ReturnFd() (int, error)
ReturnInt() (int, error)
}
利用 channel 便可以完成对异步 IO 的异步监听和同步监听
channel 带来的问题
当然使用 channel 又会带来其他的问题,比如 channel 满了以后,对 io_uring 完成队列的消费便会阻塞在向 channel 发送数据,阻塞时间过长也会导致 CQ 环溢出
比较好的解决方案是,在 channel 上抽象出一层 Resulter
,Resulter
会对完成事件进行自动缓冲,当然这也会带来一定的代码复杂度,所以 iouring-go 便将 channel 阻塞的问题交给使用者,要求 channel 的消费端尽快消费掉数据
思考 io_uring 在 Go 中的发展
netpoll
在 Linux 平台下使用了 epoll
,而且 epoll
在使用上并没有竞争问题,当然如果要使用 io_uring
来替代 epoll
来实现 netpoll
的话并不是不可能,只是这样对于工作很好的 epoll 来说并没有什么必要,而且是否能够带来可观的性能收益也都是不确定的
在高并发的情况下,有限的 SQ 环和 CQ 环,对于请求数量大于完成事件的消费速度的情况,CQ 环的大量溢出带来对内核的压力以及新的请求提交带来的错误处理,都会提高真正利用 io_uring
的难度
对于 SQ 环和 CQ 环的大小限制,也许需要通过 Pool 的方式来解决,初始化多个 io_uring 实例,当一个实例的 SQ 环满,那么就使用另外的实例来提交请求
而使用 Pool 又会增加一定的复杂度io_uring
的功能实际可以覆盖了 epoll
的,比如提交的阻塞 IO 请求便相当于 epoll
+ syscall
,另外 io_uring
还提供了超时设置和请求的超时控制,相当于实现了系统级的定时器以及 netpoll
的 deadline
但是 epoll 自身的优势,比如没有竞争问题,没有监听文件描述符的数量限制,都让 epoll 在实际的使用中更加好用,而这些问题对于 io_uring
在本身设计上就会导致的问题
比如竞争问题,使用环形缓冲区可以协调应用和内核对请求队列的访问,但是应用中多个线程或者 goroutine 就会引发对环形缓冲区的竞争问题
而请求数量的限制,那么就需要考虑到请求完成事件的溢出问题,内核不能无限制的去保存溢出的完成事件,当然这个问题通过应用中在 io_uring
实例上抽象出 io_uring 池
的方式来解决
使用 io_uring
来实现异步网络框架,对已有的网络模型会是非常大的冲击,怎么去使用 io_uring
来发挥最大的能力依然处于探索阶段,毕竟 io_uring
是一个出现才 1 年的技术
而对于普通的磁盘 IO 来说,io_uring
还是有很大的发挥空间的,利用 Go 中已有的并发机制,结合具体的性能评估,对于文件服务器来说,也许会带来极大的提升
另外一个问题便是,对于 5.1 引入,5.6 开始功能变得丰富成熟的 io_uring
来说,现在大量的环境处于 3.X,4.X,甚至 2.X , io_uring
仍然需要等待时机才能去发挥它真正的作用,而这段时间便是留给我们去探讨怎么让 io_uring
更好用