select

int select(int maxfdp1,fd_set readset,fd_set writeset,fd_set exceptset,const struct timeval timeout);

  • 参数1:等于所有需要使用select函数检测的事件fd中最大值+1,一般是在for循环的过程中维护一个最大值
  • 参数2:需要监听的可读事件fd集合
  • 参数3:需要监听的可写事件fd集合
  • 参数4:需要监听的异常事件fd集合
  • 参数5:超时时间,即在该时间内检查以上设置的fd事件,超时则立即返回
  • 返回值:-1出错,需要额外检测errno;=0监听超时;>0发生事件

timeval的结构体定义

  1. struct timeval
  2. {
  3. long tv_sec; // 秒
  4. long tv_usec; // 微妙
  5. // 总超时时间等于 timeout->tv_sec+timeout->tv_usec;
  6. };

代码示例:

  1. timeval tm;
  2. tm.tv_sec = 5;
  3. tm.tv_usec = 0;
  4. int ret = select(maxfd + 1, &readset, NULL, NULL, &tm);
  5. if (ret == -1){
  6. if (errno != EINTR) break;
  7. } else if (ret == 0){
  8. continue;
  9. } else{
  10. // do_somethings
  11. }

fd_set

fd_set 是一个大小为16,类型为long int,用来存储文件描述符的位图数组,每个位代表了一个文件描述符,最大支持1024个文件描述符,需要修改内核参数才可以更改

  1. // 实际数组的长度是由宏决定的,__FD_SETSIZE / _NFDBITS = 16
  2. typedef struct
  3. {
  4. long int __fds_bits[16]; // 可以看出是一个数量为16的long int 的位图数组,因此总长度等于 16 * 8(字节) * 8(位) = 1024. 所以select最大支持1024个文件描述符.
  5. } fd_set;

常用宏指令

  • FD_ZERO(&set): 将set清零使集合中不含任何fd
  • FD_SET(fd, &set): 将fd加入set集合
  • FD_CLR(fd, &set): 将fd从set集合中清除
  • FD_ISSET(fd, &set): 在调用select()函数后,用FD_ISSET来检测fd是否在set集合中,当检测到fd在set中则返回真,否则,返回假(0)

代码示例:

  1. std::vector<int> clientfds;
  2. int maxfd;
  3. while (true){
  4. fd_set readset;
  5. FD_ZERO(&readset); // 置零
  6. // 将监听socket加入待检测的可读事件中
  7. FD_SET(listenfd, &readset);
  8. maxfd = listenfd;
  9. // 将客户端fd加入待检测的可读事件中
  10. int clientfdslength = clientfds.size();
  11. for (int i = 0; i < clientfdslength; ++i){
  12. if (clientfds[i] != -1){
  13. FD_SET(clientfds[i], &readset);
  14. if (maxfd < clientfds[i]) maxfd = clientfds[i];
  15. }
  16. }
  17. timeval tm;
  18. tm.tv_sec = 5;
  19. tm.tv_usec = 0;
  20. int ret = select(maxfd + 1, &readset, NULL, NULL, &tm);
  21. if (ret == -1){
  22. if (errno != EINTR) break;
  23. } else if (ret == 0){
  24. continue;
  25. } else{
  26. // 检测到某个socket有事件
  27. if (FD_ISSET(listenfd, &readset)){
  28. // 应该使用非阻塞的accept进行连接,并存入clientfds内
  29. }
  30. int clientfdslength = clientfds.size();
  31. for (int i = 0; i < clientfdslength; ++i){
  32. if (clientfds[i] != -1 && FD_ISSET(clientfds[i], &readset)){
  33. // 应该使用非阻塞的recv接收数据,如果接收数据报错,则清除
  34. }
  35. }
  36. }
  37. }

总结

  • 每次调用select都需要把fd_set集合从用户态拷贝到内核态,且无法复用,每次都需要重新创建一个新的fd_set
  • 需要在用户态轮询fd_set来进行数据处理
  • fd_set的大小有限制,底层是一个long类型的位图数组,最大连接数为1024
  • 只能是LT模式触发

poll

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

  • 参数1:fds是一个struct pollfd类型的数组,用于存放需要检测其状态的socket描述符,并且调用poll函数之后fds数组不会被清空;一个pollfd结构体表示一个被监视的文件描述符,通过传递fds指示 poll() 监视多个文件描述符。其中,结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域,结构体的revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域
  • 参数2:记录数组fds中描述符的总数量
  • 参数3:超时时间,单位毫秒
  • 返回值:函数返回fds集合中就绪的读、写,或出错的描述符数量,返回0表示超时,返回-1表示出错

