我们之前在include/task_msg.h中, 其中task的消息类型我们只是实现了NEW_CONN,目的是thread_pool选择一个线程,让一个线程里的thread_queue去创建一个连接对象。但是并没有对NEW_TASK的任务类型进行定义。这种类型是允许服务端去执行某项具体的业务。并不是根据客户端来消息去被动回复的业务,而是服务端主动发送的业务给到客户端。

15.1 任务函数类型

  1. 我们先定义task的回调函数类型

lars_reactor/include/event_loop.h

  1. //...
  2. //定义异步任务回调函数类型
  3. typedef void (*task_func)(event_loop *loop, void *args);
  4. //...
  1. 为了防止循环头文件引用,我们把typedef定义在`event_loop.h`中。

lars_reactor/include/task_msg.h

  1. #pragma once
  2. #include "event_loop.h"
  3. //定义异步任务回调函数类型
  4. typedef void (*task_func)(event_loop *loop, void *args);
  5. struct task_msg
  6. {
  7. enum TASK_TYPE
  8. {
  9. NEW_CONN, //新建链接的任务
  10. NEW_TASK, //一般的任务
  11. };
  12. TASK_TYPE type; //任务类型
  13. //任务的一些参数
  14. union {
  15. //针对 NEW_CONN新建链接任务,需要传递connfd
  16. int connfd;
  17. //针对 NEW_TASK 新建任务,
  18. //可以给一个任务提供一个回调函数
  19. struct {
  20. task_func task_cb; //注册的任务函数
  21. void *args; //任务函数对应的形参
  22. };
  23. };
  24. };
  1. `task_func`是我们定义的一个任务的回调函数类型,第一个参数当然就是让哪个loop机制去执行这个task任务。很明显,一个loop是对应一个thread线程的。也就是让哪个thread去执行这个task任务。args`task_func`的函数形参。

15.2 event_loop模块添加task任务机制

  1. 我们知道,task绑定一个loop,很明显,一个`event_loop`应该拥有需要被执行的task集合。
  2. 在这里,我们将event_loop加上已经就绪的task任务的属性

lars_reactor/include/event_loop.h

  1. #pragma once
  2. /*
  3. *
  4. * event_loop事件处理机制
  5. *
  6. * */
  7. #include <sys/epoll.h>
  8. #include <ext/hash_map>
  9. #include <ext/hash_set>
  10. #include <vector>
  11. #include "event_base.h"
  12. #include "task_msg.h"
  13. #define MAXEVENTS 10
  14. // map: fd->io_event
  15. typedef __gnu_cxx::hash_map<int, io_event> io_event_map;
  16. //定义指向上面map类型的迭代器
  17. typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;
  18. //全部正在监听的fd集合
  19. typedef __gnu_cxx::hash_set<int> listen_fd_set;
  20. //定义异步任务回调函数类型
  21. typedef void (*task_func)(event_loop *loop, void *args);
  22. class event_loop
  23. {
  24. public:
  25. //构造,初始化epoll堆
  26. event_loop();
  27. //阻塞循环处理事件
  28. void event_process();
  29. //添加一个io事件到loop中
  30. void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);
  31. //删除一个io事件从loop中
  32. void del_io_event(int fd);
  33. //删除一个io事件的EPOLLIN/EPOLLOUT
  34. void del_io_event(int fd, int mask);
  35. // ===========================================
  36. //获取全部监听事件的fd集合
  37. void get_listen_fds(listen_fd_set &fds) {
  38. fds = listen_fds;
  39. }
  40. //=== 异步任务task模块需要的方法 ===
  41. //添加一个任务task到ready_tasks集合中
  42. void add_task(task_func func, void *args);
  43. //执行全部的ready_tasks里面的任务
  44. void execute_ready_tasks();
  45. // ===========================================
  46. private:
  47. int _epfd; //epoll fd
  48. //当前event_loop 监控的fd和对应事件的关系
  49. io_event_map _io_evs;
  50. //当前event_loop 一共哪些fd在监听
  51. listen_fd_set listen_fds;
  52. //一次性最大处理的事件
  53. struct epoll_event _fired_evs[MAXEVENTS];
  54. // ===========================================
  55. //需要被执行的task集合
  56. typedef std::pair<task_func, void*> task_func_pair;
  57. std::vector<task_func_pair> _ready_tasks;
  58. // ===========================================
  59. };

