4) 事件触发event_loop

接下来我们要尝试添加多路IO的处理机制,当然linux的平台下, 最优的选择就是使用epoll来做,但是用原生的epoll实际上编程起来扩展性不是很强,那么我们就需要封装一套IO事件处理机制。

4.1 io_event基于IO事件封装

我们首先定义一个IO事件类来包括一个时间需要拥有的基本成员信息.

lars_reactor/include/event_base.h

  1. #pragma once
  2. /*
  3. * 定义一些IO复用机制或者其他异常触发机制的事件封装
  4. *
  5. * */
  6. class event_loop;
  7. //IO事件触发的回调函数
  8. typedef void io_callback(event_loop *loop, int fd, void *args);
  9. /*
  10. * 封装一次IO触发实现
  11. * */
  12. struct io_event
  13. {
  14. io_event():read_callback(NULL),write_callback(NULL),rcb_args(NULL),wcb_args(NULL) {}
  15. int mask; //EPOLLIN EPOLLOUT
  16. io_callback *read_callback; //EPOLLIN事件 触发的回调
  17. io_callback *write_callback;//EPOLLOUT事件 触发的回调
  18. void *rcb_args; //read_callback的回调函数参数
  19. void *wcb_args; //write_callback的回调函数参数
  20. };
  1. 一个`io_event`对象应该包含 一个epoll的事件标识`EPOLLIN/EPOLLOUT`,和对应事件的处理函数`read_callback`,`write_callback`。他们都应该是`io_callback`类型。然后对应的函数形参。

4.2 event_loop事件循环处理机制

  1. 接下来我们就要通过event_loop类来实现io_event的基本增删操作,放在原生的`epoll`堆中。

lars_reactor/include/event_loop.h

  1. #pragma once
  2. /*
  3. *
  4. * event_loop事件处理机制
  5. *
  6. * */
  7. #include <sys/epoll.h>
  8. #include <ext/hash_map>
  9. #include <ext/hash_set>
  10. #include "event_base.h"
  11. #define MAXEVENTS 10
  12. // map: fd->io_event
  13. typedef __gnu_cxx::hash_map<int, io_event> io_event_map;
  14. //定义指向上面map类型的迭代器
  15. typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;
  16. //全部正在监听的fd集合
  17. typedef __gnu_cxx::hash_set<int> listen_fd_set;
  18. class event_loop
  19. {
  20. public:
  21. //构造,初始化epoll堆
  22. event_loop();
  23. //阻塞循环处理事件
  24. void event_process();
  25. //添加一个io事件到loop中
  26. void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);
  27. //删除一个io事件从loop中
  28. void del_io_event(int fd);
  29. //删除一个io事件的EPOLLIN/EPOLLOUT
  30. void del_io_event(int fd, int mask);
  31. private:
  32. int _epfd; //epoll fd
  33. //当前event_loop 监控的fd和对应事件的关系
  34. io_event_map _io_evs;
  35. //当前event_loop 一共哪些fd在监听
  36. listen_fd_set listen_fds;
  37. //一次性最大处理的事件
  38. struct epoll_event _fired_evs[MAXEVENTS];
  39. };

属性:

_epfd:是epoll原生堆的fd。

_io_evs:是一个hash_map对象,主要是方便我们管理fd<—>io_event的对应关系,方便我们来查找和处理。

_listen_fds:记录目前一共有多少个fd正在本我们的event_loop机制所监控.

_fried_evs:已经通过epoll_wait返回的被激活需要上层处理的fd集合.

方法:

event_loop():构造函数,主要初始化epoll.

event_process():永久阻塞,等待触发的事件,去调用对应的函数callback方法。

add_io_event():绑定一个fd和一个io_event的关系,并添加对应的事件到event_loop中。

del_io_event():从event_loop删除该事件。

  1. 具体实现方法如下:

