10.1 消息任务类型
lars_reactor/include/task_msg.h
#pragma once#include "event_loop.h"struct task_msg{enum TASK_TYPE{NEW_CONN, //新建链接的任务NEW_TASK, //一般的任务};TASK_TYPE type; //任务类型//任务的一些参数union {//针对 NEW_CONN新建链接任务,需要传递connfdint connfd;/*==== 暂时用不上 ==== *///针对 NEW_TASK 新建任务,//那么可以给一个任务提供一个回调函数struct {void (*task_cb)(event_loop*, void *args);void *args;};};};
这里面task_msg一共有两个类型的type,一个是新链接的任务,一个是普通任务。两个任务所携带的参数不同,所以用了一个union。
10.2 消息任务队列
lars_reactor/include/thread_queue.h
#pragma once#include <queue>#include <pthread.h>#include <sys/eventfd.h>#include <stdio.h>#include <unistd.h>#include "event_loop.h"/*** 每个thread对应的 消息任务队列** */template <typename T>class thread_queue{public:thread_queue(){_loop = NULL;pthread_mutex_init(&_queue_mutex, NULL);_evfd = eventfd(0, EFD_NONBLOCK);if (_evfd == -1) {perror("evenfd(0, EFD_NONBLOCK)");exit(1);}}~thread_queue(){pthread_mutex_destroy(&_queue_mutex);close(_evfd);}//向队列添加一个任务void send(const T& task) {//触发消息事件的占位传输内容unsigned long long idle_num = 1;pthread_mutex_lock(&_queue_mutex);//将任务添加到队列_queue.push(task);//向_evfd写,触发对应的EPOLLIN事件,来处理该任务int ret = write(_evfd, &idle_num, sizeof(unsigned long long));if (ret == -1) {perror("_evfd write");}pthread_mutex_unlock(&_queue_mutex);}//获取队列,(当前队列已经有任务)void recv(std::queue<T>& new_queue) {unsigned int long long idle_num = 1;pthread_mutex_lock(&_queue_mutex);//把占位的数据读出来,确保底层缓冲没有数据存留int ret = read(_evfd, &idle_num, sizeof(unsigned long long));if (ret == -1) {perror("_evfd read");}//将当前的队列拷贝出去,将一个空队列换回当前队列,同时清空自身队列,确保new_queue是空队列std::swap(new_queue, _queue);pthread_mutex_unlock(&_queue_mutex);}//设置当前thead_queue是被哪个事件触发event_loop监控void set_loop(event_loop *loop) {_loop = loop;}//设置当前消息任务队列的 每个任务触发的回调业务void set_callback(io_callback *cb, void *args = NULL){if (_loop != NULL) {_loop->add_io_event(_evfd, cb, EPOLLIN, args);}}//得到当前loopevent_loop * get_loop() {return _loop;}private:int _evfd; //触发消息任务队列读取的每个消息业务的fdevent_loop *_loop; //当前消息任务队列所绑定在哪个event_loop事件触发机制中std::queue<T> _queue; //队列pthread_mutex_t _queue_mutex; //进行添加任务、读取任务的保护锁};
一个模板类,主要是消息任务队列里的元素类型未必一定是`task_msg`类型。
thread_queue需要绑定一个event_loop。来触发消息到达,捕获消息并且触发处理消息业务的动作。
这里面有个`_evfd`是为了触发消息队列消息到达,处理该消息作用的,将`_evfd`加入到对应线程的`event_loop`中,然后再通过`set_callback`设置一个通用的该queue全部消息所触发的处理业务call_back,在这个call_back里开发者可以自定义实现一些处理业务流程。
- 通过
send将任务发送给消息队列。 - 通过
event_loop触发注册的io_callback得到消息队列里的任务。 - 在io_callback中调用
recv取得task任务,根据任务的不同类型,处理自定义不同业务流程。
10.3 线程池
接下来,我们定义线程池,将`thread_queue`和`thread_pool`进行关联。
lars_reactor/include/thread_pool.h
#pragma once#include <pthread.h>#include "task_msg.h"#include "thread_queue.h"class thread_pool{public://构造,初始化线程池, 开辟thread_cnt个thread_pool(int thread_cnt);//获取一个theadthread_queue<task_msg>* get_thread();private://_queues是当前thread_pool全部的消息任务队列头指针thread_queue<task_msg> ** _queues;//当前线程池中的线程个数int _thread_cnt;//已经启动的全部therad编号pthread_t * _tids;//当前选中的线程队列下标int _index;};
属性:
_queues:是thread_queue集合,和当前线程数量一一对应,每个线程对应一个queue。里面存的元素是task_msg。
_tids:保存线程池中每个线程的ID。
_thread_cnt:当前线程的个数.
_index:表示外层在选择哪个thead处理任务时的一个下标,因为是轮询处理,所以需要一个下标记录。
方法:
thread_pool():构造函数,初始化线程池。
get_thread():通过轮询方式,获取一个线程的thread_queue.
lars_reactor/src/thread_pool.cpp
#include "thread_pool.h"#include "event_loop.h"#include "tcp_conn.h"#include <unistd.h>#include <stdio.h>/** 一旦有task消息过来,这个业务是处理task消息业务的主流程** 只要有人调用 thread_queue:: send()方法就会触发次函数*/void deal_task_message(event_loop *loop, int fd, void *args){//得到是哪个消息队列触发的thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args;//将queue中的全部任务取出来std::queue<task_msg> tasks;queue->recv(tasks);while (tasks.empty() != true) {task_msg task = tasks.front();//弹出一个元素tasks.pop();if (task.type == task_msg::NEW_CONN) {//是一个新建链接的任务//并且将这个tcp_conn加入当当前线程的loop中去监听tcp_conn *conn = new tcp_conn(task.connfd, loop);if (conn == NULL) {fprintf(stderr, "in thread new tcp_conn error\n");exit(1);}printf("[thread]: get new connection succ!\n");}else if (task.type == task_msg::NEW_TASK) {//是一个新的普通任务//TODO}else {//其他未识别任务fprintf(stderr, "unknow task!\n");}}}//一个线程的主业务main函数void *thread_main(void *args){thread_queue<task_msg> *queue = (thread_queue<task_msg>*)args;//每个线程都应该有一个event_loop来监控客户端链接的读写事件event_loop *loop = new event_loop();if (loop == NULL) {fprintf(stderr, "new event_loop error\n");exit(1);}//注册一个触发消息任务读写的callback函数queue->set_loop(loop);queue->set_callback(deal_task_message, queue);//启动阻塞监听loop->event_process();return NULL;}thread_pool::thread_pool(int thread_cnt){_index = 0;_queues = NULL;_thread_cnt = thread_cnt;if (_thread_cnt <= 0) {fprintf(stderr, "_thread_cnt < 0\n");exit(1);}//任务队列的个数和线程个数一致_queues = new thread_queue<task_msg>*[thread_cnt];_tids = new pthread_t[thread_cnt];int ret;for (int i = 0; i < thread_cnt; ++i) {//创建一个线程printf("create %d thread\n", i);//给当前线程创建一个任务消息队列_queues[i] = new thread_queue<task_msg>();ret = pthread_create(&_tids[i], NULL, thread_main, _queues[i]);if (ret == -1) {perror("thread_pool, create thread");exit(1);}//将线程脱离pthread_detach(_tids[i]);}}thread_queue<task_msg>* thread_pool::get_thread(){if (_index == _thread_cnt) {_index = 0;}return _queues[_index];}
这里主要看`deal_task_message()`方法,是处理收到的task任务的。目前我们只对`NEW_CONN`类型的任务进行处理,一般任务先不做处理,因为暂时用不上。`NEW_CONN`的处理主要是让当前线程创建链接,并且将该链接由当前线程的event_loop接管。接下来我们就要将线程池添加到reactor框架中去。
10.4 reactor线程池关联
将线程池添加到`tcp_server`中。
lars_reactor/include/tcp_server.h
#pragma once#include <netinet/in.h>#include "event_loop.h"#include "tcp_conn.h"#include "message.h"#include "thread_pool.h"class tcp_server{public:// ...// ...private:// ...//线程池thread_pool *_thread_pool;};
在构造函数中,添加_thread_pool的初始化工作。并且在accept成功之后交给线程处理客户端的读写事件。
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <strings.h>#include <unistd.h>#include <signal.h>#include <sys/types.h> /* See NOTES */#include <sys/socket.h>#include <arpa/inet.h>#include <errno.h>#include "tcp_server.h"#include "tcp_conn.h"#include "reactor_buf.h"//server的构造函数tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port){// ...//6 创建链接管理_max_conns = MAX_CONNS;//创建链接信息数组conns = new tcp_conn*[_max_conns+3];//3是因为stdin,stdout,stderr 已经被占用,再新开fd一定是从3开始,所以不加3就会栈溢出if (conns == NULL) {fprintf(stderr, "new conns[%d] error\n", _max_conns);exit(1);}//7 =============创建线程池=================int thread_cnt = 3;//TODO 从配置文件中读取if (thread_cnt > 0) {_thread_pool = new thread_pool(thread_cnt);if (_thread_pool == NULL) {fprintf(stderr, "tcp_server new thread_pool error\n");exit(1);}}// ========================================//8 注册_socket读事件-->accept处理_loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);}//开始提供创建链接服务void tcp_server::do_accept(){int connfd;while(true) {//accept与客户端创建链接printf("begin accept\n");connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);if (connfd == -1) {if (errno == EINTR) {fprintf(stderr, "accept errno=EINTR\n");continue;}else if (errno == EMFILE) {//建立链接过多,资源不够fprintf(stderr, "accept errno=EMFILE\n");}else if (errno == EAGAIN) {fprintf(stderr, "accept errno=EAGAIN\n");break;}else {fprintf(stderr, "accept error\n");exit(1);}}else {//accept succ!int cur_conns;get_conn_num(&cur_conns);//1 判断链接数量if (cur_conns >= _max_conns) {fprintf(stderr, "so many connections, max = %d\n", _max_conns);close(connfd);}else {// ========= 将新连接由线程池处理 ==========if (_thread_pool != NULL) {//启动多线程模式 创建链接//1 选择一个线程来处理thread_queue<task_msg>* queue = _thread_pool->get_thread();//2 创建一个新建链接的消息任务task_msg task;task.type = task_msg::NEW_CONN;task.connfd = connfd;//3 添加到消息队列中,让对应的thread进程event_loop处理queue->send(task);// =====================================}else {//启动单线程模式tcp_conn *conn = new tcp_conn(connfd, _loop);if (conn == NULL) {fprintf(stderr, "new tcp_conn error\n");exit(1);}printf("[tcp_server]: get new connection succ!\n");break;}}}}}
10.5 完成Lars ReactorV0.8开发
0.8版本的server.cpp和client.cpp是不用改变的。开启服务端和客户端观察执行结果即可。
服务端:
$ ./servermsg_router init...create 0 threadcreate 1 threadcreate 2 threadadd msg cb msgid = 1add msg cb msgid = 2begin acceptbegin accept[thread]: get new connection succ!read data: Hello Lars!call msgid = 1call data = Hello Lars!call msglen = 11callback_busi ...=======
客户端
$ ./clientmsg_router init...do_connect EINPROGRESSadd msg cb msgid = 1add msg cb msgid = 101connect 127.0.0.1:7777 succ!do write over, del EPOLLOUTcall msgid = 101call data = welcome! you online..call msglen = 21recv server: [welcome! you online..]msgid: [101]len: [21]=======call msgid = 1call data = Hello Lars!call msglen = 11recv server: [Hello Lars!]msgid: [1]len: [11]=======
我们会发现,链接已经成功创建成功,并且是由于线程处理的读写任务。
