10.1 消息任务类型

lars_reactor/include/task_msg.h

  1. #pragma once
  2. #include "event_loop.h"
  3. struct task_msg
  4. {
  5. enum TASK_TYPE
  6. {
  7. NEW_CONN, //新建链接的任务
  8. NEW_TASK, //一般的任务
  9. };
  10. TASK_TYPE type; //任务类型
  11. //任务的一些参数
  12. union {
  13. //针对 NEW_CONN新建链接任务,需要传递connfd
  14. int connfd;
  15. /*==== 暂时用不上 ==== */
  16. //针对 NEW_TASK 新建任务,
  17. //那么可以给一个任务提供一个回调函数
  18. struct {
  19. void (*task_cb)(event_loop*, void *args);
  20. void *args;
  21. };
  22. };
  23. };
  1. 这里面task_msg一共有两个类型的type,一个是新链接的任务,一个是普通任务。两个任务所携带的参数不同,所以用了一个union

10.2 消息任务队列

lars_reactor/include/thread_queue.h

  1. #pragma once
  2. #include <queue>
  3. #include <pthread.h>
  4. #include <sys/eventfd.h>
  5. #include <stdio.h>
  6. #include <unistd.h>
  7. #include "event_loop.h"
  8. /*
  9. *
  10. * 每个thread对应的 消息任务队列
  11. *
  12. * */
  13. template <typename T>
  14. class thread_queue
  15. {
  16. public:
  17. thread_queue()
  18. {
  19. _loop = NULL;
  20. pthread_mutex_init(&_queue_mutex, NULL);
  21. _evfd = eventfd(0, EFD_NONBLOCK);
  22. if (_evfd == -1) {
  23. perror("evenfd(0, EFD_NONBLOCK)");
  24. exit(1);
  25. }
  26. }
  27. ~thread_queue()
  28. {
  29. pthread_mutex_destroy(&_queue_mutex);
  30. close(_evfd);
  31. }
  32. //向队列添加一个任务
  33. void send(const T& task) {
  34. //触发消息事件的占位传输内容
  35. unsigned long long idle_num = 1;
  36. pthread_mutex_lock(&_queue_mutex);
  37. //将任务添加到队列
  38. _queue.push(task);
  39. //向_evfd写,触发对应的EPOLLIN事件,来处理该任务
  40. int ret = write(_evfd, &idle_num, sizeof(unsigned long long));
  41. if (ret == -1) {
  42. perror("_evfd write");
  43. }
  44. pthread_mutex_unlock(&_queue_mutex);
  45. }
  46. //获取队列,(当前队列已经有任务)
  47. void recv(std::queue<T>& new_queue) {
  48. unsigned int long long idle_num = 1;
  49. pthread_mutex_lock(&_queue_mutex);
  50. //把占位的数据读出来,确保底层缓冲没有数据存留
  51. int ret = read(_evfd, &idle_num, sizeof(unsigned long long));
  52. if (ret == -1) {
  53. perror("_evfd read");
  54. }
  55. //将当前的队列拷贝出去,将一个空队列换回当前队列,同时清空自身队列,确保new_queue是空队列
  56. std::swap(new_queue, _queue);
  57. pthread_mutex_unlock(&_queue_mutex);
  58. }
  59. //设置当前thead_queue是被哪个事件触发event_loop监控
  60. void set_loop(event_loop *loop) {
  61. _loop = loop;
  62. }
  63. //设置当前消息任务队列的 每个任务触发的回调业务
  64. void set_callback(io_callback *cb, void *args = NULL)
  65. {
  66. if (_loop != NULL) {
  67. _loop->add_io_event(_evfd, cb, EPOLLIN, args);
  68. }
  69. }
  70. //得到当前loop
  71. event_loop * get_loop() {
  72. return _loop;
  73. }
  74. private:
  75. int _evfd; //触发消息任务队列读取的每个消息业务的fd
  76. event_loop *_loop; //当前消息任务队列所绑定在哪个event_loop事件触发机制中
  77. std::queue<T> _queue; //队列
  78. pthread_mutex_t _queue_mutex; //进行添加任务、读取任务的保护锁
  79. };
  1. 一个模板类,主要是消息任务队列里的元素类型未必一定是`task_msg`类型。

