32.1 proto通信协议定义

  1. syntax = "proto3";
  2. package lars;
  3. /* Lars系统的消息ID */
  4. enum MessageId {
  5. ID_UNKNOW = 0; //proto3 enum第一个属性必须是0,用来占位
  6. ID_GetRouteRequest = 1; //向DNS请求Route对应的关系的消息ID
  7. ID_GetRouteResponse = 2; //DNS回复的Route信息的消息ID
  8. ID_ReportStatusRequest = 3; //上报host调用状态信息请求消息ID
  9. ID_GetHostRequest = 4; //API 发送请求host信息给 Lb Agent模块 消息ID
  10. ID_GetHostResponse = 5; //agent 回执给 API host信息的 消息ID
  11. ID_ReportRequest = 6; //API report get_host的调用结果给agent的 消息ID
  12. }
  13. //...
  14. //...
  15. // API 上报调用结果给 Agent(UDP)
  16. message ReportRequest {
  17. int32 modid = 1;
  18. int32 cmdid = 2;
  19. HostInfo host = 3;
  20. int32 retcode = 4;
  21. }

ID_ReportRequestmessage ReportRequest是针对API层与agent的请求互通协议。

32.2 Lars-API:Reporter()方法客户端实现

Lars/api/cpp/lars_api/lars_api.h

  1. #pragma once
  2. #include "lars_reactor.h"
  3. #include <string>
  4. class lars_client
  5. {
  6. public:
  7. lars_client();
  8. ~lars_client();
  9. //lars 系统获取host信息 得到可用host的ip和port
  10. int get_host(int modid, int cmdid, std::string& ip, int &port);
  11. //lars 系统上报host调用信息
  12. void report(int modid, int cmdid, const std::string &ip, int port, int retcode);
  13. private:
  14. int _sockfd[3]; //3个udp socket fd 对应agent 3个udp server
  15. uint32_t _seqid; //消息的序列号
  16. };
  1. 新增`report()`方法。

Lars/api/cpp/lars_api/lars_api.cpp

  1. //lars 系统上报host调用信息
  2. void lars_client::report(int modid, int cmdid, const std::string &ip, int port, int retcode)
  3. {
  4. //1 封装上报消息
  5. lars::ReportRequest req;
  6. req.set_modid(modid);
  7. req.set_cmdid(cmdid);
  8. req.set_retcode(retcode);
  9. //1.1 host信息
  10. lars::HostInfo *hp = req.mutable_host();
  11. //ip
  12. struct in_addr inaddr;
  13. inet_aton(ip.c_str(), &inaddr);
  14. int ip_num = inaddr.s_addr;
  15. hp->set_ip(ip_num);
  16. //port
  17. hp->set_port(port);
  18. //2. send
  19. char write_buf[4096];
  20. //消息头
  21. msg_head head;
  22. head.msglen = req.ByteSizeLong();
  23. head.msgid = lars::ID_ReportRequest;
  24. memcpy(write_buf, &head, MESSAGE_HEAD_LEN);
  25. req.SerializeToArray(write_buf + MESSAGE_HEAD_LEN, head.msglen);
  26. int index = (modid+cmdid)%3;
  27. int ret = sendto(_sockfd[index], write_buf, head.msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);
  28. if (ret == -1) {
  29. perror("sendto");
  30. }
  31. }

Lars/api/cpp/example/example.cpp

  1. int ret = api.get_host(modid, cmdid, ip, port);
  2. if (ret == 0) {
  3. std::cout << "host is " << ip << ":" << port << std::endl;
  4. //上报调用结果
  5. api.report(modid, cmdid, ip, port, 0);
  6. }
  1. example的业务应用中,加上调用上报api。在每次调用完`get_host`

32.3 report业务添加的配置参数信息

Lars/lars_loadbalance_agent/conf/lars_lb_agent.conf

  1. [reporter]
  2. ip = 127.0.0.1
  3. port = 7779
  4. [dnsserver]
  5. ip = 127.0.0.1
  6. port = 7778
  7. [loadbalance]
  8. ;经过若干次获取请求host节点后,试探选择一次overload过载节点
  9. probe_num=10
  10. ;初始化host_info主机信息访问成功的个数,防止刚启动时少量失败就认为过载
  11. init_succ_cnt=180
  12. ;当idle节点失败率高于此值,节点变overload状态
  13. err_rate=0.1
  14. ;当overload节点成功率高于此值,节点变成idle状态
  15. succ_rate=0.95
  16. ;当idle节点连续失败次数超过此值,节点变成overload状态
  17. contin_err_limit=15
  18. ;当overload节点连续成功次数超过此值, 节点变成idle状态
  19. contin_succ_limit=15

