1 IO模型总结

阻塞模式:调用recv函数接收对端数据时,如果对端没有数据发送过来,进程会一直阻塞在那里什么都不能做,直到数据到来才能开始工作。图示如下:
imgclip1000.png

Socket支持设置为非阻塞模式,需要调用下面函数设置:

  1. fcntl(socketId, F_SETFL, flag | 0_NONBLOCK);

非阻塞模式下,如果recv没有发现数据,会返回-1,错误代码为EMOULDBLOCK, 进程不会被阻塞。根据错误代码可以判断当前对端没有发送数据,可以尝试继续调用recv函数。图示如下:
imgclip999.png

IO模型的差异总结如下:
image.png

2 IO复用

用select管理IO,监听多个文件描述符,若其中某个或多个描述符对应的通道有数据到来,select方法就会返回。然后再用recv去获取数据,就不会出现阻塞。
图示如下:
imgclip998.png

2.1 select复用

头文件:<sys/select.h>
函数定义: int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, timeval* timeout);
功能: 同时管理多个IO,IO有事件发生就会返回;返回值为IO事件的个数和哪些IO, 失败返回-1。
参数:

  • nfds: 所有集合中最大描述符值+1
  • readfds: 有数据可读时函数返回的socket集合
  • writefds: 有数据可写时函数返回的socket集合
  • exceptfds: 出现异常时函数返回的集合
  • timeout: 超时时间,NULL代表不超时

描述符集合相关的几个函数():

  • void FD_CLR(int fd, fd_set *set): 将fd从集合中移除
  • int FD_ISSET(int fd, fd_set *set): fd是否再集合之中
  • void FD_SET(int fd, fd_set *set): 将fd添加到集合
  • void FD_ZERO(fd_set *set): 清空集合