thread_queue需要绑定一个event_loop。来触发消息到达,捕获消息并且触发处理消息业务的动作。

  1. 这里面有个`_evfd`是为了触发消息队列消息到达,处理该消息作用的,将`_evfd`加入到对应线程的`event_loop`中,然后再通过`set_callback`设置一个通用的该queue全部消息所触发的处理业务call_back,在这个call_back里开发者可以自定义实现一些处理业务流程。
  1. 通过send将任务发送给消息队列。
  2. 通过event_loop触发注册的io_callback得到消息队列里的任务。
  3. 在io_callback中调用recv取得task任务,根据任务的不同类型,处理自定义不同业务流程。

10.3 线程池

  1. 接下来,我们定义线程池,将`thread_queue``thread_pool`进行关联。

lars_reactor/include/thread_pool.h

  1. #pragma once
  2. #include <pthread.h>
  3. #include "task_msg.h"
  4. #include "thread_queue.h"
  5. class thread_pool
  6. {
  7. public:
  8. //构造,初始化线程池, 开辟thread_cnt个
  9. thread_pool(int thread_cnt);
  10. //获取一个thead
  11. thread_queue<task_msg>* get_thread();
  12. private:
  13. //_queues是当前thread_pool全部的消息任务队列头指针
  14. thread_queue<task_msg> ** _queues;
  15. //当前线程池中的线程个数
  16. int _thread_cnt;
  17. //已经启动的全部therad编号
  18. pthread_t * _tids;
  19. //当前选中的线程队列下标
  20. int _index;
  21. };

属性:

_queues:是thread_queue集合,和当前线程数量一一对应,每个线程对应一个queue。里面存的元素是task_msg

_tids:保存线程池中每个线程的ID。

_thread_cnt:当前线程的个数.

_index:表示外层在选择哪个thead处理任务时的一个下标,因为是轮询处理,所以需要一个下标记录。

方法:

thread_pool():构造函数,初始化线程池。

get_thread():通过轮询方式,获取一个线程的thread_queue.

