本章将带你学习Linux下最高效的异步IO事件处理函数epoll的使用。

6-1 epoll基本知识

图片.png
图片.png
图片.png
图片.png
图片.png
图片.png
图片.png

6-2 epoll高性能服务器的实现

tcp_server_epoll.c

  1. #include <stdio.h>
  2. #include <unistd.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <fcntl.h>
  6. #include <errno.h>
  7. #include <sys/epoll.h>
  8. #include <sys/types.h>
  9. #include <sys/socket.h>
  10. #include <netinet/in.h>
  11. #define PORT 8888
  12. #define FD_SIZE 20
  13. #define MAX_EVENTS 20
  14. #define TIME_OUT 500
  15. #define MESSAGE_SIZE 1024
  16. int main(){
  17. int ret = -1;
  18. int socket_fd = -1;
  19. int accept_fd = -1;
  20. int flags = 1;
  21. int backlog = 10;
  22. struct sockaddr_in local_addr,remote_addr;
  23. struct epoll_event ev, events[FD_SIZE];
  24. int epoll_fd = -1;
  25. int event_number = 0;
  26. //creat a tcp socket
  27. socket_fd = socket(AF_INET, SOCK_STREAM, 0);
  28. if ( socket_fd == -1 ){
  29. perror("create socket error");
  30. exit(1);
  31. }
  32. //set REUSERADDR
  33. ret = setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, (char *)&flags, sizeof(flags));
  34. if ( ret == -1 ){
  35. perror("setsockopt error");
  36. }
  37. //set NONBLOCK
  38. flags = fcntl(socket_fd, F_GETFL, 0);
  39. fcntl(socket_fd, F_SETFL, flags|O_NONBLOCK);
  40. //set address
  41. local_addr.sin_family = AF_INET;
  42. local_addr.sin_port = htons(PORT);
  43. local_addr.sin_addr.s_addr = INADDR_ANY;
  44. bzero(&(local_addr.sin_zero),8);
  45. //bind addr
  46. ret = bind(socket_fd, (struct sockaddr *)&local_addr, sizeof(struct sockaddr_in));
  47. if( ret == -1 ) {
  48. perror("bind error");
  49. exit(1);
  50. }
  51. if (listen(socket_fd, backlog) == -1 ){
  52. perror("listen error");
  53. exit(1);
  54. }
  55. //create epoll
  56. epoll_fd = epoll_create(256);//the size argument is ignored
  57. ev.data.fd=socket_fd;
  58. ev.events=EPOLLIN;
  59. epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_fd, &ev); //将socket_fd 添加到epoll中
  60. for(;;){
  61. //events 表示一共有多少事件被侦听
  62. //MAX_EVENTS 表示在events个事件中,本次调用最多能返回多少个被解发的事件
  63. //TIME_OUT 表示本次调用最多等多长时间
  64. //event_number 表示本次调用真正有多少事件被解发
  65. event_number = epoll_wait(epoll_fd, events, MAX_EVENTS, TIME_OUT);
  66. for(int i=0; i < event_number; i++){
  67. if(events[i].data.fd == socket_fd){ // 如果是侦听端口的事件
  68. printf("listen event... \n");
  69. int addr_len = sizeof( struct sockaddr_in );
  70. accept_fd = accept(socket_fd, (struct sockaddr *)&remote_addr, &addr_len);
  71. //将新创建的socket设置为 NONBLOCK 模式
  72. flags = fcntl(accept_fd, F_GETFL, 0);
  73. fcntl(accept_fd, F_SETFL, flags|O_NONBLOCK);
  74. ev.data.fd=accept_fd;
  75. ev.events=EPOLLIN | EPOLLET;
  76. epoll_ctl(epoll_fd, EPOLL_CTL_ADD, accept_fd, &ev);
  77. printf("new accept fd:%d\n",accept_fd);
  78. } else if(events[i].events & EPOLLIN){
  79. //printf("accept event :%d\n",i);
  80. char in_buf[MESSAGE_SIZE];
  81. memset(in_buf, 0, MESSAGE_SIZE);
  82. //receive data
  83. ret = recv( events[i].data.fd, &in_buf, MESSAGE_SIZE, 0 );
  84. if(ret == MESSAGE_SIZE ){
  85. printf("maybe have data....");
  86. }
  87. if(ret <= 0){
  88. switch (errno){
  89. case EAGAIN: //说明暂时已经没有数据了,要等通知
  90. break;
  91. case EINTR: //被终断了,再来一次
  92. printf("recv EINTR... \n");
  93. ret = recv(events[i].data.fd, &in_buf, MESSAGE_SIZE, 0);
  94. break;
  95. default:
  96. printf("the client is closed, fd:%d\n", events[i].data.fd);
  97. epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
  98. close(events[i].data.fd);
  99. ;
  100. }
  101. }
  102. printf(">>>receive message:%s\n", in_buf);
  103. send(events[i].data.fd, &in_buf, ret, 0);
  104. }
  105. }
  106. }
  107. return 0;
  108. }

6-3 epoll+fork进行性能优化