pollfd

  1. typedef struct pollfd {
  2. int fd; // 需要被检测或选择的文件描述符
  3. short events; // 对文件描述符fd上感兴趣的事件,程序员设置
  4. short revents; // 文件描述符fd上当前实际发生的事件,内核返回
  5. } pollfd_t;

events 和 revents的常用参数有:

  • POLLIN 数据可读
  • POLLOUT 数据可写
  • POLLERR 发生异常

代码示例:

  1. std::vector<pollfd> fds; // 如果只是创建一个,那么不需要使用vector,直接使用一个pollfd即可
  2. pollfd listen_fd_info;
  3. listen_fd_info.fd = listenfd;
  4. listen_fd_info.events = POLLIN; // 所要监听的事件是读事件
  5. listen_fd_info.revents = 0;
  6. fds.push_back(listen_fd_info);
  7. int n;
  8. while (true){
  9. n = poll(&fds[0], fds.size(), 1000); // 如果只是监听一个,那么 poll(&listen_fd_info, 1, 1000)即可
  10. if (n < 0){
  11. // 被信号中断
  12. if (errno == EINTR)
  13. continue;
  14. // 出错,退出
  15. break;
  16. } else if (n == 0){
  17. // 超时,继续
  18. continue;
  19. }
  20. // 可见还是要遍历所有的fd,和select本质上无异
  21. int i = 0;
  22. while (i < fds.size()){
  23. if (fds[i].revents & POLLIN){ // 发生的事件是可读事件
  24. if (fds[i].fd == listenfd){
  25. // 使用非阻塞的accept监听,并加入fds中
  26. // 注意vector的特殊性,当push_back发生时,可能会使原迭代器失效,因此需要使用下标遍历。
  27. ++i;
  28. } else {
  29. // 使用非阻塞的recv接收
  30. // 当发生异常时,需要close对应的套接字,并从vector中erase旧的套接字
  31. if (接收数据失败){
  32. close(fds[i].fd);
  33. fds.erase(fds.begin() + i); // 当erase时,不需要++i
  34. } else {
  35. ++i;
  36. }
  37. }
  38. } else if (fds[i].revents & POLLERR){
  39. // 发生异常
  40. ++i;
  41. } else if (fds[i].revents & POLLOUT){
  42. // 可写入
  43. ++i;
  44. } else {
  45. ++i;
  46. }
  47. }
  48. }

总结

  • poll在用户态使用数组存储pollfd,当copy复制到内核态时,内核态使用的是链表结构,因此没有最大连接数限制(select的最大连接数由内核的fd_set数据结构限制,默认是1024)
  • 与select类似,仍然需要在用户态遍历所有的套接字才可以得知哪些套接字有事件触发
  • 只能是LT模式触发

EPOLL

int epoll_create(int size)

创建一个epoll句柄,参数size表明内核要监听的描述符数量(该参数已弃用,现在只需要传入一个大于0的值即可)。调用成功时返回一个epoll句柄描述符,失败时返回-1
每当一个进程调用了该方法,内核会创建一个eventpoll结构体,结构如下:

  1. struct eventpoll{
  2. ....
  3. /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
  4. struct rb_root rbr;
  5. /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
  6. struct list_head rdlist;
  7. ....
  8. }

代码示例:

  1. int epollfd = epoll_create(1);
  2. if (epollfd == -1){
  3. std::cout << "create epollfd error." << std::endl;
  4. close(listenfd);
  5. return -1;
  6. }

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

  • 参数1:epoll句柄,对应底层的eventpoll结构体
  • 参数2:fd操作类型,有以下3种:
    • EPOLL_CTL_ADD 注册新的fd到epfd中
    • EPOLL_CTL_MOD 修改已注册的fd的监听事件
    • EPOLL_CTL_DEL 从epfd中删除一个fd
  • 参数3:套接字描述符
  • 参数4:指向含有目标fd的epoll_event结构体的指针
  • 返回值:-1时表示异常