添加了两个属性:

task_func_pair: 回调函数和参数的键值对.

_ready_tasks: 所有已经就绪的待执行的任务集合。

同时添加了两个主要方法:

void add_task(task_func func, void *args): 添加一个任务到_ready_tasks中.

void execute_ready_tasks():执行全部的_ready_tasks任务。

将这两个方法实现如下:

lars_reactor/src/event_loop.cpp

  1. //...
  2. //添加一个任务task到ready_tasks集合中
  3. void event_loop::add_task(task_func func, void *args)
  4. {
  5. task_func_pair func_pair(func, args);
  6. _ready_tasks.push_back(func_pair);
  7. }
  8. //执行全部的ready_tasks里面的任务
  9. void event_loop::execute_ready_tasks()
  10. {
  11. std::vector<task_func_pair>::iterator it;
  12. for (it = _ready_tasks.begin(); it != _ready_tasks.end(); it++) {
  13. task_func func = it->first;//任务回调函数
  14. void *args = it->second;//回调函数形参
  15. //执行任务
  16. func(this, args);
  17. }
  18. //全部执行完毕,清空当前的_ready_tasks
  19. _ready_tasks.clear();
  20. }
  21. //...
  1. 那么`execute_ready_tasks()`函数需要在一个恰当的时候被执行,我们这里就放在每次event_loop一次`epoll_wait()`处理完一组fd事件之后,触发一次额外的task任务。

lars_reactor/src/event_loop.cpp

  1. //阻塞循环处理事件
  2. void event_loop::event_process()
  3. {
  4. while (true) {
  5. io_event_map_it ev_it;
  6. int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10);
  7. for (int i = 0; i < nfds; i++) {
  8. //...
  9. //...
  10. }
  11. //每次处理完一组epoll_wait触发的事件之后,处理异步任务
  12. this->execute_ready_tasks();
  13. }
  14. }
  1. 这里补充一下,因为在task的回调函数中,有形参`event_loop *loop`,可能会使用当前loop中监控的fd信息,所以我们应该给event_loop补充一个获取当前loop监控的全部fd信息的方法
  1. class event_loop{
  2. //...
  3. //获取全部监听事件的fd集合
  4. void get_listen_fds(listen_fd_set &fds) {
  5. fds = listen_fds;
  6. }
  7. //...
  8. };

15.3 thread_pool模块添加task任务机制

  1. 接下来我们就要用thread_pool来想每个thread所绑定的event_pool中去发送task任务,很明显thread_pool应该具备能够将task加入到event_pool中的_ready_task集合的功能。

lars_reactor/include/thread_pool.h

  1. #pragma once
  2. #include <pthread.h>
  3. #include "task_msg.h"
  4. #include "thread_queue.h"
  5. class thread_pool
  6. {
  7. public:
  8. //构造,初始化线程池, 开辟thread_cnt个
  9. thread_pool(int thread_cnt);
  10. //获取一个thead
  11. thread_queue<task_msg>* get_thread();
  12. //发送一个task任务给thread_pool里的全部thread
  13. void send_task(task_func func, void *args = NULL);
  14. private:
  15. //_queues是当前thread_pool全部的消息任务队列头指针
  16. thread_queue<task_msg> ** _queues;
  17. //当前线程池中的线程个数
  18. int _thread_cnt;
  19. //已经启动的全部therad编号
  20. pthread_t * _tids;
  21. //当前选中的线程队列下标
  22. int _index;
  23. };
  1. `send_task()`方法就是发送给线程池中全部的thread去执行task任务.