lars_reactor/src/event_loop.cpp

  1. #include "event_loop.h"
  2. #include <assert.h>
  3. //构造,初始化epoll堆
  4. event_loop::event_loop()
  5. {
  6. //flag=0 等价于epll_craete
  7. _epfd = epoll_create1(0);
  8. if (_epfd == -1) {
  9. fprintf(stderr, "epoll_create error\n");
  10. exit(1);
  11. }
  12. }
  13. //阻塞循环处理事件
  14. void event_loop::event_process()
  15. {
  16. while (true) {
  17. io_event_map_it ev_it;
  18. int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10);
  19. for (int i = 0; i < nfds; i++) {
  20. //通过触发的fd找到对应的绑定事件
  21. ev_it = _io_evs.find(_fired_evs[i].data.fd);
  22. assert(ev_it != _io_evs.end());
  23. io_event *ev = &(ev_it->second);
  24. if (_fired_evs[i].events & EPOLLIN) {
  25. //读事件,掉读回调函数
  26. void *args = ev->rcb_args;
  27. ev->read_callback(this, _fired_evs[i].data.fd, args);
  28. }
  29. else if (_fired_evs[i].events & EPOLLOUT) {
  30. //写事件,掉写回调函数
  31. void *args = ev->wcb_args;
  32. ev->write_callback(this, _fired_evs[i].data.fd, args);
  33. }
  34. else if (_fired_evs[i].events &(EPOLLHUP|EPOLLERR)) {
  35. //水平触发未处理,可能会出现HUP事件,正常处理读写,没有则清空
  36. if (ev->read_callback != NULL) {
  37. void *args = ev->rcb_args;
  38. ev->read_callback(this, _fired_evs[i].data.fd, args);
  39. }
  40. else if (ev->write_callback != NULL) {
  41. void *args = ev->wcb_args;
  42. ev->write_callback(this, _fired_evs[i].data.fd, args);
  43. }
  44. else {
  45. //删除
  46. fprintf(stderr, "fd %d get error, delete it from epoll\n", _fired_evs[i].data.fd);
  47. this->del_io_event(_fired_evs[i].data.fd);
  48. }
  49. }
  50. }
  51. }
  52. }
  53. /*
  54. * 这里我们处理的事件机制是
  55. * 如果EPOLLIN 在mask中, EPOLLOUT就不允许在mask中
  56. * 如果EPOLLOUT 在mask中, EPOLLIN就不允许在mask中
  57. * 如果想注册EPOLLIN|EPOLLOUT的事件, 那么就调用add_io_event() 方法两次来注册。
  58. * */
  59. //添加一个io事件到loop中
  60. void event_loop::add_io_event(int fd, io_callback *proc, int mask, void *args)
  61. {
  62. int final_mask;
  63. int op;
  64. //1 找到当前fd是否已经有事件
  65. io_event_map_it it = _io_evs.find(fd);
  66. if (it == _io_evs.end()) {
  67. //2 如果没有操作动作就是ADD
  68. //没有找到
  69. final_mask = mask;
  70. op = EPOLL_CTL_ADD;
  71. }
  72. else {
  73. //3 如果有操作董酒是MOD
  74. //添加事件标识位
  75. final_mask = it->second.mask | mask;
  76. op = EPOLL_CTL_MOD;
  77. }
  78. //4 注册回调函数
  79. if (mask & EPOLLIN) {
  80. //读事件回调函数注册
  81. _io_evs[fd].read_callback = proc;
  82. _io_evs[fd].rcb_args = args;
  83. }
  84. else if (mask & EPOLLOUT) {
  85. _io_evs[fd].write_callback = proc;
  86. _io_evs[fd].wcb_args = args;
  87. }
  88. //5 epoll_ctl添加到epoll堆里
  89. _io_evs[fd].mask = final_mask;
  90. //创建原生epoll事件
  91. struct epoll_event event;
  92. event.events = final_mask;
  93. event.data.fd = fd;
  94. if (epoll_ctl(_epfd, op, fd, &event) == -1) {
  95. fprintf(stderr, "epoll ctl %d error\n", fd);
  96. return;
  97. }
  98. //6 将fd添加到监听集合中
  99. listen_fds.insert(fd);
  100. }
  101. //删除一个io事件从loop中
  102. void event_loop::del_io_event(int fd)
  103. {
  104. //将事件从_io_evs删除
  105. _io_evs.erase(fd);
  106. //将fd从监听集合中删除
  107. listen_fds.erase(fd);
  108. //将fd从epoll堆删除
  109. epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
  110. }
  111. //删除一个io事件的EPOLLIN/EPOLLOUT
  112. void event_loop::del_io_event(int fd, int mask)
  113. {
  114. //如果没有该事件,直接返回
  115. io_event_map_it it = _io_evs.find(fd);
  116. if (it == _io_evs.end()) {
  117. return ;
  118. }
  119. int &o_mask = it->second.mask;
  120. //修正mask
  121. o_mask = o_mask & (~mask);
  122. if (o_mask == 0) {
  123. //如果修正之后 mask为0,则删除
  124. this->del_io_event(fd);
  125. }
  126. else {
  127. //如果修正之后,mask非0,则修改
  128. struct epoll_event event;
  129. event.events = o_mask;
  130. event.data.fd = fd;
  131. epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &event);
  132. }
  133. }
  1. 这里`del_io_event`提供两个重载,一个是直接删除事件,一个是修正事件。

