3.2 epoll的反应堆模式实现

epoll反应堆模式的实现-也就是libevent的实现原理。

  1. #include <stdlib.h>
  2. #include <stdio.h>
  3. #include <stdio.h>
  4. #include <sys/socket.h>
  5. #include <sys/epoll.h>
  6. #include <arpa/inet.h>
  7. #include <fcntl.h>
  8. #include <unistd.h>
  9. #include <errno.h>
  10. #include <string.h>
  11. #include <time.h>
  12. #define MAX_EVENTS 1024
  13. #define BUFLEN 128
  14. #define SERV_PORT 8080
  15. /*
  16. * status:1表示在监听事件中,0表示不在
  17. * last_active:记录最后一次响应时间,做超时处理
  18. */
  19. struct myevent_s {
  20. int fd; //cfd listenfd
  21. int events; //EPOLLIN EPLLOUT
  22. void *arg; //指向自己结构体指针
  23. void (*call_back)(int fd, int events, void *arg);
  24. int status;
  25. char buf[BUFLEN];
  26. int len;
  27. long last_active;
  28. };
  29. int g_efd; /* epoll_create返回的句柄 */
  30. struct myevent_s g_events[MAX_EVENTS+1]; /* +1 最后一个用于 listen fd */
  31. void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg)
  32. {
  33. ev->fd = fd;
  34. ev->call_back = call_back;
  35. ev->events = 0;
  36. ev->arg = arg;
  37. ev->status = 0;
  38. //memset(ev->buf, 0, sizeof(ev->buf));
  39. //ev->len = 0;
  40. ev->last_active = time(NULL);
  41. return;
  42. }
  43. void recvdata(int fd, int events, void *arg);
  44. void senddata(int fd, int events, void *arg);
  45. void eventadd(int efd, int events, struct myevent_s *ev)
  46. {
  47. struct epoll_event epv = {0, {0}};
  48. int op;
  49. epv.data.ptr = ev;
  50. epv.events = ev->events = events;
  51. if (ev->status == 1) {
  52. op = EPOLL_CTL_MOD;
  53. }
  54. else {
  55. op = EPOLL_CTL_ADD;
  56. ev->status = 1;
  57. }
  58. if (epoll_ctl(efd, op, ev->fd, &epv) < 0)
  59. printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
  60. else
  61. printf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events);
  62. return;
  63. }
  64. void eventdel(int efd, struct myevent_s *ev)
  65. {
  66. struct epoll_event epv = {0, {0}};
  67. if (ev->status != 1)
  68. return;
  69. epv.data.ptr = ev;
  70. ev->status = 0;
  71. epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv);
  72. return;
  73. }
  74. void acceptconn(int lfd, int events, void *arg)
  75. {
  76. struct sockaddr_in cin;
  77. socklen_t len = sizeof(cin);
  78. int cfd, i;
  79. if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) {
  80. if (errno != EAGAIN && errno != EINTR) {
  81. /* 暂时不做出错处理 */
  82. }
  83. printf("%s: accept, %s\n", __func__, strerror(errno));
  84. return;
  85. }
  86. do {
  87. for (i = 0; i < MAX_EVENTS; i++) {
  88. if (g_events[i].status == 0)
  89. break;
  90. }
  91. if (i == MAX_EVENTS) {
  92. printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS);
  93. break;
  94. }
  95. int flag = 0;
  96. if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0)
  97. {
  98. printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno));
  99. break;
  100. }
  101. eventset(&g_events[i], cfd, recvdata, &g_events[i]);
  102. eventadd(g_efd, EPOLLIN, &g_events[i]);
  103. } while(0);
  104. printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i);
  105. return;
  106. }
  107. void recvdata(int fd, int events, void *arg)
  108. {
  109. struct myevent_s *ev = (struct myevent_s *)arg;
  110. int len;
  111. len = recv(fd, ev->buf, sizeof(ev->buf), 0);
  112. eventdel(g_efd, ev);
  113. if (len > 0) {
  114. ev->len = len;
  115. ev->buf[len] = '\0';
  116. printf("C[%d]:%s\n", fd, ev->buf);
  117. /* 转换为发送事件 */
  118. eventset(ev, fd, senddata, ev);
  119. eventadd(g_efd, EPOLLOUT, ev);
  120. }
  121. else if (len == 0) {
  122. close(ev->fd);
  123. /* ev-g_events 地址相减得到偏移元素位置 */
  124. printf("[fd=%d] pos[%d], closed\n", fd, (int)(ev - g_events));
  125. }
  126. else {
  127. close(ev->fd);
  128. printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
  129. }
  130. return;
  131. }
  132. void senddata(int fd, int events, void *arg)
  133. {
  134. struct myevent_s *ev = (struct myevent_s *)arg;
  135. int len;
  136. len = send(fd, ev->buf, ev->len, 0);
  137. //printf("fd=%d\tev->buf=%s\ttev->len=%d\n", fd, ev->buf, ev->len);
  138. //printf("send len = %d\n", len);
  139. eventdel(g_efd, ev);
  140. if (len > 0) {
  141. printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf);
  142. eventset(ev, fd, recvdata, ev);
  143. eventadd(g_efd, EPOLLIN, ev);
  144. }
  145. else {
  146. close(ev->fd);
  147. printf("send[fd=%d] error %s\n", fd, strerror(errno));
  148. }
  149. return;
  150. }
  151. void initlistensocket(int efd, short port)
  152. {
  153. int lfd = socket(AF_INET, SOCK_STREAM, 0);
  154. fcntl(lfd, F_SETFL, O_NONBLOCK);
  155. eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]);
  156. eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);
  157. struct sockaddr_in sin;
  158. memset(&sin, 0, sizeof(sin));
  159. sin.sin_family = AF_INET;
  160. sin.sin_addr.s_addr = INADDR_ANY;
  161. sin.sin_port = htons(port);
  162. bind(lfd, (struct sockaddr *)&sin, sizeof(sin));
  163. listen(lfd, 20);
  164. return;
  165. }
  166. int main(int argc, char *argv[])
  167. {
  168. unsigned short port = SERV_PORT;
  169. if (argc == 2)
  170. port = atoi(argv[1]);
  171. g_efd = epoll_create(MAX_EVENTS+1);
  172. if (g_efd <= 0)
  173. printf("create efd in %s err %s\n", __func__, strerror(errno));
  174. initlistensocket(g_efd, port);
  175. /* 事件循环 */
  176. struct epoll_event events[MAX_EVENTS+1];
  177. printf("server running:port[%d]\n", port);
  178. int checkpos = 0, i;
  179. while (1) {
  180. /* 超时验证,每次测试100个链接,不测试listenfd 当客户端60秒内没有和服务器通信,则关闭此客户端链接 */
  181. long now = time(NULL);
  182. for (i = 0; i < 100; i++, checkpos++) {
  183. if (checkpos == MAX_EVENTS)
  184. checkpos = 0;
  185. if (g_events[checkpos].status != 1)
  186. continue;
  187. long duration = now - g_events[checkpos].last_active;
  188. if (duration >= 60) {
  189. close(g_events[checkpos].fd);
  190. printf("[fd=%d] timeout\n", g_events[checkpos].fd);
  191. eventdel(g_efd, &g_events[checkpos]);
  192. }
  193. }
  194. /* 等待事件发生 */
  195. int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000);
  196. if (nfd < 0) {
  197. printf("epoll_wait error, exit\n");
  198. break;
  199. }
  200. for (i = 0; i < nfd; i++) {
  201. struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr;
  202. if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {
  203. ev->call_back(ev->fd, events[i].events, ev->arg);
  204. }
  205. if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {
  206. ev->call_back(ev->fd, events[i].events, ev->arg);
  207. }
  208. }
  209. }
  210. /* 退出前释放所有资源 */
  211. return 0;
  212. }