IO模型的介绍
IO模型基本分类
(1)Blocking I/O(同步阻塞IO,BIO):最常见也最传统IO模型,即代码语句按顺序执行若某一条语句执行需等待那么后面的代码会被阻塞,例如常见顺序步骤:读取文件、等待内核返回数据、拿到数据、处理输出
用户线程同多系统调用recvfrom函数向内核发起IO读文件操作,从用户态切换到内核态(application switch to kernel),后面的代码将被阻塞,用户线程则会处于等待状态。当内核将数据从磁盘加载到内核空间,并拷贝到用户空间后(kernel switch to application),用户线程再进行后续的数据处理。
缺点:
用在多线程高并发场景(例如10万并发),服务端与客户端一对一连接,对于server端来说,将大量消耗内存和CPU资源(用户态到内核态的上下文切换),并发能力受限。
(2)同步非阻塞IO(Non-blocking IO,NIO):默认创建的socket为阻塞型,将socket设置为NONBLOCK,业务流程则变为同步非阻塞IO
同步非阻塞IO是在同步阻塞IO的基础上,将socket设置为NONBLOCK。这样做用户线程可以在发起IO请求后可以立即返回,原理图如下:
如图,用户线程连续三次执行系统调用函数recvfrom,但内核还未准备好数据,由于采用非阻塞,故直接返回错误,errno被置为EWOULDBLOCK,当第四次调用的时候,内核数据已经拷贝到了用户空间,故可以读取数据,返回OK,接下来执行数据处理。
缺点:
1、整个IO请求虽然是非阻塞的,但是为了等到数据,需要不断的轮训,反复请求。如果有大量的连接,将会消耗大量的CPU资源和网络带宽。
2、虽然设定了时间间隔去轮询,但是还是会有延迟,因为每隔一小段时间去执行一次系统调用,而数据很可能在轮询的间隔时间内已经就绪,这会导致整体数据的吞吐量降低。
用户线程需要循环的去系统调用,尝试读取数据,直到读取成功后,才进行后续的处理,
(3)IO多路复用(IO Multiplexing ):即经典的Reactor设计模式,有时也称为异步阻塞IO,Java中的Selector和Linux中的epoll都是这种模型。
IO多路复用是指内核一旦发现进程指定的一个或者多个IO事件准备读取,它就通知该进程,以select为例原理图如下:
前面两种IO模型用户线程直接调用recvfrom来等待内核返回数据,而IO复用则通过调用select(还有poll或者epoll)系统方法,此时用户线程会阻塞在select语句处,等待内核copy数据到用户态,用户再收到内核返回可读的socket文件描述符。
用户线程可以实现一个线程内同时发起和处理多个socket的IO请求,用户线程注册多个socket,(对于内核就是文件描述符集合),然后不断地调用select读取被激活的socket 文件描述符。(在这里,select看起就像是用户态和内核态之间的一个代理)
(4)异步IO(Asynchronous IO):即经典的Proactor设计模式,也称为异步非阻塞IO
用户进程发起read操作后立即返回去做其他事,kernel收到asynchronous read后也立刻返回
在数据准备完成后,kernel将数据拷贝到用户内存,并发送信号给用户告知数据已完成,或者调用用户注册的回调函数。
同步与异步:
同步阻塞IO,同步非阻塞IO,IO多路复用,从理论上来说,都是属于同步IO,因为这三种IO模型中,IO的读写操作都是在IO事件发生以后,由应用程序来完成的。而异步IO模型则不同,对于异步IO而言,不论是IO是否阻塞,异步IO的读写操作总是立即返回,因为真正的读写操作已经交由内核进行处理。也就是说,同步IO模型要求用户程序自行执行IO操作(将数据从内核缓冲区读入到用户缓冲区,或者将数据从用户缓冲区写入到内核缓冲区),而异步IO则是由内核来执行IO操作(数据在用户缓冲区和内核缓冲区的拷贝是由内核在“后台”执行的)。可以这么理解:同步IO向应用程序通知的是IO就绪事件,异步IO向应用程序通知的是IO完成事件。
深入理解多路IO模型select、poll、epoll
Select
#include <sys/select.h>
int select(int nfds, fd_set * readfds, fd_set * writefds, fd_set * exceptfds, struct timeval * timeout);
函数参数解释,参考文章
nfds:
非负整数的变量,表示当前线程打开的所有件文件描述符集的总数,nfds=maxfdp+1,计算方法就是当前线程打开的最大文件描述符+1
readfds:
fd_set集合类型的指针变量,表示当前线程接收到内核返回的可读事件文件描述符集合(有数据到了这个状态称之为读事件),如果这个集合中有一个文件可读,内核给select返回一个大于0的值,表示有文件可读,如果没有可读的文件,则根据timeout参数再判断是否超时,若内核阻塞当前线程的时长超出timeout,select返回0,若发生错误返回负值。传入NULL值,表示不关心任何文件的读变化
writefds:
当前有多少个写事件(关心输出缓冲区是否已满)
最后一个结构体表示每个几秒钟醒来做其他事件,用来设置select等待时间
*exceptfds:
监视文件描述符集合中的有抛出异常的fd
readfds、writefds和exceptfds参数分别为可读、可写和异常等事件对应的文件描述符集合;程序只需要传入自己感兴趣的文件描述符,内核将修改他们来通知程序哪些文件描述符已经准备就绪;fd_set结构体仅包含一个整形数组,该数组的每一个元素的每一位标志一个文件描述符,下面的一组宏用来操作fd_set的每一位:
FD_ZERO(fd_set * fdset);//清除fd_set的所有位
FD_SET(int fd, fd_set * fdset);//设置fdset的位fd
FD_CLR(int fd, fd_set * fdset);//清除fdset的位fd
int FD_ISSET(int fd, fd_set * fdset);//测试fdset的位fd是否被设置
timeout:
select()的超时结束时间,它可以使select处于三种状态:
(1)若将NULL以形参传入,select置于阻塞状态,当前线程一直等到内核监视文件描述符集合中某个文件描述符发生变化为止;
(2)若将时间值设为0秒0毫秒,表示非阻塞,不管文件描述符是否有变化,都立刻返回继续执行,文件无变化返回0,有变化返回一个正值;
(3)timeout的值大于0,等待时长,即select在timeout时间内阻塞,超时后返回-1,否则在超时后不管怎样一定返回。
(4)若在select等待事件内程序接收到信号,则select立即返回-1,并设置errno为EINTER。
select运行机制:
select()的机制中提供一种fd_set的数据结构,实际上是一个long类型的数组,每一个数组元素都能与一打开的文件句柄(不管是Socket句柄,还是其他文件或命名管道或设备句柄)建立联系,建立联系的工作由程序员完成,当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪一Socket或文件可读。
从流程上来看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。
select调用过程:
- 使用copy_from_user从用户空间拷贝fd_set到内核空间
- 注册回调函数__pollwait
- 遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)
- 以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。
- __pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。
- poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。
- 如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。
-
select的优点:
select缺点
每次调用select,都需要把fd_set集合从用户态拷贝到内核态,如果fd_set集合很大时,那这个开销也很大
- 同时每次调用select都需要在内核遍历传递进来的所有fd_set,如果fd_set集合很大时,那这个开销也很大
- 单个进程能够监视的文件描述符的数量存在最大限制,通常是1024,当然可以更改数量,但由于select采用轮询的方式扫描文件描述符,文件描述符数量越多,性能越差;(在linux内核头文件中,有这样的定义:#define __FD_SETSIZE 1024)
- 内核 / 用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销;
- select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件;
- select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程。
POLL
poll的机制与select类似,与select在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理。相比select模型,poll使用链表保存文件描述符,因此没有了最大文件描述符数量的限制。
poll的函数接口如下所示: ```cpp int poll(struct pollfd *fds, nfds_t nfds, int timeout);
typedef struct pollfd { int fd; // 需要被检测或选择的文件描述符 short events; // 对文件描述符fd上感兴趣的事件 short revents; // 文件描述符fd上当前实际发生的事件 } pollfd_t;
poll改变了文件描述符集合的数据结构,使用了pollfd结构而不是select的fd_set结构,使得poll支持的文件描述符集合限制远大于select的1024。<br />struct pollfd *fds fds:是一个struct pollfd类型的数组,用于存放需要检测其状态的socket描述符,并且调用poll函数之后fds数组不会被清空;一个pollfd结构体表示一个被监视的文件描述符,通过传递fds指示 poll() 监视多个文件描述符。其中,结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域,结构体的revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。并且内核每次修改的是revents整个成员,而events成员保持不变,故下次调用poll的时候不需要重置pollfd类型的事件集参数。<br />nfds_t nfds: 记录数组fds中描述符的总数量。
<a name="VJKki"></a>
### epoll
epoll在Linux2.6内核正式提出,是基于事件驱动的I/O方式,相对于select来说,epoll没有描述符个数限制,使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。<br />epoll提供了三个函数:
```cpp
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
epoll使用步骤:
- epoll_create 函数创建一个epoll句柄,参数size表明内核要监听的描述符数量。调用成功时返回一个epoll句柄描述符,失败时返回-1。
- epoll_ctl 函数注册要监听的事件类型。四个参数解释如下:
- epfd 表示epoll句柄
- op 表示fd操作类型,有如下3种
- EPOLL_CTL_ADD 注册新的fd到epfd中
- EPOLL_CTL_MOD 修改已注册的fd的监听事件
- EPOLL_CTL_DEL 从epfd中删除一个fd
- fd 是要监听的描述符
- event 表示要监听的事件
epoll_event 结构体定义如下: ```cpp struct epoll_event { __uint32_t events; / Epoll events / epoll_data_t data; / User data variable / };
typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t;
epoll_wait 函数等待事件的就绪,成功时返回就绪的事件数目,调用失败时返回 -1,等待超时返回 0。
- epfd 是epoll句柄
- events 表示从内核得到的就绪事件集合
- maxevents 告诉内核events的大小
- timeout 表示等待的超时事件
epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。<br />epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
- **水平触发(LT)**:默认工作模式,即当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序可以不立即处理该事件;下次调用epoll_wait时,会再次通知此事件
- **边缘触发(ET)**: 当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次通知此事件。(直到你做了某些操作导致该描述符变成未就绪状态了,也就是说边缘触发只在状态由未就绪变为就绪时只通知一次)。
<a name="m7wap"></a>
#### epoll原理
当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,eventpoll结构体如下所示:
```cpp
// 每创建一个epollfd,内核就会分配一个eventpoll与之对应,可以理解成内核态的epollfd.
struct eventpoll {
/* Protect the access to this structure */
spinlock_t lock;
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
/**
添加,修改或删除监听fd的时候,以及epoll_wait返回,向用户空间传递数据时,都会持有这个互斥锁.
因此,在用户空间中执行epoll相关操作是线程安全的,内核已经做了保护.
**/
struct mutex mtx;
/* Wait queue used by sys_epoll_wait() */
/**
等待队列头部.
当在该等待队列中的进程调用epoll_wait()时,会进入睡眠.
**/
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
/**
用于epollfd被f_op->poll()的时候
**/
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
/**
所有已经ready的epitem被存放在这个链表里
**/
struct list_head rdllist;
/* RB tree root used to store monitored fd structs */
/**
所有待监听的epitem被存放在这个红黑树里
**/
struct rb_root rbr;
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->lock.
*/
/**
当event转移到用户空间时,这个单链表存放着所有struct epitem
**/
struct epitem *ovflist;
/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws; // TODO
/* The user that created the eventpoll descriptor */
/**
这里存放了一些用户变量,比如fd监听数量的最大值等
**/
struct user_struct *user;
struct file *file;
/* used to optimize loop detection check */ // TODO
int visited;
struct list_head visited_list_link;
};
这个结构体中有两个成员与epoll的使用方式密切相关,一个代表红黑树的根节点,一个是双向链表的头指针。每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来。而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将就绪的事件添加到rdlist双链表中。
/**
所有已经ready的epitem被存放在这个链表里
**/
struct list_head rdllist;
/* RB tree root used to store monitored fd structs */
/**
所有待监听的epitem被存放在这个红黑树里
**/
struct rb_root rbr;
// epitem表示一个被监听的fd
struct epitem {
union {
/* RB tree node links this structure to the eventpoll RB tree */
/**
红黑树结点,当使用epoll_ctl()将一批fd加入到某个epollfd时,内核会分配一批epitem与fd一一对应,
并且以红黑树的形式来组织它们,tree的root被存放在struct eventpoll中.
**/
struct rb_node rbn;
/* Used to free the struct epitem */
struct rcu_head rcu; // TODO
};
/* List header used to link this structure to the eventpoll ready list */
/**
链表结点,所有已经ready的epitem都会被存放在eventpoll的rdllist链表中.
**/
struct list_head rdllink;
/*
* Works together "struct eventpoll"->ovflist in keeping the
* single linked chain of items.
*/
struct epitem *next; // 用于eventpoll的ovflist
/* The file descriptor information this item refers to */
/**
epitem对应的fd和struct file
**/
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait; // 当前epitem被加入到多少个等待队列中
/* List containing poll wait queues */
struct list_head pwqlist;
/* The "container" of this item */
/**
当前epitem属于那个eventpoll
**/
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* wakeup_source used when EPOLLWAKEUP is set */
struct wakeup_source __rcu *ws;
/* The structure that describe the interested events and the source fd */
/**
当前epitem关心哪些event,这个数据是由执行epoll_ctl时从用户空间传递过来的
**/
struct epoll_event event;
};
在epoll中,对于每一个事件,都会建立一个epitem结构体,epitem中的成员与红黑树及双向链表的关系如下图所示。
epoll底层大致有三个步骤:
- 调用epoll_create()建立一个epoll对象(建立红黑树及双向链表的数据结构)
- 调用epoll_ctl将需要监听的事件插入到红黑树中,并注册对应的回调函数
- 当有事件就绪,就会调用回调函数,将就绪事件拷贝到就绪链表中去(零拷贝)。
- 调用epoll_wait检查就绪链表中是否有元素,并返回就绪链表中的数据。
epoll特点:
1.每次累加添加,不需要每次传入全部的监测fd。
2.每个fd只将本进程挂载到自己的等待队列一次,直到该fd被从epoll移除,不需要重复挂载。
3.fd事件回调函数是ep_epoll_callback,该函数将发生事件的fd加入到epoll专门的就绪队列rdllist中,同时唤醒本进程。
4.本进程不需要遍历每一个fd去监测事件是否发生,而只需要判断epoll中的就绪队列rdllist是否为空即可。
5.epoll返回时,只返回就绪队列rdllist中的项,避免了无关项的操作,应用层也就不需要再次重复遍历。
6.epoll内部使用红黑树存储监测fd,支持大量fd的快速查询、修改和删除操作。
select、poll、epoll对比
select | poll | epoll | |
---|---|---|---|
操作方式 | 用户通过三个参数分别传入感兴趣的可读、可写、异常等事件,内核通过对这些参数修改来反馈其中就绪的事件。这使得用户每次调用都要重置这三个参数。 | 统一处理所有事件类型,因此只需要一个事件集参数。用户通过pollfd.events传入感兴趣的事件,内核通过修改pollfd.revents参数反馈所有的(就绪未就绪)事件集。 | 内核通过一个事件表直接管理用户感兴趣的所有事件。因此每次调用epoll_wait时,无需反复传入用户感兴趣的事件。epoll_wait系统调用的参数events仅用来反馈就绪的事件集。 |
底层实现 | 数组 | 链表 | 红黑树、双向链表 |
IO效率 | 每次调用都进行线性遍历,时间复杂度为O(n) | 每次调用都进行线性遍历,时间复杂度为O(n) | 事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到readyList里面,时间复杂度O(1) |
最大连接数 | 1024(x86)或2048(x64) | 无上限 | 无上限 |
fd拷贝 | 每次调用select,都需要把fd集合从用户态拷贝到内核态 | 每次调用poll,都需要把fd集合从用户态拷贝到内核态 | 调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝 |
附录
epoll实现过程:
摘自:https://www.cnblogs.com/leohotfn/p/8516370.html
一. epoll_create
- 调用ep_alloc()来创建一个struct eventpoll对象.ep_alloc()的执行过程如下:
1a. 获取当前用户的一些信息.
1b. 分配一个struct eventpoll对象.
1c. 初始化相关数据成员,如等待队列,就绪链表,红黑树. - 创建一个匿名fd和与之对应的struct file对象.
- 将该eventpoll和该file关联起来,eventpoll对象保存在file对象的private_data指针中.
二. epoll_ctl
- 将event拷贝到内核空间.
- 判断加入的fd是否支持poll操作.
- 根据用户传入的op参数,以及是否在eventpoll的红黑树中找到该fd的结点,来执行相应的操作(插入,删除,修改).拿插入举例,执行ep_insert():
3a. 在slab缓存中分配一个epitem对象,并初始化相关数据成员,如保存待监听的fd和它的file结构.
3b. 指定调用poll_wait()时(再次强调,不是epoll_wait)时的回调函数,用于数据就绪时唤醒进程.(其实质是初始化文件的等待队列,将进程加入到等待队列).
3c. 到此该epitem就和这个待监听的fd关联起来了.
3d. 将该epitem插入到eventpoll的红黑树中.
三. epoll_wait
- 调用ep_poll():
1a. 计算睡眠时间(如果有).
1b. 判断eventpoll的就绪链表是否为空,不为空则直接处理而不是睡眠.
1c. 将当前进程添加到eventpoll的等待队列中.
1d. 进入循环.
1e. 将当前进程设置成TASK_INTERRUPTIBLE状态,然后判断是否有信号到来,如果没有,则进入睡眠.
1f. 如果超时或被信号唤醒,则跳出循环.
1g. 将当前进程从等待队列中删除,并把其状态设置成TASK_RUNNING.
1h. 将数据拷贝给用户空间.拷贝的过程是先把ready list转移到中间链表,然后遍历中间链表拷贝到用户空间,并且判断每个结点是否水平触发,是则再次插入
到ready list.
附录
epoll源码
// @ fs/eventpoll.c
/*
* This structure is stored inside the "private_data" member of the file
* structure and represents the main data structure for the eventpoll
* interface.
*/
// 每创建一个epollfd,内核就会分配一个eventpoll与之对应,可以理解成内核态的epollfd.
struct eventpoll {
/* Protect the access to this structure */
spinlock_t lock;
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
/**
添加,修改或删除监听fd的时候,以及epoll_wait返回,向用户空间传递数据时,都会持有这个互斥锁.
因此,在用户空间中执行epoll相关操作是线程安全的,内核已经做了保护.
**/
struct mutex mtx;
/* Wait queue used by sys_epoll_wait() */
/**
等待队列头部.
当在该等待队列中的进程调用epoll_wait()时,会进入睡眠.
**/
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
/**
用于epollfd被f_op->poll()的时候
**/
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
/**
所有已经ready的epitem被存放在这个链表里
**/
struct list_head rdllist;
/* RB tree root used to store monitored fd structs */
/**
所有待监听的epitem被存放在这个红黑树里
**/
struct rb_root rbr;
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->lock.
*/
/**
当event转移到用户空间时,这个单链表存放着所有struct epitem
**/
struct epitem *ovflist;
/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws; // TODO
/* The user that created the eventpoll descriptor */
/**
这里存放了一些用户变量,比如fd监听数量的最大值等
**/
struct user_struct *user;
struct file *file;
/* used to optimize loop detection check */ // TODO
int visited;
struct list_head visited_list_link;
};
/*
* Each file descriptor added to the eventpoll interface will
* have an entry of this type linked to the "rbr" RB tree.
* Avoid increasing the size of this struct, there can be many thousands
* of these on a server and we do not want this to take another cache line.
*/
// epitem表示一个被监听的fd
struct epitem {
union {
/* RB tree node links this structure to the eventpoll RB tree */
/**
红黑树结点,当使用epoll_ctl()将一批fd加入到某个epollfd时,内核会分配一批epitem与fd一一对应,
并且以红黑树的形式来组织它们,tree的root被存放在struct eventpoll中.
**/
struct rb_node rbn;
/* Used to free the struct epitem */
struct rcu_head rcu; // TODO
};
/* List header used to link this structure to the eventpoll ready list */
/**
链表结点,所有已经ready的epitem都会被存放在eventpoll的rdllist链表中.
**/
struct list_head rdllink;
/*
* Works together "struct eventpoll"->ovflist in keeping the
* single linked chain of items.
*/
struct epitem *next; // 用于eventpoll的ovflist
/* The file descriptor information this item refers to */
/**
epitem对应的fd和struct file
**/
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait; // 当前epitem被加入到多少个等待队列中
/* List containing poll wait queues */
struct list_head pwqlist;
/* The "container" of this item */
/**
当前epitem属于那个eventpoll
**/
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* wakeup_source used when EPOLLWAKEUP is set */
struct wakeup_source __rcu *ws;
/* The structure that describe the interested events and the source fd */
/**
当前epitem关心哪些event,这个数据是由执行epoll_ctl时从用户空间传递过来的
**/
struct epoll_event event;
};
struct epoll_filefd {
struct file *file;
int fd;
} __packed;
/* Wait structure used by the poll hooks */
struct eppoll_entry {
/* List header used to link this structure to the "struct epitem" */
struct list_head llink;
/* The "base" pointer is set to the container "struct epitem" */
struct epitem *base;
/*
* Wait queue item that will be linked to the target file wait
* queue head.
*/
wait_queue_t wait;
/* The wait queue head that linked the "wait" wait queue item */
wait_queue_head_t *whead;
};
/* Used by the ep_send_events() function as callback private data */
struct ep_send_events_data {
int maxevents;
struct epoll_event __user *events;
};
/**
调用epoll_create()的实质,就是调用epoll_create1().
**/
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}
/*
* Open an eventpoll file descriptor.
*/
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
/**
对于epoll来说,目前唯一有效的FLAG是CLOSEXEC
**/
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
/**
分配一个struct eventpoll,ep_alloc()的具体分析在下面
**/
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC)); // TODO
if (fd < 0) {
error = fd;
goto out_free_ep;
}
/**
创建一个匿名fd.
epollfd本身并不存在一个真正的文件与之对应,所以内核需要创建一个"虚拟"的文件,并为之分配
真正的struct file结构,并且具有真正的fd.
**/
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
ep->file = file;
fd_install(fd, file);
return fd;
out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}
/**
分配一个eventpoll结构
**/
static int ep_alloc(struct eventpoll **pep)
{
int error;
struct user_struct *user;
struct eventpoll *ep;
/**
获取当前用户的一些信息,比如最大监听fd数目
**/
user = get_current_user();
error = -ENOMEM;
ep = kzalloc(sizeof(*ep), GFP_KERNEL); // 话说分配eventpoll对象是使用slab还是用buddy呢?TODO
if (unlikely(!ep))
goto free_uid;
/**
初始化
**/
spin_lock_init(&ep->lock);
mutex_init(&ep->mtx);
init_waitqueue_head(&ep->wq);
init_waitqueue_head(&ep->poll_wait);
INIT_LIST_HEAD(&ep->rdllist);
ep->rbr = RB_ROOT;
ep->ovflist = EP_UNACTIVE_PTR;
ep->user = user;
*pep = ep;
return 0;
free_uid:
free_uid(user);
return error;
}
/*
* The following function implements the controller interface for
* the eventpoll file that enables the insertion/removal/change of
* file descriptors inside the interest set.
*/
/**
调用epool_ctl来添加要监听的fd.
参数说明:
epfd,即epollfd
op,操作,ADD,MOD,DEL
fd,需要监听的文件描述符
event,关心的events
**/
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int full_check = 0;
struct fd f, tf;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
struct eventpoll *tep = NULL;
error = -EFAULT;
/**
错误处理以及
将event从用户空间拷贝到内核空间.
**/
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto error_return;
error = -EBADF;
/**
获取epollfd的file结构,该结构在epoll_create1()中,由函数anon_inode_getfile()分配
**/
f = fdget(epfd);
if (!f.file)
goto error_return;
/* Get the "struct file *" for the target file */
/**
获取待监听的fd的file结构
**/
tf = fdget(fd);
if (!tf.file)
goto error_fput;
/* The target file descriptor must support poll */
error = -EPERM;
/**
待监听的文件一定要支持poll.
话说什么情况下文件不支持poll呢?TODO
**/
if (!tf.file->f_op->poll)
goto error_tgt_fput;
/* Check if EPOLLWAKEUP is allowed */
if (ep_op_has_event(op))
ep_take_care_of_epollwakeup(&epds);
/*
* We have to check that the file structure underneath the file descriptor
* the user passed to us _is_ an eventpoll file. And also we do not permit
* adding an epoll file descriptor inside itself.
*/
error = -EINVAL;
/**
epollfd不能监听自己
**/
if (f.file == tf.file || !is_file_epoll(f.file))
goto error_tgt_fput;
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
/**
获取eventpoll结构,来自于epoll_create1()的分配
**/
ep = f.file->private_data;
/*
* When we insert an epoll file descriptor, inside another epoll file
* descriptor, there is the change of creating closed loops, which are
* better be handled here, than in more critical paths. While we are
* checking for loops we also determine the list of files reachable
* and hang them on the tfile_check_list, so we can check that we
* haven't created too many possible wakeup paths.
*
* We do not need to take the global 'epumutex' on EPOLL_CTL_ADD when
* the epoll file descriptor is attaching directly to a wakeup source,
* unless the epoll file descriptor is nested. The purpose of taking the
* 'epmutex' on add is to prevent complex toplogies such as loops and
* deep wakeup paths from forming in parallel through multiple
* EPOLL_CTL_ADD operations.
*/
/**
以下操作可能会修改数据结构内容,锁
**/
// TODO
mutex_lock_nested(&ep->mtx, 0);
if (op == EPOLL_CTL_ADD) {
if (!list_empty(&f.file->f_ep_links) ||
is_file_epoll(tf.file)) {
full_check = 1;
mutex_unlock(&ep->mtx);
mutex_lock(&epmutex);
if (is_file_epoll(tf.file)) {
error = -ELOOP;
if (ep_loop_check(ep, tf.file) != 0) {
clear_tfile_check_list();
goto error_tgt_fput;
}
} else
list_add(&tf.file->f_tfile_llink,
&tfile_check_list);
mutex_lock_nested(&ep->mtx, 0);
if (is_file_epoll(tf.file)) {
tep = tf.file->private_data;
mutex_lock_nested(&tep->mtx, 1);
}
}
}
/*
* Try to lookup the file inside our RB tree, Since we grabbed "mtx"
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/
/**
对于每一个监听的fd,内核都有分配一个epitem结构,并且不允许重复分配,所以要查找该fd是否
已经存在.
ep_find()即在红黑树中查找,时间复杂度为O(lgN).
**/
epi = ep_find(ep, tf.file, fd);
error = -EINVAL;
switch (op) {
/**
首先关心添加
**/
case EPOLL_CTL_ADD:
if (!epi) {
/**
如果ep_find()没有找到相关的epitem,证明是第一次插入.
在此可以看到,内核总会关心POLLERR和POLLHUP.
**/
epds.events |= POLLERR | POLLHUP;
/**
红黑树插入,ep_insert()的具体分析在下面
**/
error = ep_insert(ep, &epds, tf.file, fd, full_check);
} else
/**
如果找到了,则是重复添加
**/
error = -EEXIST;
if (full_check) // TODO
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
/**
删除
**/
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
/**
修改
**/
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
if (tep != NULL)
mutex_unlock(&tep->mtx);
mutex_unlock(&ep->mtx); // 解锁
error_tgt_fput:
if (full_check)
mutex_unlock(&epmutex);
fdput(tf);
error_fput:
fdput(f);
error_return:
return error;
}
/*
* Must be called with "mtx" held.
*/
/**
ep_insert()在epoll_ctl()中被调用,其工作是往epollfd的红黑树中添加一个待监听fd.
**/
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd, int full_check)
{
int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
/**
struct ep_pqueue的定义如下:
@ fs/eventpoll.c
// Wrapper struct used by poll queueing
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
**/
/**
查看是否达到当前用户的最大监听数
**/
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
return -ENOSPC;
/**
从slab中分配一个epitem
**/
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
/* Item initialization follow here ... */
/**
相关数据成员的初始化
**/
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
/**
在该epitem中保存待监听的fd和它的file结构.
**/
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
if (epi->event.events & EPOLLWAKEUP) {
error = ep_create_wakeup_source(epi);
if (error)
goto error_create_wakeup_source;
} else {
RCU_INIT_POINTER(epi->ws, NULL);
}
/* Initialize the poll table using the queue callback */
epq.epi = epi;
/**
初始化一个poll_table,
其实质是指定调用poll_wait()时(不是epoll_wait)的回调函数,以及我们关心哪些event.
ep_ptable_queue_proc()就是我们的回调函数,初值是所有event都关心.
ep_ptable_queue_proc()的具体分析在下面.
**/
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
revents = ep_item_poll(epi, &epq.pt);
/**
ep_item_poll()的定义如下:
@ fs/eventpoll.c
static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)
{
pt->_key = epi->event.events;
return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;
}
**/
/**
f_op->poll()一般来说只是个wrapper,它会调用真正的poll实现.
拿UDP的socket来举例,调用流程如下:
f_op->poll(),sock_poll(),udp_poll(),datagram_poll(),sock_poll_wait(),
最后调用到上面指定的ep_ptable_queue_proc().
完成这一步,该epitem就跟这个socket关联起来了,当后者有状态变化时,会通过ep_poll_callback()
来通知.
所以,f_op->poll()做了两件事情:
1.将该epitem和这个待监听的fd关联起来;
2.查询这个待监听的fd是否已经有event已经ready了,有的话就将event返回.
**/
/*
* We have to check if something went wrong during the poll wait queue
* install process. Namely an allocation for a wait queue failed due
* high memory pressure.
*/
error = -ENOMEM;
if (epi->nwait < 0)
goto error_unregister;
/* Add the current item to the list of active epoll hook for this file */
/**
把每个文件和对应的epitem关联起来
**/
spin_lock(&tfile->f_lock);
list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
/*
* Add the current item to the RB tree. All RB tree operations are
* protected by "mtx", and ep_insert() is called with "mtx" held.
*/
/**
将epitem插入到eventpoll的红黑树中
**/
ep_rbtree_insert(ep, epi);
/* now check if we've created too many backpaths */
error = -EINVAL;
if (full_check && reverse_path_check())
goto error_remove_epi;
/* We have to drop the new item inside our item list to keep track of it */
spin_lock_irqsave(&ep->lock, flags); // TODO
/* If the file is already "ready" we drop it inside the ready list */
/**
在这里,如果待监听的fd已经有事件发生,就去处理一下
**/
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
/**
将当前的epitem加入到ready list中去
**/
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
/* Notify waiting tasks that events are available */
/**
哪个进程在调用epoll_wait(),就唤醒它
**/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
/**
先不通知对eventpoll进行poll的进程
**/
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
atomic_long_inc(&ep->user->epoll_watches);
/* We have to call this outside the lock */
if (pwake)
/**
安全地通知对eventpoll进行poll的进程
**/
ep_poll_safewake(&ep->poll_wait);
return 0;
error_remove_epi:
spin_lock(&tfile->f_lock);
list_del_rcu(&epi->fllink);
spin_unlock(&tfile->f_lock);
rb_erase(&epi->rbn, &ep->rbr);
error_unregister:
ep_unregister_pollwait(ep, epi);
/*
* We need to do this because an event could have been arrived on some
* allocated wait queue. Note that we don't care about the ep->ovflist
* list, since that is used/cleaned only inside a section bound by "mtx".
* And ep_insert() is called with "mtx" held.
*/
spin_lock_irqsave(&ep->lock, flags);
if (ep_is_linked(&epi->rdllink))
list_del_init(&epi->rdllink);
spin_unlock_irqrestore(&ep->lock, flags);
wakeup_source_unregister(ep_wakeup_source(epi));
error_create_wakeup_source:
kmem_cache_free(epi_cache, epi);
return error;
}
/*
* This is the callback that is used to add our wait queue to the
* target file wakeup lists.
*/
/**
该函数在调用f_op->poll()时被调用.
其作用是当epoll主动poll某个待监听fd时,将epitem和该fd关联起来.
关联的方法是使用等待队列.
**/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
/**
@ fs/eventpoll.c
// Wait structure used by the poll hooks
struct eppoll_entry {
// List header used to link this structure to the "struct epitem"
struct list_head llink;
// The "base" pointer is set to the container "struct epitem"
struct epitem *base;
// Wait queue item that will be linked to the target file wait
// queue head.
wait_queue_t wait;
// The wait queue head that linked the "wait" wait queue item
wait_queue_head_t *whead;
};
**/
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
/**
初始化等待队列,指定ep_poll_callback()为唤醒时的回调函数.
当监听的fd发生状态改变时,即队列头被唤醒时,指定的回调函数会被调用.
**/
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); // ep_poll_callback()的具体分析在下面
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
/*
* This is the callback that is passed to the wait queue wakeup
* mechanism. It is called by the stored file descriptors when they
* have events to report.
*/
/**
这是一个关键的回调函数.
当被监听的fd发生状态改变时,该函数会被调用.
参数key指向events.
**/
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait); // 从等待队列获取epitem
struct eventpoll *ep = epi->ep;
spin_lock_irqsave(&ep->lock, flags);
/*
* If the event mask does not contain any poll(2) event, we consider the
* descriptor to be disabled. This condition is likely the effect of the
* EPOLLONESHOT bit that disables the descriptor when an event is received,
* until the next EPOLL_CTL_MOD will be issued.
*/
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;
/*
* Check the events coming with the callback. At this stage, not
* every device reports the events in the "key" parameter of the
* callback. We need to be able to handle both cases here, hence the
* test for "key" != NULL before the event match test.
*/
/**
没有我们关心的event
**/
if (key && !((unsigned long) key & epi->event.events))
goto out_unlock;
/*
* If we are transferring events to userspace, we can hold no locks
* (because we're accessing user memory, and because of linux f_op->poll()
* semantics). All the events that happen during that period of time are
* chained in ep->ovflist and requeued later on.
*/
/**
如果该函数被调用时,epoll_wait()已经返回了,
即此时应用程序已经在循环中获取events了,
这种情况下,内核将此刻发生状态改变的epitem用一个单独的链表保存起来,并且在下一次epoll_wait()
时返回给用户.这个单独的链表就是ovflist.
*/
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
if (epi->ws) {
/*
* Activate ep->ws since epi->ws may get
* deactivated at any time.
*/
__pm_stay_awake(ep->ws);
}
}
goto out_unlock;
}
/* If this file is already in the ready list we exit soon */
/**
将当前epitem添加到ready list中
**/
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake_rcu(epi);
}
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
/**
唤醒调用epoll_wait()的进程
**/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
/**
先不通知对eventpoll进行poll的进程
**/
if (waitqueue_active(&ep->poll_wait))
pwake++;
out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);
/* We have to call this outside the lock */
if (pwake)
/**
安全地通知对eventpoll进行poll的进程
**/
ep_poll_safewake(&ep->poll_wait);
if ((unsigned long)key & POLLFREE) {
/*
* If we race with ep_remove_wait_queue() it can miss
* ->whead = NULL and do another remove_wait_queue() after
* us, so we can't use __remove_wait_queue().
*/
list_del_init(&wait->task_list);
/*
* ->whead != NULL protects us from the race with ep_free()
* or ep_remove(), ep_remove_wait_queue() takes whead->lock
* held by the caller. Once we nullify it, nothing protects
* ep/epi or even wait.
*/
smp_store_release(&ep_pwq_from_wait(wait)->whead, NULL);
}
return 1;
}
/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct fd f;
struct eventpoll *ep;
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
/**
内核要验证这一段用户空间的内存是不是有效的,可写的.
**/
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))
return -EFAULT;
/* Get the "struct file *" for the eventpoll file */
/**
获取epollfd的file结构
**/
f = fdget(epfd);
if (!f.file)
return -EBADF;
/*
* We have to check that the file structure underneath the fd
* the user passed to us _is_ an eventpoll file.
*/
error = -EINVAL;
/**
检查它是不是一个真正的epollfd
**/
if (!is_file_epoll(f.file))
goto error_fput;
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
/**
获取eventpoll结构
**/
ep = f.file->private_data;
/* Time to fish for events ... */
/**
睡眠,等待事件到来.
ep_poll()的具体分析在下面.
**/
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fdput(f);
return error;
}
/**
* ep_poll - Retrieves ready events, and delivers them to the caller supplied
* event buffer.
*
* @ep: Pointer to the eventpoll context.
* @events: Pointer to the userspace buffer where the ready events should be
* stored.
* @maxevents: Size (in terms of number of events) of the caller event buffer.
* @timeout: Maximum timeout for the ready events fetch operation, in
* milliseconds. If the @timeout is zero, the function will not block,
* while if the @timeout is less than zero, the function will block
* until at least one event has been retrieved (or an error
* occurred).
*
* Returns: Returns the number of ready events which have been fetched, or an
* error code, in case of error.
*/
/**
执行epoll_wait()的进程在该函数进入休眠状态.
**/
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res = 0, eavail, timed_out = 0;
unsigned long flags;
long slack = 0;
wait_queue_t wait;
ktime_t expires, *to = NULL;
if (timeout > 0) {
/**
计算睡眠时间
**/
struct timespec end_time = ep_set_mstimeout(timeout);
slack = select_estimate_accuracy(&end_time);
to = &expires;
*to = timespec_to_ktime(end_time);
} else if (timeout == 0) {
/**
已经超时,直接检查ready list
**/
/*
* Avoid the unnecessary trip to the wait queue loop, if the
* caller specified a non blocking operation.
*/
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
goto check_events;
}
fetch_events:
spin_lock_irqsave(&ep->lock, flags);
/**
没有可用的事件,即ready list和ovflist均为空.
**/
if (!ep_events_available(ep)) {
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
/**
初始化一个等待队列成员,current是当前进程.
然后把该等待队列成员添加到ep的等待队列中,即当前进程把自己添加到等待队列中.
**/
init_waitqueue_entry(&wait, current);
__add_wait_queue_exclusive(&ep->wq, &wait);
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
/**
将当前进程的状态设置为睡眠时可以被信号唤醒.
仅仅是状态设置,还没有睡眠.
**/
set_current_state(TASK_INTERRUPTIBLE);
/**
如果此时,ready list已经有成员了,或者已经超时,则不进入睡眠.
**/
if (ep_events_available(ep) || timed_out)
break;
/**
如果有信号产生,不进入睡眠.
**/
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
/**
挂起当前进程,等待被唤醒或超时
**/
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait); // 把当前进程从该epollfd的等待队列中删除.
__set_current_state(TASK_RUNNING); // 将当前进程的状态设置为可运行.
}
check_events:
/* Is it worth to try to dig for events ? */
eavail = ep_events_available(ep);
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
/**
如果一切正常,并且有event发生,则拷贝数据给用户空间
**/
// ep_send_events()的具体分析在下面
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;
return res;
}
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
/**
@ fs/eventpoll.c
// Used by the ep_send_events() function as callback private data
struct ep_send_events_data {
int maxevents;
struct epoll_event __user *events;
};
**/
esed.maxevents = maxevents;
esed.events = events;
// ep_scan_ready_list()的具体分析在下面
return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
}
/**
* ep_scan_ready_list - Scans the ready list in a way that makes possible for
* the scan code, to call f_op->poll(). Also allows for
* O(NumReady) performance.
*
* @ep: Pointer to the epoll private data structure.
* @sproc: Pointer to the scan callback.
* @priv: Private opaque data passed to the @sproc callback.
* @depth: The current depth of recursive f_op->poll calls.
* @ep_locked: caller already holds ep->mtx
*
* Returns: The same integer error code returned by the @sproc callback.
*/
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv, int depth, bool ep_locked)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
/*
* We need to lock this because we could be hit by
* eventpoll_release_file() and epoll_ctl().
*/
if (!ep_locked)
mutex_lock_nested(&ep->mtx, depth);
/*
* Steal the ready list, and re-init the original one to the
* empty list. Also, set ep->ovflist to NULL so that events
* happening while looping w/out locks, are not lost. We cannot
* have the poll callback to queue directly on ep->rdllist,
* because we want the "sproc" callback to be able to do it
* in a lockless way.
*/
spin_lock_irqsave(&ep->lock, flags);
/**
将ready list上的epitem(即监听事件发生状态改变的epitem)移动到txlist,
并且将ready list清空.
**/
list_splice_init(&ep->rdllist, &txlist);
/**
改变ovflist的值.
在上面的ep_poll_callback()中可以看到,如果ovflist != EP_UNACTIVE_PTR,当等待队列成员被激活时,
就会将对应的epitem加入到ep->ovflist中,否则加入到ep->rdllist中.
所以这里是为了防止把新来的发生状态改变的epitem加入到ready list中.
**/
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Now call the callback function.
*/
/**
调用扫描函数处理txlist.
该扫描函数就是ep_send_events_proc.具体分析在下面.
**/
error = (*sproc)(ep, &txlist, priv);
spin_lock_irqsave(&ep->lock, flags);
/*
* During the time we spent inside the "sproc" callback, some
* other events might have been queued by the poll callback.
* We re-insert them inside the main ready-list here.
*/
/**
在调用sproc()期间,可能会有新的事件发生(被添加到ovflist中),遍历这些发生新事件的epitem,
将它们插入到ready list中.
**/
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
/**
@ fs/eventpoll.c
#define EP_UNACTIVE_PTR ((void *) -1L)
**/
/*
* We need to check if the item is already in the list.
* During the "sproc" callback execution time, items are
* queued into ->ovflist but the "txlist" might already
* contain them, and the list_splice() below takes care of them.
*/
/**
epitem不在ready list?插入!
**/
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
/*
* We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
* releasing the lock, events will be queued in the normal way inside
* ep->rdllist.
*/
/**
还原ovflist的状态
**/
ep->ovflist = EP_UNACTIVE_PTR;
/*
* Quickly re-inject items left on "txlist".
*/
/**
将上次没有处理完的epitem,重新插入到ready list中.
**/
list_splice(&txlist, &ep->rdllist);
__pm_relax(ep->ws);
/**
如果ready list不为空,唤醒.
**/
if (!list_empty(&ep->rdllist)) {
/*
* Wake up (if active) both the eventpoll wait list and
* the ->poll() wait list (delayed after we release the lock).
*/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
if (!ep_locked)
mutex_unlock(&ep->mtx);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return error;
}
/**
该函数作为callback在ep_scan_ready_list()中被调用.
head是一个链表头,链接着已经ready了的epitem.
这个链表不是eventpoll的ready list,而是上面函数中的txlist.
**/
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
int eventcnt;
unsigned int revents;
struct epitem *epi;
struct epoll_event __user *uevent;
struct wakeup_source *ws;
poll_table pt;
init_poll_funcptr(&pt, NULL);
/*
* We can loop without lock because we are passed a task private list.
* Items cannot vanish during the loop because ep_scan_ready_list() is
* holding "mtx" during this call.
*/
/**
遍历整个链表
**/
for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
/**
取出第一个结点
**/
epi = list_first_entry(head, struct epitem, rdllink);
/*
* Activate ep->ws before deactivating epi->ws to prevent
* triggering auto-suspend here (in case we reactive epi->ws
* below).
*
* This could be rearranged to delay the deactivation of epi->ws
* instead, but then epi->ws would temporarily be out of sync
* with ep_is_linked().
*/
// TODO
ws = ep_wakeup_source(epi);
if (ws) {
if (ws->active)
__pm_stay_awake(ep->ws);
__pm_relax(ws);
}
/**
从ready list中删除该结点
**/
list_del_init(&epi->rdllink);
/**
获取ready事件掩码
**/
revents = ep_item_poll(epi, &pt);
/**
ep_item_poll()的具体分析在上面的ep_insert()中.
**/
/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding "mtx", so no operations coming from userspace
* can change the item.
*/
if (revents) {
/**
将ready事件和用户传入的数据都拷贝到用户空间
**/
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) {
/**
边缘触发(ET)和水平触发(LT)的区别:
如果是ET,就绪epitem不会再次被加入到ready list中,除非fd再次发生状态改变,ep_poll_callback被调用.
如果是LT,不论是否还有有效的事件和数据,epitem都会被再次加入到ready list中,在下次epoll_wait()时会
立即返回,并通知用户空间.当然如果这个被监听的fd确实没有事件和数据,epoll_wait()会返回一个0.
**/
/*
* If this file has been added with Level
* Trigger mode, we need to insert back inside
* the ready list, so that the next call to
* epoll_wait() will check again the events
* availability. At this point, no one can insert
* into ep->rdllist besides us. The epoll_ctl()
* callers are locked out by
* ep_scan_ready_list() holding "mtx" and the
* poll callback will queue them in ep->ovflist.
*/
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
}
return eventcnt;
}
/**
该函数在epollfd被close时调用,其工作是释放一些资源.
**/
static void ep_free(struct eventpoll *ep)
{
struct rb_node *rbp;
struct epitem *epi;
/* We need to release all tasks waiting for these file */
if (waitqueue_active(&ep->poll_wait))
ep_poll_safewake(&ep->poll_wait);
/*
* We need to lock this because we could be hit by
* eventpoll_release_file() while we're freeing the "struct eventpoll".
* We do not need to hold "ep->mtx" here because the epoll file
* is on the way to be removed and no one has references to it
* anymore. The only hit might come from eventpoll_release_file() but
* holding "epmutex" is sufficient here.
*/
mutex_lock(&epmutex);
/*
* Walks through the whole tree by unregistering poll callbacks.
*/
for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) {
epi = rb_entry(rbp, struct epitem, rbn);
ep_unregister_pollwait(ep, epi);
cond_resched();
}
/*
* Walks through the whole tree by freeing each "struct epitem". At this
* point we are sure no poll callbacks will be lingering around, and also by
* holding "epmutex" we can be sure that no file cleanup code will hit
* us during this operation. So we can avoid the lock on "ep->lock".
* We do not need to lock ep->mtx, either, we only do it to prevent
* a lockdep warning.
*/
mutex_lock(&ep->mtx);
/**
在epoll_ctl()中被添加的监听fd,在这里被关闭.
**/
while ((rbp = rb_first(&ep->rbr)) != NULL) {
epi = rb_entry(rbp, struct epitem, rbn);
ep_remove(ep, epi);
cond_resched();
}
mutex_unlock(&ep->mtx);
mutex_unlock(&epmutex);
mutex_destroy(&ep->mtx);
free_uid(ep->user);
wakeup_source_unregister(ep->ws);
kfree(ep);
}