亮白风格-图解网络-小林coding-v2.0.pdf 暗黑风格-图解网络-小林coding-v2.0.pdf

select poll epoll 都是linux系统提供的 IO多路复用的函数

  1. #include<sys/select.h>
  2. #include<sys/poll.h>
  3. #include<sys/epoll.h>

1. 从操作系统角度看多路复用

点击查看【processon】

2. epoll与select, poll区别

  • epoll每次调用只会向多路复用器传入新需要关注的文件描述符
  • select和poll, 每次调用都会把所有需要关注的文件描述符都传进去, 非常浪费空间
  • select 入参是个数组, select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件;
  • poll 入参是个链表
  • select 和 poll 是水平触发, 只要有IO数据可读, 就会触发通知
  • epoll开辟了一个内存空间, 专门保存需要监听的文件描述符(红黑树)
  • epoll是边缘触发, 只有新的IO活动到来, 才会触发通知; 即使通道中有数据也不会再触发

举个例子:
第一轮轮询

  • 需要关注100个文件描述符fd, epoll传入100个, select和poll也是传入100个

第二轮轮询

  • 我只需要在原来的100个基础上再多关注1个, 那么第二次轮询
  • epoll只需要传那新的一个就行了;
  • 而select和poll函数需要传入101个文件描述符

image.png

3. epoll_wait函数

函数声明:

  1. int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)

该函数用于轮询I/O事件的发生;
参数:

  1. epfd :由epoll_create 生成的epoll专用的文件描述符;
  2. epoll_event:用于回传代处理事件的数组;
  3. maxevents:每次能处理的事件数;
  4. timeout:等待I/O事件发生的超时值(单位我也不太清楚); -1相当于阻塞,0相当于非阻塞。一般用 -1 即可返回发生事件数。

4. 用C实现服务端

启动:

  1. socket(); 创建一个文件描述符, 例如4
  2. bind(); 将这个文件描述符, 绑定到端口上, 例如4绑定到9090
  3. listen(); 开始监听9090端口的网络事件
  4. epoll_create(); 创建一个多路复用器的空间, 例如这个空间用文件描述符8表示; 相当于java中的 Selector.open()
  5. 5. epoll_ctl(8,EPOLL_CTL_ADD,4,{EPOLLIN,{u32=4,u64=12412342345234345345345}})=0; 把监听网络的文件描述符4, 注册到文件描述符8的空间中, 并标识监听 EPOLLIN 事件; 相当于java的server.register(selector, SelectorKey.OP_ACCEPT);

当新客户端连接发生的时候:

  1. epoll_wait(8,{{EPOLLIN,{u32=4,u64=4234234234234234}},4096,-1})=1 入参最后一个参数是timeout,-1是阻塞,0相当于非阻塞, 大于0的数相当于等待时间, 也就是说可以阻塞也可以非阻塞,正好对应了 java中的: select.select(0); 注意: epoll_wait()函数的入参中有个是指针, 函数会把发生事件的文件描述符存入那个指针的epoll_event结构体中 相当于java中的 select.select(0); 返回值=1 是告诉你有几个新的连接
  2. accept(4, .......) = 9 //9就是新拿到的文件描述符, 相当于java中的 (ServerSocketChannel)(key.channel()).accept(); 方法那么拿到的这个9就是新的连接, 分配到的文件描述符!!! 在java中相当于拿到了一个新的客户端的连接

客户端发送数据后:

  1. epoll_ctl(8, EPOLL_CTL_ADD, 9, {EPOLLIN, {....}}) = 0; 8是多路复用器的文件描述符, 9是新的客户端连接的文件描述符; 这个操作由java中的 client.register(selector, SelectionKey.OP_READ, buffer); 函数完成; 功能是: 把客户端的文件描述符9, 放入多路复用器8的空间中, 注册监听9的读取事件

  2. 这时候客户端发送数据; 服务端: epoll_wait(7, 接收文件描述符的结构体指针, -1) =1; 相当于java的, select.select(); ;那么C的函数,会把发生事件的文件描述符, 存入那个指针指向的结构体实例中; 我们就能从selector.selectedKeys()中找到那个客户端的文件描述符

  3. 然后服务端就可以读取socket缓冲区的数据了; C函数: read(8, ....); ; java中, 是直接读取到了buffer中

服务端发送数据:

  1. 服务端向客户端发送数据 C函数: write(8, ....);