lars_reactor/src/thread_pool.cpp

  1. void thread_pool::send_task(task_func func, void *args)
  2. {
  3. task_msg task;
  4. //给当前thread_pool中的每个thread里的pool添加一个task任务
  5. for (int i = 0; i < _thread_cnt; i++) {
  6. //封装一个task消息
  7. task.type = task_msg::NEW_TASK;
  8. task.task_cb = func;
  9. task.args = args;
  10. //取出第i个thread的消息队列
  11. thread_queue<task_msg> *queue = _queues[i];
  12. //发送task消息
  13. queue->send(task);
  14. }
  15. }
  1. `send_task()`的实现实际上是告知全部的thread,封装一个`NEW_TASK`类型的消息,通过`task_queue`告知对应的thread.很明显当我们进行 `queue->send(task)`的时候,当前的thread绑定的loop,就会触发`deal_task_message()`回调了。

lars_reactor/src/thread_pool.cpp

  1. /*
  2. * 一旦有task消息过来,这个业务是处理task消息业务的主流程
  3. *
  4. * 只要有人调用 thread_queue:: send()方法就会触发次函数
  5. */
  6. void deal_task_message(event_loop *loop, int fd, void *args)
  7. {
  8. //得到是哪个消息队列触发的
  9. thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args;
  10. //将queue中的全部任务取出来
  11. std::queue<task_msg> tasks;
  12. queue->recv(tasks);
  13. while (tasks.empty() != true) {
  14. task_msg task = tasks.front();
  15. //弹出一个元素
  16. tasks.pop();
  17. if (task.type == task_msg::NEW_CONN) {
  18. //是一个新建链接的任务
  19. //并且将这个tcp_conn加入当当前线程的loop中去监听
  20. tcp_conn *conn = new tcp_conn(task.connfd, loop);
  21. if (conn == NULL) {
  22. fprintf(stderr, "in thread new tcp_conn error\n");
  23. exit(1);
  24. }
  25. printf("[thread]: get new connection succ!\n");
  26. }
  27. else if (task.type == task_msg::NEW_TASK) {
  28. //===========是一个新的普通任务===============
  29. //当前的loop就是一个thread的事件监控loop,让当前loop触发task任务的回调
  30. loop->add_task(task.task_cb, task.args);
  31. //==========================================
  32. }
  33. else {
  34. //其他未识别任务
  35. fprintf(stderr, "unknow task!\n");
  36. }
  37. }
  38. }
  1. 我们判断task.type如果是`NEW_TASK`就将该task加入到当前loop中去.

通过上面的设计,可以看出来,thread_pool的send_task()应该是一个对外的开发者接口,所以我们要让服务器的tcp_server能够获取到thread_pool属性.

lars_reactor/include/tcp_server.h

  1. class tcp_server {
  2. //...
  3. //获取当前server的线程池
  4. thread_pool *thread_poll() {
  5. return _thread_pool;
  6. }
  7. //...
  8. };
  1. ok,这样我们基本上完成的task异步处理业务的机制. 下面我们来测试一下这个功能.

15.4 完成Lars Reactor V0.11开发