配置文件里在[loadbalance]中新增了一些字段。

那么我们需要在启动lb_agent的时候,加载这些配置文件参数.

lars_loadbalance_agent/include/main_server.h

  1. #pragma once
  2. #include "lars_reactor.h"
  3. #include "lars.pb.h"
  4. #include "route_lb.h"
  5. struct load_balance_config
  6. {
  7. //经过若干次获取请求host节点后,试探选择一次overload过载节点
  8. int probe_num;
  9. //初始化host_info主机信息访问成功的个数,防止刚启动时少量失败就认为过载
  10. int init_succ_cnt;
  11. //**************************************************
  12. //当idle节点失败率高于此值,节点变overload状态
  13. float err_rate;
  14. //当overload节点成功率高于此值,节点变成idle状态
  15. float succ_rate;
  16. //当idle节点连续失败次数超过此值,节点变成overload状态
  17. int contin_err_limit;
  18. //当overload节点连续成功次数超过此值, 节点变成idle状态
  19. int contin_succ_limit;
  20. //当前agent本地ip地址(用于上报 填充caller字段)
  21. uint32_t local_ip;
  22. //**************************************************
  23. };

lars_loadbalance_agent/src/main_server.cpp

  1. #include "main_server.h"
  2. #include "lars.pb.h"
  3. #include <netdb.h>
  4. // ...
  5. //--------- 全局资源 ----------
  6. static void init_lb_agent()
  7. {
  8. //1. 加载配置文件
  9. config_file::setPath("./conf/lars_lb_agent.conf");
  10. lb_config.probe_num = config_file::instance()->GetNumber("loadbalance", "probe_num", 10);
  11. lb_config.init_succ_cnt = config_file::instance()->GetNumber("loadbalance", "init_succ_cnt", 180);
  12. lb_config.err_rate = config_file::instance()->GetFloat("loadbalance", "err_rate", 0.1);
  13. lb_config.succ_rate = config_file::instance()->GetFloat("loadbalance", "succ_rate", 0.92);
  14. lb_config.contin_succ_limit = config_file::instance()->GetNumber("loadbalance", "contin_succ_limit", 10);
  15. lb_config.contin_err_limit = config_file::instance()->GetNumber("loadbalance", "contin_err_limit", 10);
  16. //2. 初始化3个route_lb模块
  17. create_route_lb();
  18. //3. 加载本地ip
  19. char my_host_name[1024];
  20. if (gethostname(my_host_name, 1024) == 0) {
  21. struct hostent *hd = gethostbyname(my_host_name);
  22. if (hd)
  23. {
  24. struct sockaddr_in myaddr;
  25. myaddr.sin_addr = *(struct in_addr*)hd->h_addr;
  26. lb_config.local_ip = ntohl(myaddr.sin_addr.s_addr);
  27. }
  28. }
  29. if (!lb_config.local_ip) {
  30. struct in_addr inaddr;
  31. inet_aton("127.0.0.1", &inaddr);
  32. lb_config.local_ip = ntohl(inaddr.s_addr);
  33. }
  34. }
  35. // ...

这里的本地ip,是之后在上报的时候,发送消息需要一个caller参数,这个caller参数我们就暂时默认是当前agent的ip为caller。

32.4 Agent UDP Server处理API-Report请求

  1. 接下来我们针对API发送的report`ID_ReportRequest`进行处理.

lars_loadbalance_agent/src/agent_udp_server.cpp

  1. #include "lars_reactor.h"
  2. #include "main_server.h"
  3. // ...
  4. static void report_cb(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data)
  5. {
  6. lars::ReportRequest req;
  7. req.ParseFromArray(data, len);
  8. route_lb *ptr_route_lb = (route_lb*)user_data;
  9. ptr_route_lb->report_host(req);
  10. }
  11. void * agent_server_main(void * args)
  12. {
  13. long index = (long)args;
  14. short port = index + 8888;
  15. event_loop loop;
  16. udp_server server(&loop, "0.0.0.0", port);
  17. //给server注册消息分发路由业务, 针对ID_GetHostRequest处理 每个udp拥有一个对应的route_lb
  18. server.add_msg_router(lars::ID_GetHostRequest, get_host_cb, r_lb[port-8888]);
  19. //======================================================
  20. //给server注册消息分发路由业务,针对ID_ReportRequest处理
  21. server.add_msg_router(lars::ID_ReportRequest, report_cb, r_lb[port-8888]);
  22. //======================================================
  23. printf("agent UDP server :port %d is started...\n", port);
  24. loop.event_process();
  25. return NULL;
  26. }
  27. void start_UDP_servers(void)
  28. {
  29. for (long i = 0; i < 3; i ++) {
  30. pthread_t tid;
  31. int ret = pthread_create(&tid, NULL, agent_server_main, (void*)i);
  32. if (ret == -1) {
  33. perror("pthread_create");
  34. exit(1);
  35. }
  36. pthread_detach(tid);
  37. }
  38. }