5. 示例

  1. #include <netinet/in.h> // sockaddr_in
  2. #include <sys/types.h> // socket
  3. #include <sys/socket.h> // socket
  4. #include <arpa/inet.h>
  5. #include <unistd.h>
  6. #include <sys/epoll.h> // epoll
  7. #include <sys/ioctl.h>
  8. #include <sys/time.h>
  9. #include <iostream>
  10. #include <vector>
  11. #include <string>
  12. #include <cstdlib>
  13. #include <cstdio>
  14. #include <cstring>
  15. using namespace std;
  16. #define BUFFER_SIZE 1024
  17. #define EPOLLSIZE 100
  18. struct PACKET_HEAD
  19. {
  20. int length;
  21. };
  22. class Server
  23. {
  24. private:
  25. struct sockaddr_in server_addr;
  26. socklen_t server_addr_len;
  27. int listen_fd; // 监听的fd
  28. int epfd; // epoll fd
  29. struct epoll_event events[EPOLLSIZE]; // epoll_wait返回的就绪事件
  30. public:
  31. Server(int port);
  32. ~Server();
  33. void Bind();
  34. void Listen(int queue_len = 20);
  35. void Accept();
  36. void Run();
  37. void Recv(int fd);
  38. };
  39. Server::Server(int port)
  40. {
  41. bzero(&server_addr, sizeof(server_addr));
  42. server_addr.sin_family = AF_INET;
  43. server_addr.sin_addr.s_addr = htons(INADDR_ANY);
  44. server_addr.sin_port = htons(port);
  45. // create socket to listen
  46. listen_fd = socket(PF_INET, SOCK_STREAM, 0);
  47. if (listen_fd < 0)
  48. {
  49. cout << "Create Socket Failed!";
  50. exit(1);
  51. }
  52. int opt = 1;
  53. setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
  54. }
  55. Server::~Server()
  56. {
  57. close(epfd);
  58. }
  59. void Server::Bind()
  60. {
  61. if (-1 == (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr))))
  62. {
  63. cout << "Server Bind Failed!";
  64. exit(1);
  65. }
  66. cout << "Bind Successfully.\n";
  67. }
  68. void Server::Listen(int queue_len)
  69. {
  70. if (-1 == listen(listen_fd, queue_len))
  71. {
  72. cout << "Server Listen Failed!";
  73. exit(1);
  74. }
  75. cout << "Listen Successfully.\n";
  76. }
  77. void Server::Accept()
  78. {
  79. struct sockaddr_in client_addr;
  80. socklen_t client_addr_len = sizeof(client_addr);
  81. int new_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len);
  82. if (new_fd < 0)
  83. {
  84. cout << "Server Accept Failed!";
  85. exit(1);
  86. }
  87. cout << "new connection was accepted.\n";
  88. // 在epfd中注册新建立的连接
  89. struct epoll_event event;
  90. event.data.fd = new_fd;
  91. event.events = EPOLLIN;
  92. epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &event);
  93. }
  94. void Server::Run()
  95. {
  96. epfd = epoll_create(1); // 创建epoll句柄
  97. struct epoll_event event;
  98. event.data.fd = listen_fd;
  99. event.events = EPOLLIN;
  100. epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &event); // 注册listen_fd
  101. while (1)
  102. {
  103. int nums = epoll_wait(epfd, events, EPOLLSIZE, -1);
  104. if (nums < 0)
  105. {
  106. cout << "poll() error!";
  107. exit(1);
  108. }
  109. if (nums == 0)
  110. {
  111. continue;
  112. }
  113. for (int i = 0; i < nums; ++i) // 遍历所有就绪事件
  114. {
  115. int fd = events[i].data.fd;
  116. if ((fd == listen_fd) && (events[i].events & EPOLLIN))
  117. Accept(); // 有新的客户端请求
  118. else if (events[i].events & EPOLLIN)
  119. Recv(fd); // 读数据
  120. else
  121. ;
  122. }
  123. }
  124. }
  125. void Server::Recv(int fd)
  126. {
  127. bool close_conn = false; // 标记当前连接是否断开了
  128. PACKET_HEAD head;
  129. recv(fd, &head, sizeof(head), 0); // 先接受包头,即数据总长度
  130. char *buffer = new char[head.length];
  131. bzero(buffer, head.length);
  132. int total = 0;
  133. while (total < head.length)
  134. {
  135. int len = recv(fd, buffer + total, head.length - total, 0);
  136. if (len < 0)
  137. {
  138. cout << "recv() error!";
  139. close_conn = true;
  140. break;
  141. }
  142. total = total + len;
  143. }
  144. if (total == head.length) // 将收到的消息原样发回给客户端
  145. {
  146. int ret1 = send(fd, &head, sizeof(head), 0);
  147. int ret2 = send(fd, buffer, head.length, 0);
  148. if (ret1 < 0 || ret2 < 0)
  149. {
  150. cout << "send() error!";
  151. close_conn = true;
  152. }
  153. }
  154. delete buffer;
  155. if (close_conn) // 当前这个连接有问题,关闭它
  156. {
  157. close(fd);
  158. struct epoll_event event;
  159. event.data.fd = fd;
  160. event.events = EPOLLIN;
  161. epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &event); // Delete一个fd
  162. }
  163. }
  164. int main()
  165. {
  166. Server server(15000);
  167. server.Bind();
  168. server.Listen();
  169. server.Run();
  170. return 0;
  171. }

这有几个文章不错 https://www.cnblogs.com/apprentice89/p/3234677.html https://www.cnblogs.com/apprentice89/archive/2013/05/06/3063039.html https://zhuanlan.zhihu.com/p/422229233 https://gcmurphy.wordpress.com/2012/11/30/using-epoll-in-go/