- 当agent首次连接到dns service时,将全部的
load_balance均设置为NEW状态。如果dns service重新启动,或者断开链接重连,都会将之前的拉去中或者没拉取的load_balance状态都设置为NEW状态,只有NEW状态的load_balance才能定期自动拉取.
lars_loadbalance_agent/src/dns_client.cpp
//========================================================static void conn_init(net_connection *conn, void *args){for (int i = 0; i < 3; i++) {r_lb[i]->reset_lb_status();}}//========================================================void *dns_client_thread(void* args){printf("dns client thread start\n");event_loop loop;//1 加载配置文件得到dns service ip + portstd::string ip = config_file::instance()->GetString("dnsserver", "ip", "");short port = config_file::instance()->GetNumber("dnsserver", "port", 0);//2 创建客户端tcp_client client(&loop, ip.c_str(), port, "dns client");//3 将thread_queue消息回调事件,绑定到loop中dns_queue->set_loop(&loop);dns_queue->set_callback(new_dns_request, &client);//4 设置当收到dns service回执的消息ID_GetRouteResponse处理函数client.add_msg_router(lars::ID_GetRouteResponse, deal_recv_route);//========================================================//5.设置链接成功/链接断开重连成功之后,通过conn_init来清理之前的任务client.set_conn_start(conn_init);//========================================================//启动事件监听loop.event_process();return NULL;}
lars_loadbalance_agent/src/route_lb.cpp
//将全部的load_balance都重置为NEW状态void route_lb::reset_lb_status(){pthread_mutex_lock(&_mutex);for (route_map_it it = _route_lb_map.begin();it != _route_lb_map.end(); it++) {load_balance *lb = it->second;if (lb->status == load_balance::PULLING) {lb->status = load_balance::NEW;}}pthread_mutex_unlock(&_mutex);}
- 增加配置文件参数update_timeout, 表示一个NEW状态的load_balance下的modid/cmdid节点应该经历多长时间进行一次刷新拉取。
lars_loadbalance_agent/conf/lars_lb_agent.conf
;对于每个NEW状态的modid/cmdid,多久更新一下本地路由,秒update_timeout=15
lars_loadbalance_agent/include/main_server.h
struct load_balance_config{//...//对于每个NEW状态的modid/cmdid,多久更新一下本地路由,秒long update_timeout;};
lars_loadbalance_agent/src/main_server.cpp
static void init_lb_agent(){//1. 加载配置文件config_file::setPath("./conf/lars_lb_agent.conf");//...//...lb_config.update_timeout = config_file::instance()->GetNumber("loadbalance", "update_timeout", 15);//...}
- 给
load_balance设置最后update时间参数,及最后一次从dns service拉取下来更新host_map的时间.然后在route_lb每次执行get_host的时候,对每个已经存在的host节点做最后时间超时检查,如果超时,则重新从 dns service中拉取。
lars_loadbalance_agent/include/load_balance.h
/** 负载均衡算法核心模块* 针对一组(modid/cmdid)下的全部host节点的负载规则*/class load_balance {public:load_balance(int modid, int cmdid):status(PULLING),last_update_time(0),_modid(modid),_cmdid(cmdid){//load_balance 初始化构造}// ...long last_update_time; //最后更新host_map时间戳private:// ...};
lars_loadbalance_agent/src/load_balance.cpp
//根据dns service远程返回的结果,更新_host_mapvoid load_balance::update(lars::GetRouteResponse &rsp){long current_time = time(NULL);//...//...//更新最后update时间last_update_time = current_time;//重置状态为NEWstatus = NEW;}
`load_balance`每次调用`update()`都记录一次最后的更新时间,并标记为`NEW`表示当前`modid/cmdid`没有在PULLING,可以再更新。
lars_loadbalance_agent/src/route_lb.cpp
//agent获取一个host主机,将返回的主机结果存放在rsp中int route_lb::get_host(int modid, int cmdid, lars::GetHostResponse &rsp){int ret = lars::RET_SUCC;//1. 得到keyuint64_t key = ((uint64_t)modid << 32) + cmdid;pthread_mutex_lock(&_mutex);//2. 当前key已经存在_route_lb_map中if (_route_lb_map.find(key) != _route_lb_map.end()) {//2.1 取出对应的load_balanceload_balance *lb = _route_lb_map[key];if (lb->empty() == true) {//存在lb 里面的host为空,说明正在pull()中,还没有从dns_service返回来,所以直接回复不存在assert(lb->status == load_balance::PULLING);rsp.set_retcode(lars::RET_NOEXIST);}else {ret = lb->choice_one_host(rsp);rsp.set_retcode(ret);// =================================================//超时重拉路由//检查是否要重新拉路由信息//若路由并没有处于PULLING状态,且有效期已经超时,则重新拉取if (lb->status == load_balance::NEW && time(NULL) - lb->last_update_time > lb_config.update_timeout) {lb->pull();}// =================================================}}//3. ...// ...pthread_mutex_unlock(&_mutex);return ret;}