这里主要是通过一个udp server中的route_lb对象来调用的report_host(req)方法。我们来实现这个方法。

lars_loadbalance_agent/src/route_lb.cpp

  1. //agent 上报某主机的获取结果
  2. void route_lb::report_host(lars::ReportRequest req)
  3. {
  4. int modid = req.modid();
  5. int cmdid = req.cmdid();
  6. int retcode = req.retcode();
  7. int ip = req.host().ip();
  8. int port = req.host().port();
  9. uint64_t key = ((uint64_t)modid << 32) + cmdid;
  10. pthread_mutex_lock(&_mutex);
  11. if (_route_lb_map.find(key) != _route_lb_map.end()) {
  12. load_balance *lb = _route_lb_map[key];
  13. lb->report(ip, port, retcode);
  14. //上报信息给远程reporter服务器
  15. lb->commit();
  16. }
  17. pthread_mutex_unlock(&_mutex);
  18. }

当然,route_lb最终还是分管每个modid/cmdid对应的load_balance模块,那么选择一个可用的load_balance对象,调用load_balancereport()方法.而通过commit()方法,将report的上报结果提交到远程的report service中去。

接下来我们看一下load_balance的report方法实现.

lars_loadbalance_agent/src/load_balance.cpp

  1. //上报当前host主机调用情况给远端repoter service
  2. void load_balance::report(int ip, int port, int retcode)
  3. {
  4. uint64_t key = ((uint64_t)ip << 32) + port;
  5. if (_host_map.find(key) == _host_map.end()) {
  6. return;
  7. }
  8. //1 计数统计
  9. host_info *hi = _host_map[key];
  10. if (retcode == lars::RET_SUCC) { // retcode == 0
  11. //更新虚拟成功、真实成功次数
  12. hi->vsucc ++;
  13. hi->rsucc ++;
  14. //连续成功增加
  15. hi->contin_succ ++;
  16. //连续失败次数归零
  17. hi->contin_err = 0;
  18. }
  19. else {
  20. //更新虚拟失败、真实失败次数
  21. hi->verr ++;
  22. hi->rerr ++;
  23. //连续失败个数增加
  24. hi->contin_err++;
  25. //连续成功次数归零
  26. hi->contin_succ = 0;
  27. }
  28. //2.检查节点状态
  29. //检查idle节点是否满足overload条件
  30. //或者overload节点是否满足idle条件
  31. //--> 如果是dile节点,则只有调用失败才有必要判断是否达到overload条件
  32. if (hi->overload == false && retcode != lars::RET_SUCC) {
  33. bool overload = false;
  34. //idle节点,检查是否达到判定为overload的状态条件
  35. //(1).计算失败率,如果大于预设值失败率,则为overload
  36. double err_rate = hi->verr * 1.0 / (hi->vsucc + hi->verr);
  37. if (err_rate > lb_config.err_rate) {
  38. overload = true;
  39. }
  40. //(2).连续失败次数达到阈值,判定为overload
  41. if( overload == false && hi->contin_err >= (uint32_t)lb_config.contin_err_limit) {
  42. overload = true;
  43. }
  44. //判定overload需要做的更改流程
  45. if (overload) {
  46. struct in_addr saddr;
  47. saddr.s_addr = htonl(hi->ip);
  48. printf("[%d, %d] host %s:%d change overload, succ %u err %u\n",
  49. _modid, _cmdid, inet_ntoa(saddr), hi->port, hi->vsucc, hi->verr);
  50. //设置hi为overload状态
  51. hi->set_overload();
  52. //移出_idle_list,放入_overload_list
  53. _idle_list.remove(hi);
  54. _overload_list.push_back(hi);
  55. return;
  56. }
  57. }
  58. //--> 如果是overload节点,则只有调用成功才有必要判断是否达到idle条件
  59. else if (hi->overload == true && retcode == lars::RET_SUCC) {
  60. bool idle = false;
  61. //overload节点,检查是否达到回到idle状态的条件
  62. //(1).计算成功率,如果大于预设值的成功率,则为idle
  63. double succ_rate = hi->vsucc * 1.0 / (hi->vsucc + hi->verr);
  64. if (succ_rate > lb_config.succ_rate) {
  65. idle = true;
  66. }
  67. //(2).连续成功次数达到阈值,判定为idle
  68. if (idle == false && hi->contin_succ >= (uint32_t)lb_config.contin_succ_limit) {
  69. idle = true;
  70. }
  71. //判定为idle需要做的更改流程
  72. if (idle) {
  73. struct in_addr saddr;
  74. saddr.s_addr = htonl(hi->ip);
  75. printf("[%d, %d] host %s:%d change idle, succ %u err %u\n",
  76. _modid, _cmdid, inet_ntoa(saddr), hi->port, hi->vsucc, hi->verr);
  77. //设置为idle状态
  78. hi->set_idle();
  79. //移出overload_list, 放入_idle_list
  80. _overload_list.remove(hi);
  81. _idle_list.push_back(hi);
  82. return;
  83. }
  84. }
  85. //TODO 窗口检查和超时机制
  86. }