server.cpp

  1. #include "tcp_server.h"
  2. #include <string>
  3. #include <string.h>
  4. #include "config_file.h"
  5. tcp_server *server;
  6. void print_lars_task(event_loop *loop, void *args)
  7. {
  8. printf("======= Active Task Func! ========\n");
  9. listen_fd_set fds;
  10. loop->get_listen_fds(fds);//不同线程的loop,返回的fds是不同的
  11. //可以向所有fds触发
  12. listen_fd_set::iterator it;
  13. //遍历fds
  14. for (it = fds.begin(); it != fds.end(); it++) {
  15. int fd = *it;
  16. tcp_conn *conn = tcp_server::conns[fd]; //取出fd
  17. if (conn != NULL) {
  18. int msgid = 101;
  19. const char *msg = "Hello I am a Task!";
  20. conn->send_message(msg, strlen(msg), msgid);
  21. }
  22. }
  23. }
  24. //回显业务的回调函数
  25. void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
  26. {
  27. printf("callback_busi ...\n");
  28. //直接回显
  29. conn->send_message(data, len, msgid);
  30. }
  31. //打印信息回调函数
  32. void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
  33. {
  34. printf("recv client: [%s]\n", data);
  35. printf("msgid: [%d]\n", msgid);
  36. printf("len: [%d]\n", len);
  37. }
  38. //新客户端创建的回调
  39. void on_client_build(net_connection *conn, void *args)
  40. {
  41. int msgid = 101;
  42. const char *msg = "welcome! you online..";
  43. conn->send_message(msg, strlen(msg), msgid);
  44. //创建链接成功之后触发任务
  45. server->thread_poll()->send_task(print_lars_task);
  46. }
  47. //客户端销毁的回调
  48. void on_client_lost(net_connection *conn, void *args)
  49. {
  50. printf("connection is lost !\n");
  51. }
  52. int main()
  53. {
  54. event_loop loop;
  55. //加载配置文件
  56. config_file::setPath("./serv.conf");
  57. std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
  58. short port = config_file::instance()->GetNumber("reactor", "port", 8888);
  59. printf("ip = %s, port = %d\n", ip.c_str(), port);
  60. server = new tcp_server(&loop, ip.c_str(), port);
  61. //注册消息业务路由
  62. server->add_msg_router(1, callback_busi);
  63. server->add_msg_router(2, print_busi);
  64. //注册链接hook回调
  65. server->set_conn_start(on_client_build);
  66. server->set_conn_close(on_client_lost);
  67. loop.event_process();
  68. return 0;
  69. }
  1. 我们在每次建立连接成功之后,触发任务机制。其中`print_lars_task()`方法就是我们的异步任务。由于是全部thead都出发,所以该方法会被每个thread执行。但是不同的thread中的pool所返回的fd是不一样的,这里在`print_lars_task()`中,我们给对应的客户端做了一个简单的消息发送。

client.cpp

  1. #include "tcp_client.h"
  2. #include <stdio.h>
  3. #include <string.h>
  4. //客户端业务
  5. void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
  6. {
  7. //得到服务端回执的数据
  8. char *str = NULL;
  9. str = (char*)malloc(len+1);
  10. memset(str, 0, len+1);
  11. memcpy(str, data, len);
  12. printf("recv server: [%s]\n", str);
  13. printf("msgid: [%d]\n", msgid);
  14. printf("len: [%d]\n", len);
  15. }
  16. //客户端销毁的回调
  17. void on_client_build(net_connection *conn, void *args)
  18. {
  19. int msgid = 1;
  20. const char *msg = "Hello Lars!";
  21. conn->send_message(msg, strlen(msg), msgid);
  22. }
  23. //客户端销毁的回调
  24. void on_client_lost(net_connection *conn, void *args)
  25. {
  26. printf("on_client_lost...\n");
  27. printf("Client is lost!\n");
  28. }
  29. int main()
  30. {
  31. event_loop loop;
  32. //创建tcp客户端
  33. tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");
  34. //注册消息路由业务
  35. client.add_msg_router(1, busi);
  36. client.add_msg_router(101, busi);
  37. //设置hook函数
  38. client.set_conn_start(on_client_build);
  39. client.set_conn_close(on_client_lost);
  40. //开启事件监听
  41. loop.event_process();
  42. return 0;
  43. }
  1. 客户端代码无差别。

编译并运行
服务端:

  1. $ ./server
  2. msg_router init...
  3. ip = 127.0.0.1, port = 7777
  4. create 0 thread
  5. create 1 thread
  6. create 2 thread
  7. create 3 thread
  8. create 4 thread
  9. add msg cb msgid = 1
  10. add msg cb msgid = 2
  11. begin accept
  12. begin accept
  13. [thread]: get new connection succ!
  14. callback_busi ...
  15. ======= Active Task Func! ========
  16. ======= Active Task Func! ========
  17. ======= Active Task Func! ========
  18. ======= Active Task Func! ========
  19. ======= Active Task Func! ========

客户端:

  1. $ ./client
  2. msg_router init...
  3. do_connect EINPROGRESS
  4. add msg cb msgid = 1
  5. add msg cb msgid = 101
  6. connect 127.0.0.1:7777 succ!
  7. recv server: [welcome! you online..]
  8. msgid: [101]
  9. len: [21]
  10. recv server: [Hello Lars!]
  11. msgid: [1]
  12. len: [11]
  13. recv server: [Hello I am a Task!]
  14. msgid: [101]
  15. len: [18]
  1. task机制已经集成完毕,lars_reactor功能更加强大了。