32.1 proto通信协议定义
syntax = "proto3";package lars;/* Lars系统的消息ID */enum MessageId {ID_UNKNOW = 0; //proto3 enum第一个属性必须是0,用来占位ID_GetRouteRequest = 1; //向DNS请求Route对应的关系的消息IDID_GetRouteResponse = 2; //DNS回复的Route信息的消息IDID_ReportStatusRequest = 3; //上报host调用状态信息请求消息IDID_GetHostRequest = 4; //API 发送请求host信息给 Lb Agent模块 消息IDID_GetHostResponse = 5; //agent 回执给 API host信息的 消息IDID_ReportRequest = 6; //API report get_host的调用结果给agent的 消息ID}//...//...// API 上报调用结果给 Agent(UDP)message ReportRequest {int32 modid = 1;int32 cmdid = 2;HostInfo host = 3;int32 retcode = 4;}
ID_ReportRequest和 message ReportRequest是针对API层与agent的请求互通协议。
32.2 Lars-API:Reporter()方法客户端实现
Lars/api/cpp/lars_api/lars_api.h
#pragma once#include "lars_reactor.h"#include <string>class lars_client{public:lars_client();~lars_client();//lars 系统获取host信息 得到可用host的ip和portint get_host(int modid, int cmdid, std::string& ip, int &port);//lars 系统上报host调用信息void report(int modid, int cmdid, const std::string &ip, int port, int retcode);private:int _sockfd[3]; //3个udp socket fd 对应agent 3个udp serveruint32_t _seqid; //消息的序列号};
新增`report()`方法。
Lars/api/cpp/lars_api/lars_api.cpp
//lars 系统上报host调用信息void lars_client::report(int modid, int cmdid, const std::string &ip, int port, int retcode){//1 封装上报消息lars::ReportRequest req;req.set_modid(modid);req.set_cmdid(cmdid);req.set_retcode(retcode);//1.1 host信息lars::HostInfo *hp = req.mutable_host();//ipstruct in_addr inaddr;inet_aton(ip.c_str(), &inaddr);int ip_num = inaddr.s_addr;hp->set_ip(ip_num);//porthp->set_port(port);//2. sendchar write_buf[4096];//消息头msg_head head;head.msglen = req.ByteSizeLong();head.msgid = lars::ID_ReportRequest;memcpy(write_buf, &head, MESSAGE_HEAD_LEN);req.SerializeToArray(write_buf + MESSAGE_HEAD_LEN, head.msglen);int index = (modid+cmdid)%3;int ret = sendto(_sockfd[index], write_buf, head.msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);if (ret == -1) {perror("sendto");}}
Lars/api/cpp/example/example.cpp
int ret = api.get_host(modid, cmdid, ip, port);if (ret == 0) {std::cout << "host is " << ip << ":" << port << std::endl;//上报调用结果api.report(modid, cmdid, ip, port, 0);}
在example的业务应用中,加上调用上报api。在每次调用完`get_host`。
32.3 report业务添加的配置参数信息
Lars/lars_loadbalance_agent/conf/lars_lb_agent.conf
[reporter]ip = 127.0.0.1port = 7779[dnsserver]ip = 127.0.0.1port = 7778[loadbalance];经过若干次获取请求host节点后,试探选择一次overload过载节点probe_num=10;初始化host_info主机信息访问成功的个数,防止刚启动时少量失败就认为过载init_succ_cnt=180;当idle节点失败率高于此值,节点变overload状态err_rate=0.1;当overload节点成功率高于此值,节点变成idle状态succ_rate=0.95;当idle节点连续失败次数超过此值,节点变成overload状态contin_err_limit=15;当overload节点连续成功次数超过此值, 节点变成idle状态contin_succ_limit=15
配置文件里在[loadbalance]中新增了一些字段。
那么我们需要在启动lb_agent的时候,加载这些配置文件参数.
lars_loadbalance_agent/include/main_server.h
#pragma once#include "lars_reactor.h"#include "lars.pb.h"#include "route_lb.h"struct load_balance_config{//经过若干次获取请求host节点后,试探选择一次overload过载节点int probe_num;//初始化host_info主机信息访问成功的个数,防止刚启动时少量失败就认为过载int init_succ_cnt;//**************************************************//当idle节点失败率高于此值,节点变overload状态float err_rate;//当overload节点成功率高于此值,节点变成idle状态float succ_rate;//当idle节点连续失败次数超过此值,节点变成overload状态int contin_err_limit;//当overload节点连续成功次数超过此值, 节点变成idle状态int contin_succ_limit;//当前agent本地ip地址(用于上报 填充caller字段)uint32_t local_ip;//**************************************************};
lars_loadbalance_agent/src/main_server.cpp
#include "main_server.h"#include "lars.pb.h"#include <netdb.h>// ...//--------- 全局资源 ----------static void init_lb_agent(){//1. 加载配置文件config_file::setPath("./conf/lars_lb_agent.conf");lb_config.probe_num = config_file::instance()->GetNumber("loadbalance", "probe_num", 10);lb_config.init_succ_cnt = config_file::instance()->GetNumber("loadbalance", "init_succ_cnt", 180);lb_config.err_rate = config_file::instance()->GetFloat("loadbalance", "err_rate", 0.1);lb_config.succ_rate = config_file::instance()->GetFloat("loadbalance", "succ_rate", 0.92);lb_config.contin_succ_limit = config_file::instance()->GetNumber("loadbalance", "contin_succ_limit", 10);lb_config.contin_err_limit = config_file::instance()->GetNumber("loadbalance", "contin_err_limit", 10);//2. 初始化3个route_lb模块create_route_lb();//3. 加载本地ipchar my_host_name[1024];if (gethostname(my_host_name, 1024) == 0) {struct hostent *hd = gethostbyname(my_host_name);if (hd){struct sockaddr_in myaddr;myaddr.sin_addr = *(struct in_addr*)hd->h_addr;lb_config.local_ip = ntohl(myaddr.sin_addr.s_addr);}}if (!lb_config.local_ip) {struct in_addr inaddr;inet_aton("127.0.0.1", &inaddr);lb_config.local_ip = ntohl(inaddr.s_addr);}}// ...
这里的本地ip,是之后在上报的时候,发送消息需要一个caller参数,这个caller参数我们就暂时默认是当前agent的ip为caller。
32.4 Agent UDP Server处理API-Report请求
接下来我们针对API发送的report的`ID_ReportRequest`进行处理.
lars_loadbalance_agent/src/agent_udp_server.cpp
#include "lars_reactor.h"#include "main_server.h"// ...static void report_cb(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data){lars::ReportRequest req;req.ParseFromArray(data, len);route_lb *ptr_route_lb = (route_lb*)user_data;ptr_route_lb->report_host(req);}void * agent_server_main(void * args){long index = (long)args;short port = index + 8888;event_loop loop;udp_server server(&loop, "0.0.0.0", port);//给server注册消息分发路由业务, 针对ID_GetHostRequest处理 每个udp拥有一个对应的route_lbserver.add_msg_router(lars::ID_GetHostRequest, get_host_cb, r_lb[port-8888]);//======================================================//给server注册消息分发路由业务,针对ID_ReportRequest处理server.add_msg_router(lars::ID_ReportRequest, report_cb, r_lb[port-8888]);//======================================================printf("agent UDP server :port %d is started...\n", port);loop.event_process();return NULL;}void start_UDP_servers(void){for (long i = 0; i < 3; i ++) {pthread_t tid;int ret = pthread_create(&tid, NULL, agent_server_main, (void*)i);if (ret == -1) {perror("pthread_create");exit(1);}pthread_detach(tid);}}
这里主要是通过一个udp server中的route_lb对象来调用的report_host(req)方法。我们来实现这个方法。
lars_loadbalance_agent/src/route_lb.cpp
//agent 上报某主机的获取结果void route_lb::report_host(lars::ReportRequest req){int modid = req.modid();int cmdid = req.cmdid();int retcode = req.retcode();int ip = req.host().ip();int port = req.host().port();uint64_t key = ((uint64_t)modid << 32) + cmdid;pthread_mutex_lock(&_mutex);if (_route_lb_map.find(key) != _route_lb_map.end()) {load_balance *lb = _route_lb_map[key];lb->report(ip, port, retcode);//上报信息给远程reporter服务器lb->commit();}pthread_mutex_unlock(&_mutex);}
当然,route_lb最终还是分管每个modid/cmdid对应的load_balance模块,那么选择一个可用的load_balance对象,调用load_balance的report()方法.而通过commit()方法,将report的上报结果提交到远程的report service中去。
接下来我们看一下load_balance的report方法实现.
lars_loadbalance_agent/src/load_balance.cpp
//上报当前host主机调用情况给远端repoter servicevoid load_balance::report(int ip, int port, int retcode){uint64_t key = ((uint64_t)ip << 32) + port;if (_host_map.find(key) == _host_map.end()) {return;}//1 计数统计host_info *hi = _host_map[key];if (retcode == lars::RET_SUCC) { // retcode == 0//更新虚拟成功、真实成功次数hi->vsucc ++;hi->rsucc ++;//连续成功增加hi->contin_succ ++;//连续失败次数归零hi->contin_err = 0;}else {//更新虚拟失败、真实失败次数hi->verr ++;hi->rerr ++;//连续失败个数增加hi->contin_err++;//连续成功次数归零hi->contin_succ = 0;}//2.检查节点状态//检查idle节点是否满足overload条件//或者overload节点是否满足idle条件//--> 如果是dile节点,则只有调用失败才有必要判断是否达到overload条件if (hi->overload == false && retcode != lars::RET_SUCC) {bool overload = false;//idle节点,检查是否达到判定为overload的状态条件//(1).计算失败率,如果大于预设值失败率,则为overloaddouble err_rate = hi->verr * 1.0 / (hi->vsucc + hi->verr);if (err_rate > lb_config.err_rate) {overload = true;}//(2).连续失败次数达到阈值,判定为overloadif( overload == false && hi->contin_err >= (uint32_t)lb_config.contin_err_limit) {overload = true;}//判定overload需要做的更改流程if (overload) {struct in_addr saddr;saddr.s_addr = htonl(hi->ip);printf("[%d, %d] host %s:%d change overload, succ %u err %u\n",_modid, _cmdid, inet_ntoa(saddr), hi->port, hi->vsucc, hi->verr);//设置hi为overload状态hi->set_overload();//移出_idle_list,放入_overload_list_idle_list.remove(hi);_overload_list.push_back(hi);return;}}//--> 如果是overload节点,则只有调用成功才有必要判断是否达到idle条件else if (hi->overload == true && retcode == lars::RET_SUCC) {bool idle = false;//overload节点,检查是否达到回到idle状态的条件//(1).计算成功率,如果大于预设值的成功率,则为idledouble succ_rate = hi->vsucc * 1.0 / (hi->vsucc + hi->verr);if (succ_rate > lb_config.succ_rate) {idle = true;}//(2).连续成功次数达到阈值,判定为idleif (idle == false && hi->contin_succ >= (uint32_t)lb_config.contin_succ_limit) {idle = true;}//判定为idle需要做的更改流程if (idle) {struct in_addr saddr;saddr.s_addr = htonl(hi->ip);printf("[%d, %d] host %s:%d change idle, succ %u err %u\n",_modid, _cmdid, inet_ntoa(saddr), hi->port, hi->vsucc, hi->verr);//设置为idle状态hi->set_idle();//移出overload_list, 放入_idle_list_overload_list.remove(hi);_idle_list.push_back(hi);return;}}//TODO 窗口检查和超时机制}
其中set_idle()与set_overload()方法实现如下:
lars_loadbalance_agent/src/host_info.cpp
#include "host_info.h"#include "main_server.h"void host_info::set_idle(){vsucc = lb_config.init_succ_cnt;verr = 0;rsucc = 0;rerr = 0;contin_succ = 0;contin_err = 0;overload = false;}void host_info::set_overload(){vsucc = 0;verr = lb_config.init_err_cnt;//overload的初试虚拟err错误次数rsucc = 0;rerr = 0;contin_err = 0;contin_succ = 0;overload = true;}
load_balance的report()方法实现主要是针对两个链表做节点的交替处理。和成功率失败率的判断。
节点失败率 = 节点
verr/ (vsucc+verr)
节点成功率 = 节点vsucc/ (vsucc+verr)
当idle节点的失败率>预设值(默认10%),将节点判定为overload;
当overload节点的成功率>预设值(默认95%),将节点判定为idle;
而不可以idle/overload节点都只关注成功率or都只关注失败率,这样可能造成节点在idle/overload状态间频繁切换
为idle节点、overload节点设置不同的阈值可以区别对待。
接下来我们来实现`load_balance`的`commit()`方法。
lars_loadbalance_agent/src/load_balance.cpp
//提交host的调用结果给远程reporter service上报结果void load_balance::commit(){if (this->empty() == true) {return;}//1. 封装请求消息lars::ReportStatusRequest req;req.set_modid(_modid);req.set_cmdid(_cmdid);req.set_ts(time(NULL));req.set_caller(lb_config.local_ip);//2. 从idle_list取值for (host_list_it it = _idle_list.begin(); it != _idle_list.end(); it++) {host_info *hi = *it;lars::HostCallResult call_res;call_res.set_ip(hi->ip);call_res.set_port(hi->port);call_res.set_succ(hi->rsucc);call_res.set_err(hi->rerr);call_res.set_overload(false);req.add_results()->CopyFrom(call_res);}//3. 从over_list取值for (host_list_it it = _overload_list.begin(); it != _overload_list.end(); it++) {host_info *hi = *it;lars::HostCallResult call_res;call_res.set_ip(hi->ip);call_res.set_port(hi->port);call_res.set_succ(hi->rsucc);call_res.set_err(hi->rerr);call_res.set_overload(true);req.add_results()->CopyFrom(call_res);}//4 发送给report_client 的消息队列report_queue->send(req);}