4.3 Reactor集成event_loop机制

  1. 好了,那么接下来,就让让Lars Reactor框架集成`event_loop`机制。

首先简单修正一个tcp_server.cpp文件,对之前的do_accept()的调度时机做一下修正。

  1. 1. `tcp_server`成员新增`event_loop`成员。

lars_reactor/include/tcp_server.h

  1. #pragma once
  2. #include <netinet/in.h>
  3. #include "event_loop.h"
  4. class tcp_server
  5. {
  6. public:
  7. //server的构造函数
  8. tcp_server(event_loop* loop, const char *ip, uint16_t port);
  9. //开始提供创建链接服务
  10. void do_accept();
  11. //链接对象释放的析构
  12. ~tcp_server();
  13. private:
  14. int _sockfd; //套接字
  15. struct sockaddr_in _connaddr; //客户端链接地址
  16. socklen_t _addrlen; //客户端链接地址长度
  17. // ============= 新增 ======================
  18. //event_loop epoll事件机制
  19. event_loop* _loop;
  20. // ============= 新增 ======================
  21. };
  1. 构造函数在创建完listen fd之后,添加accept事件。

lars_reactor/src/tcp_server.cpp

  1. //listen fd 客户端有新链接请求过来的回调函数
  2. void accept_callback(event_loop *loop, int fd, void *args)
  3. {
  4. tcp_server *server = (tcp_server*)args;
  5. server->do_accept();
  6. }
  7. //server的构造函数
  8. tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port)
  9. {
  10. bzero(&_connaddr, sizeof(_connaddr));
  11. //忽略一些信号 SIGHUP, SIGPIPE
  12. //SIGPIPE:如果客户端关闭,服务端再次write就会产生
  13. //SIGHUP:如果terminal关闭,会给当前进程发送该信号
  14. if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {
  15. fprintf(stderr, "signal ignore SIGHUP\n");
  16. }
  17. if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
  18. fprintf(stderr, "signal ignore SIGPIPE\n");
  19. }
  20. //1. 创建socket
  21. _sockfd = socket(AF_INET, SOCK_STREAM /*| SOCK_NONBLOCK*/ | SOCK_CLOEXEC, IPPROTO_TCP);
  22. if (_sockfd == -1) {
  23. fprintf(stderr, "tcp_server::socket()\n");
  24. exit(1);
  25. }
  26. //2 初始化地址
  27. struct sockaddr_in server_addr;
  28. bzero(&server_addr, sizeof(server_addr));
  29. server_addr.sin_family = AF_INET;
  30. inet_aton(ip, &server_addr.sin_addr);
  31. server_addr.sin_port = htons(port);
  32. //2-1可以多次监听,设置REUSE属性
  33. int op = 1;
  34. if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op)) < 0) {
  35. fprintf(stderr, "setsocketopt SO_REUSEADDR\n");
  36. }
  37. //3 绑定端口
  38. if (bind(_sockfd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
  39. fprintf(stderr, "bind error\n");
  40. exit(1);
  41. }
  42. //4 监听ip端口
  43. if (listen(_sockfd, 500) == -1) {
  44. fprintf(stderr, "listen error\n");
  45. exit(1);
  46. }
  47. // ============= 新增 ======================
  48. //5 将_sockfd添加到event_loop中
  49. _loop = loop;
  50. //6 注册_socket读事件-->accept处理
  51. _loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);
  52. // ============= 新增 ======================
  53. }
  1. 修改do_accept()方法

