19-lars-agent_1.png

    首先我们要在主线程中,启动3个UDP Server线程,这个是提供业务层/API层的服务。然后分别启动report_client线程,用来和reporter Service进行通信,将请求上报信息发送给Reporter Service。 然后再启动dns_client线程,用来和dns service通信。

    lars_loadbalance_agent/include/main_server.h

    1. #pragma once
    2. #include "lars_reactor.h"
    3. #include "lars.pb.h"
    4. //与report_client通信的thread_queue消息队列
    5. extern thread_queue<lars::ReportStatusRequest>* report_queue;
    6. //与dns_client通信的thread_queue消息队列
    7. extern thread_queue<lars::GetRouteRequest>* dns_queue;
    8. // 启动udp server服务,用来接收业务层(调用者/使用者)的消息
    9. void start_UDP_servers(void);
    10. // 启动lars_reporter client 线程
    11. void start_report_client(void);
    12. // 启动lars_dns client 线程
    13. void start_dns_client(void);

    lars_loadbalance_agent/src/main_server.cpp

    1. #include "main_server.h"
    2. #include "lars.pb.h"
    3. //与report_client通信的thread_queue消息队列
    4. thread_queue<lars::ReportStatusRequest>* report_queue = NULL;
    5. //与dns_client通信的thread_queue消息队列
    6. thread_queue<lars::GetRouteRequest>* dns_queue = NULL;
    7. int main(int argc, char **argv)
    8. {
    9. //1 加载配置文件
    10. //2 启动udp server服务,用来接收业务层(调用者/使用者)的消息
    11. start_UDP_servers();
    12. //3 启动lars_reporter client 线程
    13. report_queue = new thread_queue<lars::ReportStatusRequest>();
    14. if (report_queue == NULL) {
    15. fprintf(stderr, "create report queue error!\n");
    16. exit(1);
    17. }
    18. start_report_client();
    19. //4 启动lars_dns client 线程
    20. dns_queue = new thread_queue<lars::GetRouteRequest>();
    21. if (dns_queue == NULL) {
    22. fprintf(stderr, "create dns queue error!\n");
    23. exit(1);
    24. }
    25. start_dns_client();
    26. std::cout <<"done!" <<std::endl;
    27. while (1) {
    28. sleep(10);
    29. }
    30. return 0;
    31. }

    这里我们分别在main()中 ,开启以上线程。

    1. 其中`report_client`线程需要携带`thread_queue<lars::ReportStatusRequest>`消息队列通道。`agent`负责将上报请求消息`lars::ReportStatusRequest`通过thread_queue发送给reporter service
    2. 其中`dns_client`线程需要携带`thread_queue<lars::GetRouteRequest>``agent`负责将请求modid/cmdidroute消息`lars::GetRouteRequest`通过thread_queue发送给dns service

    3个udp server的线程开辟实现如下:

    lars_loadbalance_agent/src/agent_udp_server.cpp

    1. #include "lars_reactor.h"
    2. #include "main_server.h"
    3. void * agent_server_main(void * args)
    4. {
    5. int *index = (int*)args;
    6. short port = *index + 8888;
    7. event_loop loop;
    8. udp_server server(&loop, "0.0.0.0", port);
    9. //TODO 给server注册消息分发路由业务
    10. printf("agent UDP server :port %d is started...\n", port);
    11. loop.event_process();
    12. return NULL;
    13. }
    14. void start_UDP_servers(void)
    15. {
    16. for (int i = 0; i < 3; i ++) {
    17. pthread_t tid;
    18. int ret = pthread_create(&tid, NULL, agent_server_main, &i);
    19. if (ret == -1) {
    20. perror("pthread_create");
    21. exit(1);
    22. }
    23. pthread_detach(tid);
    24. }
    25. }

    reporter thread创建实现如下:

    lars_loadbalance_agent/src/reporter_client.cpp

    1. #include "lars_reactor.h"
    2. #include "main_server.h"
    3. #include <pthread.h>
    4. void *report_client_thread(void* args)
    5. {
    6. printf("report client thread start\n");
    7. #if 0
    8. event_loop loop;
    9. //1 加载配置文件得到repoter ip + port
    10. std::string ip = config_file::instance()->GetString("reporter", "ip", "");
    11. short port = config_file::instance()->GetNumber("reporter", "port", 0);
    12. //2 创建客户端
    13. tcp_client client(&loop, ip.c_str(), port, "reporter client");
    14. //3 将 thread_queue消息回调事件,绑定到loop中
    15. report_queue->set_loop(&loop);
    16. report_queue->set_callback()
    17. //4 启动事件监听
    18. loop.event_process();
    19. #endif
    20. return NULL;
    21. }
    22. void start_report_client()
    23. {
    24. //开辟一个线程
    25. pthread_t tid;
    26. //启动线程业务函数
    27. int ret = pthread_create(&tid, NULL, report_client_thread, NULL);
    28. if (ret == -1) {
    29. perror("pthread_create");
    30. exit(1);
    31. }
    32. //设置分离模式
    33. pthread_detach(tid);
    34. }

    dns thread创建实现如下:

    lars_loadbalance_agent/src/dns_client.cpp

    1. #include "lars_reactor.h"
    2. #include "main_server.h"
    3. #include <pthread.h>
    4. void *dns_client_thread(void* args)
    5. {
    6. printf("dns client thread start\n");
    7. return NULL;
    8. }
    9. void start_dns_client()
    10. {
    11. //开辟一个线程
    12. pthread_t tid;
    13. //启动线程业务函数
    14. int ret = pthread_create(&tid, NULL, dns_client_thread, NULL);
    15. if (ret == -1) {
    16. perror("pthread_create");
    17. exit(1);
    18. }
    19. //设置分离模式
    20. pthread_detach(tid);
    21. }

    编译,然后我们简单启动一下./bin/lars_lb_agent

    1. $ ./bin/lars_lb_agent
    2. dns client thread start
    3. report client thread start
    4. done!
    5. msg_router init...
    6. server on 0.0.0.0:8888 is running...
    7. agent UDP server :port 8888 is started...
    8. msg_router init...
    9. server on 0.0.0.0:8888 is running...
    10. agent UDP server :port 8888 is started...
    11. msg_router init...
    12. server on 0.0.0.0:8888 is running...
    13. agent UDP server :port 8888 is started...
    14. ...