lars_reactor/src/thread_pool.cpp

  1. #include "thread_pool.h"
  2. #include "event_loop.h"
  3. #include "tcp_conn.h"
  4. #include <unistd.h>
  5. #include <stdio.h>
  6. /*
  7. * 一旦有task消息过来,这个业务是处理task消息业务的主流程
  8. *
  9. * 只要有人调用 thread_queue:: send()方法就会触发次函数
  10. */
  11. void deal_task_message(event_loop *loop, int fd, void *args)
  12. {
  13. //得到是哪个消息队列触发的
  14. thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args;
  15. //将queue中的全部任务取出来
  16. std::queue<task_msg> tasks;
  17. queue->recv(tasks);
  18. while (tasks.empty() != true) {
  19. task_msg task = tasks.front();
  20. //弹出一个元素
  21. tasks.pop();
  22. if (task.type == task_msg::NEW_CONN) {
  23. //是一个新建链接的任务
  24. //并且将这个tcp_conn加入当当前线程的loop中去监听
  25. tcp_conn *conn = new tcp_conn(task.connfd, loop);
  26. if (conn == NULL) {
  27. fprintf(stderr, "in thread new tcp_conn error\n");
  28. exit(1);
  29. }
  30. printf("[thread]: get new connection succ!\n");
  31. }
  32. else if (task.type == task_msg::NEW_TASK) {
  33. //是一个新的普通任务
  34. //TODO
  35. }
  36. else {
  37. //其他未识别任务
  38. fprintf(stderr, "unknow task!\n");
  39. }
  40. }
  41. }
  42. //一个线程的主业务main函数
  43. void *thread_main(void *args)
  44. {
  45. thread_queue<task_msg> *queue = (thread_queue<task_msg>*)args;
  46. //每个线程都应该有一个event_loop来监控客户端链接的读写事件
  47. event_loop *loop = new event_loop();
  48. if (loop == NULL) {
  49. fprintf(stderr, "new event_loop error\n");
  50. exit(1);
  51. }
  52. //注册一个触发消息任务读写的callback函数
  53. queue->set_loop(loop);
  54. queue->set_callback(deal_task_message, queue);
  55. //启动阻塞监听
  56. loop->event_process();
  57. return NULL;
  58. }
  59. thread_pool::thread_pool(int thread_cnt)
  60. {
  61. _index = 0;
  62. _queues = NULL;
  63. _thread_cnt = thread_cnt;
  64. if (_thread_cnt <= 0) {
  65. fprintf(stderr, "_thread_cnt < 0\n");
  66. exit(1);
  67. }
  68. //任务队列的个数和线程个数一致
  69. _queues = new thread_queue<task_msg>*[thread_cnt];
  70. _tids = new pthread_t[thread_cnt];
  71. int ret;
  72. for (int i = 0; i < thread_cnt; ++i) {
  73. //创建一个线程
  74. printf("create %d thread\n", i);
  75. //给当前线程创建一个任务消息队列
  76. _queues[i] = new thread_queue<task_msg>();
  77. ret = pthread_create(&_tids[i], NULL, thread_main, _queues[i]);
  78. if (ret == -1) {
  79. perror("thread_pool, create thread");
  80. exit(1);
  81. }
  82. //将线程脱离
  83. pthread_detach(_tids[i]);
  84. }
  85. }
  86. thread_queue<task_msg>* thread_pool::get_thread()
  87. {
  88. if (_index == _thread_cnt) {
  89. _index = 0;
  90. }
  91. return _queues[_index];
  92. }
  1. 这里主要看`deal_task_message()`方法,是处理收到的task任务的。目前我们只对`NEW_CONN`类型的任务进行处理,一般任务先不做处理,因为暂时用不上。
  2. `NEW_CONN`的处理主要是让当前线程创建链接,并且将该链接由当前线程的event_loop接管。
  3. 接下来我们就要将线程池添加到reactor框架中去。

10.4 reactor线程池关联

  1. 将线程池添加到`tcp_server`中。

lars_reactor/include/tcp_server.h

  1. #pragma once
  2. #include <netinet/in.h>
  3. #include "event_loop.h"
  4. #include "tcp_conn.h"
  5. #include "message.h"
  6. #include "thread_pool.h"
  7. class tcp_server
  8. {
  9. public:
  10. // ...
  11. // ...
  12. private:
  13. // ...
  14. //线程池
  15. thread_pool *_thread_pool;
  16. };

