1. 我们现在的reporter_serviceio入库操作,完全是在消息的callback中进行的,那么实际上,这回占用我们server的工作线程的阻塞时间,从而浪费cpu。所以我们应该将io的入库操作,交给一个专门做入库的消息队列线程池来做,这样我们的callback就会立刻返回该业务,从而可以继续处理下一个conn链接的消息事件业务。
    2. 所以我们就要在此给reporter_service设计一个存储数据的线程池及配套的消息队列。当然这里面我们还是直接用写好的`lars_reactor`框架里的接口即可。

    lars_reporter/src/reporter_service.cpp

    1. #include "lars_reactor.h"
    2. #include "lars.pb.h"
    3. #include "store_report.h"
    4. #include <string>
    5. thread_queue<lars::ReportStatusRequest> **reportQueues = NULL;
    6. int thread_cnt = 0;
    7. void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
    8. {
    9. lars::ReportStatusRequest req;
    10. req.ParseFromArray(data, len);
    11. //将上报数据存储到db
    12. StoreReport sr;
    13. sr.store(req);
    14. //轮询将消息平均发送到每个线程的消息队列中
    15. static int index = 0;
    16. //将消息发送给某个线程消息队列
    17. reportQueues[index]->send(req);
    18. index ++;
    19. index = index % thread_cnt;
    20. }
    21. void create_reportdb_threads()
    22. {
    23. thread_cnt = config_file::instance()->GetNumber("reporter", "db_thread_cnt", 3);
    24. //开线程池的消息队列
    25. reportQueues = new thread_queue<lars::ReportStatusRequest>*[thread_cnt];
    26. if (reportQueues == NULL) {
    27. fprintf(stderr, "create thread_queue<lars::ReportStatusRequest>*[%d], error", thread_cnt) ;
    28. exit(1);
    29. }
    30. for (int i = 0; i < thread_cnt; i++) {
    31. //给当前线程创建一个消息队列queue
    32. reportQueues[i] = new thread_queue<lars::ReportStatusRequest>();
    33. if (reportQueues == NULL) {
    34. fprintf(stderr, "create thread_queue error\n");
    35. exit(1);
    36. }
    37. pthread_t tid;
    38. int ret = pthread_create(&tid, NULL, store_main, reportQueues[i]);
    39. if (ret == -1) {
    40. perror("pthread_create");
    41. exit(1);
    42. }
    43. pthread_detach(tid);
    44. }
    45. }
    46. int main(int argc, char **argv)
    47. {
    48. event_loop loop;
    49. //加载配置文件
    50. config_file::setPath("./conf/lars_reporter.conf");
    51. std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
    52. short port = config_file::instance()->GetNumber("reactor", "port", 7779);
    53. //创建tcp server
    54. tcp_server server(&loop, ip.c_str(), port);
    55. //添加数据上报请求处理的消息分发处理业务
    56. server.add_msg_router(lars::ID_ReportStatusRequest, get_report_status);
    57. //为了防止在业务中出现io阻塞,那么需要启动一个线程池对IO进行操作的,接受业务的请求存储消息
    58. create_reportdb_threads();
    59. //启动事件监听
    60. loop.event_process();
    61. return 0;
    62. }
    1. 这里主线程启动了线程池,根据配置文件的`db_thread_cnt`数量来开辟。每个线程都会执行`store_main`方法,我们来看一下实现

    lars_reporter/src/store_thread.cpp

    1. #include "lars.pb.h"
    2. #include "lars_reactor.h"
    3. #include "store_report.h"
    4. struct Args
    5. {
    6. thread_queue<lars::ReportStatusRequest>* first;
    7. StoreReport *second;
    8. };
    9. //typedef void io_callback(event_loop *loop, int fd, void *args);
    10. void thread_report(event_loop *loop, int fd, void *args)
    11. {
    12. //1. 从queue里面取出需要report的数据(需要thread_queue)
    13. thread_queue<lars::ReportStatusRequest>* queue = ((Args*)args)->first;
    14. StoreReport *sr = ((Args*)args)->second;
    15. std::queue<lars::ReportStatusRequest> report_msgs;
    16. //1.1 从消息队列中取出全部的消息元素集合
    17. queue->recv(report_msgs);
    18. while ( !report_msgs.empty() ) {
    19. lars::ReportStatusRequest msg = report_msgs.front();
    20. report_msgs.pop();
    21. //2. 将数据存储到DB中(需要StoreReport)
    22. sr->store(msg);
    23. }
    24. }
    25. void *store_main(void *args)
    26. {
    27. //得到对应的thread_queue
    28. thread_queue<lars::ReportStatusRequest> *queue = (thread_queue<lars::ReportStatusRequest>*)args;
    29. //定义事件触发机制
    30. event_loop loop;
    31. //定义一个存储对象
    32. StoreReport sr;
    33. Args callback_args;
    34. callback_args.first = queue;
    35. callback_args.second = &sr;
    36. queue->set_loop(&loop);
    37. queue->set_callback(thread_report, &callback_args);
    38. //启动事件监听
    39. loop.event_process();
    40. return NULL;
    41. }
    1. 每个线程都会绑定一个`thread_queue<lars::ReportStatusRequest>`,然后一个线程里面有一个loop,来监控消息队列是否有消息事件过来,如果有消息实现过来,针对每个消息会触发`thread_report()`方法, `thread_report()`中,我们就直接将`lars::ReportStatusRequest`消息存储到db中。
    2. 那么,由谁来给每个线程的`thread_queue`发送消息呢,就是agent/客户端发送的请求,我们在处理`lars::ID_ReportStatusRequest` 消息分发业务的时候调用`get_report_status()`来触发。

    lars_reporter/src/reporter_service.cpp

    1. void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
    2. {
    3. lars::ReportStatusRequest req;
    4. req.ParseFromArray(data, len);
    5. //将上报数据存储到db
    6. StoreReport sr;
    7. sr.store(req);
    8. //轮询将消息平均发送到每个线程的消息队列中
    9. static int index = 0;
    10. //将消息发送给某个线程消息队列
    11. reportQueues[index]->send(req);
    12. index ++;
    13. index = index % thread_cnt;
    14. }
    1. 这里的分发机制,是采用最轮询的方式,是每个线程依次分配,去调用`thread_queue``send()`方法,将消息发送给消息队列。
    2. 最后我们进行测试,效果跟之前的效果是一样的。我们现在已经集成进来了存储线程池,现在就不用担心在处理业务的时候,因为DB等的io阻塞,使cpu得不到充分利用了。