五、网络编程API
MSG_OOB
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
int main(int argc, const char *argv[])
{
if (argc <= 2)
{
printf("Usage: %s ip_address port_number\n", argv[0]);
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
struct sockaddr_in server_address;
bzero(&server_address, sizeof(server_address));
server_address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &server_address.sin_addr);
server_address.sin_port = htons(port);
int sockfd = socket(PF_INET, SOCK_STREAM, 0);
assert(sockfd >= 0);
if (connect(sockfd, (struct sockaddr *)&server_address,
sizeof(server_address)) < 0)
{
printf("connection failed\n");
}
else
{
const char *oob_data = "abc";
const char *normal_data = "123";
send(sockfd, normal_data, strlen(normal_data), 0);
send(sockfd, oob_data, strlen(oob_data), MSG_OOB);
send(sockfd, normal_data, strlen(normal_data), 0);
}
close(sockfd);
return 0;
}
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
//MSG_OOB的tcpdump测试
#define BUF_SIZE 1024
int main(int argc, const char *argv[])
{
if (argc <= 2)
{
printf("Usage: %s ip_address port_number\n", argv[0]);
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int sock = socket(PF_INET, SOCK_STREAM, 0);
assert(sock >= 0);
int ret = bind(sock, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(sock, 5);
assert(ret != -1);
struct sockaddr_in client;
socklen_t client_addrlength = sizeof(client);
int connfd = accept(sock, (struct sockaddr *)&client, &client_addrlength);
if (connfd < 0)
{
printf("errno is: %d, errstr: %s\n", errno, strerror(errno));
}
else
{
char buffer[ BUF_SIZE];
memset(buffer, '\0', BUF_SIZE);
ret = recv(connfd, buffer, BUF_SIZE -1, 0);
printf("got %d bytes of normal data '%s'\n", ret, buffer);
memset(buffer, '\0', BUF_SIZE);
ret = recv(connfd, buffer, BUF_SIZE -1, MSG_OOB);
printf("got %d bytes of oob data '%s'\n", ret, buffer);
memset(buffer, '\0', BUF_SIZE);
ret = recv(connfd, buffer, BUF_SIZE -1, 0);
printf("got %d bytes of normal data '%s'\n", ret, buffer);
close(connfd);
}
close(sock);
return 0;
}
六、高级I/O
CGI
CGI是什么
WEB之CGI——CGI详解(原理,配置及访问)
CGI是Web服务器和运行其上的应用程序进行“交流”的一种约定。
CGI:通用网关接口(Common Gateway Interface)是一个Web服务器主机提供信息服务的标准接口。通过CGI接口,Web服务器就能够获取客户端提交的信息,转交给服务器端的CGI程序进行处理,最后返回结果给客户端。
输出重定向 printf
#include <stdio.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>2
#include <string.h>
#include <assert.h>
//CGI server
int main(int argc, const char *argv[])
{
if (argc <= 2)
{
fprintf(stderr, "Usage: %s ip_address port_number\n", argv[0]);
exit(1);
}
const char *ip = argv[1];
int port = atoi(argv[2]);
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int sock = socket(PF_INET, SOCK_STREAM, 0);
assert(sock >= 0);
int ret = bind(sock, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(sock, 5);
assert(ret != -1);
struct sockaddr_in client;
socklen_t client_addrlength = sizeof(client);
int connfd = accept(sock, (struct sockaddr *)&client, &client_addrlength);
if (connfd < 0)
{
fprintf(stderr, "accept error, errno:%d, strerr:%s\n", errno, strerror(errno));
}
else
{
close(STDOUT_FILENO);
dup(connfd);
printf("abcd\n");
close(connfd);
}
close(sock);
return 0;
}
writev
struct stat;
stat()
fstat()
S_ISDIR()
#include <stdio.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <assert.h>
#include <sys/types.h>
#include <errno.h>
#include <stdbool.h> //c没有bool
#define BUF_SIZE 1024
static const char *status_line[2]={"200 OK","500 Internal server error"};
/*定义两种HTTP状态码和状态信息*/
int main(int argc, const char *argv[]){
if(argc != 3){
printf("Usage: %s <port> <filename> \n", argv[1]);
exit(1);
}
const char* file_name=argv[2];
int sockServ = socket(PF_INET, SOCK_STREAM, 0);
assert(sockServ>=0);
struct sockaddr_in sockServAddr;
memset(&sockServAddr, 0, sizeof(sockServAddr));
sockServAddr.sin_family = AF_INET;
sockServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
sockServAddr.sin_port = htons(atoi(argv[1]));
int ret=bind(sockServ, (struct sockaddr*)& sockServAddr, sizeof(sockServAddr));
assert(ret!=-1);
ret=listen(sockServ, 5);
assert(ret!=-1);
struct sockaddr_in sockClientAddr;
socklen_t clientAddrLen = sizeof(sockClientAddr);
int sockClient = accept(sockServ, (struct sockaddr*)& sockClientAddr, &clientAddrLen);
if(sockClient<0){
//char *strerror(int errnum)返回一个指向错误字符串的指针 C 标准库 - <string.h>
fprintf(stderr,"accept error:errno:%d,strerr:%s \n ",errno,strerror(errno));
}else{
/*用于保存HTTP应答的状态行、头部字段和一个空行的缓存区*/
char header_buf[BUF_SIZE];
memset(&header_buf,0,sizeof(header_buf));
/*用于存放目标文件内容的应该用程序缓存*/
char* file_buf=NULL;
/*用于获取目标文件的属性,比如是否为目录, 文件大小等*/
struct stat file_stat;
/*记录目标文件是否是有效文件*/
bool valid = true;
int len=0;
//int stat(const char *path, struct stat *buf)返回值:成功返回0,失败返回-1;
if(stat(file_name,&file_stat)<0){ /*目标文件不存在*/
valid=false;
}else{
if(S_ISDIR(file_stat.st_mode)){ //目标文件是一个目录
valid=false;
}else if(file_stat.st_mode&S_IROTH){ /*当前用户有读取目标文件爱你的权限*/
/*动态分配缓存区file_buf,
* 并指定其大小为目标的大小file_stat.st_size加1,
* 然后将目标文件读入缓存区file_buf中*/
int fd=open(file_name,O_RDONLY);
// file_buf=new char[file_stat.st_size+1];
file_buf=malloc(sizeof(char)*file_stat.st_size);
memset(file_buf, '\0', file_stat.st_size + 1);
if(read(fd,file_buf,file_stat.st_size)<0){
valid=false;
}
}else{
valid=false;
}
}
/*如果目标文件有效, 则发送正常的HTTP应答 */
if(valid){
ret=snprintf(header_buf,BUF_SIZE-1,"%s %s\r\n","HTTP/1.1",status_line[0]);
len+=ret;
ret=snprintf(header_buf+len,BUF_SIZE-1-len,"Content-Length: %lld\r\n", (long long)file_stat.st_size);
/*利用writev将header_buf和file_buf的内容一并写出 */
struct iovec iv[2];
iv[0].iov_base=header_buf;
iv[0].iov_len=strlen(header_buf);
iv[1].iov_base=file_buf;
iv[1].iov_len=file_stat.st_size;
ret=writev(sockClient,iv,2);
}else{ /*如果目标文件无效, 则通知客户端服务器发生了“内部错误”*/
ret=snprintf(header_buf,BUF_SIZE-1,"%s %s\r\n","HTTP/1.1",status_line[1]);
len+=ret;
ret=snprintf(header_buf+len,BUF_SIZE-1-len,"Content-Length: %lld\r\n", (long long)file_stat.st_size);
write(sockClient,header_buf,strlen(header_buf));
}
close(sockClient);
if(file_buf){ //没有分配空间就不需要释放
free(file_buf);
}
}
close(sockServ);
return 0;
}
sendfile
#include <stdio.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <assert.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/sendfile.h>
int main(int argc, const char *argv[])
{
if (argc <= 3)
{
fprintf(stderr, "usage: %s ip_address port_number filename\n", argv[0]);
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
const char *file_name = argv[3];
int filefd = open(file_name, O_RDONLY);
assert( filefd > 0);
struct stat stat_buf;
fstat(filefd, &stat_buf);
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int sock = socket(AF_INET, SOCK_STREAM, 0);
assert(sock > 0);
int ret = bind(sock, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(sock, 5);
assert(ret != -1);
struct sockaddr_in client;
socklen_t client_addrlength = sizeof(client);
int connfd = accept(sock, (struct sockaddr *)&client, &client_addrlength);
if(connfd < 0)
{
fprintf(stderr, "accept error! errno:%d, errstr:%s\n", errno, strerror(errno));
}
else
{
sendfile(connfd, filefd, NULL, stat_buf.st_size);
close(connfd);
}
close(filefd);
close(sock);
return 0;
}
splice
#define _GNU_SOURCE
#include <stdio.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <assert.h>
int main(int argc, const char *argv[])
{
if (argc <= 2)
{
fprintf(stderr, "usage: %s ip_address port_number\n", argv[0]);
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int sock = socket(PF_INET, SOCK_STREAM, 0);
assert(sock >=0 );
int ret = bind(sock, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(sock, 5);
assert(ret != -1);
struct sockaddr_in client;
socklen_t client_addrlength = sizeof(client);
int connfd = accept(sock, (struct sockaddr *)&client, &client_addrlength);
if(connfd < 0)
{
fprintf(stderr, "accept error! errno:%d, errstr:%s\n", errno, strerror(errno));
}
else
{
int pipefd[2];
assert(ret != -1);
ret = pipe(pipefd); /*创建管道, pipefd[0]: read end of the pipe, pipefd[1]:write end of the pipe */
/*将connfd上 流入的客户数据定向到管道中*/
ret = splice(connfd, NULL, pipefd[1], NULL, 32768,
SPLICE_F_MORE | SPLICE_F_MOVE);
assert(ret != -1);
/*将管道的输出定向到connfd客户连接文件描述 */
ret = splice(pipefd[0], NULL, connfd, NULL, 326768,
SPLICE_F_MORE | SPLICE_F_MOVE);
assert(ret != -1);
close(connfd);
}
close(sock);
return 0;
}
关于__GNU_SOURCE 这个宏
在编写网络程序时,会涉及到一些关于BSD系统保留下的结构体和宏定义,关于一切配置在linux系统的/usr/include/features.h文件中,这是用来让用户配置编译环境的头文件。_GUN_SOURCE这个宏,这个宏可以让用户打开所有feature。
tee
#define _GNU_SOURCE
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
//同时输出数据到终端和文件
int main(int argc, const char *argv[])
{
if (argc != 2)
{
fprintf(stderr, "usage: %s <file>\n", basename(argv[0]));
return 1;
}
int filefd = open(argv[1], O_CREAT | O_WRONLY | O_TRUNC, 0666);
assert(filefd > 0);
int pipefd_stdout[2];
int ret=pipe(pipefd_stdout);
assert(ret != -1);
int pipefd_file[2];
ret=pipe(pipefd_file);
assert(ret != -1);
/*将标准输入内容输入管道 pipefd_stdout */
ret=splice(STDIN_FILENO,NULL,pipefd_stdout[1],NULL,32678,SPLICE_F_MORE|SPLICE_F_MOVE);
assert(ret != -1);
/*将管道pipefd_stdout的输出复制到管道pipefd_file的输入端 */
ret=tee(pipefd_stdout[0],pipefd_file[1],32678,SPLICE_F_NONBLOCK);
assert(ret != -1);
/*将pipefd_file的输出定向到文件描述符file_fd上,从而将标准输入的内容写入文件
* */
ret=splice(pipefd_file[0],NULL,filefd,NULL,32678,SPLICE_F_NONBLOCK);
assert(ret != -1);
/*将管道pipefd_stdout的输出定向到标准输出, 其内容和写入文件的内容完全一致
* */
ret=splice(pipefd_stdout[0],NULL,STDOUT_FILENO,NULL,32678,SPLICE_F_NONBLOCK);
assert(ret != -1);
close(filefd);
close(pipefd_stdout[0]);
close(pipefd_stdout[1]);
close(pipefd_file[0]);
close(pipefd_file[1]);
return 0;
}
七、编写规范
守护进程
linux高级环境编程—守护进程
守护进程与后台进程(Python 创建守护进程)
一般的服务器程序都是以后台进程(守护进程)的方式运行,那么要如何使得服务器进程后台化呢?
下面介绍守护进程的编写遵循的步骤:
- 创建子进程,关闭父进程;2. 设置文件权限掩码。当进程创建新文件(使用open(const char *pathname, int flags, mode_t mode)系统调用时,文件的权限将是mode&0777;
- 创建新会话,设置本进程为进程组的首领;
- 切换工作目录;
- 关闭标准输入设备、标准输出设备和标准错误输出设备;
- 关闭其他已经打开的文件描述符;
- 将标准输入、标准输出、标准错误重定向到/dev/null文件
八、框架
server模型
C/S 与P2P
I/O模型
- 同步I/O模型
- 阻塞I/O
- 非阻塞I/O
- I/O复用
- 信号
- 异步I/O模型
事件处理模式
两种高效的事件处理模式:Reactor模式和Proactor模式
Reactor 模式要求 主线程(I/O 处理单元) 只负责监听文件描述符上是否有事件发生,有的话就立即将该事件通知工作线程(逻辑单元)。除此之外,主线程不做任何其他实质性的工作。 读写数据,接受新的连接,以及处理客户请求均在工作线程中完成。
与 Reactor 模式不同,Proactor 模式 将所有 I/O 操作都交给主线程和内核来处理, 工作线程仅仅负责业务逻辑。
有限状态机
处理HTTP
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#define BUFFER_SIZE 4096
//主状态机 正在分析的状态:正在分析请求行 还是头部
enum CHECK_STATE {CHECK_STATE_REQUESTLINE=0,CHECK_STATE_HEADER};
//从状态机获取行的状态:读到完整行,行出错,行数据不完整
enum LINE_STATUS {LINE_OK=0,LINE_BAD,LINE_OPEN};
//服务器处理的结果
enum HTTP_CODE {NO_REQUEST,GET_REQUEST,BAD_REQUEST,FORBIDDEN_REQUEST,INTERNAL_ERROR,CLOSED_CONNECTION};
//往client回复消息
static const char* szret[]={"I get a correct result\n","Something wrong\n"};
/*从状态机,用于解析出一行内容 */
LINE_STATUS parse_line(char *buffer,int &checked_index,int &read_index){
char temp;
/*checked_index指向buffer(应用程序的读缓冲区)中当前正在分析的字节,
* read_index指向buffer中客户数据的为尾部的下一字节。
* buffer中第0~check_index字节都已经分析完毕,第checked_index~(read_index
* -1)字节又下面的循环挨个分析 */
for(;checked_index<read_index;++checked_index){
/*获得当前要分析的字节 */
temp=buffer[checked_index];
/*如果当前的字节是"\r",即回车符, 则说明可能读到一个完整的行 */
if(temp=='\r'){
/*如果“\r”字符碰巧是目前buffer中的最后一个已经被读入的客户数据,那么这次分析没有读取到一个完整的行,
* 返回LINE_OPEN以表示还需要继续读取客户数据才能进一步分析 *
*/
if((checked_index+1)==read_index){
return LINE_OPEN;
}
/*如果下一个字符是"\n", 则说明我们成功读取到一个完整的行 */
else if(buffer[checked_index+1]=='\n'){
buffer[checked_index++]='\0';
buffer[checked_index++]='\0';
return LINE_OK;
}
/*否则的话, 说明客户发送的HTTP请求存在语法问题*/
return LINE_BAD;
}
/*如果当前字节是“\n”,即换行符,则也说明可能读取到一个完整的行 */
else if(temp=='\n'){
if((checked_index>1)&&buffer[checked_index-1]=='\r'){
buffer[checked_index-1]='\0';
buffer[checked_index++]='\0';
return LINE_OK;
}
return LINE_BAD;
}
}
/* 如果所有内容都分析完毕也没有遇到"\r"字符,则返回LINE_OPEN,
* 表示还需要继续读取客户数据才能近一步分析
*/
return LINE_OPEN;
}
HTTP_CODE parse_requestline(char *temp,CHECK_STATE &checkstate){
/*如果请求行中没有空白字符或“\t”字符,则HTTP请求必有问题 */
char *url=strpbrk(temp," \t");
if(!url){
return BAD_REQUEST;
}
*url++='\0';
/*仅支持GET方法*/
char *method=temp;
if(strcasecmp(method,"GET")==0){
printf("The request method is GET\n");
}else{
printf("only support GET method\n");
return BAD_REQUEST;
}
url+=strspn(url," \t");
char *version=strpbrk(url," \t");
if(!version){
return BAD_REQUEST;
}
/*仅支持HTTP/1.1 */
*version++='\0';
if(strcasecmp(version,"HTTP/1.1")!=0){
return BAD_REQUEST;
}
/*检查URL是否合法 */
if(strncasecmp(url,"http://",7)==0){
url+=7;
url=strchr(url,'/');
}
if(!url||url[0]!='/'){
return BAD_REQUEST;
}
printf("The request URL is : %s \n",url);
/*HTTP 请求行处理完毕, 状态转移到头部字段分析 */
checkstate=CHECK_STATE_HEADER;
return NO_REQUEST;
}
/*分析头部字段 */
HTTP_CODE parse_headers(char *temp){
/*遇到一个空行,说明我们得到一个正确的HTTP请求 */
if(temp[0]=='\0'){
return GET_REQUEST;
}
/*处理”HOST“头部字段 */
else if(strncasecmp(temp,"Host:",5)==0){
temp+=5;
temp+=strspn(temp," \t");
printf("the request host is:%s\n",temp);
}
/*其他头部字段都不处理 */
else{
printf("I can not handle this header\n");
}
return NO_REQUEST;
}
/*分析HTTP请求的入口函数 */
HTTP_CODE parse_content(char *buffer,int &checked_index,CHECK_STATE &checkstate,int &read_index,int &start_line){
/*记录当前行的读取状态 */
LINE_STATUS linestatus=LINE_OK;
/*记录HTTP请求的处理结果 */
HTTP_CODE retcode=NO_REQUEST;
/*主状态机, 用于从buffer中取出所有完整的行 */
while((linestatus=parse_line(buffer,checked_index,read_index))==LINE_OK){
/*start_line 是行在buffer中的起始位置*/
char *temp=buffer+start_line;
/*记录下一行的起始位置*/
start_line=checked_index;
/*checkstatus 记录主机状态当前的状态 */
switch(checkstate){
case CHECK_STATE_REQUESTLINE:
retcode=parse_requestline(temp,checkstate);
if(retcode==BAD_REQUEST){
return BAD_REQUEST;
}
break;
case CHECK_STATE_HEADER:
retcode=parse_headers(temp);
if(retcode==BAD_REQUEST){
return BAD_REQUEST;
}else if(retcode==GET_REQUEST){
return GET_REQUEST;
}
break;
default:
return INTERNAL_ERROR;
}
}
/*若没有读取到一个完整的行, 则表示还需要继续读取客户数据才能进一步分析 */
if(linestatus==LINE_OPEN){
return NO_REQUEST;
}else{
return BAD_REQUEST;
}
}
int main(int argc, char *argv[]) {
if (argc < 2) {
printf("usage: %s port_number\n", basename(argv[0]));
return 1;
}
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
address.sin_addr.s_addr=htonl(INADDR_ANY);
address.sin_port = htons(atoi(argv[1]));
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
int option = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option));
int ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int fd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
if (fd < 0) {
printf("errno is : %d\n", errno);
} else {
char buffer[BUFFER_SIZE];
memset(buffer, '\0', BUFFER_SIZE);
//一共要读到的字节数
int data_read=0;
//已经读取了多少
int read_index=0;
//已经分析完多少
int checked_index=0;
//行在buffer中的起始位置
int start_line=0;
/*set the main state machine's initial state */
CHECK_STATE checkstate=CHECK_STATE_REQUESTLINE;
while(1){
data_read=recv(fd,buffer+read_index,BUFFER_SIZE-1,0);
if(data_read==-1){
printf("reading failed\n");
break;
}else if(data_read==0){
printf("remote client has closed the connection\n");
break;
}
read_index+=data_read;
//解析本次数据
HTTP_CODE result=parse_content(buffer,checked_index,checkstate,read_index,start_line);
if(result==NO_REQUEST){
continue;
}else if(result==GET_REQUEST){
send(fd,szret[0],strlen(szret[0]),0);
break;
}else{
send(fd,szret[1],strlen(szret[1]),0);
break;
}
}
close(fd);
}
close(listenfd);
return 0;
}
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <assert.h>
int main(int argc, char *argv[]) {
if (argc < 3) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
struct sockaddr_in address;
memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(port);
inet_pton(AF_INET, ip, &address.sin_addr);
int sock = socket(PF_INET, SOCK_STREAM, 0);
assert(sock >= 0);
int ret = connect(sock, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);
const char *buf = "POST /http://example/hello.html HTTP/1.1\r\n\r\nHost:hostlocal";
buf = "GET /http://example/hello.html HTTP/1.1\r\n\r\nHost:hostlocal";
char *recvbuf[1024];
send(sock, buf, strlen(buf), 0);
sleep(1);
recv(sock, recvbuf, sizeof(recvbuf), 0);
printf("%s\n", recvbuf);
close(sock);
return 0;
}
九、I/O复用
select
同时接收普通数据和带外数据
#include<stdio.h>
#include<stdlib.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<fcntl.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/select.h>
//select同时接收 ordinary data and oob data
#define BUF_SIZE 1024
void error_handling(char *msg);
int main(int argc,char *argv[]){
int serv_sock,clnt_sock;
struct sockaddr_in serv_addr,clnt_addr;
socklen_t clnt_addr_size;
char buf[BUF_SIZE];
if (argc != 2)
{
fprintf(stderr, "usage: %s <port>\n", argv[0]);
return 1;
}
serv_sock=socket(PF_INET,SOCK_STREAM,0);
memset(&serv_addr,0,sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_addr.sin_port=htons(atoi(argv[1]));
//分配IP地址和端口号
if(bind(serv_sock,(struct sockaddr*)&serv_addr,sizeof(serv_addr))==-1){
error_handling("bind() error");
}
//进入等待连接请求状态
if(listen(serv_sock,5)==-1){
error_handling("listen() error");
}
clnt_addr_size=sizeof(clnt_addr);
clnt_sock=accept(serv_sock,(struct sockaddr*)&clnt_addr,&clnt_addr_size);
if(clnt_sock==-1){
error_handling("accept() error");
}
fd_set read_fds;
fd_set exception_fds;
FD_ZERO(&read_fds);
FD_ZERO(&exception_fds);
while(1){
memset(&buf,0,sizeof(buf));
//每次select 会修改fd集合,要再次传入
FD_SET(clnt_sock,&read_fds);
FD_SET(clnt_sock,&exception_fds);
int ret=select(clnt_sock+1,&read_fds,NULL,&exception_fds,NULL);
if(ret<0){
printf("selection failure\n");
break;
}
//对于可读事件
if(FD_ISSET(clnt_sock,&read_fds)){
ret=recv(clnt_sock,buf,sizeof(buf)-1,0);
if(ret<=0){
break;
}
printf("Get %d bytes of normal data:%s \n",ret,buf);
}
/*对于异常事件,采用带MSG_OOB标志的recv函数读取带外数据 */
else if(FD_ISSET(clnt_sock,&exception_fds)){
ret=recv(clnt_sock,buf,sizeof(buf)-1,MSG_OOB);
if(ret<=0){
break;
}
printf("Get %d bytes of oob data:%s \n",ret,buf);
}
}
close(clnt_sock);
close(serv_sock);
return 0;
}
void error_handling(char *msg){
fputs(msg,stderr);
fputc('\n',stderr);
exit(1);
}
epoll
#include<stdio.h>
#include<stdlib.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<fcntl.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#include<stdbool.h>
//epoll的ET and LT
#define BUF_SIZE 10
#define MAX_EVENT_NUMBER 10
int setnonblocking(int fd);
void addfd(int epfd,int fd,bool enable_et);
void lt(struct epoll_event* events,int number,int epfd,int listenfd);
void et(struct epoll_event* events,int number,int epfd,int listenfd);
void error_handling(char *msg);
int main(int argc,char *argv[]){
int serv_sock,clnt_sock;
struct sockaddr_in serv_addr,clnt_addr;
socklen_t clnt_addr_size;
char buf[BUF_SIZE];
struct epoll_event events[MAX_EVENT_NUMBER];
if (argc != 2)
{
fprintf(stderr, "usage: %s <port>\n", argv[0]);
return 1;
}
serv_sock=socket(PF_INET,SOCK_STREAM,0);
memset(&serv_addr,0,sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_addr.sin_port=htons(atoi(argv[1]));
int option=1;
setsockopt(serv_sock,SOL_SOCKET,SO_REUSEADDR,&option,sizeof(option));
//分配IP地址和端口号
if(bind(serv_sock,(struct sockaddr*)&serv_addr,sizeof(serv_addr))==-1){
error_handling("bind() error");
}
//进入等待连接请求状态
if(listen(serv_sock,5)==-1){
error_handling("listen() error");
}
int epfd=epoll_create(5);
if(epfd==-1){
error_handling("epoll_create() error");
}
addfd(epfd,serv_sock,true);
while(1){
int ret=epoll_wait(epfd,events,MAX_EVENT_NUMBER,-1);
if(ret<0){
printf("epoll failure\n");
break;
}
//分析就绪事件
// lt(events,ret,epfd,serv_sock);
et(events,ret,epfd,serv_sock);
}
close(serv_sock);
return 0;
}
//将fd设置为非阻塞
int setnonblocking(int fd){
int old_opt=fcntl(fd,F_GETFL);
fcntl(fd,old_opt|O_NONBLOCK);
return old_opt;
}
//注册事件,是否为条件触发
void addfd(int epfd,int fd,bool enable_et){
struct epoll_event event;
event.data.fd=fd;
event.events=EPOLLIN;
if(enable_et){
event.events|=EPOLLET;
}
epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&event);
setnonblocking(fd);
}
void lt(struct epoll_event* events,int number,int epfd,int listenfd){
for(int i=0;i<number;i++){
int sockfd=events[i].data.fd;
// new client request
if(sockfd==listenfd){
struct sockaddr_in clnt_addr;
socklen_t clnt_addr_size;
clnt_addr_size=sizeof(clnt_addr);
int clnt_sock=accept(sockfd,(struct sockaddr*)&clnt_addr,&clnt_addr_size);
if(clnt_sock==-1){
error_handling("accept() error");
}
//不使用ET
addfd(epfd,clnt_sock,false);
}else if(events[i].events & EPOLLIN){ //接收信息
printf("Event trigger once\n");
char buf[BUF_SIZE];
int ret=recv(sockfd,buf,BUF_SIZE-1,0);
if(ret<0){
close(sockfd);
continue;
}
printf("Get %d bytes of content: %s \n",ret,buf);
}else{
printf("Sonmething else happened\n");
}
}
}
void et(struct epoll_event* events,int number,int epfd,int listenfd){
for(int i=0;i<number;i++){
int sockfd=events[i].data.fd;
// new client request
if(sockfd==listenfd){
struct sockaddr_in clnt_addr;
socklen_t clnt_addr_size;
clnt_addr_size=sizeof(clnt_addr);
int clnt_sock=accept(sockfd,(struct sockaddr*)&clnt_addr,&clnt_addr_size);
if(clnt_sock==-1){
error_handling("accept() error");
}
//使用ET
addfd(epfd,clnt_sock,true);
}else if(events[i].events & EPOLLIN){ //接收信息
printf("Event trigger once\n");
char buf[BUF_SIZE];
while(1){
memset(&buf,0,sizeof(buf));
int ret=recv(sockfd,buf,BUF_SIZE-1,0);
if(ret<0){
// 判断errno
if((errno==EAGAIN)||(errno==EWOULDBLOCK)){
printf("Read later\n");
break;
}
close(sockfd);
break;
}else if(ret==0){
close(sockfd);
}else{
printf("Get %d bytes of content: %s \n",ret,buf);
}
}
}else{
printf("Sonmething else happened\n");
}
}
}
void error_handling(char *msg){
fputs(msg,stderr);
fputc('\n',stderr);
exit(1);
}
LT
ET
ET-EPOLLONESHOT
#include<stdio.h>
#include<stdlib.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<fcntl.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#include<stdbool.h>
#include<pthread.h>
//epoll的ET的EPOLLONESHOT
#define BUF_SIZE 1024
#define MAX_EVENT_NUMBER 10
//fds结构体的目的是 新开线程接收数据时同时把sockfd和epollfd传过去以便使用
struct fds{
int sockfd;
int epollfd;
};
int setnonblocking(int fd);
void addfd(int epfd,int fd,bool enable_et);
void reset_oneshot(int efpd,int fd);
void* worker(void* arg);
void error_handling(char *msg);
int main(int argc,char *argv[]){
int serv_sock,clnt_sock;
struct sockaddr_in serv_addr,clnt_addr;
socklen_t clnt_addr_size;
char buf[BUF_SIZE];
struct epoll_event events[MAX_EVENT_NUMBER];
if (argc != 2)
{
fprintf(stderr, "usage: %s <port>\n", argv[0]);
return 1;
}
serv_sock=socket(PF_INET,SOCK_STREAM,0);
memset(&serv_addr,0,sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_addr.sin_port=htons(atoi(argv[1]));
int option=1;
setsockopt(serv_sock,SOL_SOCKET,SO_REUSEADDR,&option,sizeof(option));
//分配IP地址和端口号
if(bind(serv_sock,(struct sockaddr*)&serv_addr,sizeof(serv_addr))==-1){
error_handling("bind() error");
}
//进入等待连接请求状态
if(listen(serv_sock,5)==-1){
error_handling("listen() error");
}
int epfd=epoll_create(5);
if(epfd==-1){
error_handling("epoll_create() error");
}
//不能注册ONESHOT,否则只能连接一个客户
addfd(epfd,serv_sock,false);
while(1){
int ret=epoll_wait(epfd,events,MAX_EVENT_NUMBER,-1);
if(ret<0){
printf("epoll failure\n");
break;
}
//分析就绪事件
for(int i=0;i<ret;i++){
int sockfd=events[i].data.fd;
//new client request
if(sockfd==serv_sock){
clnt_addr_size=sizeof(clnt_addr);
clnt_sock=accept(sockfd,(struct sockaddr*)&clnt_addr,&clnt_addr_size);
if(clnt_sock==-1){
error_handling("accept() error");
}
//使用ET
addfd(epfd,clnt_sock,true);
}
else if(events[i].events & EPOLLIN){ //接收信息
//新开一个线程来处理,处理完要重置EPOLLONESHOT
pthread_t thread;
struct fds new_worker;
new_worker.epollfd=epfd;
new_worker.sockfd=sockfd;
pthread_create(&thread,NULL,worker,(void*)&new_worker);
}else{
printf("Something else happened\n");
}
}
}
close(serv_sock);
return 0;
}
//将fd设置为非阻塞
int setnonblocking(int fd){
int old_opt=fcntl(fd,F_GETFL);
fcntl(fd,F_SETFL,old_opt|O_NONBLOCK);
return old_opt;
}
//注册事件,是否为条件触发
void addfd(int epfd,int fd,bool oneshot){
struct epoll_event event;
event.data.fd=fd;
event.events=EPOLLIN | EPOLLET;
if(oneshot){
event.events|=EPOLLONESHOT;
}
epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&event);
setnonblocking(fd);
}
//重置EPOLLONESHOT
void reset_oneshot(int efpd,int fd){
struct epoll_event event;
event.events=EPOLLIN | EPOLLET | EPOLLONESHOT;
event.data.fd=fd;
epoll_ctl(efpd,EPOLL_CTL_MOD,fd,&event);
}
void* worker(void* arg){
int sockfd = (( struct fds*)arg)->sockfd;
int epfd = (( struct fds*)arg)->epollfd;
pthread_t pid = pthread_self();
printf("Start new thread %u to receive data on fd:%d\n",pid,sockfd);
char buf[BUF_SIZE];
memset(&buf,0,sizeof(buf));
while(1){
int ret = recv(sockfd, buf, BUF_SIZE-1, 0);
if(ret==0){
close(sockfd);
printf("Foreigner closed the connection\n");
break;
}else if(ret<0){ //错误,且errno=EAGAIN
if(errno==EAGAIN){
reset_oneshot(epfd,sockfd);
printf("EAGAIN read later\n");
break;
}
}else{
printf("thread %u get content: %s\n", pid, buf);
//printf("thread %u about to sleep\n", pid);
sleep(5); //模拟数据处理
//printf("thread %u back from sleep\n", pid);
}
}
printf("End thread receiving data on fd:%d\n",sockfd);
}
void error_handling(char *msg){
fputs(msg,stderr);
fputc('\n',stderr);
exit(1);
}
过5s发送,发现是新的线程。
高级应用1-非阻塞connect
non-blocking connect ENINPROGRESS
#include<stdio.h>
#include<stdlib.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<fcntl.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/select.h>
#include<sys/time.h>
//非阻塞connect select ENINPROGRESS
#define BUF_SIZE 1024
int setnonblocking(int fd);
void error_handling(char *msg);
int main(int argc,char *argv[]){
int serv_sock,clnt_sock;
struct sockaddr_in serv_addr,clnt_addr;
socklen_t clnt_addr_size;
char buf[BUF_SIZE];
if (argc != 2)
{
fprintf(stderr, "usage: %s <port>\n", argv[0]);
return 1;
}
serv_sock=socket(PF_INET,SOCK_STREAM,0);
memset(&serv_addr,0,sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_addr.sin_port=htons(atoi(argv[1]));
int fdopt=setnonblocking(serv_sock);
int ret=connect(serv_sock,(struct sockaddr*)&serv_addr,sizeof(serv_addr));
if(ret==0){ //连接成功,恢复sockfd属性并返回
fprintf(stdout, "connect with server immediately\n");
fcntl(serv_sock, F_SETFL, fdopt);
return serv_sock;
}else if(errno!=EINPROGRESS){ //不是EINPROCESS错误
fprintf(stderr, "unblock connect not support\n");
return -1;
}
//EINPROCESS错误,通过select、poll等来监听可写事件,返回后getsockopt来读取和清除error
fd_set write_fds;
struct timeval timeout;
FD_SET(serv_sock,&write_fds);
timeout.tv_sec=5;
timeout.tv_usec=0;
ret=select(serv_sock+1,NULL,&write_fds,NULL,&timeout);
if(ret<=0){ //没有就绪事件
fprintf(stderr, "connection timeout\n");
close(serv_sock);
return -1;
}
if(!FD_ISSET(serv_sock,&write_fds)){
fprintf(stderr, "no event on sockfd found\n");
close(serv_sock);
return -1;
}
int error=0;
socklen_t err_len=sizeof(error);
//连接不成功
if(getsockopt(serv_sock,SOL_SOCKET,SO_ERROR,&error,&err_len)<0){
fprintf(stderr, "get socket option failed\n");
close(serv_sock);
return -1;
}
if(errno!=0){
printf("connection failed after select with the error: %d \n", error);
close(serv_sock);
return -1;
}
//0才连接成功
fprintf(stdout, "connection ready after select with the socket: %d\n", serv_sock);
//清除
fcntl(serv_sock,F_SETFL,fdopt);
close(serv_sock);
return 0;
}
//将fd设置为非阻塞
int setnonblocking(int fd){
int old_opt=fcntl(fd,F_GETFL);
fcntl(fd,F_SETFL,old_opt|O_NONBLOCK);
return old_opt;
}
void error_handling(char *msg){
fputs(msg,stderr);
fputc('\n',stderr);
exit(1);
}
高级应用2-聊天室
poll+splice
判断poll发生时间时,用 if(fds[1].revents & POLLRDHUP)
#define _GNU_SOURCE 1
#include<stdio.h>
#include<stdlib.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<unistd.h>
#include<fcntl.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<poll.h>
//聊天室 server同时监听和连接socket。并使用牺牲空间换事件的策略来提高性能
#define BUF_SIZE 1024
#define USER_LIMIT 5 /*最大用户数量*/
#define FD_LIMIT 65535 //限制client_data对象数量,poll最大可65536
//对于每个clinet 有addr、有将data写入cinet的位置、有缓存buf
struct client_data{
struct sockaddr_in address;
char *write_buf;
char buf[BUF_SIZE];
};
int setnonblocking(int fd);
void error_handling(char *msg);
int main(int argc,char *argv[]){
int serv_sock,clnt_sock;
struct sockaddr_in serv_addr,clnt_addr;
socklen_t clnt_addr_size;
char buf[BUF_SIZE];
if (argc != 2)
{
printf( "usage: %s <port>\n", argv[0]);
return 1;
}
serv_sock=socket(PF_INET,SOCK_STREAM,0);
memset(&serv_addr,0,sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_addr.sin_port=htons(atoi(argv[1]));
int option=1;
setsockopt(serv_sock,SOL_SOCKET,SO_REUSEADDR,&option,sizeof(option));
//分配IP地址和端口号
if(bind(serv_sock,(struct sockaddr*)&serv_addr,sizeof(serv_addr))==-1){
error_handling("bind() error");
}
//进入等待连接请求状态
if(listen(serv_sock,5)==-1){
error_handling("listen() error");
}
/*创建users数组,
* 分配FD_LIMIT个client_data对象。可以预期:每个可能的socket
* 连接可以获得一个这样的对象,
* 并且socket的值可以直接用来索引(作为数组的下标)socket连接对应的client_data对象,
* 这是将socket和客户数据关系的简单而高效的方式
*/
struct client_data *users=malloc(sizeof(struct client_data)*FD_LIMIT);
struct pollfd fds[USER_LIMIT+1];
int user_cnt=0;
for(int i=0;i<=USER_LIMIT;i++){
fds[i].fd=-1;
fds[i].events=0;
}
fds[0].fd=serv_sock;
fds[0].events=POLLIN|POLLERR;
fds[0].revents=0;
while(1){
int ret=poll(fds,user_cnt+1,-1);
if(ret<1){
printf("poll failure\n");
break;
}
for(int i=0;i<user_cnt+1;i++){
//对于new clinet connection
if((fds[i].fd==serv_sock)&&(fds[i].revents&POLLIN)){
clnt_addr_size=sizeof(clnt_addr);
int clnt_sock=accept(serv_sock,(struct sockaddr*)&clnt_addr,&clnt_addr_size);
if(clnt_sock==-1){
printf("errno is: %d\n",errno);
continue;
}
/*如果请求太多, 则关闭新到的连接 */
if(user_cnt>=USER_LIMIT){
const char *info="too many users\n";
fprintf(stdout, "%s\n", info);
send(clnt_sock,info,sizeof(info),0);
close(clnt_sock);
continue;
}
/*对于新的连接,同时修改fds和users数组*/
user_cnt++;
users[user_cnt].address=clnt_addr;
setnonblocking(clnt_sock);
fds[user_cnt].fd=clnt_sock;
fds[user_cnt].events=POLLIN | POLLRDHUP | POLLERR;
fds[user_cnt].revents=0;
fprintf(stdout, "comes a new user, now here %d users\n", user_cnt);
}
//对于错误
else if(fds[i].revents&POLLERR){
fprintf(stderr, "get an error from %d\n", fds[i].fd);
char errors[100];
memset(errors,0,100);
socklen_t length=sizeof(errors);
//清除错误
if(getsockopt(fds[i].fd,SOL_SOCKET,SO_ERROR,&errors,&length)){
fprintf(stderr,"get socket option failed\n");
}
continue;
}
//如果客户端关闭连接,client也close对应socket
else if(fds[i].revents&POLLRDHUP){
users[fds[i].fd]=users[fds[user_cnt].fd];
fds[i]=fds[user_cnt];
close(fds[i].fd);
i--;
user_cnt--;
printf("A client left\n");
}
//对于 客户端数据接收
else if(fds[i].revents&POLLIN){
clnt_sock=fds[i].fd;
memset(users[clnt_sock].buf,0,BUF_SIZE);
ret=recv(clnt_sock,users[clnt_sock].buf,BUF_SIZE-1,0);
fprintf(stdout,"get %d bytes of client data: %s from %d\n",ret,users[clnt_sock].buf,clnt_sock);
//receive failure
if(ret<0){
//close connection,把最后一个connection移到此处
if(errno!=EAGAIN){
close(clnt_sock);
users[fds[i].fd]=users[fds[user_cnt].fd];
fds[i]=fds[user_cnt];
i--;
user_cnt--;
}
}else if(ret==0){
}else{
//朝着每一个client发送, 产生POLLOUT事件
for(int j=0;j<=user_cnt;j++){
if(fds[j].fd==clnt_sock){
continue;
}
fds[j].events|=~POLLIN;
fds[j].events|=POLLOUT;
users[fds[j].fd].write_buf=users[clnt_sock].buf;
}
}
}
//对于 发送, 发送之后仍转回接收状态
else if(fds[i].revents&POLLOUT){
clnt_sock=fds[i].fd;
if(!users[clnt_sock].write_buf){
continue;
}
ret=send(clnt_sock,users[clnt_sock].write_buf,strlen(users[clnt_sock].write_buf),0);
users[clnt_sock].write_buf=NULL;
fds[i].events|=~POLLOUT;
fds[i].events|=POLLIN;
}
}
}
free(users);
close(serv_sock);
return 0;
}
//将fd设置为非阻塞
int setnonblocking(int fd){
int old_opt=fcntl(fd,F_GETFL);
int new_opt=old_opt|O_NONBLOCK;
fcntl(fd,F_SETFL,new_opt);
return old_opt;
}
void error_handling(char *msg){
fputs(msg,stderr);
fputc('\n',stderr);
exit(1);
}
#define _GNU_SOURCE 1
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<poll.h>
#include <fcntl.h>
//poll同时监听用户输入和网络连接,并用splice将用户输入定向到网络连接上实现零拷贝
#define BUF_SIZE 100
void error_handling(char *msg);
int main(int argc, char *argv[]){
int sock;
struct sockaddr_in serv_addr;
if (argc != 3)
{
printf("Usage : %s <IP> <port> \n", argv[0]);
exit(1);
}
sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
serv_addr.sin_port = htons(atoi(argv[2]));
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == -1)
error_handling("connect() error");
struct pollfd fds[2];
//fd=0 标准输入
fds[0].fd=0;
fds[0].events=POLLIN;
fds[0].revents=0;
//fd=sock 可读
//POLLRDHUP sock的一端关闭了连接,或者是写端关闭了连接,_GNU_SOURCE 可以用来判断链路是否发生异常
fds[1].fd=sock;
fds[1].events=POLLIN|POLLRDHUP;
fds[1].revents=0;
char buf[BUF_SIZE];
int pipefd[2];
int ret=pipe(pipefd);
if(ret==-1){
error_handling("pipe() error");
}
while(1){
ret=poll(fds,2,-1);
if(ret<0){
fprintf(stderr, "poll failure\n");
break;
}
//关闭一端连接
if(fds[1].revents & POLLRDHUP){
fprintf(stdout, "server close the connection\n");
break;
}
//数据接收
else if(fds[1].revents & POLLIN){
memset(buf, '\0', sizeof(BUF_SIZE));
recv(fds[1].fd, buf, BUF_SIZE-1, 0);
fprintf(stdout, "%s\n", buf);
}
//标准输入就绪,从标准输入重定向的网络连接
if(fds[0].revents & POLLIN){
ret=splice(0,NULL,pipefd[1],NULL,32768,SPLICE_F_MORE|SPLICE_F_MOVE);
ret=splice(pipefd[0],NULL,sock,NULL,32768,SPLICE_F_MORE|SPLICE_F_MOVE);
}
}
close(sock);
return 0;
}
void error_handling(char *msg)
{
fputs(msg, stderr);
fputc('\n', stderr);
exit(1);
}
除发送者外,其他人都能收到消息。
高级应用3-同时处理TCP和UDP请求
epoll
#include<stdio.h>
#include<stdlib.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<unistd.h>
#include<fcntl.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#include<sys/types.h>
#define BUF_SIZE 1024
#define MAX_EVENT_NUMBER 64
//同时处理TCP和UDP
int setnonblocking(int fd);
void addfd(int epfd,int fd);
void error_handling(char *msg);
int main(int argc,char *argv[]){
int tcp_sock,udp_sock,clnt_sock;
struct sockaddr_in address,clnt_addr;
socklen_t clnt_addr_size;
char buf[BUF_SIZE];
struct epoll_event events[MAX_EVENT_NUMBER];
if (argc != 2)
{
fprintf(stderr, "usage: %s <port>\n", argv[0]);
return 1;
}
//tcp socket
tcp_sock=socket(PF_INET,SOCK_STREAM,0);
if(tcp_sock==-1){
error_handling("tcp socket() error");
}
memset(&address,0,sizeof(address));
address.sin_family=AF_INET;
address.sin_addr.s_addr=htonl(INADDR_ANY);
address.sin_port=htons(atoi(argv[1]));
int option=1;
setsockopt(tcp_sock,SOL_SOCKET,SO_REUSEADDR,&option,sizeof(option));
int ret=0;
//分配IP地址和端口号
if(bind(tcp_sock,(struct sockaddr*)&address,sizeof(address))==-1){
error_handling("bind() error");
}
//进入等待连接请求状态
if(listen(tcp_sock,5)==-1){
error_handling("listen() error");
}
udp_sock=socket(PF_INET,SOCK_DGRAM,0);
if(udp_sock==-1){
error_handling("udp socket() error");
}
if(bind(udp_sock,(struct sockaddr*)&address,sizeof(address))==-1){
error_handling("bind() error");
}
int epfd=epoll_create(5);
if(epfd==-1){
error_handling("epoll_create() error");
}
addfd(epfd,tcp_sock);
addfd(epfd,udp_sock);
while(1){
int ret=epoll_wait(epfd,events,MAX_EVENT_NUMBER,-1);
if(ret<0){
printf("epoll failure\n");
break;
}
for(int i=0;i<ret;i++){
int sockfd=events[i].data.fd;
// new tcp client request
if(sockfd==tcp_sock){
clnt_addr_size=sizeof(clnt_addr);
int clnt_sock=accept(sockfd,(struct sockaddr*)&clnt_addr,&clnt_addr_size);
if(clnt_sock==-1){
error_handling("accept() error");
}
//使用ET注册fd
addfd(epfd,clnt_sock);
}
//udp 传输数据 数据报 不用建立连接 用sendto recvfrom传递client address
else if(sockfd==udp_sock){
char buf[BUF_SIZE];
while(1){
memset(&buf,0,sizeof(buf));
clnt_addr_size=sizeof(clnt_addr);
//<sys/types.h>
ret=recvfrom(sockfd,buf,BUF_SIZE-1,0,(struct sockaddr*)&clnt_addr,&clnt_addr_size);
printf("UDP-Received: %s",buf);
if(ret>0){
sendto(sockfd,buf,BUF_SIZE-1,0,(struct sockaddr*)&clnt_addr,clnt_addr_size);
}
}
}
//tcp 传输数据
else if(events[i].events&EPOLLIN){
memset(&buf,0,sizeof(buf));
while(1){
clnt_addr_size=sizeof(clnt_addr);
//<sys/types.h>
ret=recv(sockfd,buf,BUF_SIZE-1,0);
if(ret<0){
if((errno==EAGAIN)||(errno==EWOULDBLOCK)){
break;
}
close(sockfd);
break;
}else if(ret==0){
close(sockfd);
}else{
printf("TCP-Received: %s",buf);
send(sockfd,buf,BUF_SIZE-1,0);
}
}
}else{
printf("Sonmething else happened\n");
}
}
}
close(tcp_sock);
return 0;
}
//将fd设置为非阻塞
int setnonblocking(int fd){
int old_opt=fcntl(fd,F_GETFL);
fcntl(fd,old_opt|O_NONBLOCK);
return old_opt;
}
//注册事件,是否为条件触发
void addfd(int epfd,int fd){
struct epoll_event event;
event.data.fd=fd;
event.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&event);
setnonblocking(fd);
}
void error_handling(char *msg){
fputs(msg,stderr);
fputc('\n',stderr);
exit(1);
}
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include <fcntl.h>
//TCP client
#define BUF_SIZE 100
void error_handling(char *msg);
int main(int argc, char *argv[]){
int sock;
struct sockaddr_in serv_addr;
int ret=0;
if (argc != 3)
{
printf("Usage : %s <IP> <port> \n", argv[0]);
exit(1);
}
sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
serv_addr.sin_port = htons(atoi(argv[2]));
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == -1)
error_handling("connect() error");
char buf[BUF_SIZE];
char *msg="Hello!\n";
ret=send(sock,msg,sizeof(msg),0);
//回声
recv(sock,buf,sizeof(buf),0);
printf("TCP-Receive: %s",buf);
close(sock);
return 0;
}
void error_handling(char *msg)
{
fputs(msg, stderr);
fputc('\n', stderr);
exit(1);
}
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include <fcntl.h>
//TCP client
#define BUF_SIZE 100
void error_handling(char *msg);
int main(int argc, char *argv[]){
int sock;
struct sockaddr_in serv_addr;
int ret=0;
if (argc != 3)
{
printf("Usage : %s <IP> <port> \n", argv[0]);
exit(1);
}
sock = socket(PF_INET, SOCK_DGRAM, 0);
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
serv_addr.sin_port = htons(atoi(argv[2]));
char buf[BUF_SIZE];
char *msg="Hello!\n";
ret=sendto(sock,msg,sizeof(msg),0,(struct sockaddr *)&serv_addr, sizeof(serv_addr));
//回声
socklen_t length=sizeof(serv_addr);
recvfrom(sock,buf,sizeof(buf),0,(struct sockaddr *)&serv_addr,&length);
printf("UDP-Receive: %s",buf);
close(sock);
return 0;
}
void error_handling(char *msg)
{
fputs(msg, stderr);
fputc('\n', stderr);
exit(1);
}
十、信号
signal和sigaction
统一信号源
sigaction+pipe
#include<stdio.h>
#include<stdlib.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<unistd.h>
#include<fcntl.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#include<sys/types.h>
#include<signal.h>
#include<stdbool.h>
//将信号Signal与I/O事件统一,统一事件源
// signal_handler 向 pipe中写入信号值,main while 从pipe中用poll获取 pipe中的信号值 进行处理
#define MAX_EVENT_NUMBER 1024
//传输信号值的管道
static int pipefd[2];
void sig_handler(int sig);
void addsig(int sig);
int setnonblocking(int fd);
void addfd(int epfd,int fd);
void error_handling(char *msg);
int main(int argc,char *argv[]){
int serv_sock,clnt_sock;
struct sockaddr_in serv_addr,clnt_addr;
socklen_t clnt_addr_size;
if (argc != 2)
{
fprintf(stderr, "usage: %s <port>\n", argv[0]);
return 1;
}
serv_sock=socket(PF_INET,SOCK_STREAM,0);
memset(&serv_addr,0,sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_addr.sin_port=htons(atoi(argv[1]));
int option=1;
setsockopt(serv_sock,SOL_SOCKET,SO_REUSEADDR,&option,sizeof(option));
//分配IP地址和端口号
if(bind(serv_sock,(struct sockaddr*)&serv_addr,sizeof(serv_addr))==-1){
error_handling("bind() error");
}
//进入等待连接请求状态
if(listen(serv_sock,5)==-1){
error_handling("listen() error");
}
struct epoll_event events[MAX_EVENT_NUMBER];
int epfd=epoll_create(5);
if(epfd==-1){
error_handling("epoll_create() error");
}
addfd(epfd,serv_sock);
/*使用socketpair创建管道, 注册pipefd[0]上的可读事件*/
int ret=socketpair(PF_UNIX,SOCK_STREAM,0,pipefd);
if(ret==-1){
error_handling("socketpair() error");
}
setnonblocking(pipefd[1]);
addfd(epfd,pipefd[0]);
/*设置一些信号的处理函数*/
addsig(SIGHUP);
addsig(SIGCHLD);
addsig(SIGTERM);
addsig(SIGINT);
bool stop_server=false;
while(!stop_server){
ret=epoll_wait(epfd,events,MAX_EVENT_NUMBER,-1);
if((ret<0)&&(errno!=EINTR)){
fprintf(stderr,"epoll wait failure\n");
break;
}
for(int i=0;i<ret;i++){
int sockfd=events[i].data.fd;
/*如果就绪的文件描述符是listenfd,则处理新的连接*/
if(sockfd==serv_sock){
clnt_addr_size=sizeof(clnt_addr);
clnt_sock=accept(serv_sock,(struct sockaddr*)&clnt_addr,&clnt_addr_size);
addfd(epfd,clnt_sock);
}
/*如果就绪的文件描述符是pipefd[0], 则处理信号*/
else if((sockfd==pipefd[0])&&(events[i].events&EPOLLIN)){
char signals[1024];
ret=recv(pipefd[0],signals,sizeof(signals),0);
if(ret==-1){
continue;
}else if(ret==0){
continue;
}else{
/* 因为每个信号值占1字节,所以按字节来逐个接收信号。我们以SIGTERM
* 为例,来说明如何安全地终止服务器主循环
*/
for(int i=0;i<ret;i++){
switch(signals[i]){
case SIGCHLD:
case SIGHUP:continue;
case SIGTERM:
case SIGINT:
stop_server=true;
printf("CTRL + C --> close fds\n");
}
}
}
}
else{}
}
}
close(serv_sock);
close(pipefd[0]);
close(pipefd[1]);
return 0;
}
void sig_handler(int sig){
int save_errno=errno;
send(pipefd[1],(char*)&sig,1,0);
errno=save_errno;
}
void addsig(int sig){
struct sigaction sa;
memset(&sa,0,sizeof(sa));
sa.sa_handler=sig_handler;
sa.sa_flags=SA_RESTART;
sigfillset(&sa.sa_mask);
if(sigaction(sig,&sa,NULL)==-1){
error_handling("sigaction error");
}
}
int setnonblocking(int fd){
int old_opt=fcntl(fd,F_GETFL);
int new_opt=old_opt|O_NONBLOCK;
fcntl(fd,F_SETFL,new_opt);
return old_opt;
}
void addfd(int epfd,int fd){
struct epoll_event event;
event.data.fd=fd;
event.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&event);
setnonblocking(fd);
}
void error_handling(char *msg){
fputs(msg,stderr);
fputc('\n',stderr);
exit(1);
}
SIGURG
sigaction
#include<stdio.h>
#include<stdlib.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<unistd.h>
#include<fcntl.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#include<sys/types.h>
#include<signal.h>
#include<stdbool.h>
//信号Signal处理 MSG_OOB
#define BUF_SIZE 1024
static int clnt_sock;
void sig_handler(int sig);
void addsig(int sig);
void error_handling(char *msg);
int main(int argc,char *argv[]){
int serv_sock;
struct sockaddr_in serv_addr,clnt_addr;
socklen_t clnt_addr_size;
if (argc != 2)
{
fprintf(stderr, "usage: %s <port>\n", argv[0]);
return 1;
}
serv_sock=socket(PF_INET,SOCK_STREAM,0);
memset(&serv_addr,0,sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_addr.sin_port=htons(atoi(argv[1]));
int option=1;
setsockopt(serv_sock,SOL_SOCKET,SO_REUSEADDR,&option,sizeof(option));
//分配IP地址和端口号
if(bind(serv_sock,(struct sockaddr*)&serv_addr,sizeof(serv_addr))==-1){
error_handling("bind() error");
}
//进入等待连接请求状态
if(listen(serv_sock,5)==-1){
error_handling("listen() error");
}
clnt_addr_size=sizeof(clnt_addr);
clnt_sock=accept(serv_sock,(struct sockaddr*)&clnt_addr,&clnt_addr_size);
if(clnt_sock<0){
error_handling("accept() error");
}
addsig(SIGURG);
/* 使用SIGURG信號之前,我们必须设置socket的宿主进程或进程组*/
fcntl(clnt_sock,F_SETOWN,getpid());
char buf[BUF_SIZE];
while(1){
memset(buf,0,sizeof(buf));
int ret=recv(clnt_sock,buf,BUF_SIZE-1,0);
if(ret<=0){
break;
}
printf("get %d bytes of normal data: %s \n",ret,buf);
}
close(clnt_sock);
close(serv_sock);
return 0;
}
void sig_handler(int sig){
fprintf(stdout,"sig_urg function\n");
int save_errno=errno;
char buf[BUF_SIZE];
int ret=recv(clnt_sock,buf,BUF_SIZE-1,MSG_OOB);
printf("get %d bytes of oob %s \n",ret,buf);
errno=save_errno;
}
void addsig(int sig){
struct sigaction sa;
memset(&sa,0,sizeof(sa));
sa.sa_handler=sig_handler;
sa.sa_flags=SA_RESTART;
sigfillset(&sa.sa_mask);
if(sigaction(sig,&sa,NULL)==-1){
error_handling("sigaction error");
}
}
void error_handling(char *msg){
fputs(msg,stderr);
fputc('\n',stderr);
exit(1);
}
十一、定时器
基于升序链表
#ifndef LST_TIMER
#define LST_TIMER
#include <time.h>
#include <arpa/inet.h>
#include <iostream>
#define BUFFER_SIZE 64
class util_timer; // ~ 前向声明
// ~ 用户数据结构
struct client_data {
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
util_timer* timer;
};
class util_timer {
public:
util_timer(): prev(NULL), next(NULL) {};
public:
time_t expire;
void (*cb_func) (client_data*);
client_data* user_data;
util_timer *prev;
util_timer *next;
};
class sort_timer_lst {
public:
sort_timer_lst() : head(NULL), tail(NULL) {} // ~ 构造函数
// ~ 链表被销毁时,删除其中所有的定时器
~sort_timer_lst() {
util_timer *tmp = head;
while (tmp) {
head = head->next;
delete tmp;
tmp = head;
}
}
// ~ add
void add_timer(util_timer *timer) {
if (!timer) return;
if (!head) { // ~ 链表为空
head = tail = timer;
return;
}
if (timer->expire < head->expire) { // ~ timer 期限比 head 还小
timer->next = head;
head->prev = timer;
head = timer;
return;
}
add_timer(timer, head); // ~ 辅助函数(重载) (本质就是往有序链表中插入数据)
}
// ~ adjust
void adjust_timer(util_timer *timer) {
if (!timer) return;
util_timer *tmp = timer->next;
// ~ 如果目标位于链表尾部,或者重新设置的超时时间仍然小于下一个定时器(每次调整只会增加)
if (!tmp || (timer->expire < tmp->expire)) return;
if (timer == head) { // ~ 如果被调整的定时器是head,则需要将head从头部拔下来,重新设置头节点,并插入修改后的head'
head = head->next;
head->prev = NULL;
timer->next = NULL;
add_timer(timer, head);
} else {
// ~ 拔下当前节点
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
add_timer(timer, timer->next); // ~ 往后插入
}
}
// ~ del_timer
void del_timer(util_timer *timer) {
if (!timer) return;
if ((timer == head) && (timer == tail)) { // ~ 链表只有一个节点
delete timer;
head = NULL;
tail = NULL;
return;
}
if (timer == head) { // ~ 删除头节点
head = head->next;
head->prev = NULL;
delete timer;
return;
}
if (timer == tail) { // ~ 删除尾节点
tail = tail->prev;
tail->next = NULL;
delete timer;
return;
}
// ~ 删除中部节点
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete(timer);
}
// ~ 心搏函数
void tick() {
if (!head) return; // ~ 链表空
std::cout << "timer tick" << std::endl;
time_t cur = time(NULL);
util_timer *tmp = head;
while (tmp) {
if (cur < tmp->expire) break;
tmp->cb_func(tmp->user_data);
head = tmp->next;
if (head) head->prev = NULL;
delete tmp;
tmp = head;
}
}
private:
void add_timer(util_timer *timer, util_timer *lst_head) {
util_timer* prev = lst_head;
util_timer* tmp = prev->next;
while (tmp) {
if (timer->expire < tmp->expire) {
prev->next = timer;
timer->prev = prev;
timer->next = tmp;
return;
}
prev = prev->next;
tmp = tmp->next;
}
prev->next = timer; // ~ 此时 prev 是 tail 位置
timer->prev = prev;
timer->next = NULL;
}
private:
util_timer *head;
util_timer *tail;
};
#endif
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#include "11_2_lst_timer.h"
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define TIMESLOT 5
// ~ 设置非阻塞函数
// ~ epoll 事件注册函数
// ~ 信号注册,信号处理函数
// ~ 定时处理函数
// ~ 回调函数 ↑ 用到
static int pipefd[2]; // ~ 管道
static sort_timer_lst timer_lst; // ~ 升序定时器链表
static int epfd = 0;
int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
void addfd(int epfd, int fd) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
void sig_handler(int sig) {
int save_errno = errno;
int msg = sig;
send(pipefd[1], (char*)&msg, 1, 0);
errno = save_errno;
}
void addsig(int sig) {
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = sig_handler;
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}
void timer_handler() {
/*定时处理任务,实际上就是调用tick函数*/
timer_lst.tick();
/*因为一次alarm调用只会引起一次SIGALRM信号,所以我们要重新定时,以不断触发
*SIGALRM信号
**/
alarm(TIMESLOT);
}
/*定时器回调函数, 它删除非活动连接socket上的注册事件,并关闭之*/
void cb_func(client_data* user_data) {
epoll_ctl(epfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
assert(user_data);
close(user_data->sockfd);
std::cout << "close fd " << user_data->sockfd << std::endl;
}
int main(int argc, char *argv[]) {
if (argc < 2) {
std::cout << "usage : " << basename(argv[0]) << " port_number " << std::endl;
return 1;
}
int ret = 0;
struct sockaddr_in serv_addr,clnt_addr;
int serv_sock,clnt_sock;
socklen_t clnt_addr_size;
bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_addr.sin_port = htons(atoi(argv[1]));
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
assert(serv_sock >= 0);
int option = 1;
setsockopt(serv_sock, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option));
ret = bind(serv_sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
assert(ret != -1);
ret = listen(serv_sock, 5);
assert(ret != -1);
epoll_event events[MAX_EVENT_NUMBER];
int epfd = epoll_create(5);
assert(epfd != -1);
addfd(epfd, serv_sock);
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); // ~ 双向管道
assert(ret != -1);
setnonblocking(pipefd[1]);
addfd(epfd, pipefd[0]);
addsig(SIGALRM);
addsig(SIGTERM);
bool stop_server = false;
client_data *users = new client_data[FD_LIMIT];
bool timeout = false;
alarm(TIMESLOT);
while (!stop_server) {
int number = epoll_wait(epfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) {
std::cout << "epoll failure " << std::endl;
break;
}
for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
/*处理新到的客户连接*/
if (sockfd == serv_sock) {
clnt_addr_size = sizeof(clnt_addr);
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_addr, &clnt_addr_size);
addfd(epfd, clnt_sock);
users[clnt_sock].address = clnt_addr;
users[clnt_sock].sockfd = clnt_sock;
/*创建定时器, 设置其回调函数与超时时间,
* 然后绑定定时器与用户数据,
* 最后将定时器添加到链表timer_lst中*/
util_timer *timer = new util_timer;
timer->user_data = &users[clnt_sock];
timer->cb_func = cb_func;
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
users[clnt_sock].timer = timer;
timer_lst.add_timer(timer);
}
/*处理信号*/
else if ((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) {
int sig;
char signals[1024];
ret = recv(pipefd[0], signals, sizeof(signals), 0);
if (ret == -1) continue;
else if (ret == 0) continue;
else {
for (int i = 0; i < ret; ++i) {
switch(signals[i]) {
case SIGALRM: {
/*用timeout变量标记有定时任务需要处理,
* 但不立即处理定时任务,
* 这是因为定时任务的优先级不是很高,
* 我们优先处理其他更重要的任务*/
timeout = true;
break;
}
case SIGTERM: {
stop_server = true;
}
}
}
}
}
/*处理客户连接上接收到的数据*/
else if (events[i].events & EPOLLIN) {
memset(users[sockfd].buf, '\0', BUFFER_SIZE);
ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0);
std::cout << "get " << ret << " bytes of client data " << users[sockfd].buf << " from " << sockfd << std::endl;
util_timer *timer = users[sockfd].timer;
if (ret < 0) {
/*如果发生错误,则关闭连接,并移除其对应的定时器*/
if (errno != EAGAIN) {
cb_func(&users[sockfd]);
if (timer) timer_lst.del_timer(timer);
}
} else if (ret == 0) {
/*如果对方已经关闭连接,则我们也关闭连接,并移除对应的定时器*/
cb_func(&users[sockfd]);
if (timer) timer_lst.del_timer(timer);
} else {
if (timer) {
/*如果某个客户连接上有数据可读,
* 则我们要调整该连接对应的定时器,
* 以延迟该连接被关闭的时间*/
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
std::cout << "adjust timer once " << std::endl;
timer_lst.add_timer(timer);
}
}
}
}
/*最后处理定时事件,
* 因为I/O事件有更高的优先级。当然,这样做将导致定时任务不能精确
* 地按照预期的时间执行
* */
if (timeout) {
timer_handler();
timeout = false;
}
}
close(serv_sock);
close(pipefd[1]);
close(pipefd[0]);
delete [] users;
return 0;
}
IO复用
#define TIMEOUT 5000
int timeout = TIMEOUT;
time_t start = time(NULL);
time_t end = time(NULL);
while(1)
{
printf("the timeout is now %d mil-seconds \n", timeout);
start = time(NULL);
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, timeout);
if ((number < 0) && (errno != EINTER))
{
printf("epoll failure\n");
break;
}
/* 如果epoll_wait成功返回0, 则说明超时时间到,此时便可处理定时任务,
* 并重置定时时间
*/
if (number == 0)
{
timeout = TIMEOUT;
continue;
}
end = time(NULL);
/*如果epoll_wait的返回值大于0, 则本次epoll_wait调用持续的时间是(end -
* start)*1000 ms,
* 我们需要将定时时间timeout减去这段时间,以获得下次epoll_wait调用的超时参数
*/
timeout -= (end - start) * 1000;
/*重新计算之后的timeout值可能等于0, 说明本次epoll_wait调用返回时,
* 不仅有文件描述就绪,而且其超时时间也刚好到达,此时我们也要定时任务,并重置定时时间
*/
if (timeout <= 0)
{
timeout = TIMEOUT;
}
//handle connections
}
基于时间轮
#ifndef TIME_WHEEL_TIMER_H_
#define TIME_WHEEL_TIMER_H_
#include<stdio.h>
#include<time.h>
#include<netinet/in.h>
#define BUFFER_SIZE 64
class tw_timer;
/*绑定socket和定时器*/
struct client_data{
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
tw_timer* timer;
};
class tw_timer{
public:
tw_timer(int rot,int rs):next(NULL),prev(NULL),rotation(rot),time_slot(ts){};
public:
int rotation; /*记录定时器在时间轮转多少圈后生效*/
int time_slot;/*记录定时器属于时间轮上哪个槽(对应的链表,下同) */
void (*cb_func)(client_data*);/*客户数据*/
client_data *user_data;/*客户数据*/
tw_timer *next;//指向下一个定时器
tw_timer *prev;//指向前一个定时器
};
//时间轮
class time_wheel{
public:
time_wheel():cur_slot(0){
for(int i=0;i<N;i++){
slots[i]=NULL; //初始化每个槽的头结点
}
}
//析构函数,遍历每个槽,并销毁其中的定时器
~time_wheel(){
for(int i=0;i<N;i++){
tw_timer *tmp=slots[i];
while(tmp){
slots[i]=tmp.next;
delete tmp;
tmp=slots[i];
}
}
}
//根据定时值timeout创建一个定时器,并把它插入合适的槽中
tw_timer* add_timer(int timeout){
if(timeout<0) return NULL;
int ticks=0;
/*下面根据待插入定时器的超时值计算它将在时间轮转动多少个滴答后被触发,
* 并将该滴答数存储于变量ticks中。如果待插入定时器的超时值小于时间轮的槽间隔SI,
* 则将ticks向上折合为1,否则就将ticks向下折合为timeout/SI
*/
if(timeout<SI){
ticks=1;
}else{
ticks=timeout/SI;
}
//计算待插入的定时器在时间轮转动多少圈后被触发
int rotation=ticks/N;
//计算待插入的定时器应该被插入哪个槽中
int ts=(cur_slot+(ticks%N))%N;
//创建新的定时器,它在时间轮转动rotation圈之后被触发,且位于第ts个槽上
tw_timer *timer=new tw_timer(rotation,ts);
//如果第ts个槽中尚无任何定时器,则把新建的定时器插入其中,
//并将该定时器设置为该槽的头结点
if(!slots[ts]){
printf("add timer,rotation is %d, ts is %d, cur_slot is %d\n",rotation,ts,cur_slot);
slots[ts]=timer;
}else{
//否则,将定时器插入第ts个槽中
timer->next=slots[ts];
slots[ts]->prev=timer;
slots[ts]=timer;
}
return timer;
}
void del_timer(tw_timer *timer){
if(!timer) return;
int ts=timer->time_slot;
// slots[ts]是目标定时器所在槽的头结点。如果目标定时器就是该头结点,
// 则需要重置第ts个槽的头结点
if(timer==slots[ts]){
slots[ts]=slots[ts]->next;
if(slots[ts]){
slots[ts]->prev=NULL;
}
delete timer;
}else{
timer->prev->next=timer->next;
if(timer->next){
timer->next->prev=timer->prev;
}
delete timer;
}
}
//SI 时间到后,调用该函数,时间轮向前滚动一个槽的间隔
void tick(){
tw_timer *tmp=slots[cur_slot];//取得时间轮上当前槽的头结点
printf("current slot is %d \n",cur_slot);
while(tmp){
printf("tick the timer once\n");
//如果定时器的rotation值大于0, 则本轮没到定时时间
if(tmp->rotation>0){
tmp->rotation--;
tmp=tmp->next;
}else{
//否则, 说明定时器已到期,于是执行定时任务,然后删除该定时器
tmp->cb_func(tmp->user_data);
if(tmp==slots[cur_slot]){
printf("delete header in cur_slot\n");
slots[cur_slot]=tmp->next;
delete tmp;
if(slots[cur_slot]){
slots[cur_slot]->prev=NULL;
}
tmp=slots[cur_slot];
}else{
tmp->prev->next=tmp->next;
if(tmp->next){
tmp->next->prev=tmp->prev;
}
tw_timer *tmp2=tmp->next;
delete tmp;
tmp=tmp2;
}
}
}
cur_slot=(cur_slot+1)%N;//更新时间轮的当前槽,以反应时间轮的转动
}
private:
static const int N 60; //时间轮上槽的数目
static const int SI 1;//每1s时间转动一次,即槽间隔为1s
tw_timer *slots[N]; //时间轮的槽, 其中每个元素指向一个定时器链表, 链表无序
int cur_slot; //时间轮的当前槽
};
#endif
基于最小堆
#ifndef MIN_HEAP_H
#define MIN_HEAP_H
#include<iostream>
#include<time.h>
#include<netinet/in.h>
using namespace std;
#define BUFFER_SIZE 64
class heap_timer;
/*绑定socket和定时器*/
struct client_data{
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
tw_timer* timer;
};
//定时器类
class heap_timer{
public:
heap_timer(int delay){
expire=(time(NULL)+delay);
}
public:
time_t expire;//定时器生效的绝对时间
void (*cb_func)(client_data *);//定时器回调函数
struct client_data *user_data;//用户数据
};
//时间堆类
class time_heap{
public:
//构造函数之一, 初始化一个大小为cap的空堆
time_heap(int cap) throw (std::exception):capacity(cap),cur_size(0){
array=new heap_timer * [capacity];//创建堆数组
if(!array){
throw exception();
}
for(int i=0;i<capacity;i++){
array[i]=NULL;
}
}
time_heap(heap_timer **init_array,int size,int cap) throw (std::exception):cur_size(size),capacity(cap){
if(capacity<size){
throw exception();
}
array=new heap_timer * [capacity];//创建堆数组
if(!array){
throw exception();
}
for(int i=0;i<capacity;i++){
array[i]=NULL;
}
if(size!=0){
//初始化数组
for(int i=0;i<size;i++){
array[i]=init_array[i];
}
for(int i=(cur_size-1)/2;i>=0;i--){
//对数组中的第[(cur_size-1)/2]~0个元素执行下滤操作
percolate_down(i);
}
}
}
~time_heap(){
for(int i=0;i<cur_size;i++){
delete array[i];
}
delete [] array;
}
void add_timer(heap_timer* timer) throw (std::exception){
if(!timer) return;
if(cur_size>=capacity){
resize();//如果当前堆数组容量不够,则将其扩大一倍
}
//新插入了一个元素,当前堆大小加1, hole是新建空穴的位置
int hole=cur_size++;
int parent=0;
//对从空穴到根节点的路径上的所有节点执行上虑操作
for(;hole>0;hole=parent){
parent=(hole-1)/2;
if(array[parent]->expire<=timer->expire){
break;
}else{
array[hole]=array[parent];
}
array[hole]=timer;
}
}
void del_timer(heap_timer *timer){
if(!timer) return;
//仅仅将目标定时器的回调函数设置为空, 即所谓的延迟销毁。
//这样将节省正删除该定时器造成的开销, 但这样做容易是堆数组膨胀
timer->cb_func=NULL;
}
heap_timer* top() const{
if(empty()) return NULL;
return array[0];
}
void pop_timer(){
if(empty()) return;
if(array[0]){
delete array[0];
array[0]=array[--cur_size];
percolate_down(0);//对新的堆顶元素执行下虑操作
}
}
void tick(){
heap_timer *tmp=array[0];
time_t cur=time(NULL); //循环处理堆中到期的定时器
while(!empty()){
if(!tmp){
break;
}
//如果堆顶定时器没有到期,则退出循环
if(tmp->expire>cur){
break;
}
//否则就执行堆顶定时器中的任务
if(array[0]->cb_func){
array[0]->cb_func(array[0]->user_data);
}
//将堆顶元素删除,同时生成新的堆顶定时器(array[0])
pop_timer();
tmp=array[0];
}
}
bool empty() const {return cur_size==0;}
private:
//最小堆的下虑操作,
//它确保堆数组中以第hole个节点作为根的子树拥有最小堆性质
void percolate_down(int hole){
heap_timer *tmp=array[hole];
int child=0;
for(;(hole*2+1)<=(cur_size-1);hole=child){
child=hole*2+1;
//找左右子中的较小值
if((child<(cur_size-1))&&(array[child+1]->expire<array[child]->expire)){
child++;
}
if(array[child]->expire<tmp->expire){
array[hole]=array[child];
}else{
break;
}
array[hole]=tmp;
}
}
void resize() throw (std::exception){
heap_timer **tmp=new heap_timer * [2*capacity];
for(int i=0;i<2*capacity;i++){
tmp[i]=NULL;
}
if(!tmp){
throw std::exception;
}
capacity=2*capacity;
for(int i=0;i<cur_size;i++){
tmp[i]=array[i];
}
delete [] array;
array=tmp;
}
private:
heap_timer **array;//堆数组
int capacity; //堆数组容量
int cur_size; //堆数组当前包含元素的个数
};
#endif
二倍扩充内存
- vector在push_back以成倍增长可以在均摊后达到O(1)的事件复杂度,相对于增长指定大小的O(n)时间复杂度更好。
- 为了防止申请内存的浪费,现在使用较多的有2倍与1.5倍的增长方式,而1.5倍的增长方式可以更好的实现对内存的重复利用。
十二、I/O框架库Libevent
下载安装
下载安装包,解压,进入
$ sudo ./configure -prefix=/usr/lib/libevent
提示错误
configure: error: openssl is a must but can not be found.
一般为openssl库的缘故,需要安装openssl,一般ubuntu自带openssl,这里需要openssl-devel,要注意的是在ubuntu中需要使用sudo apt-get install libssl-dev来安装而不是sudo apt-get install openssl-devel。
编译安装
sudo make
sudo make install
测试是否安装成功
在用测试程序时出现
: fatal error: event.h: No such file or directory
解决:安装依赖库
sudo apt-get install libevent-dev
示例
十三、多进程编程
IPC_PRIVATE
#include<stdio.h>
#include<sys/sem.h>
#include<sys/wait.h>
#include<unistd.h>
#include<stdlib.h>
union semun{
int val; //SETVAL
struct semid_ds* buf; //IPC_STAT IPC_SET
unsigned short int* array; //GETALL SETALL
struct seminfo* __buf; //IPC_INFO
};
//op为-1时执行P操作, op为1时执行V操作
void pv(int sem_id,int op){
struct sembuf sem_b;
sem_b.sem_num=0;
sem_b.sem_op=op;
sem_b.sem_flg=SEM_UNDO;
semop(sem_id,&sem_b,1);
}
int main(int argc,char* argv[]){
int sem_id=semget(IPC_PRIVATE,1,0666);
union semun sem_un;
sem_un.val=1;
semctl(sem_id,0,SETVAL,sem_un);
pid_t id=fork();
if(id<0){
return 1;
}
else if(id==0){
printf("child try to get binary sem\n");
pv(sem_id,-1);
printf("child get the sem and would release it after 5 seconds\n");
sleep(5);
pv(sem_id,1);
exit(0);
}else{
printf("parent try to get binary sem\n");
pv(sem_id,-1);
printf("parent get the sem and would release it after 5 seconds\n");
sleep(5);
pv(sem_id,1);
}
waitpid(id,NULL,0);
semctl(sem_id,0,IPC_RMID,sem_un); ////删除信号量
return 0;
}
共享内存
#define _GNU_SOURCE 1
#include<stdio.h>
#include<stdlib.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<unistd.h>
#include<fcntl.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#include<signal.h>
#include<sys/mman.h>
#include<stdbool.h>
#include<sys/wait.h>
#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMET 65535
struct client_data{
struct sockaddr_in address; //客户端的socket地址
int connfd; //socket文件描述符
pid_t pid; //处理这个连接的子进程的PID
int pipefd[2]; //和父进程通信用的管道
};
int sig_pipefd[2];
int epfd;
int serv_sock;
int shmfd;
static const char* shm_name="/my_shm";
char* share_mem=0;
bool stop_child=false;
//当前客户数量
int user_count=0;
//客户连接数组。进程用客户连接的编号来索引这个数组,即可取得相关的客户连接数据
struct client_data* users=0;
//子进程和客户连接的映射关系。用进程的PID来索引这个数组,即可取得该进程所处理的客户连接的编号
int* sub_process=0;
void error_handling(const char *msg){
fputs(msg,stderr);
fputc('\n',stderr);
exit(1);
}
int setnonblocking(int fd){
int old_opt=fcntl(fd,F_GETFL);
int new_opt=old_opt|O_NONBLOCK;
fcntl(fd,F_SETFL,new_opt);
return old_opt;
}
void addfd(int epfd,int fd){
struct epoll_event event;
event.data.fd=fd;
event.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&event);
setnonblocking(fd);
}
void sig_handler(int sig){
int save_errno=errno;
send(sig_pipefd[1],(char*)&sig,1,0);
errno=save_errno;
}
void addsig(int sig,void(*handler)(int),bool restart=true){
struct sigaction sa;
memset(&sa,0,sizeof(sa));
sa.sa_handler=handler;
if(restart){
sa.sa_flags|=SA_RESTART;
}
if(sigfillset(&sa.sa_mask)==-1){
error_handling("sigaction() error");
}
}
void child_term_handler(int sig){
stop_child=true;
}
//子进程运行的函数。参数idx指出该子进程处理的客户连接的编号,users是保存所有客户端连接数据的数组,参数share_mem指出共享内存的起始地址
int run_child(int idx,struct client_data* users,char* share_mem){
struct epoll_event events[MAX_EVENT_NUMBER];
//子进程使用I/O复用技术来同时监听两个文件描述符:客户连接socket、与父进程通信的管道文件描述符
int child_epfd=epoll_create(5);
if(child_epfd==-1){
error_handling("epoll_create() error");
}
int connfd=users[idx].connfd;
addfd(child_epfd,connfd);
int pipefd=users[idx].pipefd[1];
addfd(child_epfd,pipefd);
//子进程需要设置自己的信号处理函数
addsig(SIGTERM,child_term_handler,false);
int ret=0;
while(!stop_child){
int number=epoll_wait(child_epfd, events, MAX_EVENT_NUMBER, -1);
if((number < 0) && (errno != EINTR)){
printf("epoll failure\n");
break;
}
for(int i=0;i<number;i++){
int sockfd = events[i].data.fd;
//本子进程负责的客户连接有数据到达
if ((sockfd == connfd) && (events[i].events &EPOLLIN)){
memset(share_mem+idx*BUFFER_SIZE,0,BUFFER_SIZE);
ret=recv(connfd,share_mem+idx*BUFFER_SIZE,BUFFER_SIZE-1,0);
if(ret<0){
if(errno!=EAGAIN){
stop_child=true;
}
}else if(ret==0){
stop_child=true;
}else{
//成功读取客户数据后就通知主进程(通过管道)来处理
send(pipefd,(char*)&idx,sizeof(idx),0);
}
}
//主进程通知本进程(通过管道)将第client个客户的数据发送到本进程负责的客户端
else if((sockfd == pipefd) && (events[i].events & EPOLLIN)){
int client=0;
//接收主进程发送来的数据, 即有客户数据到达的连接的编号
ret = recv(sockfd, (char*)&client, sizeof(client), 0);
if(ret<0){
if(errno!=EAGAIN){
stop_child=true;
}
}else if(ret==0){
stop_child=true;
}else{
//成功读取客户数据后就通知主进程(通过管道)来处理
send(connfd,share_mem+client*BUFFER_SIZE,BUFFER_SIZE,0);
}
}else{
continue;
}
}
}
close(connfd);
close(pipefd);
close(child_epfd);
return 0;
}
int main(int argc,char *argv[]){
struct sockaddr_in serv_addr,clnt_addr;
socklen_t clnt_addr_size;
if (argc != 2)
{
printf( "usage: %s <port>\n", argv[0]);
return 1;
}
serv_sock=socket(PF_INET,SOCK_STREAM,0);
memset(&serv_addr,0,sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_addr.sin_port=htons(atoi(argv[1]));
int option=1;
setsockopt(serv_sock,SOL_SOCKET,SO_REUSEADDR,&option,sizeof(option));
//分配IP地址和端口号
if(bind(serv_sock,(struct sockaddr*)&serv_addr,sizeof(serv_addr))==-1){
error_handling("bind() error");
}
//进入等待连接请求状态
if(listen(serv_sock,5)==-1){
error_handling("listen() error");
}
user_count=0;
//避免 不能将 "void *" 类型的值分配到 "task_struct *" 类型的实体
users=(struct client_data*)malloc(sizeof(struct client_data)*(1+FD_LIMIT));
sub_process=(int*)malloc(sizeof(int)*(PROCESS_LIMET+1));
for(int i=0;i<PROCESS_LIMET;i++){
sub_process[i]=-1;
}
epoll_event events[MAX_EVENT_NUMBER];
epfd=epoll_create(1);
if(epfd==-1){
error_handling("epoll_create() error");
}
addfd(epfd,serv_sock);
int ret=socketpair(PF_UNIX,SOCK_STREAM,0,sig_pipefd);
if(ret==-1){
error_handling("socketpair() error");
}
setnonblocking(sig_pipefd[1]);
addfd(epfd,sig_pipefd[0]);
addsig(SIGCHLD,sig_handler);
addsig(SIGTERM,sig_handler);
addsig(SIGINT,sig_handler);
addsig(SIGPIPE,sig_handler);
bool stop_server=false;
bool terminate=false;
//创建共享内存,作为所有客户socket连接的缓存
shmfd=shm_open(shm_name,O_CREAT|O_RDWR,0666);
if(shmfd==-1){
error_handling("shm_open() error");
}
//ftruncate ()会将参数fd 指定的文件大小改为参数length 指定的大小
ret=ftruncate(shmfd,USER_LIMIT*BUFFER_SIZE);
if(ret==-1){
error_handling("ftruncate() error");
}
//mmap将一个文件或者其它对象映射到进程的地址空间 <sys/mman.h>
/*
void * mmap (void *addr,留给内核来完成
size_t len,
int prot,访问权限PROT_READ(可读) , PROT_WRITE (可写), PROT_EXEC (可执行), PROT_NONE(不可访问)
int flags,MAP_SHARED , MAP_PRIVATE , MAP_FIXED,
int fd,
off_t offset);
*/
share_mem=(char*)mmap(NULL,USER_LIMIT*BUFFER_SIZE,PROT_READ|PROT_WRITE,MAP_SHARED,shmfd,0);
if(*share_mem==MAP_SHARED){
error_handling("mmap() error");
}
close(shmfd);
while(!stop_server){
int number=epoll_wait(epfd,events,MAX_EVENT_NUMBER,-1);
if((number<0)&&(errno!=EINTR)){
printf("epoll failuer\n");
break;
}
for(int i=0;i<number;i++){
int sockfd=events[i].data.fd;
if(sockfd==serv_sock){
clnt_addr_size=sizeof(clnt_addr);
int connfd=accept(serv_sock,(struct sockaddr*)&clnt_addr,&clnt_addr_size);
if(connfd<0){
printf("errno is : %d\n",errno);
continue;
}
if(user_count>=USER_LIMIT){
const char* info="too many users\n";
printf("%s",info);
send(connfd,info,strlen(info),0);
close(connfd);
continue;
}
users[user_count].address=clnt_addr;
users[user_count].connfd=connfd;
//在主进程和子进程之间建立管道
ret=socketpair(PF_UNIX,SOCK_STREAM,0,users[user_count].pipefd);
if(ret==-1){
error_handling("socketpair() error");
}
pid_t pid=fork();
if(pid<0){
close(connfd);
continue;
}else if(pid==0){
//发送信号
close(epfd);
close(serv_sock);
close(users[user_count].pipefd[0]);
close(sig_pipefd[1]);
close(sig_pipefd[0]);
//子进程开始处理
run_child(user_count,users,share_mem);
//解除映射
munmap((void*)share_mem,USER_LIMIT*BUFFER_SIZE);
exit(0);
}else{
close(connfd);
close(users[user_count].pipefd[1]);
//接收信号
addfd(epfd,users[user_count].pipefd[0]);
users[user_count].pid=pid;
//记录新的客户连接在数组users中的索引值,建立进程pid和该索引值之间的映射关系
sub_process[pid]=user_count;
user_count++;
}
}else if((sockfd==sig_pipefd[0])&&events[i].events&EPOLLIN){
//接收到信号
char signals[1024];
ret=recv(sig_pipefd[0],signals,sizeof(signals),0);
if(ret==-1){
continue;
}else if(ret==0){
continue;
}else{
for(int i=0;i<ret;i++){
switch(signals[i]){
//子进程退出,表示有某个客户端关闭了连接
case SIGCHLD:
{
pid_t pid;
int stat;
while((pid=waitpid(-1,&stat,WNOHANG))>0){
//清除第del_user个客户连接使用的相关数据
int del_user=sub_process[pid];
sub_process[pid]=-1;
if((del_user<0)||(del_user>USER_LIMIT)){
continue;
}
//清除第del_user个客户连接使用的相关数据
epoll_ctl(epfd,EPOLL_CTL_DEL,users[del_user].pipefd[0],0);
close(users[del_user].pipefd[0]);
users[del_user]=users[--user_count];
sub_process[users[del_user].pid]=del_user;
}
if(terminate&&user_count==0){
stop_server=true;
}
break;
}
case SIGTERM:
case SIGINT:
{
printf("kill all the child now\n");
if(user_count==0){
stop_server=true;
break;
}
for(int i=0;i<user_count;i++){
int pid=users[i].pid;
kill(pid,SIGTERM);
}
terminate=true;
break;
}
default: break;
}
}
}
}
//某个子进程向父进程写入了数据
else if(events[i].events&EPOLLIN){
//读取管道数据, child变量记录了是哪个客户连接有数据到达
int child=0;
ret = recv(sockfd, (char *)&child, sizeof(child), 0);
printf("read data from child accross pipe.\n");
if (ret == -1)
{
continue;
}
else if (ret == 0)
{
continue;
}
else{
//向负责处理第child个客户连接的子进程之外的其他子进程发送消息,通知它们有客户数据要写
for(int j=0;j<user_count;j++){
if(users[j].pipefd[0]!=sockfd){
printf("send data to child accross pipe.\n");
printf("%s",(char*)&child);
send(users[j].pipefd[0],(char*)&child,sizeof(child),0);
}
}
}
}
}
}
//释放资源
close(sig_pipefd[0]);
close(sig_pipefd[1]);
close(serv_sock);
close(epfd);
shm_unlink(shm_name);
free(users);
free(sub_process);
return 0;
}
#define _GNU_SOURCE 1
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<poll.h>
#include <fcntl.h>
//聊天室
//poll同时监听用户输入和网络连接,并用splice将用户输入定向到网络连接上实现零拷贝
#define BUF_SIZE 100
void error_handling(char *msg);
int main(int argc, char *argv[]){
int sock;
struct sockaddr_in serv_addr;
if (argc != 3)
{
printf("Usage : %s <IP> <port> \n", argv[0]);
exit(1);
}
sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
serv_addr.sin_port = htons(atoi(argv[2]));
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == -1)
error_handling("connect() error");
struct pollfd fds[2];
//fd=0 标准输入
fds[0].fd=0;
fds[0].events=POLLIN;
fds[0].revents=0;
//fd=sock 可读
//POLLRDHUP sock的一端关闭了连接,或者是写端关闭了连接,_GNU_SOURCE 可以用来判断链路是否发生异常
fds[1].fd=sock;
fds[1].events=POLLIN|POLLRDHUP;
fds[1].revents=0;
char buf[BUF_SIZE];
int pipefd[2];
int ret=pipe(pipefd);
if(ret==-1){
error_handling("pipe() error");
}
while(1){
ret=poll(fds,2,-1);
if(ret<0){
fprintf(stderr, "poll failure\n");
break;
}
//关闭一端连接
if(fds[1].revents & POLLRDHUP){
fprintf(stdout, "server close the connection\n");
break;
}
//数据接收
else if(fds[1].revents & POLLIN){
memset(buf, '\0', sizeof(BUF_SIZE));
recv(fds[1].fd, buf, BUF_SIZE-1, 0);
fprintf(stdout, "%s\n", buf);
}
//标准输入就绪,从标准输入重定向的网络连接
if(fds[0].revents & POLLIN){
ret=splice(0,NULL,pipefd[1],NULL,32768,SPLICE_F_MORE|SPLICE_F_MOVE);
ret=splice(pipefd[0],NULL,sock,NULL,32768,SPLICE_F_MORE|SPLICE_F_MOVE);
}
}
close(sock);
return 0;
}
void error_handling(char *msg)
{
fputs(msg, stderr);
fputc('\n', stderr);
exit(1);
}
进程间传递fd
#include<stdio.h>
#include<sys/socket.h>
#include<fcntl.h>
#include<unistd.h>
#include<assert.h>
#include<string.h>
#include<stdlib.h>
static const int CONTROL_LEN=CMSG_LEN(sizeof(int));
//发送文件描述符,fd参数是用来传递信息的UNIX域socket,
//fd_to_send参数是待发送的文件描述符
void send_fd(int fd,int fd_to_send){
struct iovec iov[1];
struct msghdr msg;
char buf[0];
iov[0].iov_base=buf;
iov[0].iov_len=1;
msg.msg_name=NULL; // 消息的协议地址
msg.msg_namelen=0;
msg.msg_iov=iov;//多io缓冲区的地址
msg.msg_iovlen=1;
//附属信息
struct cmsghdr cm;
cm.cmsg_len=CONTROL_LEN;
cm.cmsg_level=SOL_SOCKET; //原始协议级别
cm.cmsg_type=SCM_RIGHTS; //控制信息类型
//CMSG_DATA()返回一个指向cmsghdr 的数据部分的指针。
*(int *)CMSG_DATA(&cm)=fd_to_send;
msg.msg_control=&cm; //设置辅助数据 / * 辅助数据的地址 * /
msg.msg_controllen=CONTROL_LEN;
sendmsg(fd,&msg,0);
}
int recv_fd(int fd){
struct iovec iov[1];
struct msghdr msg;
char buf[0];
iov[0].iov_base=buf;
iov[0].iov_len=1;
msg.msg_name=NULL;
msg.msg_namelen=0;
msg.msg_iov=iov;
msg.msg_iovlen=1;
struct cmsghdr cm;
msg.msg_control=&cm; //设置辅助数据
msg.msg_controllen=CONTROL_LEN;
recvmsg(fd,&msg,0);
//CMSG_DATA()返回一个指向cmsghdr 的数据部分的指针。
int fd_to_read=*(int *)CMSG_DATA(&cm);
return fd_to_read;
}
int main(int argc, const char *argv[]){
int pipefd[2];
int fd_to_pass=0;
//创建父、子进程间的管道,文件描述符pipefd[0]和pipefd[1]都是UNIX域socket
int ret=socketpair(PF_UNIX,SOCK_STREAM,0,pipefd);
assert(ret!=-1);
pid_t pid=fork();
assert(pid>=0);
if(pid==0){
close(pipefd[0]);
fd_to_pass=open("test.txt",O_RDWR,0666);
//子进程通过管道将文件描述符发送到父进程。如果文件text.txt打开失败,则子进程将标准输入文件描述符发送到父进程
send_fd(pipefd[1],(fd_to_pass>0)?fd_to_pass:0);
char buf[1024];
memset(buf,0,1024);
read(fd_to_pass,buf,1024); //读取目标文件描述符,以验证其有效性
printf("I got fd %d and data %s\n",fd_to_pass,buf);
close(fd_to_pass);
}
return 0;
}
十四、多线程编程
信号量
十五、进程池与线程池
半同步/半异步进程池
设计模型——单例模式
#ifndef __PROCESS_POOL_H
#define __PROCESS_POOL_H
#include<stdio.h>
#include<stdlib.h>
#include<fcntl.h>
#include<string.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<assert.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<errno.h>
#include<sys/epoll.h>
#include<sys/wait.h>
#include<sys/stat.h>
#include<signal.h>
#include<stdbool.h>
//描述一个子进程的类,m_pid是目标子进程的PID,m_pipefd是
//父进程和子进程通信用的管道
class process{
public:
process():m_pid(-1){}
public:
pid_t m_pid;
int m_pipefd[2];
};
template <typename T>
class processpool{
private:
//将构造函数定义为私有的,因此我们只能通过后面的create静态函数来创建processpool实例
processpool(int listenfd,int process_number=8);
public:
//单例模式,以保证程序最多创建一个processpool实例,这是程序正确处理信号的必要条件
static processpool<T> *create(int listenfd,int process_number=8){
if(!m_instance){
m_instance=new processpool<T>(listenfd,process_number);
}
return m_instance;
}
~processpool(){
}
//启动进程池
void run();
private:
void setup_sig_pipe();
void run_child();
void run_parent();
private:
//static instance of process pool 单例模式
static processpool<T> *m_instance;
//每个子进程最多能处理的客户数量
static const int MAX_PROCESS_NUMBER=16;
//epoll最多能处理的事件数
static const int USER_PER_PROCESS=65536;
//number of total processes
static const int MAX_EVENT_NUMBER=10000;
//总进程数
int m_process_number;
//子进程序号,从0开始
int m_idx;
//当前的epfd
int m_epollfd;
//记录所有的subprocess
process *m_sub_process;
//监听socket
int m_listenfd;
//子进程停止与否
int m_stop;
};
//static成员变量需要在 类定义体外 进行初始化与定义
template <typename T>
processpool<T> *processpool<T>::m_instance=NULL;
//处理信号的管道
static int sig_pipefd[2];
static int setnonblocking(int fd){
int old_opt=fcntl(fd,F_GETFL);
int new_opt=old_opt|O_NONBLOCK;
fcntl(fd,F_SETFL,new_opt);
return old_opt;
}
static void addfd(int epollfd,int fd){
struct epoll_event event;
event.data.fd=fd;
event.events=EPOLLIN|EPOLLET;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
setnonblocking(fd);
}
static void removefd(int epollfd,int fd){
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,0);
close(fd);
}
static void sig_handler(int sig)
{
int save_errno = errno;
send(sig_pipefd[1], &sig, 1, 0);
errno = save_errno;
}
static void addsig(int sig,void(handler)(int),bool restart=true){
struct sigaction sa;
sa.sa_handler=handler;
if(!restart){
sa.sa_flags|=SA_RESTART;
}
sigfillset(&sa.sa_mask);
assert(sigaction(sig,&sa,NULL)!=-1);
}
template <typename T>
processpool<T>::processpool(int listenfd,int process_number)
:m_listenfd(listenfd),m_process_number(process_number),m_idx(-1),m_stop(false){
assert((process_number>0)&&(process_number<=MAX_PROCESS_NUMBER));
m_sub_process=new process[process_number];
assert(m_sub_process);
////创建process_number个子进程,并建立它们和父进程之间的管道
for(int i=0;i<process_number;i++){
int ret=socketpair(PF_UNIX,SOCK_STREAM,0,m_sub_process[i].m_pipefd);
assert(ret==0);
m_sub_process[i].m_pid=fork();
assert(m_sub_process[i].m_pid>=0);
if(m_sub_process[i].m_pid>0){ //父进程
close(m_sub_process[i].m_pipefd[1]);
continue;
}else{ //子进程
close(m_sub_process[i].m_pipefd[0]);
m_idx=i;
break;
}
}
}
//统一事件源
template <typename T>
void processpool<T>::setup_sig_pipe(){
//创建epoll事件监听表和信号管道
m_epollfd=epoll_create(5);
assert(m_epollfd!=-1);
int ret=socketpair(PF_UNIX,SOCK_STREAM,0,sig_pipefd);
assert(ret!=-1);
setnonblocking(sig_pipefd[1]);
addfd(m_epollfd,sig_pipefd[0]);
//设置信号处理函数
addsig(SIGCHLD,sig_handler);
addsig(SIGTERM,sig_handler);
addsig(SIGINT,sig_handler);
//根据信号的默认处理规则SIGPIPE信号的默认执行动作是terminate(终止、退出),所以client会退出。若不想客户端退出可以把SIGPIPE设为SIG_IGN
addsig(SIGPIPE,SIG_IGN);
}
//父进程中m_idx值为-1, 子进程中m_idx值大于等于0,
//我们据此判断下来要运行的是父进程代码还是子进程代码
template <typename T>
void processpool<T>::run(){
if(m_idx!=-1){
run_child();
return;
}
run_parent();
}
template <typename T>
void processpool<T>::run_child(){
setup_sig_pipe();
//每个子进程都通过其在进程池中的序号值m_idx找到与父进程通信的管道
int pipefd=m_sub_process[m_idx].m_pipefd[1];
//子进程需要监听管道文件描述符pipefd,
//因为父进程将通过它来通知子进程accept新连接
addfd(m_epollfd,pipefd);
epoll_event events[MAX_EVENT_NUMBER];
T *users=new T[USER_PER_PROCESS];
assert(users);
int number=0;
int ret=-1;
while(!m_stop){
number=epoll_wait(m_epollfd,events,MAX_PROCESS_NUMBER,-1);
if((number<0)&&(errno!=EINTR)){
printf("epoll failure\n");
break;
}
for(int i=0;i<number;i++){
int sockfd=events[i].data.fd;
if((sockfd==pipefd)&&events[i].events&EPOLLIN){
int client=0;
ret=recv(sockfd,(char*)&client,sizeof(client),0);
if((ret<0)&&(errno!=EAGAIN)){
continue;
}
else if(ret==0){
continue;
}else{
struct sockaddr_in clinet_address;
socklen_t clnt_addr_size=sizeof(clinet_address);
int connfd=accept(m_listenfd,(struct sockaddr*)&clinet_address,&clnt_addr_size);
if(connfd<0){
printf("errno is:%d \n",errno);
continue;
}
addfd(m_epollfd,connfd);
//模板类T必须实现init方法,
//以初始化一个客户连接。我们直接使用connfd来索引逻辑处理对象(T类型的对象),
//以提高效率
users[connfd].init(m_epollfd,connfd,clinet_address);
}
}
//下面处理子进程接收到的信号
else if((sockfd==sig_pipefd[0])&&(events[i].events&EPOLLIN)){
int sig;
char signals[1024];
ret=recv(sig_pipefd[0],signals,sizeof(signals),0);
if(ret<0){
continue;
}else{
for(int i=0;i<ret;i++){
switch(signals[i]){
case SIGCHLD:
pid_t pid;
int stat;
while((pid=waitpid(-1,&stat,WNOHANG))>0){
continue;
}
break;
case SIGTERM:
case SIGINT:
m_stop=true;
break;
default:
break;
}
}
}
}
//如果是其他可读数据,
//那么必然是客户请求到来。调用逻辑处理对象的process方法处理之一
else if(events[i].events&EPOLLIN){
users[sockfd].process();
}else{
continue;
}
}
}
delete users;
users=NULL;
close(pipefd);
close(m_epollfd);
}
template <typename T>
void processpool<T>::run_parent(){
setup_sig_pipe();
//父进程监听m_listenfd
addfd(m_epollfd,m_listenfd);
int new_conn=1;
int number=0;
int ret=-1;
epoll_event events[MAX_EVENT_NUMBER];
int sub_process_counter=0;
while(!m_stop){
number=epoll_wait(m_epollfd,events,MAX_EVENT_NUMBER,-1);
if((number<0)&&(errno!=EINTR)){
printf("epoll failuer\n");
break;
}
for(int i=0;i<number;i++){
int sockfd=events[i].data.fd;
if(sockfd==m_listenfd){
//如果有新连接到来,就采用Round
//Robin方式将其分配给一个子进程处理
int i=sub_process_counter;
do{
if(m_sub_process[i].m_pid!=-1){
break;
}
i=(i+1)%m_process_number;
}while(i!=sub_process_counter);
if(m_sub_process[i].m_pid==-1){
m_stop=true;
break;
}
sub_process_counter=(i+1)%m_process_number;
send(m_sub_process[i].m_pipefd[0],(char*)&new_conn,sizeof(new_conn),0);
printf("send request to child %d\n",i);
}
//下面处理父进程接收的信号
else if((sockfd==sig_pipefd[0])&&events[i].events&EPOLLIN){
int sig;
char signals[1024];
ret=recv(sig_pipefd[0],signals,sizeof(signals),0);
if(ret<=0){
continue;
}
else{
for(int i=0;i<ret;i++){
switch(signals[i]){
case SIGCHLD:
pid_t pid;
int stat;
while(pid=waitpid(-1,&stat,WNOHANG)>0){
for(int i=0;i<m_process_number;i++){
//如果进程池中第i个子进程退出了,
//则主进程关闭相应的通信管道,
//并设置相应的m_pid为-1,以标记该子进程已退出
if(m_sub_process[i].m_pid==pid){
printf("child %d join\n",i);
close(m_sub_process[i].m_pipefd[0]);
m_sub_process[i].m_pid=-1;
}
}
}
//如果所有子进程都已经退出了,则父进程也退出
m_stop=true;
for(int i=0;i<m_process_number;i++){
if(m_sub_process[i].m_pid!=-1){
m_stop=false;
}
}
break;
case SIGTERM:
case SIGINT:
//如果父进程接收到终止信号,那么就杀死所有子进程,并等待
//它们全部结束。
printf("kill all the child now\n");
for(int i=0;i<m_process_number;i++){
int pid=m_sub_process[i].m_pid;
if(pid!=-1){
kill(pid,SIGTERM);
}
}
break;
default:
break;
}
}
}
}else{
continue;
}
}
}
close(m_epollfd);
}
#endif
CGI服务器
#include<iostream>
#include<stdlib.h>
#include<fcntl.h>
#include<string.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<assert.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<errno.h>
#include<sys/epoll.h>
#include<sys/wait.h>
#include<sys/stat.h>
#include<signal.h>
#include "15_1_process_pool.h"
//用于处理客户端CGI请求的类,它可以作为processpool类的模板参数
class cgi_conn{
public:
cgi_conn(){}
~cgi_conn(){}
//初始化客户连接,清空读缓冲区
//init(m_epollfd,connfd,clinet_address);
void init(int epollfd,int sockfd,const sockaddr_in &client_addr){
m_epollfd=epollfd;
m_sockfd=sockfd;
m_address=client_addr;
memset(m_buf,'\0',BUFFER_SIZE);
m_read_idx=0;
}
void process(){
int idx=0;
int ret=-1;
while(1){
ret=recv(m_sockfd,m_buf+idx,BUFFER_SIZE-1,0);
if(ret<0){
if(errno!=EAGAIN){
removefd(m_epollfd,m_sockfd);
}
break;
}
//如果对方关闭连接,则服务器也关闭连接
else if(ret==0){
removefd(m_epollfd,m_sockfd);
break;
}
else{
m_read_idx+=ret;
fprintf(stdout,"user content is: %s\n",m_buf);
//如果遇到字符“\r\n”, 则开始处理客户请求
for(;idx<m_read_idx;idx++){
if((idx>=1)&&(m_buf[idx-1]=='\r')&&(m_buf[idx]=='\n')){
break;
}
}
//如果没有遇到字符“\r\n”,则需要读取更多客户数据
if(idx==m_read_idx){
continue;
}
m_buf[idx-1]='\0';
char *file_name=m_buf;
//判断文件是否存在
////判断客户要运行的CGI程序是否存在
if(access(file_name,F_OK)==-1){
removefd(m_epollfd,m_sockfd);
break;
}
//创建子进程来执行CGI程序
ret=fork();
if(ret==-1){
removefd(m_epollfd,m_sockfd);
break;
}else if(ret>0){
//父进程只需关闭连接
removefd(m_epollfd,m_sockfd);
}else{
//子进程将标准输出定向到m_sockfd, 并执行CGI程序
close(STDOUT_FILENO);
dup(m_sockfd);
//把当前进程替换为一个新进程
execl(m_buf,m_buf,(char*)0);
exit(0);
}
}
}
}
private:
static const int BUFFER_SIZE=1024;
static int m_epollfd;
int m_sockfd;
sockaddr_in m_address;
char m_buf[BUFFER_SIZE];
//标记读缓冲区中已经读入的客户数据的最后一个字节的下一个位置
int m_read_idx;
};
int cgi_conn::m_epollfd=-1;
int main(int argc, const char *argv[])
{
int serv_sock,clnt_sock;
struct sockaddr_in serv_addr,clnt_addr;
socklen_t clnt_addr_size;
if (argc != 2)
{
fprintf(stderr, "usage: %s <port>\n", argv[0]);
return 1;
}
serv_sock=socket(PF_INET,SOCK_STREAM,0);
memset(&serv_addr,0,sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_addr.sin_port=htons(atoi(argv[1]));
assert(serv_sock >= 0);
int ret = bind(serv_sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
assert(ret != -1);
ret = listen(serv_sock, 5);
assert(ret != -1);
processpool<cgi_conn> *pool=processpool<cgi_conn>::create(serv_sock);
if(pool){
pool->run();
delete pool;
}
close(serv_sock);//main 函数创建了文件描述符listenfd,那么就由它亲自关闭
return 0;
}
#define _GNU_SOURCE 1
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<poll.h>
#include<assert.h>
#include <fcntl.h>
#define BUFFER_SIZE 64
void error_handling(char *msg);
int main(int argc, char *argv[]){
int sock;
struct sockaddr_in serv_addr;
if (argc != 3)
{
printf("Usage : %s <IP> <port> \n", argv[0]);
exit(1);
}
sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
serv_addr.sin_port = htons(atoi(argv[2]));
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == -1){
printf("connection failed \n");
close(sock);
return 1;
}
char *buf=(char*)"cgi\r\n";
char recvbuf[BUFFER_SIZE];
send(sock,buf,sizeof(buf),0);
sleep(1);
int ret=recv(sock,recvbuf,BUFFER_SIZE,0);
for(int i=0;i<ret;i++){
printf("%c",recvbuf[i]);
}
printf("\n");
close(sock);
return 0;
}
#include<stdio.h>
int main(){
printf("cgi_hello");
return 0;
}