1 IO模型总结
阻塞模式:调用recv函数接收对端数据时,如果对端没有数据发送过来,进程会一直阻塞在那里什么都不能做,直到数据到来才能开始工作。图示如下:
Socket支持设置为非阻塞模式,需要调用下面函数设置:
fcntl(socketId, F_SETFL, flag | 0_NONBLOCK);
非阻塞模式下,如果recv没有发现数据,会返回-1,错误代码为EMOULDBLOCK, 进程不会被阻塞。根据错误代码可以判断当前对端没有发送数据,可以尝试继续调用recv函数。图示如下:
2 IO复用
用select管理IO,监听多个文件描述符,若其中某个或多个描述符对应的通道有数据到来,select方法就会返回。然后再用recv去获取数据,就不会出现阻塞。
图示如下:
2.1 select复用
头文件:<sys/select.h>
函数定义: int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, timeval* timeout);
功能: 同时管理多个IO,IO有事件发生就会返回;返回值为IO事件的个数和哪些IO, 失败返回-1。
参数:
- nfds: 所有集合中最大描述符值+1
- readfds: 有数据可读时函数返回的socket集合
- writefds: 有数据可写时函数返回的socket集合
- exceptfds: 出现异常时函数返回的集合
- timeout: 超时时间,NULL代表不超时
描述符集合相关的几个函数(
- void FD_CLR(int fd, fd_set *set): 将fd从集合中移除
- int FD_ISSET(int fd, fd_set *set): fd是否再集合之中
- void FD_SET(int fd, fd_set *set): 将fd添加到集合
- void FD_ZERO(fd_set *set): 清空集合
select代码示例:
//client.c:#include <stdio.h>#include <sys/socket.h>#include <arpa/inet.h>#include <string.h>#include <unistd.h>#include <sys/select.h>int main(){//create socketint socketId;if ((socketId = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0){printf("socket create failed\r\n");}sockaddr_in addr;memset(&addr, 0, sizeof(addr));addr.sin_family = AF_INET; //指定协议addr.sin_port = htons(5188); //指定网络字节序格式的端口号addr.sin_addr.s_addr = inet_addr("127.0.0.1");//connect to server via socketif (connect(socketId, (sockaddr*)&addr, sizeof(addr)) < 0){printf("connect to server failed\r\n");}//使用select实现IO复用监听、读写数据fd_set rset;FD_ZERO(&rset);//create a set and initial itint stdinFd = fileno(stdin); //获取标准输入的文件描述符int maxfd = (stdinFd < socketId) ? socketId : stdinFd;int nready;char recvbuf[1024];char sendbuf[1024];while(1){FD_SET(stdinFd, &rset);FD_SET(socketId, &rset);nready = select(maxfd + 1, &rset, NULL, NULL, NULL);if (nready == -1)printf("select failed\r\n");else if (nready == 0)continue;else{//产生可读事件,返回的是那个fd产生的//检查是那个fd产生的事件if (FD_ISSET(socketId, &rset)){//是socket产生可读事件,表示数据到来,获取数据read(socketId, recvbuf, sizeof(recvbuf));printf("received from server: %s\r\n", recvbuf);memset(recvbuf, 0, sizeof(recvbuf));}else if (FD_ISSET(stdinFd, &rset)){//是控制台输入产生的可读事件,表示用户输入数据了,需要发送到serverif (fgets(sendbuf, sizeof(sendbuf), stdin) == NULL)break;write(socketId, sendbuf, strlen(sendbuf));memset(sendbuf, 0, sizeof(sendbuf));}}}close(socketId);return 0;}//=============================================//server.c#include <stdio.h>#include <sys/socket.h>#include <arpa/inet.h>#include <string.h>#include <unistd.h>#include <signal.h>#include <sys/wait.h>#include <sys/select.h>#include <errno.h>void customHandler(int sig){while(waitpid(-1, NULL, WNOHANG) > 0);}int main(){//add SIGCHLD signal handlersignal(SIGCHLD, customHandler);//create socketint socketId;if ((socketId = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0){printf("socket create failed\r\n");}sockaddr_in addr;memset(&addr, 0, sizeof(addr));addr.sin_family = AF_INET; //指定协议addr.sin_port = htons(5188); //指定网络字节序格式的端口号/** addr.sin_addr = htonl(INADDR_ANY); //指定本地任意ip地址,使用网络字节序 */addr.sin_addr.s_addr = inet_addr("127.0.0.1");//set REUSEADDRint on = 1;if(setsockopt(socketId, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0){printf("socket set REUSEADDR failed\r\n");}//start bindif (bind(socketId, (sockaddr*)&addr, sizeof(addr)) < 0){printf("socket bind failed\r\n");}//start listenif (listen(socketId, SOMAXCONN) < 0){printf("socket listen failed\r\n");}sockaddr_in remoteAddr;socklen_t remoteLen = sizeof(remoteAddr);int remoteConnId;//select IO复用方式实现多客户端连接处理int connectedClient[FD_SETSIZE];//最大值为select函数可接受的最多fdfor (int i = 0; i < FD_SETSIZE; i++){connectedClient[i] = -1;}int nready;int maxFd = socketId;fd_set rset;fd_set allset;FD_ZERO(&rset);FD_ZERO(&allset);FD_SET(socketId, &allset);//第一次添加监听socket,此时还没有客户端socketwhile(1){rset = allset;nready = select(maxFd + 1, &rset, NULL, NULL, NULL);if (nready == -1){//select失败,判断是否为系统中断导致,是中断则继续if (errno == EINTR)continue;printf("select failed");return 0;}else if (nready == 0)continue;if (FD_ISSET(socketId, &rset)){//监听端口有可读事件,表示有客户端连接过来,需要建立连接remoteConnId = accept(socketId, (sockaddr*)&remoteAddr, &remoteLen);if (remoteConnId == -1){printf("accept failed");return 0;}int i;for (int i = 0; i < FD_SETSIZE; i++){if (connectedClient[i] == -1){connectedClient[i] = remoteConnId;break;}}if (i == FD_SETSIZE){printf("too many clients");return 0;}printf("IP = %s, port = %d\r\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port));FD_SET(remoteConnId, &allset);//添加已连接的socket到下一次select集合中if (remoteConnId > maxFd) //更新最大fd值maxFd = remoteConnId;if (--nready <= 0)continue; //返回集合都处理了,继续下一次循环}//处理已连接socket的读写事件for (int i = 0; i < FD_SETSIZE; i++){remoteConnId = connectedClient[i];if (remoteConnId == -1)continue;if (FD_ISSET(remoteConnId, &rset)){char rcvBuf[1024] = {0};int ret = read(remoteConnId, rcvBuf, sizeof(rcvBuf));// 从socket中读取数据流if (ret == 0){printf("close client: [%d]\r\n", remoteConnId);connectedClient[i] = -1;FD_CLR(remoteConnId, &allset);close(remoteConnId);}fputs(rcvBuf, stdout);write(remoteConnId, rcvBuf, strlen(rcvBuf));//将数据再写回remote端if (--nready <= 0)continue;}}}close(socketId);return 0;}
select就绪的条件
可读:
- 套接口接收缓冲区有数据可读
- 连接的读通道关闭,即接收到FIN段,读操作将返回0
- 对于监听socket,已完成连接的对了不为空时(server端)
- 套接口发生错误待处理
可写:
- 套接口发送缓冲区有空间容纳数据
- 连接的写通道关闭
- 套接口发生错误待处理
异常:
- 套接口存在带外数据(普通数据触发可读)
select的限制
int main() { struct rlimit rl; if (getrlimit(RLIMIT_NOFILE, &rl) < 0) { printf(“getrlimit error\n”); return 0; }
printf("%d\n", (int)rl.rlim_max);rl.rlim_cur = 5000;rl.rlim_max = 5000;if (setrlimit(RLIMIT_NOFILE, &rl) < 0){printf("setrlimit error\n");return 0;}return 0;
}
- select中的fd_set**集合容量限制**(FD_SETSIZE=1024),要修改需要重新编译内核<a name="eu77b"></a>## 2.2 poll复用poll函数没有集合容量的限制(仍然有第一个限制),定义如下<br />头文件:`<poll.h>`<br />函数定义: `int poll(struct pollfd* fd, nfds_t nfds, int timeout);`<br />功能: 同时管理多个IO,IO有事件发生就会返回;返回值为IO事件的个数和哪些IO, 失败返回-1。<br />参数:- fds: 要监听的套接口和事件- nfds: 要检测的IO个数- timeout: 超时时间结构体pollfd结构如下:```cstruct pollfd {int fd; //要监听的IO接口short events; //要监听的事件short revnets; //由内核返回,发生在fd上的事件}
结构体pollfd中event的取值如下:
2.3 epoll复用
epoll函数
epoll是Linux特有的IO复用函数,epoll和select、poll的不同表现在:
- epoll使用一组函数,不是单个函数
- epoll把需要监听的文件描述符上的事件放在内核事件表中,不需要每次传入结构体和集合
- epoll的事件和poll一样,宏定义多了E前缀
- epoll需要一个文件描述符表示内核事件表
用到的函数如下:
头文件:<sys/epoll.h>
函数定义:
int epoll_create(int size); //创建内核事件表(哈希实现),size为个数。返回事件表的描述符int epoll_create1(int flags);//创建红黑树实现的内核事件表//操作事件表,参数分别为描述符,操作方式(增删改等),要监听的文件描述符,要监听的事件//op = EPOLL_CTL_ADD,添加fd上新的监听事件//op = EPOLL_CTL_MOD,修改fd上的监听事件//op = EPOLL_CTL_DEL,删除fd上的监听事件int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);//等待监听事件发生,参数分别为epoll描述符,监听到的事件,事件个数,超时时间//返回值为就绪的描述符个数int epoll_wait(int epfd, struct epoll_event* event, int maxevents, int timeout);
epoll效率高的原因:
- 不会随监听个数增加而降低效率,基于回调,如果有期望的事件发生会通过回调函数将其加入epoll队列,因此与个数无关。
- select/poll采用内存拷贝方法通知用户进程,epoll采用的是共享内存。
- epoll会告诉app相关信息(通过event里的socket id和事件类型)用于app直接定位到事件,而不必遍历整个集合(见下面的示例)
```c
//处理poll返回的就绪文件描述符
int ret = poll(fds, MAX_EVENT_NUMBER, -1);
//需要遍历fds中每个描述符
for (int i = 0; i < MAX_EVENT_NUMBER; i++)
{
if (fds[i].revents & POLLIN)//判断返回的事件是不是要监听的事件
}//do something
//处理epoll返回的就绪文件描述符 int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); //只需要遍历ret个描述符 for (int i = 0; i < ret; i++) { int socketfd = event[i].data.fd; //do something }
<a name="uh4qK"></a>### epoll工作模式epoll对文件描述符的操作有两种模式:LT (Level Trigger,电平触发)模式和ET (Edge Trigger,边沿触发)模式:- LT模式是默认的工作模式,这种模式下epoll相当于一个效率较高的poll。对于采用LT工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,**应用程序可以不立即处理该事件。当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理**。- ET模式是epoll的高效工作模式。对于采用ET工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,**应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件**。ET模式在很大程度上降低了同一个epoll事件被重复触发的次数, 因此效率要比LT模式高。当往epoll内核事件表中注册一个文件描述符上的**EPOLLET事件**(poll没有该事件)时,epoll将以ET模式来操作该文件描述符。下面代码是两种模式处理的示例:```c//测试epoll函数的LT和ET模式#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <assert.h>#include <stdio.h>#include <unistd.h>#include <errno.h>#include <string.h>#include <fcntl.h>#include <stdlib.h>#include <sys/epoll.h>#include <stdbool.h>#include <libgen.h>#define MAX_EVENT_NUMBER 1024#define BUFFER_SIZE 10//设置描述符为非阻塞模式int setNonblocking(int fd){int old_option = fcntl(fd, F_GETFL);int new_option = old_option | O_NONBLOCK;fcntl(fd, F_SETFL, new_option);return old_option;}//添加描述符到epoll事件表,可以指定LT or ET模式void addfd(int epollfd, int fd, bool enableET){struct epoll_event event;event.data.fd = fd;event.events = EPOLLIN; //监听数据可读事件if (enableET)event.events |= EPOLLET;epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);setNonblocking(fd);}//LT模式下工作流程void lt_flow(struct epoll_event *event, int number, int epollfd, int listenfd){char buf[BUFFER_SIZE];for (int i = 0; i < number; i++){int sockfd = event[i].data.fd;if (sockfd == listenfd) //epoll返回的是监听中的socket,说明有新客户端连接{struct sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);//接受客户端连接int connfd = accept(listenfd, (struct sockaddr *)&client_addr, &client_len);addfd(epollfd, connfd, false); //LT模式,加入到epoll事件表}else if (event[i].events & EPOLLIN) //有数据可读{//因为是LT模式,socket缓存中有未读出的数据,就会触发EPOLLIN事件printf("event trigger once\n");memset(buf, 0, BUFFER_SIZE);int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);if (ret <= 0){close(sockfd); //出异常了,关闭socketcontinue;}printf("get %d bytes of connets: %s\n", ret, buf);}else{printf("something else happened\n");}}}//ET模式下工作流程void et_flow(struct epoll_event *event, int number, int epollfd, int listenfd){char buf[BUFFER_SIZE];for (int i = 0; i < number; i++){int sockfd = event[i].data.fd;if (sockfd == listenfd) //epoll返回的是监听中的socket,说明有新客户端连接{struct sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);//接受客户端连接int connfd = accept(listenfd, (struct sockaddr *)&client_addr, &client_len);addfd(epollfd, connfd, true); //ET模式,加入到epoll事件表}else if (event[i].events & EPOLLIN) //有数据可读{//因为是ET模式,只会触发一次EPOLLIN事件,所以需要在这里一次性读完数据printf("event trigger once\n");while (1){memset(buf, 0, BUFFER_SIZE);int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);if (ret < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){ //判断用于非阻塞IO,表示数据全部读完,等待下一次数据可读,不关闭sockfdprintf("read later\n");break;}close(sockfd); //出异常了,关闭socketbreak;}else if (ret == 0){close(sockfd); //数据读完,关闭socket}else{printf("get %d bytes of connets: %s\n", ret, buf);}}}else{printf("something else happened\n");}}}int main(int argc, char *argv[]){if (argc <= 2){printf("Usage: %s ip_address port_number\n", basename(argv[0]));return 1;}const char *ip = argv[1];const int port = atoi(argv[2]);int ret = 0;struct sockaddr_in address;bzero(&address, sizeof(address));address.sin_family = AF_INET; //IPv4address.sin_addr.s_addr = inet_addr(ip);address.sin_port = htons(port);int listenfd = socket(PF_INET, SOCK_STREAM, 0);assert(listenfd >= 0);ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));assert(ret >= 0);ret = listen(listenfd, 5); //最大连接数5assert(ret >= 0);//创建epoll并添加socket到事件表中struct epoll_event events[MAX_EVENT_NUMBER];int epollfd = epoll_create(5);assert(epollfd >= 0);addfd(epollfd, listenfd, true);while(1){//使用epoll等待事件触发,可能是新客户端连接,可能是数据可写ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);if(ret < 0){printf("epoll failed\n");break;}//选择LT或ET模式处理lt_flow(events, ret, epollfd, listenfd);// et_flow(events, ret, epollfd, listenfd);}close(listenfd);return 0;}
当用telnet充当客户端发送大于10字节的数据时,由于buffer只有10字节,所以LT和ET会出现不同的现象,ET上报的可读事件只有一次:
EPOLLONESHOT事件
即使使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中就会引起一个问题。比如一个线程(或进程)在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个 socket的局面。
我们期望的是一个socket连接在任一时刻都只被一 个线程处理。这一点可以使用epoll的EPOLLONESHOT事件实现。
- 对于注册了EPOLLONESHOT事件的文件描述符,OS最多触发其上注册的一个可读、可写、异常事件,且只触发一次。
- 在某个线程处理完后,需要重置描述符上的EPOLLONESHOT事件,确保下次事件可以被正常触发。
使用结构如下:
void addfd(int epollfd, int fd, bool oneshot){struct epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET; //监听数据可读事件if (oneshot)event.events |= EPOLLONESHOT;epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);setNonblocking(fd);}//正常处理可读写、异常事件后,调用reset函数重置描述符void reset_oneshot(int epollfd, int fd){struct epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event); //modify操作重置}
2.4 三种复用方式的比较
3 信号驱动IO
3.1 信号处理概述和函数
Linux信号产生的条件:
- 对于前台进程,用户可以通过输入特殊的终端字符来给它发送信号。比如输人Ctrl+C通常会给进程发送一个中断信号
- 系统异常。比如浮点异常和非法内存段访问
- 系统状态变化。比如alarm定时器到期将引起SIGALRM信号
- 运行kill命令或调用kill函数。
Linux系统信号的含义如下(也可以通过man signal查看),服务器程序必须处理(或至少忽略)一些常见的信号,以免异常终止。
Unix系统信号含义
信号发送函数
#include <sys/types.h>#include <signal.h>int kill(pid_t pid, int sig); //将信号sig发送给进程pid
pid是进程号,不同值具有不同的含义:
| pid参数 | 含义 |
|---|---|
| pid > 0 | 发送给进程号为pid的进程 |
| pid = 0 | 发送给本进程组内其他进程 |
| pid = -1 | 发送给出init进程外的所有进程,要求发送者有相应的权限 |
| pid < -1 | 发送给组ID为-pid的进程组中所有进程 |
自定义信号处理
//默认信号处理函数,宏定义:SIG_DFL //系统默认处理SIG_IGN //忽略信号//自定义的信号处理函数的格式,输入参数int为指定的信号typedef void (*sighandler_t)(int);//下面两个函数用于为指定信号设置处理函数sighandler_t signal(int signum, sighandler_t handler);int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);
signal只能生效一次,下次需要重新指定处理函数,而sigaction接口更加健壮,act是指定的新的信号处理方式,oldact是返回的之前的处理方式。自定义的信号处理函数需要填充到sigaction结构体中:
struct sigaction {void (*sa_handler)(int); //自定义信号处理函数void (*sa_sigaction)(int, siginfo_t *, void *); //更强大的信号处理函数,选一个指定sigset_t sa_mask; //进程的信号掩码,可以用sigprocmask获取和设置int sa_flags; //程序收到信号时的行为void (*sa_restorer)(void); //过时,不要使用};
信号集函数
结构体sigset_t可以用来指定一组信号,和文件描述符集合fd_set很相似,其处理函数如下:
#include <signal.h>int sigemptyset(sigset_t *set); //清空集合int sigfillset(sigset_t *set); //设置所有信号int sigaddset(sigset_t *set, int signum); //添加指定信号int sigdelset(sigset_t *set, int signum); //删除指定信号int sigismember(const sigset_t *set, int signum); 测试信号是否集合中int sigpending(sigset_t *set); //获取当前被系统挂起的信号
3.2 信号处理IO事件
对SIGIO信号注册专门的信号处理函数。在有数据到来的时候,SIGIO信号被触发,处理函数就可以通知程序用recv去接收数据。等待信号不会阻塞进程。
图示如下:
示例代码如下:信号通过pipe管道传递给while循环,进行数据读取或其他处理:
//信号传输IO事件#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <assert.h>#include <stdio.h>#include <unistd.h>#include <errno.h>#include <string.h>#include <fcntl.h>#include <stdlib.h>#include <sys/epoll.h>#include <stdbool.h>#include <libgen.h>#include <signal.h>#define MAX_EVENT_NUMBER 1024#define BUFFER_SIZE 1024static int pipefd[2];//双向通道//设置描述符为非阻塞模式int setNonblocking(int fd){int old_option = fcntl(fd, F_GETFL);int new_option = old_option | O_NONBLOCK;fcntl(fd, F_SETFL, new_option);return old_option;}//添加描述符到epoll事件表,指定ET模式void addfd(int epollfd, int fd){struct epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET; //监听数据可读事件epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);setNonblocking(fd);}//自定义信号处理函数void sig_handler(int sig){int save_errno = errno;//保留原来的errno,保证函数的可重入性int msg = sig;send(pipefd[1], (char*)&msg, 1, 0);//发送信号到管道,让while循环处理errno = save_errno;}//添加信号处理映射的函数void addsighandler(int sig){struct sigaction sa;memset(&sa, 0, sizeof(sa));sa.sa_handler = sig_handler;sa.sa_flags |= SA_RESTART;sigfillset(&sa.sa_mask);assert(sigaction(sig, &sa, NULL) != -1);}//ET模式下工作流程, 返回值为是否结束循环bool et_flow(struct epoll_event* event, int number, int epollfd, int listenfd){char buf[BUFFER_SIZE];for (int i = 0; i < number; i++){int sockfd = event[i].data.fd;if (sockfd == listenfd) //epoll返回的是监听中的socket,说明有新客户端连接{struct sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);//接受客户端连接int connfd = accept(listenfd, (struct sockaddr*)&client_addr, &client_len);addfd(epollfd, connfd); //加入到epoll事件表}else if (sockfd == pipefd[0] && event[i].events & EPOLLIN){//是管道发来的信号,触发了可读事件int sig;int ret = recv(pipefd[0], buf, BUFFER_SIZE - 1, 0);if (ret <= 0){return false;}else{for (int i = 0; i < ret; i++){printf("receive signal from pipe: %d\n", buf[i]);switch (buf[i]) //遍历每个收到的信号{case SIGCHLD:case SIGHUP:{return false;}case SIGTERM:case SIGINT:{return true;}}}}}else if (event[i].events & EPOLLIN) //有数据可读{//因为是ET模式,只会触发一次EPOLLIN事件,所以需要在这里一次性读完数据printf("event trigger once\n");while (1){memset(buf, 0, BUFFER_SIZE);int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);if (ret < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//判断用于非阻塞IO,表示数据全部读完,等待下一次数据可读,不关闭sockfdprintf("read later\n");break;}close(sockfd); //出异常了,关闭socketbreak;}else if (ret == 0){close(sockfd); //数据读完,关闭socket}else{printf("get %d bytes of connets: %s\n", ret, buf);}}}else{printf("something else happened\n");}}return false;}int main(int argc, char* argv[]){if (argc <= 2){printf("Usage: %s ip_address port_number\n", basename(argv[0]));return 1;}const char* ip = argv[1];const int port = atoi(argv[2]);int ret = 0;struct sockaddr_in address;bzero(&address, sizeof(address));address.sin_family = AF_INET; //IPv4address.sin_addr.s_addr = inet_addr(ip);address.sin_port = htons(port);int listenfd = socket(PF_INET, SOCK_STREAM, 0);assert(listenfd >= 0);ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));assert(ret >= 0);ret = listen(listenfd, 5); //最大连接数5assert(ret >= 0);//创建epoll并添加socket到事件表中struct epoll_event events[MAX_EVENT_NUMBER];int epollfd = epoll_create(5);assert(epollfd >= 0);addfd(epollfd, listenfd);//创建本地域管道,用于父子进程通信,并监听可读事件ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);assert(ret >= 0);setNonblocking(pipefd[1]);addfd(epollfd, pipefd[0]);//设置一些信号的自定义处理addsighandler(SIGHUP);addsighandler(SIGCHLD);addsighandler(SIGTERM);addsighandler(SIGINT);bool stop_while = false;while (!stop_while){//使用epoll等待事件触发,可能是新客户端连接,可能是数据可写ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);if (ret < 0 && errno != EINTR){printf("epoll failed\n");break;}stop_while = et_flow(events, ret, epollfd, listenfd);}close(listenfd);close(pipefd[0]);close(pipefd[1]);return 0;}
4 异步IO
异步IO方式是效率最高的模式,使用aio_read取代recv。aio_read会向内核空间提交一个buffer,即使没有数据到来,aio_read会立刻返回,buffer会保留在内核空间。
当有数据到来的时候,数据会被复制到buffer,同时会触发一个信号(信号定义在aio_read参数中)通知应用程序。应用程序只需要为该信号添加处理函数即可。
图示如下:
4.1 POSIX异步IO结构体
POSIX异步IO接口使用aiocb来描述IO操作:
#include <aiocb.h>struct aiocb {int aio_fildes; /* 被打开用于读写的文件描述符 */off_t aio_offset; /* 偏移量,必须显式的定义*/volatile void *aio_buf; /* 要读写的内容*/size_t aio_nbytes; /* 要读写的字节数 */int aio_reqprio; /* 异步IO请求优先级 */struct sigevent aio_sigevent; /* 控制如何通知应用程序*/int aio_lio_opcode; /* Operation to be performed lio_listio() only */};
sigevent结构体用于定义如何通知进程:
#include <signal.h>struct sigevent {int sigev_notify; /* 通知的类型:不通知SIGEV_NONE,信号通知SIGEV_SIGNAL,函数调用 SIGEV_THREAD*/int sigev_signo; /* 信号通知指定的信号 */union sigval sigev_value; /* 信号通知附件的info */void (*sigev_notify_function) (union sigval); /* 指定的函数调用*/void *sigev_notify_attributes; /* Attributes for notification thread (SIGEV_THREAD) */pid_t sigev_notify_thread_id; /* ID of thread to signal (SIGEV_THREAD_ID) */};
4.2 POSIX异步IO函数
#include <aio.h>int aio_read(struct aiocb *aiocbp);int aio_write(struct aiocb *aiocbp);int aio_fsync(int op, struct aiocb *aiocbp); //同步数据到存储,op为操作int aio_error(const struct aiocb *aiocbp); //获取异步读写和同步操作的完成状态,返回值为状态ssize_t aio_return(struct aiocb *aiocbp); //获取异步操作的返回值int aio_suspend(const struct aiocb * const aiocb_list[], int nitems, const struct timespec *timeout); //阻塞进程,等待异步操作完成int aio_cancel(int fd, struct aiocb *aiocbp); //取消异步操作
aio_error返回值:
| 返回值 | 含义 |
|---|---|
| 0 | 异步操作成功 |
| -1 | aio_error调用失败 |
| EINPROGRESS | 操作仍在等待 |
aio_cancel返回值:
| 返回值 | 含义 |
|---|---|
| AIO_ALLDONE | 所有操作在取消前已经完成 |
| AIO_CANCELED | 所有操作已经取消 |
| AIO_NOTCANCELED | 至少有一个要求的操作没有被取消 |
| -1 | aio_cancel调用失败 |
4.3 异步IO代码示例
//ROT-13翻译算法异步操作版#include "../include/apue.h"#include <ctype.h>#include <fcntl.h>#include <aio.h>#include <errno.h>#define BSZ 4096#define NBUF 8enum rwop {UNUSED = 0,READ_PENDING = 1,WRITE_PENDING = 2};struct buf {enum rwop op;int last;struct aiocb aiocb;unsigned char data[BSZ];};struct buf bufs[NBUF];unsigned chartranslate(unsigned char c){if (isalpha(c)) {if (c >= 'n')c -= 13;else if (c >= 'a')c += 13;else if (c >= 'N')c -= 13;elsec += 13;}return(c);}intmain(int argc, char* argv[]){int ifd, ofd, i, j, n, err, numop;struct stat sbuf;const struct aiocb *aiolist[NBUF];off_t off = 0;if (argc != 3)err_quit("usage: rot13 infile outfile");if ((ifd = open(argv[1], O_RDONLY)) < 0)//打开输入文件err_sys("can't open %s", argv[1]);if ((ofd = open(argv[2], O_RDWR|O_CREAT|O_TRUNC, FILE_MODE)) < 0)//打开输出文件err_sys("can't create %s", argv[2]);if (fstat(ifd, &sbuf) < 0)err_sys("fstat failed");/* 初始化8个缓冲区 */for (i = 0; i < NBUF; i++) {bufs[i].op = UNUSED;bufs[i].aiocb.aio_buf = bufs[i].data;bufs[i].aiocb.aio_sigevent.sigev_notify = SIGEV_NONE;aiolist[i] = NULL;}numop = 0;for (;;) {for (i = 0; i < NBUF; i++) {switch (bufs[i].op) {case UNUSED:/** 从输入文件读取未读内容*/if (off < sbuf.st_size) {bufs[i].op = READ_PENDING;bufs[i].aiocb.aio_fildes = ifd;bufs[i].aiocb.aio_offset = off;off += BSZ;if (off >= sbuf.st_size)bufs[i].last = 1;bufs[i].aiocb.aio_nbytes = BSZ;if (aio_read(&bufs[i].aiocb) < 0) //异步读取err_sys("aio_read failed");aiolist[i] = &bufs[i].aiocb;numop++;}break;case READ_PENDING:if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS)//检查异步操作是否完成,未完成则继续循环continue;if (err != 0) {if (err == -1)err_sys("aio_error failed");elseerr_exit(err, "read failed");}/** 异步读完成,翻译然后进行异步写*/if ((n = aio_return(&bufs[i].aiocb)) < 0) //获取异步读的返回值err_sys("aio_return failed");if (n != BSZ && !bufs[i].last)err_quit("short read (%d/%d)", n, BSZ);for (j = 0; j < n; j++)bufs[i].data[j] = translate(bufs[i].data[j]); //开始翻译bufs[i].op = WRITE_PENDING;bufs[i].aiocb.aio_fildes = ofd;bufs[i].aiocb.aio_nbytes = n;if (aio_write(&bufs[i].aiocb) < 0) //异步写入err_sys("aio_write failed");/* retain our spot in aiolist */break;case WRITE_PENDING:if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS)continue;if (err != 0) {if (err == -1)err_sys("aio_error failed");elseerr_exit(err, "write failed");}/** 异步写完成*/if ((n = aio_return(&bufs[i].aiocb)) < 0)err_sys("aio_return failed");if (n != bufs[i].aiocb.aio_nbytes)err_quit("short write (%d/%d)", n, BSZ);aiolist[i] = NULL;bufs[i].op = UNUSED;//将此缓冲区置为空闲numop--;break;}}if (numop == 0) {if (off >= sbuf.st_size)break;} else {if (aio_suspend(aiolist, NBUF, NULL) < 0) //等待所有异步操作完成err_sys("aio_suspend failed");}}bufs[0].aiocb.aio_fildes = ofd;if (aio_fsync(O_SYNC, &bufs[0].aiocb) < 0) //异步同步操作,写入存储文件err_sys("aio_fsync failed");exit(0);}