其中set_idle()set_overload()方法实现如下:

lars_loadbalance_agent/src/host_info.cpp

  1. #include "host_info.h"
  2. #include "main_server.h"
  3. void host_info::set_idle()
  4. {
  5. vsucc = lb_config.init_succ_cnt;
  6. verr = 0;
  7. rsucc = 0;
  8. rerr = 0;
  9. contin_succ = 0;
  10. contin_err = 0;
  11. overload = false;
  12. }
  13. void host_info::set_overload()
  14. {
  15. vsucc = 0;
  16. verr = lb_config.init_err_cnt;//overload的初试虚拟err错误次数
  17. rsucc = 0;
  18. rerr = 0;
  19. contin_err = 0;
  20. contin_succ = 0;
  21. overload = true;
  22. }

load_balancereport()方法实现主要是针对两个链表做节点的交替处理。和成功率失败率的判断。

节点失败率 = 节点verr / (vsucc + verr)
节点成功率 = 节点vsucc / (vsucc + verr)

当idle节点的失败率>预设值(默认10%),将节点判定为overload;
当overload节点的成功率>预设值(默认95%),将节点判定为idle;

而不可以idle/overload节点都只关注成功率or都只关注失败率,这样可能造成节点在idle/overload状态间频繁切换
为idle节点、overload节点设置不同的阈值可以区别对待。

  1. 接下来我们来实现`load_balance``commit()`方法。

lars_loadbalance_agent/src/load_balance.cpp

  1. //提交host的调用结果给远程reporter service上报结果
  2. void load_balance::commit()
  3. {
  4. if (this->empty() == true) {
  5. return;
  6. }
  7. //1. 封装请求消息
  8. lars::ReportStatusRequest req;
  9. req.set_modid(_modid);
  10. req.set_cmdid(_cmdid);
  11. req.set_ts(time(NULL));
  12. req.set_caller(lb_config.local_ip);
  13. //2. 从idle_list取值
  14. for (host_list_it it = _idle_list.begin(); it != _idle_list.end(); it++) {
  15. host_info *hi = *it;
  16. lars::HostCallResult call_res;
  17. call_res.set_ip(hi->ip);
  18. call_res.set_port(hi->port);
  19. call_res.set_succ(hi->rsucc);
  20. call_res.set_err(hi->rerr);
  21. call_res.set_overload(false);
  22. req.add_results()->CopyFrom(call_res);
  23. }
  24. //3. 从over_list取值
  25. for (host_list_it it = _overload_list.begin(); it != _overload_list.end(); it++) {
  26. host_info *hi = *it;
  27. lars::HostCallResult call_res;
  28. call_res.set_ip(hi->ip);
  29. call_res.set_port(hi->port);
  30. call_res.set_succ(hi->rsucc);
  31. call_res.set_err(hi->rerr);
  32. call_res.set_overload(true);
  33. req.add_results()->CopyFrom(call_res);
  34. }
  35. //4 发送给report_client 的消息队列
  36. report_queue->send(req);
  37. }