在构造函数中,添加_thread_pool的初始化工作。并且在accept成功之后交给线程处理客户端的读写事件。

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <strings.h>
  5. #include <unistd.h>
  6. #include <signal.h>
  7. #include <sys/types.h> /* See NOTES */
  8. #include <sys/socket.h>
  9. #include <arpa/inet.h>
  10. #include <errno.h>
  11. #include "tcp_server.h"
  12. #include "tcp_conn.h"
  13. #include "reactor_buf.h"
  14. //server的构造函数
  15. tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port)
  16. {
  17. // ...
  18. //6 创建链接管理
  19. _max_conns = MAX_CONNS;
  20. //创建链接信息数组
  21. conns = new tcp_conn*[_max_conns+3];//3是因为stdin,stdout,stderr 已经被占用,再新开fd一定是从3开始,所以不加3就会栈溢出
  22. if (conns == NULL) {
  23. fprintf(stderr, "new conns[%d] error\n", _max_conns);
  24. exit(1);
  25. }
  26. //7 =============创建线程池=================
  27. int thread_cnt = 3;//TODO 从配置文件中读取
  28. if (thread_cnt > 0) {
  29. _thread_pool = new thread_pool(thread_cnt);
  30. if (_thread_pool == NULL) {
  31. fprintf(stderr, "tcp_server new thread_pool error\n");
  32. exit(1);
  33. }
  34. }
  35. // ========================================
  36. //8 注册_socket读事件-->accept处理
  37. _loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);
  38. }
  39. //开始提供创建链接服务
  40. void tcp_server::do_accept()
  41. {
  42. int connfd;
  43. while(true) {
  44. //accept与客户端创建链接
  45. printf("begin accept\n");
  46. connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
  47. if (connfd == -1) {
  48. if (errno == EINTR) {
  49. fprintf(stderr, "accept errno=EINTR\n");
  50. continue;
  51. }
  52. else if (errno == EMFILE) {
  53. //建立链接过多,资源不够
  54. fprintf(stderr, "accept errno=EMFILE\n");
  55. }
  56. else if (errno == EAGAIN) {
  57. fprintf(stderr, "accept errno=EAGAIN\n");
  58. break;
  59. }
  60. else {
  61. fprintf(stderr, "accept error\n");
  62. exit(1);
  63. }
  64. }
  65. else {
  66. //accept succ!
  67. int cur_conns;
  68. get_conn_num(&cur_conns);
  69. //1 判断链接数量
  70. if (cur_conns >= _max_conns) {
  71. fprintf(stderr, "so many connections, max = %d\n", _max_conns);
  72. close(connfd);
  73. }
  74. else {
  75. // ========= 将新连接由线程池处理 ==========
  76. if (_thread_pool != NULL) {
  77. //启动多线程模式 创建链接
  78. //1 选择一个线程来处理
  79. thread_queue<task_msg>* queue = _thread_pool->get_thread();
  80. //2 创建一个新建链接的消息任务
  81. task_msg task;
  82. task.type = task_msg::NEW_CONN;
  83. task.connfd = connfd;
  84. //3 添加到消息队列中,让对应的thread进程event_loop处理
  85. queue->send(task);
  86. // =====================================
  87. }
  88. else {
  89. //启动单线程模式
  90. tcp_conn *conn = new tcp_conn(connfd, _loop);
  91. if (conn == NULL) {
  92. fprintf(stderr, "new tcp_conn error\n");
  93. exit(1);
  94. }
  95. printf("[tcp_server]: get new connection succ!\n");
  96. break;
  97. }
  98. }
  99. }
  100. }
  101. }

10.5 完成Lars ReactorV0.8开发

  1. 0.8版本的server.cppclient.cpp是不用改变的。开启服务端和客户端观察执行结果即可。

服务端:

  1. $ ./server
  2. msg_router init...
  3. create 0 thread
  4. create 1 thread
  5. create 2 thread
  6. add msg cb msgid = 1
  7. add msg cb msgid = 2
  8. begin accept
  9. begin accept
  10. [thread]: get new connection succ!
  11. read data: Hello Lars!
  12. call msgid = 1
  13. call data = Hello Lars!
  14. call msglen = 11
  15. callback_busi ...
  16. =======

客户端

  1. $ ./client
  2. msg_router init...
  3. do_connect EINPROGRESS
  4. add msg cb msgid = 1
  5. add msg cb msgid = 101
  6. connect 127.0.0.1:7777 succ!
  7. do write over, del EPOLLOUT
  8. call msgid = 101
  9. call data = welcome! you online..
  10. call msglen = 21
  11. recv server: [welcome! you online..]
  12. msgid: [101]
  13. len: [21]
  14. =======
  15. call msgid = 1
  16. call data = Hello Lars!
  17. call msglen = 11
  18. recv server: [Hello Lars!]
  19. msgid: [1]
  20. len: [11]
  21. =======
  1. 我们会发现,链接已经成功创建成功,并且是由于线程处理的读写任务。