流的实现在Libuv里占了很大的篇幅,是非常核心的逻辑。流的本质是封装了对文件描述符的操作,例如读、写,连接、监听。我们首先看看数据结构,流在Libuv里用uv_stream_s表示,继承于uv_handle_s。
struct uv_stream_s {// uv_handle_s的字段void* data;// 所属事件循环uv_loop_t* loop;// handle类型uv_handle_type type;// 关闭handle时的回调uv_close_cb close_cb;// 用于插入事件循环的handle队列void* handle_queue[2];union {int fd;void* reserved[4];} u;// 用于插入事件循环的closing阶段uv_handle_t* next_closing;// 各种标记unsigned int flags;// 流拓展的字段/*户写入流的字节大小,流缓存用户的输入,然后等到可写的时候才执行真正的写*/size_t write_queue_size;// 分配内存的函数,内存由用户定义,用来保存读取的数据uv_alloc_cb alloc_cb;// 读回调uv_read_cb read_cb;// 连接请求对应的结构体uv_connect_t *connect_req;/*关闭写端的时候,发送完缓存的数据,执行shutdown_req的回调(shutdown_req在uv_shutdown的时候赋值)*/uv_shutdown_t *shutdown_req;/*流对应的IO观察者*/uv__io_t io_watcher;// 缓存待写的数据,该字段用于插入队列void* write_queue[2];// 已经完成了数据写入的队列,该字段用于插入队列void* write_completed_queue[2];// 有连接到来并且完成三次握手后,执行的回调uv_connection_cb connection_cb;// 操作流时出错码int delayed_error;// accept返回的通信socket对应的文件描述int accepted_fd;// 同上,用于IPC时,缓存多个传递的文件描述符void* queued_fds;}
流的实现中,最核心的字段是IO观察者,其余的字段是和流的性质相关的。IO观察者封装了流对应的文件描述符和文件描述符事件触发时的回调。比如读一个流、写一个流、关闭一个流、连接一个流、监听一个流,在uv_stream_s中都有对应的字段去支持。但是本质上是靠IO观察者去驱动的。
1 读一个流,就是IO观察者中的文件描述符的可读事件触发时,执行用户的读回调。
2 写一个流,先把数据写到流中,等到IO观察者中的文件描述符可写事件触发时,执行真正的写入,并执行用户的写结束回调。
3 关闭一个流,就是IO观察者中的文件描述符可写事件触发时,就会执行关闭流的写端。如果流中还有数据没有写完,则先写完(比如发送)后再执行关闭操作,接着执行用户的回调。
4 连接流,比如作为客户端去连接服务器。就是IO观察者中的文件描述符可读事件触发时(比如建立三次握手成功),执行用户的回调。
5 监听流,就是IO观察者中的文件描述符可读事件触发时(比如有完成三次握手的连接),执行用户的回调。
下面我们看一下流的具体实现
5.1 初始化流
在使用uv_stream_t之前需要首先初始化,我们看一下如何初始化一个流。
void uv__stream_init(uv_loop_t* loop,uv_stream_t* stream,uv_handle_type type) {int err;// 记录handle的类型uv__handle_init(loop, (uv_handle_t*)stream, type);stream->read_cb = NULL;stream->alloc_cb = NULL;stream->close_cb = NULL;stream->connection_cb = NULL;stream->connect_req = NULL;stream->shutdown_req = NULL;stream->accepted_fd = -1;stream->queued_fds = NULL;stream->delayed_error = 0;QUEUE_INIT(&stream->write_queue);QUEUE_INIT(&stream->write_completed_queue);stream->write_queue_size = 0;/*初始化IO观察者,把文件描述符(这里还没有,所以是-1)和回调uv__stream_io记录在io_watcher上,fd的事件触发时,统一由uv__stream_io函数处理,但也会有特殊情况(下面会讲到)*/uv__io_init(&stream->io_watcher, uv__stream_io, -1);}
初始化一个流的逻辑很简单明了,就是初始化相关的字段,需要注意的是初始化IO观察者时,设置的处理函数是uv__stream_io,后面我们会分析这个函数的具体逻辑。
5.2 打开流
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {// 还没有设置fd或者设置的同一个fd则继续,否则返回UV_EBUSYif (!(stream->io_watcher.fd == -1 ||stream->io_watcher.fd == fd))return UV_EBUSY;// 设置流的标记stream->flags |= flags;// 是TCP流则可以设置下面的属性if (stream->type == UV_TCP) {// 关闭nagle算法if ((stream->flags & UV_HANDLE_TCP_NODELAY) &&uv__tcp_nodelay(fd, 1))return UV__ERR(errno);/*开启keepalive机制*/if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&uv__tcp_keepalive(fd, 1, 60)) {return UV__ERR(errno);}}/*保存socket对应的文件描述符到IO观察者中,Libuv会在Poll IO阶段监听该文件描述符*/stream->io_watcher.fd = fd;return 0;}
打开一个流,本质上就是给这个流关联一个文件描述符,后续的操作的时候都是基于这个文件描述符的,另外还有一些属性的设置。
5.3 读流
我们在一个流上执行uv_read_start后,流的数据(如果有的话)就会通过read_cb回调源源不断地流向调用方。
int uv_read_start(uv_stream_t* stream,uv_alloc_cb alloc_cb,uv_read_cb read_cb) {// 流已经关闭,不能读if (stream->flags & UV_HANDLE_CLOSING)return UV_EINVAL;// 流不可读,说明可能是只写流if (!(stream->flags & UV_HANDLE_READABLE))return -ENOTCONN;// 标记正在读stream->flags |= UV_HANDLE_READING;// 记录读回调,有数据的时候会执行这个回调stream->read_cb = read_cb;// 分配内存函数,用于存储读取的数据stream->alloc_cb = alloc_cb;// 注册等待读事件uv__io_start(stream->loop, &stream->io_watcher, POLLIN);// 激活handle,有激活的handle,事件循环不会退出uv__handle_start(stream);return 0;}
执行uv_read_start本质上是给流对应的文件描述符在epoll中注册了一个等待可读事件,并记录相应的上下文,比如读回调函数,分配内存的函数。接着打上正在做读取操作的标记。当可读事件触发的时候,读回调就会被执行,除了读取数据,还有一个读操作就是停止读取。对应的函数是uv_read_stop。
int uv_read_stop(uv_stream_t* stream) {// 是否正在执行读取操作,如果不是,则没有必要停止if (!(stream->flags & UV_HANDLE_READING))return 0;// 清除正在读取的标记stream->flags &= ~UV_HANDLE_READING;// 撤销等待读事件uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);// 对写事件也不感兴趣,停掉handle。允许事件循环退出if (!uv__io_active(&stream->io_watcher, POLLOUT))uv__handle_stop(stream);stream->read_cb = NULL;stream->alloc_cb = NULL;return 0;}
另外还有一个辅助函数,判断流是否设置了可读属性。
int uv_is_readable(const uv_stream_t* stream) {return !!(stream->flags & UV_HANDLE_READABLE);}
上面的函数只是注册和注销读事件,如果可读事件触发的时候,我们还需要自己去读取数据,我们看一下真正的读逻辑
static void uv__read(uv_stream_t* stream) {uv_buf_t buf;ssize_t nread;struct msghdr msg;char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];int count;int err;int is_ipc;// 清除读取部分标记stream->flags &= ~UV_STREAM_READ_PARTIAL;count = 32;/*流是Unix域类型并且用于IPC,Unix域不一定用于IPC,用作IPC可以支持传递文件描述符*/is_ipc = stream->type == UV_NAMED_PIPE &&((uv_pipe_t*) stream)->ipc;// 设置了读回调,正在读,count大于0while (stream->read_cb&& (stream->flags & UV_STREAM_READING)&& (count-- > 0)) {buf = uv_buf_init(NULL, 0);// 调用调用方提供的分配内存函数,分配内存承载数据stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);/*不是IPC则直接读取数据到buf,否则用recvmsg读取数据和传递的文件描述符(如果有的话)*/if (!is_ipc) {do {nread = read(uv__stream_fd(stream),buf.base,buf.len);}while (nread < 0 && errno == EINTR);} else {/* ipc uses recvmsg */msg.msg_flags = 0;msg.msg_iov = (struct iovec*) &buf;msg.msg_iovlen = 1;msg.msg_name = NULL;msg.msg_namelen = 0;msg.msg_controllen = sizeof(cmsg_space);msg.msg_control = cmsg_space;do {nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);}while (nread < 0 && errno == EINTR);}// 读失败if (nread < 0) {// 读繁忙if (errno == EAGAIN || errno == EWOULDBLOCK) {// 执行读回调stream->read_cb(stream, 0, &buf);} else {/* Error. User should call uv_close(). */// 读失败stream->read_cb(stream, -errno, &buf);}return;} else if (nread == 0) {// 读到结尾了uv__stream_eof(stream, &buf);return;} else {// 读成功,读取数据的长度ssize_t buflen = buf.len;/*是IPC则解析读取的数据,把文件描述符解析出来,放到stream的accepted_fd和queued_fds字段*/if (is_ipc) {err = uv__stream_recv_cmsg(stream, &msg);if (err != 0) {stream->read_cb(stream, err, &buf);return;}}// 执行读回调stream->read_cb(stream, nread, &buf);}}}
uv_read除了可以读取一般的数据外,还支持读取传递的文件描述符。我们看一下描述符传递的原理。我们知道,父进程fork出子进程的时候,子进程是继承父进程的文件描述符列表的。我们看一下进程和文件描述符的关系。
fork之前如图5-1所示。

我们再看一下fork之后的结构如图5-2所示。

如果父进程或者子进程在fork之后创建了新的文件描述符,那父子进程间就不能共享了,假设父进程要把一个文件描述符传给子进程,那怎么办呢?根据进程和文件描述符的关系。传递文件描述符要做的事情,不仅仅是在子进程中新建一个fd,还要建立起fd->file->inode的关联,不过我们不需要关注这些,因为操作系统都帮我们处理了,我们只需要通过sendmsg把想传递的文件描述符发送给Unix域的另一端。Unix域另一端就可以通过recvmsg把文件描述符从数据中读取出来。接着使用uv__stream_recv_cmsg函数保存数据里解析出来的文件描述符。
static int uv__stream_recv_cmsg(uv_stream_t* stream,struct msghdr* msg) {struct cmsghdr* cmsg;// 遍历msgfor (cmsg = CMSG_FIRSTHDR(msg);cmsg != NULL;cmsg = CMSG_NXTHDR(msg, cmsg)) {char* start;char* end;int err;void* pv;int* pi;unsigned int i;unsigned int count;pv = CMSG_DATA(cmsg);pi = pv;start = (char*) cmsg;end = (char*) cmsg + cmsg->cmsg_len;count = 0;while (start + CMSG_LEN(count * sizeof(*pi)) < end)count++;for (i = 0; i < count; i++) {/*accepted_fd代表当前待处理的文件描述符,如果已经有值则剩余描述符就通过uv__stream_queue_fd排队如果还没有值则先赋值*/if (stream->accepted_fd != -1) {err = uv__stream_queue_fd(stream, pi[i]);} else {stream->accepted_fd = pi[i];}}}return 0;}
uvstream_recv_cmsg会从数据中解析出一个个文件描述符存到stream中,第一个文件描述符保存在accepted_fd,剩下的使用uvstream_queue_fd处理。
struct uv__stream_queued_fds_s {unsigned int size;unsigned int offset;int fds[1];};static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {uv__stream_queued_fds_t* queued_fds;unsigned int queue_size;// 原来的内存queued_fds = stream->queued_fds;// 没有内存,则分配if (queued_fds == NULL) {// 默认8个queue_size = 8;/*一个元数据内存+多个fd的内存(前面加*代表解引用后的值的类型所占的内存大小,减一是因为uv__stream_queued_fds_t结构体本身有一个空间)*/queued_fds = uv__malloc((queue_size - 1) *sizeof(*queued_fds->fds) +sizeof(*queued_fds));if (queued_fds == NULL)return UV_ENOMEM;// 容量queued_fds->size = queue_size;// 已使用个数queued_fds->offset = 0;// 指向可用的内存stream->queued_fds = queued_fds;// 之前的内存用完了,扩容} else if (queued_fds->size == queued_fds->offset) {// 每次加8个queue_size = queued_fds->size + 8;queued_fds = uv__realloc(queued_fds,(queue_size - 1) * sizeof(*queued_fds->fds) + sizeof(*queued_fds));if (queued_fds == NULL)return UV_ENOMEM;// 更新容量大小queued_fds->size = queue_size;// 保存新的内存stream->queued_fds = queued_fds;}/* Put fd in a queue */// 保存fdqueued_fds->fds[queued_fds->offset++] = fd;return 0;}
内存结构如图5-3所示。

最后我们看一下读结束后的处理,
static void uv__stream_eof(uv_stream_t* stream,const uv_buf_t* buf) {// 打上读结束标记stream->flags |= UV_STREAM_READ_EOF;// 注销等待可读事件uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);// 没有注册等待可写事件则停掉handle,否则会影响事件循环的退出if (!uv__io_active(&stream->io_watcher, POLLOUT))uv__handle_stop(stream);uv__stream_osx_interrupt_select(stream);// 执行读回调stream->read_cb(stream, UV_EOF, buf);// 清除正在读标记stream->flags &= ~UV_STREAM_READING;}
我们看到,流结束的时候,首先注销等待可读事件,然后通过回调通知上层。
5.4 写流
我们在流上执行uv_write就可以往流中写入数据。
int uv_write(/*一个写请求,记录了需要写入的数据和信息。数据来自下面的const uv_buf_t bufs[]*/uv_write_t* req,// 往哪个流写uv_stream_t* handle,// 需要写入的数据const uv_buf_t bufs[],// uv_buf_t个数unsigned int nbufs,// 写完后执行的回调uv_write_cb cb) {return uv_write2(req, handle, bufs, nbufs, NULL, cb);}
uv_write是直接调用uv_write2。第四个参数是NULL。代表是一般的写数据,不传递文件描述符。
int uv_write2(uv_write_t* req,uv_stream_t* stream,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t* send_handle,uv_write_cb cb) {int empty_queue;// 待发送队列是否为空empty_queue = (stream->write_queue_size == 0);// 构造一个写请求uv__req_init(stream->loop, req, UV_WRITE);// 写请求对应的回调req->cb = cb;// 写请求对应的流req->handle = stream;req->error = 0;// 需要发送的文件描述符,也可以是NULL说明不需要发送文件描述符req->send_handle = send_handle;QUEUE_INIT(&req->queue);// bufs指向待写的数据req->bufs = req->bufsml;// 复制调用方的数据过来memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));// buf个数req->nbufs = nbufs;// 当前写成功的buf索引,针对bufs数组req->write_index = 0;// 待写的数据大小 = 之前的大小 + 本次大小stream->write_queue_size += uv__count_bufs(bufs, nbufs);// 插入待写队列QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);// 非空说明正在连接,还不能写,比如TCP流if (stream->connect_req) {/* Still connecting, do nothing. */}else if (empty_queue) { // 当前待写队列为空,直接写uv__write(stream);}else {// 还有数据没有写完,注册等待写事件uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);uv__stream_osx_interrupt_select(stream);}return 0;}
uv_write2的主要逻辑就是封装一个写请求,插入到流的待写队列。然后根据当前流的情况。看是直接写入还是等待会再写入。关系图大致如图5-4所示。

uv_write2只是对写请求进行一些预处理,真正执行写的函数是uv__write
static void uv__write(uv_stream_t* stream) {struct iovec* iov;QUEUE* q;uv_write_t* req;int iovmax;int iovcnt;ssize_t n;int err;start:// 没有数据需要写if (QUEUE_EMPTY(&stream->write_queue))return;q = QUEUE_HEAD(&stream->write_queue);req = QUEUE_DATA(q, uv_write_t, queue);// 从哪里开始写iov = (struct iovec*) &(req->bufs[req->write_index]);// 还有多少没写iovcnt = req->nbufs - req->write_index;// 最多可以写多少iovmax = uv__getiovmax();// 取最小值if (iovcnt > iovmax)iovcnt = iovmax;// 需要传递文件描述符if (req->send_handle) {int fd_to_send;struct msghdr msg;struct cmsghdr *cmsg;union {char data[64];struct cmsghdr alias;} scratch;if (uv__is_closing(req->send_handle)) {err = -EBADF;goto error;}// 待发送的文件描述符fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);memset(&scratch, 0, sizeof(scratch));msg.msg_name = NULL;msg.msg_namelen = 0;msg.msg_iov = iov;msg.msg_iovlen = iovcnt;msg.msg_flags = 0;msg.msg_control = &scratch.alias;msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));cmsg = CMSG_FIRSTHDR(&msg);cmsg->cmsg_level = SOL_SOCKET;cmsg->cmsg_type = SCM_RIGHTS;cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));{void* pv = CMSG_DATA(cmsg);int* pi = pv;*pi = fd_to_send;}do {// 使用sendmsg函数发送文件描述符n = sendmsg(uv__stream_fd(stream), &msg, 0);}while (n == -1 && errno == EINTR);} else {do {// 写一个或者写批量写if (iovcnt == 1) {n = write(uv__stream_fd(stream),iov[0].iov_base,iov[0].iov_len);} else {n = writev(uv__stream_fd(stream), iov, iovcnt);}}while (n == -1 && errno == EINTR);}// 写失败if (n < 0) {/*不是写繁忙,则报错,否则如果设置了同步写标记,则继续尝试写*/if (errno != EAGAIN &&errno != EWOULDBLOCK &&errno != ENOBUFS) {err = -errno;goto error;} else if (stream->flags & UV_STREAM_BLOCKING) {/* If this is a blocking stream, try again. */goto start;}} else {// 写成功while (n >= 0) {// 当前buf首地址uv_buf_t* buf = &(req->bufs[req->write_index]);// 当前buf的数据长度size_t len = buf->len;// 小于说明当前buf还没有写完(还没有被消费完)if ((size_t)n < len) {// 更新待写的首地址buf->base += n;// 更新待写的数据长度buf->len -= n;/*更新待写队列的长度,这个队列是待写数据的总长度,等于多个buf的和*/stream->write_queue_size -= n;n = 0;/*还没写完,设置了同步写,则继续尝试写,否则退出,注册待写事件*/if (stream->flags & UV_STREAM_BLOCKING) {goto start;} else {break;}} else {/*当前buf的数据都写完了,则更新待写数据的的首地址,即下一个buf,因为当前buf写完了*/req->write_index++;// 更新n,用于下一个循环的计算n -= len;// 更新待写队列的长度stream->write_queue_size -= len;/*等于最后一个buf了,说明待写队列的数据都写完了*/if (req->write_index == req->nbufs) {/*释放buf对应的内存,并把请求插入写完成队列,然后准备触发写完成回调*/uv__write_req_finish(req);return;}}}}/*写成功,但是还没有写完,注册待写事件,等待可写的时候继续写*/uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);uv__stream_osx_interrupt_select(stream);return;// 写出错error:// 记录错误req->error = err;/*释放内存,丢弃数据,插入写完成队列,把IO观察者插入pending队列,等待pending阶段执行回调*/uv__write_req_finish(req);// 注销待写事件uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);// 如果也没有注册等待可读事件,则把handle关闭if (!uv__io_active(&stream->io_watcher, POLLIN))uv__handle_stop(stream);uv__stream_osx_interrupt_select(stream);}
我们看一下一个写请求结束后(成功或者失败),Libuv如何处理的。逻辑在uv__write_req_finish函数。
static void uv__write_req_finish(uv_write_t* req) {uv_stream_t* stream = req->handle;// 从待写队列中移除QUEUE_REMOVE(&req->queue);// 写成功,并且分配了额外的堆内存,则需要释放,见uv__writeif (req->error == 0) {if (req->bufs != req->bufsml)uv__free(req->bufs);req->bufs = NULL;}// 插入写完成队列QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);/*把IO观察者插入pending队列,Libuv在处理pending阶段时,会触发IO观察者的写事件*/uv__io_feed(stream->loop, &stream->io_watcher);}
uv__write_req_finish的逻辑比较简单
1把节点从待写队列中移除
2 req->bufs != req->bufsml不相等说明分配了堆内存,需要自己释放
3并把请求插入写完成队列,把IO观察者插入pending队列,等待pending阶段执行回调,在pending节点会执行IO观察者的回调(uv__stream_io)。
我们看一下uv__stream_io如何处理的,下面是具体的处理逻辑。
// 可写事件触发if (events & (POLLOUT | POLLERR | POLLHUP)) {// 继续执行写uv__write(stream);// 处理写成功回调uv__write_callbacks(stream);// 待写队列空,注销等待可写事件,即不需要写了if (QUEUE_EMPTY(&stream->write_queue))uv__drain(stream);}
我们只关注uv__write_callbacks。
static void uv__write_callbacks(uv_stream_t* stream) {uv_write_t* req;QUEUE* q;// 写完成队列非空while (!QUEUE_EMPTY(&stream->write_completed_queue)) {q = QUEUE_HEAD(&stream->write_completed_queue);req = QUEUE_DATA(q, uv_write_t, queue);QUEUE_REMOVE(q);uv__req_unregister(stream->loop, req);// bufs的内存还没有被释放if (req->bufs != NULL) {// 更新待写队列的大小,即减去req对应的所有数据的大小stream->write_queue_size -= uv__write_req_size(req);/*bufs默认指向bufsml,超过默认大小时,bufs指向新申请的堆内存,所以需要释放*/if (req->bufs != req->bufsml)uv__free(req->bufs);req->bufs = NULL;}// 执行回调if (req->cb)req->cb(req, req->error);}}
uv__write_callbacks负责更新流的待写队列大小、释放额外申请的堆内存、执行每个写请求的回调。
5.5 关闭流的写端
// 关闭流的写端int uv_shutdown(uv_shutdown_t* req,uv_stream_t* stream,uv_shutdown_cb cb) {// 初始化一个关闭请求,关联的handle是streamuv__req_init(stream->loop, req, UV_SHUTDOWN);req->handle = stream;// 关闭后执行的回调req->cb = cb;stream->shutdown_req = req;// 设置正在关闭的标记stream->flags |= UV_HANDLE_SHUTTING;// 注册等待可写事件uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);return 0;}
关闭流的写端就是相当于给流发送一个关闭请求,把请求挂载到流中,然后注册等待可写事件,在可写事件触发的时候就会执行关闭操作。在分析写流的章节中我们提到,可写事件触发的时候,会执行uvdrain注销等待可写事件,除此之外,uvdrain还做了一个事情,就是关闭流的写端。我们看看具体的逻辑。
static void uv__drain(uv_stream_t* stream) {uv_shutdown_t* req;int err;// 撤销等待可写事件,因为没有数据需要写入了uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);uv__stream_osx_interrupt_select(stream);// 设置了关闭写端,但是还没有关闭,则执行关闭写端if ((stream->flags & UV_HANDLE_SHUTTING) &&!(stream->flags & UV_HANDLE_CLOSING) &&!(stream->flags & UV_HANDLE_SHUT)) {req = stream->shutdown_req;stream->shutdown_req = NULL;// 清除标记stream->flags &= ~UV_HANDLE_SHUTTING;uv__req_unregister(stream->loop, req);err = 0;// 关闭写端if (shutdown(uv__stream_fd(stream), SHUT_WR))err = UV__ERR(errno);// 标记已关闭写端if (err == 0)stream->flags |= UV_HANDLE_SHUT;// 执行回调if (req->cb != NULL)req->cb(req, err);}}
通过调用shutdown关闭流的写端,比如TCP流发送完数据后可以关闭写端。但是仍然可以读。
5.6 关闭流
void uv__stream_close(uv_stream_t* handle) {unsigned int i;uv__stream_queued_fds_t* queued_fds;// 从事件循环中删除IO观察者,移出pending队列uv__io_close(handle->loop, &handle->io_watcher);// 停止读uv_read_stop(handle);// 停掉handleuv__handle_stop(handle);// 不可读、写handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);// 关闭非标准流的文件描述符if (handle->io_watcher.fd != -1) {/*Don't close stdio file descriptors.Nothing good comes from it.*/if (handle->io_watcher.fd > STDERR_FILENO)uv__close(handle->io_watcher.fd);handle->io_watcher.fd = -1;}// 关闭通信socket对应的文件描述符if (handle->accepted_fd != -1) {uv__close(handle->accepted_fd);handle->accepted_fd = -1;}// 同上,这是在排队等待处理的文件描述符if (handle->queued_fds != NULL) {queued_fds = handle->queued_fds;for (i = 0; i < queued_fds->offset; i++)uv__close(queued_fds->fds[i]);uv__free(handle->queued_fds);handle->queued_fds = NULL;}}
关闭流就是把流注册在epoll的事件注销,关闭所持有的文件描述符。
5.7 连接流
连接流是针对TCP和Unix域的,所以我们首先介绍一下一些网络编程相关的内容,首先我们先要有一个socket。我们看Libuv中如何新建一个socket。
int uv__socket(int domain, int type, int protocol) {int sockfd;int err;// 新建一个socket,并设置非阻塞和LOEXEC标记sockfd = socket(domain, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);// 不触发SIGPIPE信号,比如对端已经关闭,本端又执行写#if defined(SO_NOSIGPIPE){int on = 1;setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on));}#endifreturn sockfd;}
在Libuv中,socket的模式都是非阻塞的,uv__socket是Libuv中申请socket的函数,不过Libuv不直接调用该函数,而是封装了一下。
/*1 获取一个新的socket fd2 把fd保存到handle里,并根据flag进行相关设置3 绑定到本机随意的地址(如果设置了该标记的话)*/static int new_socket(uv_tcp_t* handle,int domain,unsigned long flags) {struct sockaddr_storage saddr;socklen_t slen;int sockfd;// 获取一个socketsockfd = uv__socket(domain, SOCK_STREAM, 0);// 设置选项和保存socket的文件描述符到IO观察者中uv__stream_open((uv_stream_t*) handle, sockfd, flags);// 设置了需要绑定标记UV_HANDLE_BOUNDif (flags & UV_HANDLE_BOUND) {slen = sizeof(saddr);memset(&saddr, 0, sizeof(saddr));// 获取fd对应的socket信息,比如IP,端口,可能没有getsockname(uv__stream_fd(handle),(struct sockaddr*) &saddr,&slen);// 绑定到socket中,如果没有则绑定到系统随机选择的地址bind(uv__stream_fd(handle),(struct sockaddr*) &saddr, slen);}return 0;}
上面的代码就是在Libuv申请一个socket的逻辑,另外它还支持新建的socket,可以绑定到一个用户设置的,或者操作系统随机选择的地址。不过Libuv并不直接使用这个函数。而是又封装了一层。
// 如果流还没有对应的fd,则申请一个新的,如果有则修改流的配置static int maybe_new_socket(uv_tcp_t* handle,int domain,unsigned long flags) {struct sockaddr_storage saddr;socklen_t slen;// 已经有fd了if (uv__stream_fd(handle) != -1) {// 该流需要绑定到一个地址if (flags & UV_HANDLE_BOUND) {/*流是否已经绑定到一个地址了。handle的flag是在new_socket里设置的,如果有这个标记说明已经执行过绑定了,直接更新flags就行。*/if (handle->flags & UV_HANDLE_BOUND) {handle->flags |= flags;return 0;}// 有fd,但是可能还没绑定到一个地址slen = sizeof(saddr);memset(&saddr, 0, sizeof(saddr));// 获取socket绑定到的地址if (getsockname(uv__stream_fd(handle),(struct sockaddr*) &saddr,&slen))return UV__ERR(errno);// 绑定过了socket地址,则更新flags就行if ((saddr.ss_family == AF_INET6 &&((struct sockaddr_in6*) &saddr)->sin6_port != 0) ||(saddr.ss_family == AF_INET &&((struct sockaddr_in*) &saddr)->sin_port != 0)) {handle->flags |= flags;return 0;}// 没绑定则绑定到随机地址,bind中实现if (bind(uv__stream_fd(handle),(struct sockaddr*) &saddr,slen))return UV__ERR(errno);}handle->flags |= flags;return 0;}// 申请一个新的fd关联到流return new_socket(handle, domain, flags);}
maybe_new_socket函数的逻辑分支很多,主要如下
1 如果流还没有关联到fd,则申请一个新的fd关联到流上
2 如果流已经关联了一个fd。
    如果流设置了绑定地址的标记,但是已经通过Libuv绑定了一个地址(Libuv会设置UV_HANDLE_BOUND标记,用户也可能是直接调bind函数绑定了)。则不需要再次绑定,更新flags就行。
    如果流设置了绑定地址的标记,但是还没有通过Libuv绑定一个地址,这时候通过getsocketname判断用户是否自己通过bind函数绑定了一个地址,是的话则不需要再次执行绑定操作。否则随机绑定到一个地址。
以上两个函数的逻辑主要是申请一个socket和给socket绑定一个地址。下面我们开看一下连接流的实现。
int uv__tcp_connect(uv_connect_t* req,uv_tcp_t* handle,const struct sockaddr* addr,unsigned int addrlen,uv_connect_cb cb) {int err;int r;// 已经发起了connect了if (handle->connect_req != NULL)return UV_EALREADY;// 申请一个socket和绑定一个地址,如果还没有的话err = maybe_new_socket(handle, addr->sa_family,UV_HANDLE_READABLE | UV_HANDLE_WRITABLEif (err)return err;handle->delayed_error = 0;do {// 清除全局错误变量的值errno = 0;// 非阻塞发起三次握手r = connect(uv__stream_fd(handle), addr, addrlen);} while (r == -1 && errno == EINTR);if (r == -1 && errno != 0) {// 三次握手还没有完成if (errno == EINPROGRESS); /* not an error */else if (errno == ECONNREFUSED)// 对方拒绝建立连接,延迟报错handle->delayed_error = UV__ERR(errno);else// 直接报错return UV__ERR(errno);}// 初始化一个连接型request,并设置某些字段uv__req_init(handle->loop, req, UV_CONNECT);req->cb = cb;req->handle = (uv_stream_t*) handle;QUEUE_INIT(&req->queue);// 连接请求handle->connect_req = req;// 注册到Libuv观察者队列uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);// 连接出错,插入pending队尾if (handle->delayed_error)uv__io_feed(handle->loop, &handle->io_watcher);return 0;}
连接流的逻辑,大致如下
1 申请一个socket,绑定一个地址。
2 根据给定的服务器地址,发起三次握手,非阻塞的,会直接返回继续执行,不会等到三次握手完成。
3 往流上挂载一个connect型的请求。
4 设置IO观察者感兴趣的事件为可写。然后把IO观察者插入事件循环的IO观察者队列。等待可写的时候时候(完成三次握手),就会执行cb回调。
可写事件触发时,会执行uv__stream_io,我们看一下具体的逻辑。
if (stream->connect_req) {uv__stream_connect(stream);return;}
我们继续看uv__stream_connect。
static void uv__stream_connect(uv_stream_t* stream) {int error;uv_connect_t* req = stream->connect_req;socklen_t errorsize = sizeof(int);// 连接出错if (stream->delayed_error) {error = stream->delayed_error;stream->delayed_error = 0;} else {// 还是需要判断一下是不是出错了getsockopt(uv__stream_fd(stream),SOL_SOCKET,SO_ERROR,&error,&errorsize);error = UV__ERR(error);}// 还没连接成功,先返回,等待下次可写事件的触发if (error == UV__ERR(EINPROGRESS))return;// 清空stream->connect_req = NULL;uv__req_unregister(stream->loop, req);/*连接出错则注销之前注册的等待可写队列,连接成功如果待写队列为空,也注销事件,有数据需要写的时候再注册*/if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);}// 执行回调,通知上层连接结果if (req->cb)req->cb(req, error);if (uv__stream_fd(stream) == -1)return;// 连接失败,清空待写的数据和执行写请求的回调(如果有的话)if (error < 0) {uv__stream_flush_write_queue(stream, UV_ECANCELED);uv__write_callbacks(stream);}}
连接流的逻辑是
1发起非阻塞式连接
2 注册等待可写事件
3 可写事件触发时,把连接结果告诉调用方
4 连接成功则发送写队列的数据,连接失败则清除写队列的数据并执行每个写请求的回调(有的话)。
5.8 监听流
监听流是针对TCP或Unix域的,主要是把一个socket变成listen状态。并且设置一些属性。
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {static int single_accept = -1;unsigned long flags;int err;if (tcp->delayed_error)return tcp->delayed_error;// 是否设置了不连续accept。默认是连续accept。if (single_accept == -1) {const char* val = getenv("UV_TCP_SINGLE_ACCEPT");single_accept = (val != NULL && atoi(val) != 0);}// 设置不连续acceptif (single_accept)tcp->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;flags = 0;/*可能还没有用于listen的fd,socket地址等。这里申请一个socket和绑定到一个地址(如果调listen之前没有调bind则绑定到随机地址)*/err = maybe_new_socket(tcp, AF_INET, flags);if (err)return err;// 设置fd为listen状态if (listen(tcp->io_watcher.fd, backlog))return UV__ERR(errno);// 建立连接后的业务回调tcp->connection_cb = cb;tcp->flags |= UV_HANDLE_BOUND;// 设置io观察者的回调,由epoll监听到连接到来时执行tcp->io_watcher.cb = uv__server_io;/*插入观察者队列,这时候还没有增加到epoll,Poll IO阶段再遍历观察者队列进行处理(epoll_ctl)*/uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);return 0;}
监听流的逻辑看起来很多,但是主要的逻辑是把流对的fd改成listen状态,这样流就可以接收连接请求了。接着设置连接到来时执行的回调。最后注册IO观察者到事件循环。等待连接到来。就会执行uvserver_io。uvserver_io再执行connection_cb。监听流和其它流有一个区别是,当IO观察者的事件触发时,监听流执行的回调是uvserver_io函数。而其它流是在uvstream_io里统一处理。我们看一下连接到来或者Unix域上有数据到来时的处理逻辑。
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {uv_stream_t* stream;int err;stream = container_of(w, uv_stream_t, io_watcher);// 注册等待可读事件uv__io_start(stream->loop, &stream->io_watcher, POLLIN);while (uv__stream_fd(stream) != -1) {/*通过accept拿到和客户端通信的fd,我们看到这个fd和服务器的fd是不一样的*/err = uv__accept(uv__stream_fd(stream));// 错误处理if (err < 0) {/*uv__stream_fd(stream)对应的fd是非阻塞的,返回这个错说明没有连接可用accept了,直接返回*/if (err == -EAGAIN || err == -EWOULDBLOCK)return; /* Not an error. */if (err == -ECONNABORTED)continue;// 进程的打开的文件描述符个数达到阈值,看是否有备用的if (err == -EMFILE || err == -ENFILE) {err = uv__emfile_trick(loop, uv__stream_fd(stream));if (err == -EAGAIN || err == -EWOULDBLOCK)break;}// 发生错误,执行回调stream->connection_cb(stream, err);continue;}// 记录拿到的通信socket对应的fdstream->accepted_fd = err;// 执行上传回调stream->connection_cb(stream, 0);/*stream->accepted_fd为-1说明在回调connection_cb里已经消费了 accepted_fd,否则先注销服务器在epoll中的fd的读事件,等待消费后再注册,即不再处理请求了*/if (stream->accepted_fd != -1) {/*The user hasn't yet accepted called uv_accept()*/uv__io_stop(loop, &stream->io_watcher, POLLIN);return;}/*是TCP类型的流并且设置每次只accpet一个连接,则定时阻塞,被唤醒后再accept,否则一直accept(如果用户在connect回调里消费了accept_fd的话),定时阻塞用于多进程竞争处理连接*/if (stream->type == UV_TCP &&(stream->flags & UV_TCP_SINGLE_ACCEPT)) {struct timespec timeout = { 0, 1 };nanosleep(&timeout, NULL);}}}
我们看到连接到来时,Libuv会从已完成连接的队列中摘下一个节点,然后执行connection_cb回调。在connection_cb回调里,需要uv_accept消费accpet_fd。
int uv_accept(uv_stream_t* server, uv_stream_t* client) {int err;switch (client->type) {case UV_NAMED_PIPE:case UV_TCP:// 把文件描述符保存到clienterr = uv__stream_open(client,server->accepted_fd,UV_STREAM_READABLE| UV_STREAM_WRITABLE);if (err) {uv__close(server->accepted_fd);goto done;}break;case UV_UDP:err = uv_udp_open((uv_udp_t*) client,server->accepted_fd);if (err) {uv__close(server->accepted_fd);goto done;}break;default:return -EINVAL;}client->flags |= UV_HANDLE_BOUND;done:// 非空则继续放一个到accpet_fd中等待accept,用于文件描述符传递if (server->queued_fds != NULL) {uv__stream_queued_fds_t* queued_fds;queued_fds = server->queued_fds;// 把第一个赋值到accept_fdserver->accepted_fd = queued_fds->fds[0];/*offset减去一个单位,如果没有了,则释放内存,否则需要把后面的往前挪,offset执行最后一个*/if (--queued_fds->offset == 0) {uv__free(queued_fds);server->queued_fds = NULL;} else {memmove(queued_fds->fds,queued_fds->fds + 1,queued_fds->offset * sizeof(*queued_fds->fds));}} else {// 没有排队的fd了,则注册等待可读事件,等待accept新的fdserver->accepted_fd = -1;if (err == 0)uv__io_start(server->loop, &server->io_watcher, POLLIN);}return err;}
client是用于和客户端进行通信的流,accept就是把accept_fd保存到client中,client就可以通过fd和对端进行通信了。消费完accept_fd后,如果还有待处理的fd的话,需要补充一个到accept_fd(针对Unix域),其它的继续排队等待处理,如果没有待处理的fd则注册等待可读事件,继续处理新的连接。
5.9 销毁流
当我们不再需要一个流的时候,我们会首先调用uv_close关闭这个流,关闭流只是注销了事件和释放了文件描述符,调用uv_close之后,流对应的结构体就会被加入到closing队列,在closing阶段的时候,才会执行销毁流的操作,比如丢弃还没有写完成的数据,执行对应流的回调,我们看看销毁流的函数uv__stream_destroy。
void uv__stream_destroy(uv_stream_t* stream) {// 正在连接,则执行回调if (stream->connect_req) {uv__req_unregister(stream->loop, stream->connect_req);stream->connect_req->cb(stream->connect_req, -ECANCELED);stream->connect_req = NULL;}// 丢弃待写的数据,如果有的话uv__stream_flush_write_queue(stream, -ECANCELED);// 处理写完成队列,这里是处理被丢弃的数据uv__write_callbacks(stream);// 正在关闭流,直接回调if (stream->shutdown_req) {uv__req_unregister(stream->loop, stream->shutdown_req);stream->shutdown_req->cb(stream->shutdown_req, -ECANCELED);stream->shutdown_req = NULL;}}
我们看到,销毁流的时候,如果流中还有待写的数据,则会丢弃。我们看一下uvstream_flush_write_queue和uvwrite_callbacks。
void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {uv_write_t* req;QUEUE* q;while (!QUEUE_EMPTY(&stream->write_queue)) {q = QUEUE_HEAD(&stream->write_queue);QUEUE_REMOVE(q);req = QUEUE_DATA(q, uv_write_t, queue);// 把错误写到每个请求中req->error = error;QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);}}
uvstream_flush_write_queue丢弃待写队列中的请求,并直接插入写完成队列中。uvwrite_callbacks是写完或者写出错时执行的函数,它逐个处理写完成队列中的节点,每个节点是一个写请求,执行它的回调,如何分配了堆内存,则释放内存。在写流章节已经分析,不再具体展开。
5.10 事件触发的处理
在流的实现中,读写等操作都只是注册事件到epoll,事件触发的时候,会执行统一的回调函数uv__stream_io。下面列一下该函数的代码,具体实现在其它章节已经分析。
static void uv__stream_io(uv_loop_t* loop,uv__io_t* w,unsigned int events) {uv_stream_t* stream;stream = container_of(w, uv_stream_t, io_watcher);// 是连接流,则执行连接处理函数if (stream->connect_req) {uv__stream_connect(stream);return;}/*Ignore POLLHUP here. Even it it's set,there may still be data to read.*/// 可读是触发,则执行读处理if (events & (POLLIN | POLLERR | POLLHUP))uv__read(stream);// 读回调关闭了流if (uv__stream_fd(stream) == -1)return; /* read_cb closed stream. *//* ¬¬POLLHUP说明对端关闭了,即不会发生数据过来了。如果流的模式是持续读,1 如果只读取了部分(设置UV_STREAM_READ_PARTIAL),并且没有读到结尾(没有设置UV_STREAM_READ_EOF),则直接作读结束处理,2 如果只读取了部分,上面的读回调执行了读结束操作,则这里就不需要处理了3 如果没有设置只读了部分,还没有执行读结束操作,则不能作读结束操作,因为对端虽然关闭了,但是之前的传过来的数据可能还没有被消费完4 如果没有设置只读了部分,执行了读结束操作,那这里也不需要处理*/if ((events & POLLHUP) &&(stream->flags & UV_STREAM_READING) &&(stream->flags & UV_STREAM_READ_PARTIAL) &&!(stream->flags & UV_STREAM_READ_EOF)) {uv_buf_t buf = { NULL, 0 };uv__stream_eof(stream, &buf);}if (uv__stream_fd(stream) == -1)return; /* read_cb closed stream. */// 可写事件触发if (events & (POLLOUT | POLLERR | POLLHUP)) {// 写数据uv__write(stream);// 写完后做后置处理,释放内存,执行回调等uv__write_callbacks(stream);// 待写队列为空,则注销等待写事件if (QUEUE_EMPTY(&stream->write_queue))uv__drain(stream);}}