对于每一个事件fd,会建立一个epitem结构体,链接到eventpoll结构体的红黑树上(时间复杂度是O(log2n)),结构如下:

  1. struct epitem {
  2. // 对应红黑树的节点,包含3个指针,指向左右子树和上级节点)
  3. // tree的root保存在eventpoll rbr中
  4. struct rb_node rbn;
  5. // 对应eventpoll的rdllist
  6. // 当epitem对应的fd的存在已经ready的I/O事件,
  7. // 则ep_poll_callback回调函数会将该epitem链接到eventpoll中的rdllist循环链表中去,
  8. struct list_head rdllink;
  9. // epitem对应的fd
  10. struct epoll_filefd ffd;
  11. // 当前epitem属于哪个eventpoll
  12. struct eventpoll *ep;
  13. // 注册的感兴趣的事件,也就是用户空间的epoll_event,这个数据是调用epoll_ctl时从用户态传递过来
  14. struct epoll_event event;
  15. };

image.png
epitem既可以作为红黑树的节点,亦可作为链表的节点

epoll_event

  1. struct epoll_event {
  2. __uint32_t events; /* Epoll events */
  3. epoll_data_t data; /* User data variable */
  4. };
  5. typedef union epoll_data {
  6. void *ptr;
  7. int fd; // 一般用这个
  8. __uint32_t u32;
  9. __uint64_t u64;
  10. } epoll_data_t; // 用户自定义数据,本质是一个union对象,在64位操作系统中大小是8字节

events的常用参数有:

  • EPOLLIN 数据可读
  • EPOLLOUT 数据可写
  • EPOLLERR 发生异常

代码示例:

  1. epoll_event listen_fd_event;
  2. listen_fd_event.data.fd = listenfd;
  3. listen_fd_event.events = EPOLLIN;
  4. // 若取消注释这一行,则使用ET模式
  5. // listen_fd_event.events |= EPOLLET;
  6. // 返回值: == 0 : 成功调用,-1调用失败
  7. if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &listen_fd_event) == -1){
  8. std::cout << "epoll_ctl error." << std::endl;
  9. close(listenfd);
  10. return -1;
  11. }

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

  • 参数1:epoll句柄
  • 参数2:epoll_event结构体数组的首地址,当然如果只需要监听一个,直接传入一个epoll_event结构体的地址即可。当从内核返回后,内核会把有事件的n个fd置于前n位,可见相比较于poll、select的方式,不需要每次都重新计算epoll_event数组
  • 参数3:events数组的大小
  • 参数4:超时时间
  • 返回值:函数等待事件的就绪,成功时返回就绪的事件数目,调用失败时返回 -1,等待超时返回 0

epoll_wait会检查eventpoll结构体上的链表结构是否为空,当不为空时将数据拷贝至用户态指定的epoll_event数组中,否则阻塞等待一定的时间。

代码示例:

  1. int n;
  2. while (true){
  3. epoll_event epoll_events[1024];
  4. n = epoll_wait(epollfd, epoll_events, 1024, 1000);
  5. if (n < 0){
  6. // 被信号中断
  7. if (errno == EINTR)
  8. continue;
  9. // 出错,退出
  10. break;
  11. } else if (n == 0){
  12. // 超时,继续
  13. continue;
  14. }
  15. for (size_t i = 0; i < n; ++i){
  16. if (epoll_events[i].events & EPOLLIN){ // 发生的事件是可读事件
  17. if (epoll_events[i].data.fd == listenfd){
  18. // 监听socket,接受新连接
  19. sockaddr_in clientaddr;
  20. socklen_t clientaddrlen = sizeof(clientaddr);
  21. // 接受客户端连接并将产生的clientfd加入fds集合中
  22. int clientfd = accept(listenfd, (sockaddr*)&clientaddr, &clientaddrlen);
  23. if (clientfd != INVALID_FD){
  24. // 将客户端socket设置为非阻塞
  25. int oldSocketFlag = fcntl(clientfd, F_GETFL, 0);
  26. int newSocketFlag = oldSocketFlag | O_NONBLOCK;
  27. if (fcntl(clientfd, F_SETFL, newSocketFlag) == INVALID_FD){
  28. close(clientfd);
  29. std::cout << "set clientfd to nonblock error." << std::endl;
  30. } else {
  31. epoll_event client_fd_event;
  32. client_fd_event.data.fd = clientfd;
  33. client_fd_event.events = EPOLLIN;
  34. // 若取消注释这一行,则使用ET模式
  35. // client_fd_event.events |= EPOLLET;
  36. // 默认是LT模式,即有数据可读时总是返回EPOLLIN
  37. // 使用ET模式,则只有当有新数据输入时才会有事件,因此需要我们使用循环一直读取数据直到触发返回-1,且错误码为EWOULDBLOCK
  38. if (epoll_ctl(epollfd, EPOLL_CTL_ADD, clientfd, &client_fd_event) != INVALID_FD){
  39. std::cout << "new client accepted, clientfd: " << clientfd << std::endl;
  40. } else {
  41. std::cout << "add client fd to epollfd error." << std::endl;
  42. close(clientfd);
  43. }
  44. }
  45. }
  46. } else {
  47. // 普通的clientfd,收取数据
  48. std::cout << "client fd: " << epoll_events[i].data.fd << " recvdata." << std::endl;
  49. char buf[1] = {0};
  50. int m = recv(epoll_events[i].data.fd, buf, 1, 0);
  51. if (m <= 0){
  52. if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN){
  53. if (epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1){
  54. std::cout << "client disconnected, clientfd: " << epoll_events[i].data.fd << std::endl;
  55. }
  56. close(epoll_events[i].data.fd);
  57. }
  58. } else {
  59. std::cout << "recv from client: " << buf << ", clientfd: " << epoll_events[i].data.fd << std::endl;
  60. }
  61. }
  62. } else if (epoll_events[i].events & EPOLLERR){
  63. //TODO: 暂不处理
  64. }
  65. }
  66. }

