
首先我们要在主线程中,启动3个UDP Server线程,这个是提供业务层/API层的服务。然后分别启动report_client线程,用来和reporter Service进行通信,将请求上报信息发送给Reporter Service。 然后再启动dns_client线程,用来和dns service通信。
lars_loadbalance_agent/include/main_server.h
#pragma once#include "lars_reactor.h"#include "lars.pb.h"//与report_client通信的thread_queue消息队列extern thread_queue<lars::ReportStatusRequest>* report_queue;//与dns_client通信的thread_queue消息队列extern thread_queue<lars::GetRouteRequest>* dns_queue;// 启动udp server服务,用来接收业务层(调用者/使用者)的消息void start_UDP_servers(void);// 启动lars_reporter client 线程void start_report_client(void);// 启动lars_dns client 线程void start_dns_client(void);
lars_loadbalance_agent/src/main_server.cpp
#include "main_server.h"#include "lars.pb.h"//与report_client通信的thread_queue消息队列thread_queue<lars::ReportStatusRequest>* report_queue = NULL;//与dns_client通信的thread_queue消息队列thread_queue<lars::GetRouteRequest>* dns_queue = NULL;int main(int argc, char **argv){//1 加载配置文件//2 启动udp server服务,用来接收业务层(调用者/使用者)的消息start_UDP_servers();//3 启动lars_reporter client 线程report_queue = new thread_queue<lars::ReportStatusRequest>();if (report_queue == NULL) {fprintf(stderr, "create report queue error!\n");exit(1);}start_report_client();//4 启动lars_dns client 线程dns_queue = new thread_queue<lars::GetRouteRequest>();if (dns_queue == NULL) {fprintf(stderr, "create dns queue error!\n");exit(1);}start_dns_client();std::cout <<"done!" <<std::endl;while (1) {sleep(10);}return 0;}
这里我们分别在main()中 ,开启以上线程。
其中`report_client`线程需要携带`thread_queue<lars::ReportStatusRequest>`消息队列通道。`agent`负责将上报请求消息`lars::ReportStatusRequest`通过thread_queue发送给reporter service。其中`dns_client`线程需要携带`thread_queue<lars::GetRouteRequest>`。`agent`负责将请求modid/cmdid的route消息`lars::GetRouteRequest`通过thread_queue发送给dns service。
3个udp server的线程开辟实现如下:
lars_loadbalance_agent/src/agent_udp_server.cpp
#include "lars_reactor.h"#include "main_server.h"void * agent_server_main(void * args){int *index = (int*)args;short port = *index + 8888;event_loop loop;udp_server server(&loop, "0.0.0.0", port);//TODO 给server注册消息分发路由业务printf("agent UDP server :port %d is started...\n", port);loop.event_process();return NULL;}void start_UDP_servers(void){for (int i = 0; i < 3; i ++) {pthread_t tid;int ret = pthread_create(&tid, NULL, agent_server_main, &i);if (ret == -1) {perror("pthread_create");exit(1);}pthread_detach(tid);}}
reporter thread创建实现如下:
lars_loadbalance_agent/src/reporter_client.cpp
#include "lars_reactor.h"#include "main_server.h"#include <pthread.h>void *report_client_thread(void* args){printf("report client thread start\n");#if 0event_loop loop;//1 加载配置文件得到repoter ip + portstd::string ip = config_file::instance()->GetString("reporter", "ip", "");short port = config_file::instance()->GetNumber("reporter", "port", 0);//2 创建客户端tcp_client client(&loop, ip.c_str(), port, "reporter client");//3 将 thread_queue消息回调事件,绑定到loop中report_queue->set_loop(&loop);report_queue->set_callback()//4 启动事件监听loop.event_process();#endifreturn NULL;}void start_report_client(){//开辟一个线程pthread_t tid;//启动线程业务函数int ret = pthread_create(&tid, NULL, report_client_thread, NULL);if (ret == -1) {perror("pthread_create");exit(1);}//设置分离模式pthread_detach(tid);}
dns thread创建实现如下:
lars_loadbalance_agent/src/dns_client.cpp
#include "lars_reactor.h"#include "main_server.h"#include <pthread.h>void *dns_client_thread(void* args){printf("dns client thread start\n");return NULL;}void start_dns_client(){//开辟一个线程pthread_t tid;//启动线程业务函数int ret = pthread_create(&tid, NULL, dns_client_thread, NULL);if (ret == -1) {perror("pthread_create");exit(1);}//设置分离模式pthread_detach(tid);}
编译,然后我们简单启动一下./bin/lars_lb_agent
$ ./bin/lars_lb_agentdns client thread startreport client thread startdone!msg_router init...server on 0.0.0.0:8888 is running...agent UDP server :port 8888 is started...msg_router init...server on 0.0.0.0:8888 is running...agent UDP server :port 8888 is started...msg_router init...server on 0.0.0.0:8888 is running...agent UDP server :port 8888 is started......