lars_reactor/src/tcp_server.cpp

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <strings.h>
  5. #include <unistd.h>
  6. #include <signal.h>
  7. #include <sys/types.h> /* See NOTES */
  8. #include <sys/socket.h>
  9. #include <arpa/inet.h>
  10. #include <errno.h>
  11. #include "tcp_server.h"
  12. #include "reactor_buf.h"
  13. //临时的收发消息
  14. struct message{
  15. char data[m4K];
  16. char len;
  17. };
  18. struct message msg;
  19. void server_rd_callback(event_loop *loop, int fd, void *args);
  20. void server_wt_callback(event_loop *loop, int fd, void *args);
  21. //...省略其他代码
  22. //...省略其他代码
  23. //server read_callback
  24. void server_rd_callback(event_loop *loop, int fd, void *args)
  25. {
  26. int ret = 0;
  27. struct message *msg = (struct message*)args;
  28. input_buf ibuf;
  29. ret = ibuf.read_data(fd);
  30. if (ret == -1) {
  31. fprintf(stderr, "ibuf read_data error\n");
  32. //删除事件
  33. loop->del_io_event(fd);
  34. //对端关闭
  35. close(fd);
  36. return;
  37. }
  38. if (ret == 0) {
  39. //删除事件
  40. loop->del_io_event(fd);
  41. //对端关闭
  42. close(fd);
  43. return ;
  44. }
  45. printf("ibuf.length() = %d\n", ibuf.length());
  46. //将读到的数据放在msg中
  47. msg->len = ibuf.length();
  48. bzero(msg->data, msg->len);
  49. memcpy(msg->data, ibuf.data(), msg->len);
  50. ibuf.pop(msg->len);
  51. ibuf.adjust();
  52. printf("recv data = %s\n", msg->data);
  53. //删除读事件,添加写事件
  54. loop->del_io_event(fd, EPOLLIN);
  55. loop->add_io_event(fd, server_wt_callback, EPOLLOUT, msg);
  56. }
  57. //server write_callback
  58. void server_wt_callback(event_loop *loop, int fd, void *args)
  59. {
  60. struct message *msg = (struct message*)args;
  61. output_buf obuf;
  62. //回显数据
  63. obuf.send_data(msg->data, msg->len);
  64. while(obuf.length()) {
  65. int write_ret = obuf.write2fd(fd);
  66. if (write_ret == -1) {
  67. fprintf(stderr, "write connfd error\n");
  68. return;
  69. }
  70. else if(write_ret == 0) {
  71. //不是错误,表示此时不可写
  72. break;
  73. }
  74. }
  75. //删除写事件,添加读事件
  76. loop->del_io_event(fd, EPOLLOUT);
  77. loop->add_io_event(fd, server_rd_callback, EPOLLIN, msg);
  78. }
  79. //...省略其他代码
  80. //...省略其他代码
  81. //开始提供创建链接服务
  82. void tcp_server::do_accept()
  83. {
  84. int connfd;
  85. while(true) {
  86. //accept与客户端创建链接
  87. printf("begin accept\n");
  88. connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
  89. if (connfd == -1) {
  90. if (errno == EINTR) {
  91. fprintf(stderr, "accept errno=EINTR\n");
  92. continue;
  93. }
  94. else if (errno == EMFILE) {
  95. //建立链接过多,资源不够
  96. fprintf(stderr, "accept errno=EMFILE\n");
  97. }
  98. else if (errno == EAGAIN) {
  99. fprintf(stderr, "accept errno=EAGAIN\n");
  100. break;
  101. }
  102. else {
  103. fprintf(stderr, "accept error");
  104. exit(1);
  105. }
  106. }
  107. else {
  108. //accept succ!
  109. // ============= 新增 ======================
  110. this->_loop->add_io_event(connfd, server_rd_callback, EPOLLIN, &msg);
  111. break;
  112. // ============= 新增 ======================
  113. }
  114. }
  115. }
  116. //...省略其他代码
  117. //...省略其他代码

4.4 完成Lars Reactor V0.3开发

  1. 我们将lars_reactor/example/lars_reactor_0.2的代码复制一份到 lars_reactor/example/lars_reactor_0.3中。

lars_reactor/example/lars_reactor_0.3/lars_reactor.cpp

  1. #include "tcp_server.h"
  2. int main()
  3. {
  4. event_loop loop;
  5. tcp_server server(&loop, "127.0.0.1", 7777);
  6. loop.event_process();
  7. return 0;
  8. }

编译。

启动服务器

  1. $ ./lars_reactor

分别启动2个客户端

client1

  1. $ nc 127.0.0.1 7777
  2. hello Iam client1
  3. hello Iam client1 回显

client2

  1. $ nc 127.0.0.1 7777
  2. hello Iam client2
  3. hello Iam client2 回显

服务端打印

  1. $ ./lars_reactor
  2. begin accept
  3. ibuf.length() = 18
  4. recv data = hello Iam client1
  5. begin accept
  6. ibuf.length() = 18
  7. recv data = hello Iam client2

目前我们已经成功将event_loop机制加入到reactor中了,接下来继续添加功能。