select代码示例:

  1. //client.c:
  2. #include <stdio.h>
  3. #include <sys/socket.h>
  4. #include <arpa/inet.h>
  5. #include <string.h>
  6. #include <unistd.h>
  7. #include <sys/select.h>
  8. int main()
  9. {
  10. //create socket
  11. int socketId;
  12. if ((socketId = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
  13. {
  14. printf("socket create failed\r\n");
  15. }
  16. sockaddr_in addr;
  17. memset(&addr, 0, sizeof(addr));
  18. addr.sin_family = AF_INET; //指定协议
  19. addr.sin_port = htons(5188); //指定网络字节序格式的端口号
  20. addr.sin_addr.s_addr = inet_addr("127.0.0.1");
  21. //connect to server via socket
  22. if (connect(socketId, (sockaddr*)&addr, sizeof(addr)) < 0)
  23. {
  24. printf("connect to server failed\r\n");
  25. }
  26. //使用select实现IO复用监听、读写数据
  27. fd_set rset;
  28. FD_ZERO(&rset);//create a set and initial it
  29. int stdinFd = fileno(stdin); //获取标准输入的文件描述符
  30. int maxfd = (stdinFd < socketId) ? socketId : stdinFd;
  31. int nready;
  32. char recvbuf[1024];
  33. char sendbuf[1024];
  34. while(1)
  35. {
  36. FD_SET(stdinFd, &rset);
  37. FD_SET(socketId, &rset);
  38. nready = select(maxfd + 1, &rset, NULL, NULL, NULL);
  39. if (nready == -1)
  40. printf("select failed\r\n");
  41. else if (nready == 0)
  42. continue;
  43. else
  44. {
  45. //产生可读事件,返回的是那个fd产生的
  46. //检查是那个fd产生的事件
  47. if (FD_ISSET(socketId, &rset))
  48. {
  49. //是socket产生可读事件,表示数据到来,获取数据
  50. read(socketId, recvbuf, sizeof(recvbuf));
  51. printf("received from server: %s\r\n", recvbuf);
  52. memset(recvbuf, 0, sizeof(recvbuf));
  53. }
  54. else if (FD_ISSET(stdinFd, &rset))
  55. {
  56. //是控制台输入产生的可读事件,表示用户输入数据了,需要发送到server
  57. if (fgets(sendbuf, sizeof(sendbuf), stdin) == NULL)
  58. break;
  59. write(socketId, sendbuf, strlen(sendbuf));
  60. memset(sendbuf, 0, sizeof(sendbuf));
  61. }
  62. }
  63. }
  64. close(socketId);
  65. return 0;
  66. }
  67. //=============================================
  68. //server.c
  69. #include <stdio.h>
  70. #include <sys/socket.h>
  71. #include <arpa/inet.h>
  72. #include <string.h>
  73. #include <unistd.h>
  74. #include <signal.h>
  75. #include <sys/wait.h>
  76. #include <sys/select.h>
  77. #include <errno.h>
  78. void customHandler(int sig)
  79. {
  80. while(waitpid(-1, NULL, WNOHANG) > 0)
  81. ;
  82. }
  83. int main()
  84. {
  85. //add SIGCHLD signal handler
  86. signal(SIGCHLD, customHandler);
  87. //create socket
  88. int socketId;
  89. if ((socketId = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
  90. {
  91. printf("socket create failed\r\n");
  92. }
  93. sockaddr_in addr;
  94. memset(&addr, 0, sizeof(addr));
  95. addr.sin_family = AF_INET; //指定协议
  96. addr.sin_port = htons(5188); //指定网络字节序格式的端口号
  97. /** addr.sin_addr = htonl(INADDR_ANY); //指定本地任意ip地址,使用网络字节序 */
  98. addr.sin_addr.s_addr = inet_addr("127.0.0.1");
  99. //set REUSEADDR
  100. int on = 1;
  101. if(setsockopt(socketId, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
  102. {
  103. printf("socket set REUSEADDR failed\r\n");
  104. }
  105. //start bind
  106. if (bind(socketId, (sockaddr*)&addr, sizeof(addr)) < 0)
  107. {
  108. printf("socket bind failed\r\n");
  109. }
  110. //start listen
  111. if (listen(socketId, SOMAXCONN) < 0)
  112. {
  113. printf("socket listen failed\r\n");
  114. }
  115. sockaddr_in remoteAddr;
  116. socklen_t remoteLen = sizeof(remoteAddr);
  117. int remoteConnId;
  118. //select IO复用方式实现多客户端连接处理
  119. int connectedClient[FD_SETSIZE];//最大值为select函数可接受的最多fd
  120. for (int i = 0; i < FD_SETSIZE; i++)
  121. {
  122. connectedClient[i] = -1;
  123. }
  124. int nready;
  125. int maxFd = socketId;
  126. fd_set rset;
  127. fd_set allset;
  128. FD_ZERO(&rset);
  129. FD_ZERO(&allset);
  130. FD_SET(socketId, &allset);//第一次添加监听socket,此时还没有客户端socket
  131. while(1)
  132. {
  133. rset = allset;
  134. nready = select(maxFd + 1, &rset, NULL, NULL, NULL);
  135. if (nready == -1)
  136. {
  137. //select失败,判断是否为系统中断导致,是中断则继续
  138. if (errno == EINTR)
  139. continue;
  140. printf("select failed");
  141. return 0;
  142. }
  143. else if (nready == 0)
  144. continue;
  145. if (FD_ISSET(socketId, &rset))
  146. {
  147. //监听端口有可读事件,表示有客户端连接过来,需要建立连接
  148. remoteConnId = accept(socketId, (sockaddr*)&remoteAddr, &remoteLen);
  149. if (remoteConnId == -1)
  150. {
  151. printf("accept failed");
  152. return 0;
  153. }
  154. int i;
  155. for (int i = 0; i < FD_SETSIZE; i++)
  156. {
  157. if (connectedClient[i] == -1)
  158. {
  159. connectedClient[i] = remoteConnId;
  160. break;
  161. }
  162. }
  163. if (i == FD_SETSIZE)
  164. {
  165. printf("too many clients");
  166. return 0;
  167. }
  168. printf("IP = %s, port = %d\r\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port));
  169. FD_SET(remoteConnId, &allset);//添加已连接的socket到下一次select集合中
  170. if (remoteConnId > maxFd) //更新最大fd值
  171. maxFd = remoteConnId;
  172. if (--nready <= 0)
  173. continue; //返回集合都处理了,继续下一次循环
  174. }
  175. //处理已连接socket的读写事件
  176. for (int i = 0; i < FD_SETSIZE; i++)
  177. {
  178. remoteConnId = connectedClient[i];
  179. if (remoteConnId == -1)
  180. continue;
  181. if (FD_ISSET(remoteConnId, &rset))
  182. {
  183. char rcvBuf[1024] = {0};
  184. int ret = read(remoteConnId, rcvBuf, sizeof(rcvBuf));// 从socket中读取数据流
  185. if (ret == 0)
  186. {
  187. printf("close client: [%d]\r\n", remoteConnId);
  188. connectedClient[i] = -1;
  189. FD_CLR(remoteConnId, &allset);
  190. close(remoteConnId);
  191. }
  192. fputs(rcvBuf, stdout);
  193. write(remoteConnId, rcvBuf, strlen(rcvBuf));//将数据再写回remote端
  194. if (--nready <= 0)
  195. continue;
  196. }
  197. }
  198. }
  199. close(socketId);
  200. return 0;
  201. }

select就绪的条件

可读:

  • 套接口接收缓冲区有数据可读
  • 连接的读通道关闭,即接收到FIN段,读操作将返回0
  • 对于监听socket,已完成连接的对了不为空时(server端)
  • 套接口发生错误待处理

可写:

  • 套接口发送缓冲区有空间容纳数据
  • 连接的写通道关闭
  • 套接口发生错误待处理

异常:

  • 套接口存在带外数据(普通数据触发可读)

select的限制

  • 一个进程能打开的最大文件描述符限制,可以通过调整内核参数进行修改。下面代码可以用来测试: ```c

    include

    include

    include

    include

int main() { struct rlimit rl; if (getrlimit(RLIMIT_NOFILE, &rl) < 0) { printf(“getrlimit error\n”); return 0; }

  1. printf("%d\n", (int)rl.rlim_max);
  2. rl.rlim_cur = 5000;
  3. rl.rlim_max = 5000;
  4. if (setrlimit(RLIMIT_NOFILE, &rl) < 0)
  5. {
  6. printf("setrlimit error\n");
  7. return 0;
  8. }
  9. return 0;

}

  1. - select中的fd_set**集合容量限制**(FD_SETSIZE=1024),要修改需要重新编译内核
  2. <a name="eu77b"></a>
  3. ## 2.2 poll复用
  4. poll函数没有集合容量的限制(仍然有第一个限制),定义如下<br />头文件:`<poll.h>`<br />函数定义: `int poll(struct pollfd* fd, nfds_t nfds, int timeout);`<br />功能: 同时管理多个IOIO有事件发生就会返回;返回值为IO事件的个数和哪些IO 失败返回-1。<br />参数:
  5. - fds: 要监听的套接口和事件
  6. - nfds: 要检测的IO个数
  7. - timeout: 超时时间
  8. 结构体pollfd结构如下:
  9. ```c
  10. struct pollfd {
  11. int fd; //要监听的IO接口
  12. short events; //要监听的事件
  13. short revnets; //由内核返回,发生在fd上的事件
  14. }

结构体pollfd中event的取值如下:
image.png

2.3 epoll复用

epoll函数

epoll是Linux特有的IO复用函数,epoll和select、poll的不同表现在:

  • epoll使用一组函数,不是单个函数
  • epoll把需要监听的文件描述符上的事件放在内核事件表中,不需要每次传入结构体和集合
    • epoll的事件和poll一样,宏定义多了E前缀
  • epoll需要一个文件描述符表示内核事件表

用到的函数如下:
头文件:<sys/epoll.h>
函数定义:

  1. int epoll_create(int size); //创建内核事件表(哈希实现),size为个数。返回事件表的描述符
  2. int epoll_create1(int flags);//创建红黑树实现的内核事件表
  3. //操作事件表,参数分别为描述符,操作方式(增删改等),要监听的文件描述符,要监听的事件
  4. //op = EPOLL_CTL_ADD,添加fd上新的监听事件
  5. //op = EPOLL_CTL_MOD,修改fd上的监听事件
  6. //op = EPOLL_CTL_DEL,删除fd上的监听事件
  7. int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
  8. //等待监听事件发生,参数分别为epoll描述符,监听到的事件,事件个数,超时时间
  9. //返回值为就绪的描述符个数
  10. int epoll_wait(int epfd, struct epoll_event* event, int maxevents, int timeout);

epoll效率高的原因:

  • 不会随监听个数增加而降低效率,基于回调,如果有期望的事件发生会通过回调函数将其加入epoll队列,因此与个数无关。
  • select/poll采用内存拷贝方法通知用户进程,epoll采用的是共享内存
  • epoll会告诉app相关信息(通过event里的socket id和事件类型)用于app直接定位到事件,而不必遍历整个集合(见下面的示例) ```c //处理poll返回的就绪文件描述符 int ret = poll(fds, MAX_EVENT_NUMBER, -1); //需要遍历fds中每个描述符 for (int i = 0; i < MAX_EVENT_NUMBER; i++) { if (fds[i].revents & POLLIN)//判断返回的事件是不是要监听的事件
    1. //do something
    }

//处理epoll返回的就绪文件描述符 int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); //只需要遍历ret个描述符 for (int i = 0; i < ret; i++) { int socketfd = event[i].data.fd; //do something }

  1. <a name="uh4qK"></a>
  2. ### epoll工作模式
  3. epoll对文件描述符的操作有两种模式:LT (Level Trigger,电平触发)模式和ET (Edge Trigger,边沿触发)模式:
  4. - LT模式是默认的工作模式,这种模式下epoll相当于一个效率较高的poll。对于采用LT工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,**应用程序可以不立即处理该事件。当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理**。
  5. - ET模式是epoll的高效工作模式。对于采用ET工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,**应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件**。
  6. ET模式在很大程度上降低了同一个epoll事件被重复触发的次数, 因此效率要比LT模式高。当往epoll内核事件表中注册一个文件描述符上的**EPOLLET事件**(poll没有该事件)时,epoll将以ET模式来操作该文件描述符。
  7. 下面代码是两种模式处理的示例:
  8. ```c
  9. //测试epoll函数的LT和ET模式
  10. #include <sys/types.h>
  11. #include <sys/socket.h>
  12. #include <netinet/in.h>
  13. #include <arpa/inet.h>
  14. #include <assert.h>
  15. #include <stdio.h>
  16. #include <unistd.h>
  17. #include <errno.h>
  18. #include <string.h>
  19. #include <fcntl.h>
  20. #include <stdlib.h>
  21. #include <sys/epoll.h>
  22. #include <stdbool.h>
  23. #include <libgen.h>
  24. #define MAX_EVENT_NUMBER 1024
  25. #define BUFFER_SIZE 10
  26. //设置描述符为非阻塞模式
  27. int setNonblocking(int fd)
  28. {
  29. int old_option = fcntl(fd, F_GETFL);
  30. int new_option = old_option | O_NONBLOCK;
  31. fcntl(fd, F_SETFL, new_option);
  32. return old_option;
  33. }
  34. //添加描述符到epoll事件表,可以指定LT or ET模式
  35. void addfd(int epollfd, int fd, bool enableET)
  36. {
  37. struct epoll_event event;
  38. event.data.fd = fd;
  39. event.events = EPOLLIN; //监听数据可读事件
  40. if (enableET)
  41. event.events |= EPOLLET;
  42. epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
  43. setNonblocking(fd);
  44. }
  45. //LT模式下工作流程
  46. void lt_flow(struct epoll_event *event, int number, int epollfd, int listenfd)
  47. {
  48. char buf[BUFFER_SIZE];
  49. for (int i = 0; i < number; i++)
  50. {
  51. int sockfd = event[i].data.fd;
  52. if (sockfd == listenfd) //epoll返回的是监听中的socket,说明有新客户端连接
  53. {
  54. struct sockaddr_in client_addr;
  55. socklen_t client_len = sizeof(client_addr);
  56. //接受客户端连接
  57. int connfd = accept(listenfd, (struct sockaddr *)&client_addr, &client_len);
  58. addfd(epollfd, connfd, false); //LT模式,加入到epoll事件表
  59. }
  60. else if (event[i].events & EPOLLIN) //有数据可读
  61. {
  62. //因为是LT模式,socket缓存中有未读出的数据,就会触发EPOLLIN事件
  63. printf("event trigger once\n");
  64. memset(buf, 0, BUFFER_SIZE);
  65. int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
  66. if (ret <= 0)
  67. {
  68. close(sockfd); //出异常了,关闭socket
  69. continue;
  70. }
  71. printf("get %d bytes of connets: %s\n", ret, buf);
  72. }
  73. else
  74. {
  75. printf("something else happened\n");
  76. }
  77. }
  78. }
  79. //ET模式下工作流程
  80. void et_flow(struct epoll_event *event, int number, int epollfd, int listenfd)
  81. {
  82. char buf[BUFFER_SIZE];
  83. for (int i = 0; i < number; i++)
  84. {
  85. int sockfd = event[i].data.fd;
  86. if (sockfd == listenfd) //epoll返回的是监听中的socket,说明有新客户端连接
  87. {
  88. struct sockaddr_in client_addr;
  89. socklen_t client_len = sizeof(client_addr);
  90. //接受客户端连接
  91. int connfd = accept(listenfd, (struct sockaddr *)&client_addr, &client_len);
  92. addfd(epollfd, connfd, true); //ET模式,加入到epoll事件表
  93. }
  94. else if (event[i].events & EPOLLIN) //有数据可读
  95. {
  96. //因为是ET模式,只会触发一次EPOLLIN事件,所以需要在这里一次性读完数据
  97. printf("event trigger once\n");
  98. while (1)
  99. {
  100. memset(buf, 0, BUFFER_SIZE);
  101. int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
  102. if (ret < 0)
  103. {
  104. if (errno == EAGAIN || errno == EWOULDBLOCK)
  105. { //判断用于非阻塞IO,表示数据全部读完,等待下一次数据可读,不关闭sockfd
  106. printf("read later\n");
  107. break;
  108. }
  109. close(sockfd); //出异常了,关闭socket
  110. break;
  111. }
  112. else if (ret == 0)
  113. {
  114. close(sockfd); //数据读完,关闭socket
  115. }
  116. else
  117. {
  118. printf("get %d bytes of connets: %s\n", ret, buf);
  119. }
  120. }
  121. }
  122. else
  123. {
  124. printf("something else happened\n");
  125. }
  126. }
  127. }
  128. int main(int argc, char *argv[])
  129. {
  130. if (argc <= 2)
  131. {
  132. printf("Usage: %s ip_address port_number\n", basename(argv[0]));
  133. return 1;
  134. }
  135. const char *ip = argv[1];
  136. const int port = atoi(argv[2]);
  137. int ret = 0;
  138. struct sockaddr_in address;
  139. bzero(&address, sizeof(address));
  140. address.sin_family = AF_INET; //IPv4
  141. address.sin_addr.s_addr = inet_addr(ip);
  142. address.sin_port = htons(port);
  143. int listenfd = socket(PF_INET, SOCK_STREAM, 0);
  144. assert(listenfd >= 0);
  145. ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
  146. assert(ret >= 0);
  147. ret = listen(listenfd, 5); //最大连接数5
  148. assert(ret >= 0);
  149. //创建epoll并添加socket到事件表中
  150. struct epoll_event events[MAX_EVENT_NUMBER];
  151. int epollfd = epoll_create(5);
  152. assert(epollfd >= 0);
  153. addfd(epollfd, listenfd, true);
  154. while(1)
  155. {
  156. //使用epoll等待事件触发,可能是新客户端连接,可能是数据可写
  157. ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
  158. if(ret < 0)
  159. {
  160. printf("epoll failed\n");
  161. break;
  162. }
  163. //选择LT或ET模式处理
  164. lt_flow(events, ret, epollfd, listenfd);
  165. // et_flow(events, ret, epollfd, listenfd);
  166. }
  167. close(listenfd);
  168. return 0;
  169. }

当用telnet充当客户端发送大于10字节的数据时,由于buffer只有10字节,所以LT和ET会出现不同的现象,ET上报的可读事件只有一次:
image.png

EPOLLONESHOT事件

即使使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中就会引起一个问题。比如一个线程(或进程)在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个 socket的局面
我们期望的是一个socket连接在任一时刻都只被一 个线程处理。这一点可以使用epoll的EPOLLONESHOT事件实现。

  • 对于注册了EPOLLONESHOT事件的文件描述符,OS最多触发其上注册的一个可读、可写、异常事件,且只触发一次。
  • 在某个线程处理完后,需要重置描述符上的EPOLLONESHOT事件,确保下次事件可以被正常触发。

使用结构如下:

  1. void addfd(int epollfd, int fd, bool oneshot)
  2. {
  3. struct epoll_event event;
  4. event.data.fd = fd;
  5. event.events = EPOLLIN | EPOLLET; //监听数据可读事件
  6. if (oneshot)
  7. event.events |= EPOLLONESHOT;
  8. epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
  9. setNonblocking(fd);
  10. }
  11. //正常处理可读写、异常事件后,调用reset函数重置描述符
  12. void reset_oneshot(int epollfd, int fd)
  13. {
  14. struct epoll_event event;
  15. event.data.fd = fd;
  16. event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
  17. epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event); //modify操作重置
  18. }

2.4 三种复用方式的比较

image.png

3 信号驱动IO

3.1 信号处理概述和函数

Linux信号产生的条件:

  • 对于前台进程,用户可以通过输入特殊的终端字符来给它发送信号。比如输人Ctrl+C通常会给进程发送一个中断信号
  • 系统异常。比如浮点异常和非法内存段访问
  • 系统状态变化。比如alarm定时器到期将引起SIGALRM信号
  • 运行kill命令或调用kill函数。

Linux系统信号的含义如下(也可以通过man signal查看),服务器程序必须处理(或至少忽略)一些常见的信号,以免异常终止
Unix系统信号含义

信号发送函数

  1. #include <sys/types.h>
  2. #include <signal.h>
  3. int kill(pid_t pid, int sig); //将信号sig发送给进程pid

pid是进程号,不同值具有不同的含义:

pid参数 含义
pid > 0 发送给进程号为pid的进程
pid = 0 发送给本进程组内其他进程
pid = -1 发送给出init进程外的所有进程,要求发送者有相应的权限
pid < -1 发送给组ID为-pid的进程组中所有进程

自定义信号处理

  1. //默认信号处理函数,宏定义:
  2. SIG_DFL //系统默认处理
  3. SIG_IGN //忽略信号
  4. //自定义的信号处理函数的格式,输入参数int为指定的信号
  5. typedef void (*sighandler_t)(int);
  6. //下面两个函数用于为指定信号设置处理函数
  7. sighandler_t signal(int signum, sighandler_t handler);
  8. int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);

signal只能生效一次,下次需要重新指定处理函数,而sigaction接口更加健壮,act是指定的新的信号处理方式,oldact是返回的之前的处理方式。自定义的信号处理函数需要填充到sigaction结构体中:

  1. struct sigaction {
  2. void (*sa_handler)(int); //自定义信号处理函数
  3. void (*sa_sigaction)(int, siginfo_t *, void *); //更强大的信号处理函数,选一个指定
  4. sigset_t sa_mask; //进程的信号掩码,可以用sigprocmask获取和设置
  5. int sa_flags; //程序收到信号时的行为
  6. void (*sa_restorer)(void); //过时,不要使用
  7. };

sa_flags 字段指定对信号进行处理的各个选项:
image.png

信号集函数

结构体sigset_t可以用来指定一组信号,和文件描述符集合fd_set很相似,其处理函数如下:

  1. #include <signal.h>
  2. int sigemptyset(sigset_t *set); //清空集合
  3. int sigfillset(sigset_t *set); //设置所有信号
  4. int sigaddset(sigset_t *set, int signum); //添加指定信号
  5. int sigdelset(sigset_t *set, int signum); //删除指定信号
  6. int sigismember(const sigset_t *set, int signum); 测试信号是否集合中
  7. int sigpending(sigset_t *set); //获取当前被系统挂起的信号

3.2 信号处理IO事件

对SIGIO信号注册专门的信号处理函数。在有数据到来的时候,SIGIO信号被触发,处理函数就可以通知程序用recv去接收数据。等待信号不会阻塞进程。
图示如下:
imgclip997.png

示例代码如下:信号通过pipe管道传递给while循环,进行数据读取或其他处理:

  1. //信号传输IO事件
  2. #include <sys/types.h>
  3. #include <sys/socket.h>
  4. #include <netinet/in.h>
  5. #include <arpa/inet.h>
  6. #include <assert.h>
  7. #include <stdio.h>
  8. #include <unistd.h>
  9. #include <errno.h>
  10. #include <string.h>
  11. #include <fcntl.h>
  12. #include <stdlib.h>
  13. #include <sys/epoll.h>
  14. #include <stdbool.h>
  15. #include <libgen.h>
  16. #include <signal.h>
  17. #define MAX_EVENT_NUMBER 1024
  18. #define BUFFER_SIZE 1024
  19. static int pipefd[2];//双向通道
  20. //设置描述符为非阻塞模式
  21. int setNonblocking(int fd)
  22. {
  23. int old_option = fcntl(fd, F_GETFL);
  24. int new_option = old_option | O_NONBLOCK;
  25. fcntl(fd, F_SETFL, new_option);
  26. return old_option;
  27. }
  28. //添加描述符到epoll事件表,指定ET模式
  29. void addfd(int epollfd, int fd)
  30. {
  31. struct epoll_event event;
  32. event.data.fd = fd;
  33. event.events = EPOLLIN | EPOLLET; //监听数据可读事件
  34. epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
  35. setNonblocking(fd);
  36. }
  37. //自定义信号处理函数
  38. void sig_handler(int sig)
  39. {
  40. int save_errno = errno;//保留原来的errno,保证函数的可重入性
  41. int msg = sig;
  42. send(pipefd[1], (char*)&msg, 1, 0);//发送信号到管道,让while循环处理
  43. errno = save_errno;
  44. }
  45. //添加信号处理映射的函数
  46. void addsighandler(int sig)
  47. {
  48. struct sigaction sa;
  49. memset(&sa, 0, sizeof(sa));
  50. sa.sa_handler = sig_handler;
  51. sa.sa_flags |= SA_RESTART;
  52. sigfillset(&sa.sa_mask);
  53. assert(sigaction(sig, &sa, NULL) != -1);
  54. }
  55. //ET模式下工作流程, 返回值为是否结束循环
  56. bool et_flow(struct epoll_event* event, int number, int epollfd, int listenfd)
  57. {
  58. char buf[BUFFER_SIZE];
  59. for (int i = 0; i < number; i++)
  60. {
  61. int sockfd = event[i].data.fd;
  62. if (sockfd == listenfd) //epoll返回的是监听中的socket,说明有新客户端连接
  63. {
  64. struct sockaddr_in client_addr;
  65. socklen_t client_len = sizeof(client_addr);
  66. //接受客户端连接
  67. int connfd = accept(listenfd, (struct sockaddr*)&client_addr, &client_len);
  68. addfd(epollfd, connfd); //加入到epoll事件表
  69. }
  70. else if (sockfd == pipefd[0] && event[i].events & EPOLLIN)
  71. {
  72. //是管道发来的信号,触发了可读事件
  73. int sig;
  74. int ret = recv(pipefd[0], buf, BUFFER_SIZE - 1, 0);
  75. if (ret <= 0)
  76. {
  77. return false;
  78. }
  79. else
  80. {
  81. for (int i = 0; i < ret; i++)
  82. {
  83. printf("receive signal from pipe: %d\n", buf[i]);
  84. switch (buf[i]) //遍历每个收到的信号
  85. {
  86. case SIGCHLD:
  87. case SIGHUP:
  88. {
  89. return false;
  90. }
  91. case SIGTERM:
  92. case SIGINT:
  93. {
  94. return true;
  95. }
  96. }
  97. }
  98. }
  99. }
  100. else if (event[i].events & EPOLLIN) //有数据可读
  101. {
  102. //因为是ET模式,只会触发一次EPOLLIN事件,所以需要在这里一次性读完数据
  103. printf("event trigger once\n");
  104. while (1)
  105. {
  106. memset(buf, 0, BUFFER_SIZE);
  107. int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
  108. if (ret < 0)
  109. {
  110. if (errno == EAGAIN || errno == EWOULDBLOCK)
  111. {
  112. //判断用于非阻塞IO,表示数据全部读完,等待下一次数据可读,不关闭sockfd
  113. printf("read later\n");
  114. break;
  115. }
  116. close(sockfd); //出异常了,关闭socket
  117. break;
  118. }
  119. else if (ret == 0)
  120. {
  121. close(sockfd); //数据读完,关闭socket
  122. }
  123. else
  124. {
  125. printf("get %d bytes of connets: %s\n", ret, buf);
  126. }
  127. }
  128. }
  129. else
  130. {
  131. printf("something else happened\n");
  132. }
  133. }
  134. return false;
  135. }
  136. int main(int argc, char* argv[])
  137. {
  138. if (argc <= 2)
  139. {
  140. printf("Usage: %s ip_address port_number\n", basename(argv[0]));
  141. return 1;
  142. }
  143. const char* ip = argv[1];
  144. const int port = atoi(argv[2]);
  145. int ret = 0;
  146. struct sockaddr_in address;
  147. bzero(&address, sizeof(address));
  148. address.sin_family = AF_INET; //IPv4
  149. address.sin_addr.s_addr = inet_addr(ip);
  150. address.sin_port = htons(port);
  151. int listenfd = socket(PF_INET, SOCK_STREAM, 0);
  152. assert(listenfd >= 0);
  153. ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
  154. assert(ret >= 0);
  155. ret = listen(listenfd, 5); //最大连接数5
  156. assert(ret >= 0);
  157. //创建epoll并添加socket到事件表中
  158. struct epoll_event events[MAX_EVENT_NUMBER];
  159. int epollfd = epoll_create(5);
  160. assert(epollfd >= 0);
  161. addfd(epollfd, listenfd);
  162. //创建本地域管道,用于父子进程通信,并监听可读事件
  163. ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);
  164. assert(ret >= 0);
  165. setNonblocking(pipefd[1]);
  166. addfd(epollfd, pipefd[0]);
  167. //设置一些信号的自定义处理
  168. addsighandler(SIGHUP);
  169. addsighandler(SIGCHLD);
  170. addsighandler(SIGTERM);
  171. addsighandler(SIGINT);
  172. bool stop_while = false;
  173. while (!stop_while)
  174. {
  175. //使用epoll等待事件触发,可能是新客户端连接,可能是数据可写
  176. ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
  177. if (ret < 0 && errno != EINTR)
  178. {
  179. printf("epoll failed\n");
  180. break;
  181. }
  182. stop_while = et_flow(events, ret, epollfd, listenfd);
  183. }
  184. close(listenfd);
  185. close(pipefd[0]);
  186. close(pipefd[1]);
  187. return 0;
  188. }

4 异步IO

异步IO方式是效率最高的模式,使用aio_read取代recv。aio_read会向内核空间提交一个buffer,即使没有数据到来,aio_read会立刻返回,buffer会保留在内核空间。
当有数据到来的时候,数据会被复制到buffer,同时会触发一个信号(信号定义在aio_read参数中)通知应用程序。应用程序只需要为该信号添加处理函数即可。
图示如下:
imgclip996.png

4.1 POSIX异步IO结构体

POSIX异步IO接口使用aiocb来描述IO操作:

  1. #include <aiocb.h>
  2. struct aiocb {
  3. int aio_fildes; /* 被打开用于读写的文件描述符 */
  4. off_t aio_offset; /* 偏移量,必须显式的定义*/
  5. volatile void *aio_buf; /* 要读写的内容*/
  6. size_t aio_nbytes; /* 要读写的字节数 */
  7. int aio_reqprio; /* 异步IO请求优先级 */
  8. struct sigevent aio_sigevent; /* 控制如何通知应用程序*/
  9. int aio_lio_opcode; /* Operation to be performed lio_listio() only */
  10. };

sigevent结构体用于定义如何通知进程:

  1. #include <signal.h>
  2. struct sigevent {
  3. int sigev_notify; /* 通知的类型:不通知SIGEV_NONE,信号通知SIGEV_SIGNAL,函数调用 SIGEV_THREAD*/
  4. int sigev_signo; /* 信号通知指定的信号 */
  5. union sigval sigev_value; /* 信号通知附件的info */
  6. void (*sigev_notify_function) (union sigval); /* 指定的函数调用*/
  7. void *sigev_notify_attributes; /* Attributes for notification thread (SIGEV_THREAD) */
  8. pid_t sigev_notify_thread_id; /* ID of thread to signal (SIGEV_THREAD_ID) */
  9. };

4.2 POSIX异步IO函数

  1. #include <aio.h>
  2. int aio_read(struct aiocb *aiocbp);
  3. int aio_write(struct aiocb *aiocbp);
  4. int aio_fsync(int op, struct aiocb *aiocbp); //同步数据到存储,op为操作
  5. int aio_error(const struct aiocb *aiocbp); //获取异步读写和同步操作的完成状态,返回值为状态
  6. ssize_t aio_return(struct aiocb *aiocbp); //获取异步操作的返回值
  7. int aio_suspend(const struct aiocb * const aiocb_list[], int nitems, const struct timespec *timeout); //阻塞进程,等待异步操作完成
  8. int aio_cancel(int fd, struct aiocb *aiocbp); //取消异步操作

aio_error返回值:

返回值 含义
0 异步操作成功
-1 aio_error调用失败
EINPROGRESS 操作仍在等待

aio_cancel返回值:

返回值 含义
AIO_ALLDONE 所有操作在取消前已经完成
AIO_CANCELED 所有操作已经取消
AIO_NOTCANCELED 至少有一个要求的操作没有被取消
-1 aio_cancel调用失败

4.3 异步IO代码示例

  1. //ROT-13翻译算法异步操作版
  2. #include "../include/apue.h"
  3. #include <ctype.h>
  4. #include <fcntl.h>
  5. #include <aio.h>
  6. #include <errno.h>
  7. #define BSZ 4096
  8. #define NBUF 8
  9. enum rwop {
  10. UNUSED = 0,
  11. READ_PENDING = 1,
  12. WRITE_PENDING = 2
  13. };
  14. struct buf {
  15. enum rwop op;
  16. int last;
  17. struct aiocb aiocb;
  18. unsigned char data[BSZ];
  19. };
  20. struct buf bufs[NBUF];
  21. unsigned char
  22. translate(unsigned char c)
  23. {
  24. if (isalpha(c)) {
  25. if (c >= 'n')
  26. c -= 13;
  27. else if (c >= 'a')
  28. c += 13;
  29. else if (c >= 'N')
  30. c -= 13;
  31. else
  32. c += 13;
  33. }
  34. return(c);
  35. }
  36. int
  37. main(int argc, char* argv[])
  38. {
  39. int ifd, ofd, i, j, n, err, numop;
  40. struct stat sbuf;
  41. const struct aiocb *aiolist[NBUF];
  42. off_t off = 0;
  43. if (argc != 3)
  44. err_quit("usage: rot13 infile outfile");
  45. if ((ifd = open(argv[1], O_RDONLY)) < 0)//打开输入文件
  46. err_sys("can't open %s", argv[1]);
  47. if ((ofd = open(argv[2], O_RDWR|O_CREAT|O_TRUNC, FILE_MODE)) < 0)//打开输出文件
  48. err_sys("can't create %s", argv[2]);
  49. if (fstat(ifd, &sbuf) < 0)
  50. err_sys("fstat failed");
  51. /* 初始化8个缓冲区 */
  52. for (i = 0; i < NBUF; i++) {
  53. bufs[i].op = UNUSED;
  54. bufs[i].aiocb.aio_buf = bufs[i].data;
  55. bufs[i].aiocb.aio_sigevent.sigev_notify = SIGEV_NONE;
  56. aiolist[i] = NULL;
  57. }
  58. numop = 0;
  59. for (;;) {
  60. for (i = 0; i < NBUF; i++) {
  61. switch (bufs[i].op) {
  62. case UNUSED:
  63. /*
  64. * 从输入文件读取未读内容
  65. */
  66. if (off < sbuf.st_size) {
  67. bufs[i].op = READ_PENDING;
  68. bufs[i].aiocb.aio_fildes = ifd;
  69. bufs[i].aiocb.aio_offset = off;
  70. off += BSZ;
  71. if (off >= sbuf.st_size)
  72. bufs[i].last = 1;
  73. bufs[i].aiocb.aio_nbytes = BSZ;
  74. if (aio_read(&bufs[i].aiocb) < 0) //异步读取
  75. err_sys("aio_read failed");
  76. aiolist[i] = &bufs[i].aiocb;
  77. numop++;
  78. }
  79. break;
  80. case READ_PENDING:
  81. if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS)//检查异步操作是否完成,未完成则继续循环
  82. continue;
  83. if (err != 0) {
  84. if (err == -1)
  85. err_sys("aio_error failed");
  86. else
  87. err_exit(err, "read failed");
  88. }
  89. /*
  90. * 异步读完成,翻译然后进行异步写
  91. */
  92. if ((n = aio_return(&bufs[i].aiocb)) < 0) //获取异步读的返回值
  93. err_sys("aio_return failed");
  94. if (n != BSZ && !bufs[i].last)
  95. err_quit("short read (%d/%d)", n, BSZ);
  96. for (j = 0; j < n; j++)
  97. bufs[i].data[j] = translate(bufs[i].data[j]); //开始翻译
  98. bufs[i].op = WRITE_PENDING;
  99. bufs[i].aiocb.aio_fildes = ofd;
  100. bufs[i].aiocb.aio_nbytes = n;
  101. if (aio_write(&bufs[i].aiocb) < 0) //异步写入
  102. err_sys("aio_write failed");
  103. /* retain our spot in aiolist */
  104. break;
  105. case WRITE_PENDING:
  106. if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS)
  107. continue;
  108. if (err != 0) {
  109. if (err == -1)
  110. err_sys("aio_error failed");
  111. else
  112. err_exit(err, "write failed");
  113. }
  114. /*
  115. * 异步写完成
  116. */
  117. if ((n = aio_return(&bufs[i].aiocb)) < 0)
  118. err_sys("aio_return failed");
  119. if (n != bufs[i].aiocb.aio_nbytes)
  120. err_quit("short write (%d/%d)", n, BSZ);
  121. aiolist[i] = NULL;
  122. bufs[i].op = UNUSED;//将此缓冲区置为空闲
  123. numop--;
  124. break;
  125. }
  126. }
  127. if (numop == 0) {
  128. if (off >= sbuf.st_size)
  129. break;
  130. } else {
  131. if (aio_suspend(aiolist, NBUF, NULL) < 0) //等待所有异步操作完成
  132. err_sys("aio_suspend failed");
  133. }
  134. }
  135. bufs[0].aiocb.aio_fildes = ofd;
  136. if (aio_fsync(O_SYNC, &bufs[0].aiocb) < 0) //异步同步操作,写入存储文件
  137. err_sys("aio_fsync failed");
  138. exit(0);
  139. }