tcp_server_epoll_fork.c

  1. #include <stdio.h>
  2. #include <unistd.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <fcntl.h>
  6. #include <errno.h>
  7. #include <sys/epoll.h>
  8. #include <sys/types.h>
  9. #include <sys/wait.h>
  10. #include <sys/socket.h>
  11. #include <netinet/in.h>
  12. #define PORT 8111
  13. #define FD_SIZE 20
  14. #define MAX_EVENTS 20
  15. #define TIME_OUT 500
  16. #define MESSAGE_SIZE 1024
  17. //一般是系统CPU核数*2 +1
  18. #define NB_PROCESS 4
  19. int main(){
  20. int ret = -1;
  21. int socket_fd = -1;
  22. int accept_fd = -1;
  23. int flags = 1;
  24. int backlog = 10;
  25. struct sockaddr_in local_addr,remote_addr;
  26. struct epoll_event ev, events[FD_SIZE];
  27. int epoll_fd = -1;
  28. int event_number = 0;
  29. int pid;
  30. int status;
  31. int max_subprocess = NB_PROCESS;
  32. //creat a tcp socket
  33. socket_fd = socket(AF_INET, SOCK_STREAM, 0);
  34. if ( socket_fd == -1 ){
  35. perror("create socket error");
  36. exit(1);
  37. }
  38. //set REUSERADDR
  39. ret = setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, (char *)&flags, sizeof(flags));
  40. if ( ret == -1 ){
  41. perror("setsockopt error");
  42. }
  43. //set NONBLOCK
  44. flags = fcntl(socket_fd, F_GETFL, 0);
  45. fcntl(socket_fd, F_SETFL, flags|O_NONBLOCK);
  46. //set address
  47. local_addr.sin_family = AF_INET;
  48. local_addr.sin_port = htons(PORT);
  49. local_addr.sin_addr.s_addr = INADDR_ANY;
  50. bzero(&(local_addr.sin_zero),8);
  51. //bind addr
  52. ret = bind(socket_fd, (struct sockaddr *)&local_addr, sizeof(struct sockaddr_in));
  53. if( ret == -1 ) {
  54. perror("bind error");
  55. exit(1);
  56. }
  57. if (listen(socket_fd, backlog) == -1 ){
  58. perror("listen error");
  59. exit(1);
  60. }
  61. //fork some subprocess
  62. for(int a=0; a < max_subprocess; a++){
  63. if(pid !=0){
  64. pid = fork();
  65. }
  66. }
  67. //child process
  68. if(pid == 0) {
  69. printf("create an new child process...");
  70. //create epoll
  71. epoll_fd = epoll_create(256);//the size argument is ignored
  72. ev.data.fd=socket_fd;
  73. ev.events=EPOLLIN;
  74. epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_fd, &ev); //将socket_fd 添加到epoll中
  75. for(;;){
  76. //events 表示一共有多少事件被侦听
  77. //MAX_EVENTS 表示在events个事件中,本次调用最多能返回多少个被解发的事件
  78. //TIME_OUT 表示本次调用最多等多长时间
  79. //event_number 表示本次调用真正有多少事件被解发
  80. event_number = epoll_wait(epoll_fd, events, MAX_EVENTS, TIME_OUT);
  81. for(int i=0; i < event_number; i++){
  82. if(events[i].data.fd == socket_fd){ // 如果是侦听端口的事件
  83. printf("listen event... \n");
  84. int addr_len = sizeof( struct sockaddr_in );
  85. accept_fd = accept(socket_fd, (struct sockaddr *)&remote_addr, &addr_len);
  86. //将新创建的socket设置为 NONBLOCK 模式
  87. flags = fcntl(accept_fd, F_GETFL, 0);
  88. fcntl(accept_fd, F_SETFL, flags|O_NONBLOCK);
  89. ev.data.fd=accept_fd;
  90. ev.events=EPOLLIN | EPOLLET;
  91. epoll_ctl(epoll_fd, EPOLL_CTL_ADD, accept_fd, &ev);
  92. printf("new accept fd:%d\n",accept_fd);
  93. } else if(events[i].events & EPOLLIN){
  94. //printf("accept event :%d\n",i);
  95. char in_buf[MESSAGE_SIZE];
  96. memset(in_buf, 0, MESSAGE_SIZE);
  97. //receive data
  98. ret = recv( events[i].data.fd, &in_buf, MESSAGE_SIZE, 0 );
  99. if(ret == MESSAGE_SIZE ){
  100. printf("maybe have data....");
  101. }
  102. if(ret <= 0){
  103. switch (errno){
  104. case EAGAIN:
  105. ret = recv(events[i].data.fd, &in_buf, MESSAGE_SIZE, 0);
  106. break;
  107. case EINTR:
  108. printf("recv EINTR... \n");
  109. break;
  110. default:
  111. printf("the client is closed, fd:%d\n", events[i].data.fd);
  112. epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
  113. close(events[i].data.fd);
  114. ;
  115. }
  116. }
  117. printf(">>>receive message:%s\n", in_buf);
  118. send(events[i].data.fd, &in_buf, ret, 0);
  119. }
  120. }
  121. }
  122. }else {// pid == 0
  123. //wait child process to quit
  124. wait(&status);
  125. }
  126. return 0;
  127. }

epoll在非阻塞的情况下,还是会有惊群现象的。每当有信息过来,总会有一些进程被惊起,抢占消息来处理,其他的进程就做无用功。