EPOLLONESHOT

当多个线程同时处理一个socket fd时,可能会出现这样一个情况,当事件被触发时,A线程读取了数据并开始处理,此时又有新数据到来,此时又使用了另外的线程去接收数据并处理,这样会导致数据乱序的异常问题。在LT和ET模式都会有这种情况。
解决方法有:

  1. 规定只能使用一个线程去处理该fd
  2. 加锁处理
  3. 使用EPOLLONESHOT

EPOLLONESHOT 的原理是当 socket 事件被处理之后,就必须重新通过 EPOLLONESHOT 注册,否则不会再次触发,这样就可以达到有序处理的目的。

多进程EPOLL惊群现象

非IO多路复用下多进程的惊群问题

在不使用 IO 多路复用的情况下,一个进程只能监听一个 tcp 连接,为了提高效率一般使用多进程处理,有两种模式:
模式1:由一个主进程进行 accept 监听,接受一个连接之后 fork 出一个子进程,把新连接传给子进程处理。由于只有一个监听的进程,因此这种方式不会有惊群问题。
模式2:由主进程 fork 出多个子进程,子进程继承父进程的监听 tcp 连接fd,所有进程一起监听。这种方式下,由于内核调用的是 prepare_to_wait_exclusive 方法把进程(task_struct)添加到 socket 监听队列下,在对一个等待队列进程唤醒时只会唤醒一个进程,因此也不存在惊群现象

内核对进程唤醒提供了两种模式,一种是 prepare_to_wati,此种方式不会设置互斥位,会将挂在这个等待队列上的所有进程都唤醒;一种是 prepare_to_wait_exclusive,这种方式会设置互斥位,因此每次只会唤醒一个进程。

image.png

IO多路复用下多进程的惊群问题

当使用单进程的 EPOLL 时,由于只有一个进程的关系,不存在进群现象。
但是在多进程下,由于各进程都使用 epoll_wait() 同一个 listen_fd,因此当有事件触发时,会同时唤醒所有的进程。
原因在于底层的实现上,当 EPOLL 把当前进程挂在 fd 的等待队列时,默认不会添加互斥位。
解决方法有:

  1. 使用 epoll_ctl 为 fd 添加一个 EPOLLEXCLUSIVE 标志位
  2. 在建立 listen_fd 时使用 SO_REUSEPORT 标志,底层会进行负载均衡,使事件只有一个进程处理。
    image.png

总结

  • epoll能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了,极大的减少了不必要的数据拷贝。注意对于游戏来说,一般是连接少(单服几万),而大量活跃,select/poll效率高,因为epoll每次都需要触发一次回调,而select/poll可以一次遍历即可。
  • epoll提供水平触发(Level Triggered),边缘触发(Edge Triggered)
  • ET模式很大程度上减少了epoll事件的触发次数,因此效率比LT模式下高。

image.png