29.1 Report设计与实现

report client主要是实现thread_queue的回调业务,udp server会定期的上传上报数据到reporter,那么请求对于report client就是透传给reporter serivce即可。

lars_loadbalance_agent/src/reporter_client.cpp

  1. #include "lars_reactor.h"
  2. #include "main_server.h"
  3. #include <string>
  4. #include <pthread.h>
  5. //typedef void io_callback(event_loop *loop, int fd, void *args);
  6. //只要thread_queue有数据,loop就会触发此回调函数来处理业务
  7. void new_report_request(event_loop *loop, int fd, void *args)
  8. {
  9. tcp_client *client = (tcp_client*)args;
  10. //1. 将请求数据从thread_queue中取出,
  11. std::queue<lars::ReportStatusRequest> msgs;
  12. //2. 将数据放在queue队列中
  13. report_queue->recv(msgs);
  14. //3. 遍历队列,通过client依次将每个msg发送给reporter service
  15. while (!msgs.empty()) {
  16. lars::ReportStatusRequest req = msgs.front();
  17. msgs.pop();
  18. std::string requestString;
  19. req.SerializeToString(&requestString);
  20. //client 发送数据
  21. client->send_message(requestString.c_str(), requestString.size(), lars::ID_ReportStatusRequest);
  22. }
  23. }
  24. void *report_client_thread(void* args)
  25. {
  26. printf("report client thread start\n");
  27. event_loop loop;
  28. //1 加载配置文件得到repoter ip + port
  29. std::string ip = config_file::instance()->GetString("reporter", "ip", "");
  30. short port = config_file::instance()->GetNumber("reporter", "port", 0);
  31. //2 创建客户端
  32. tcp_client client(&loop, ip.c_str(), port, "reporter client");
  33. //3 将 thread_queue消息回调事件,绑定到loop中
  34. report_queue->set_loop(&loop);
  35. report_queue->set_callback(new_report_request, &client);
  36. //4 启动事件监听
  37. loop.event_process();
  38. return NULL;
  39. }
  40. void start_report_client()
  41. {
  42. //开辟一个线程
  43. pthread_t tid;
  44. //启动线程业务函数
  45. int ret = pthread_create(&tid, NULL, report_client_thread, NULL);
  46. if (ret == -1) {
  47. perror("pthread_create");
  48. exit(1);
  49. }
  50. //设置分离模式
  51. pthread_detach(tid);
  52. }

29.2 Dns Client设计与实现

  1. dns client report client的业务十分相似,只是针对的协议不同了。dns clientthread_queue 回调业务主要是透传`lars::GetRouteRequest`数据包。

lars_loadbalance_agent/src/dns_client.cpp

  1. #include "lars_reactor.h"
  2. #include "main_server.h"
  3. #include <pthread.h>
  4. //typedef void io_callback(event_loop *loop, int fd, void *args);
  5. //只要thread_queue有数据,loop就会触发此回调函数来处理业务
  6. void new_dns_request(event_loop *loop, int fd, void *args)
  7. {
  8. tcp_client *client = (tcp_client*)args;
  9. //1. 将请求数据从thread_queue中取出,
  10. std::queue<lars::GetRouteRequest> msgs;
  11. //2. 将数据放在queue队列中
  12. dns_queue->recv(msgs);
  13. //3. 遍历队列,通过client依次将每个msg发送给reporter service
  14. while (!msgs.empty()) {
  15. lars::GetRouteRequest req = msgs.front();
  16. msgs.pop();
  17. std::string requestString;
  18. req.SerializeToString(&requestString);
  19. //client 发送数据
  20. client->send_message(requestString.c_str(), requestString.size(), lars::ID_GetRouteRequest);
  21. }
  22. }
  23. void *dns_client_thread(void* args)
  24. {
  25. printf("dns client thread start\n");
  26. event_loop loop;
  27. //1 加载配置文件得到dns service ip + port
  28. std::string ip = config_file::instance()->GetString("dnsserver", "ip", "");
  29. short port = config_file::instance()->GetNumber("dnsserver", "port", 0);
  30. //2 创建客户端
  31. tcp_client client(&loop, ip.c_str(), port, "dns client");
  32. //3 将thread_queue消息回调事件,绑定到loop中
  33. dns_queue->set_loop(&loop);
  34. dns_queue->set_callback(new_dns_request, &client);
  35. //4 启动事件监听
  36. loop.event_process();
  37. return NULL;
  38. }
  39. void start_dns_client()
  40. {
  41. //开辟一个线程
  42. pthread_t tid;
  43. //启动线程业务函数
  44. int ret = pthread_create(&tid, NULL, dns_client_thread, NULL);
  45. if (ret == -1) {
  46. perror("pthread_create");
  47. exit(1);
  48. }
  49. //设置分离模式
  50. pthread_detach(tid);
  51. }