1. 当agent首次连接到dns service时,将全部的load_balance均设置为NEW状态。如果dns service重新启动,或者断开链接重连,都会将之前的拉去中或者没拉取的load_balance状态都设置为NEW状态,只有NEW状态的load_balance才能定期自动拉取.

    lars_loadbalance_agent/src/dns_client.cpp

    1. //========================================================
    2. static void conn_init(net_connection *conn, void *args)
    3. {
    4. for (int i = 0; i < 3; i++) {
    5. r_lb[i]->reset_lb_status();
    6. }
    7. }
    8. //========================================================
    9. void *dns_client_thread(void* args)
    10. {
    11. printf("dns client thread start\n");
    12. event_loop loop;
    13. //1 加载配置文件得到dns service ip + port
    14. std::string ip = config_file::instance()->GetString("dnsserver", "ip", "");
    15. short port = config_file::instance()->GetNumber("dnsserver", "port", 0);
    16. //2 创建客户端
    17. tcp_client client(&loop, ip.c_str(), port, "dns client");
    18. //3 将thread_queue消息回调事件,绑定到loop中
    19. dns_queue->set_loop(&loop);
    20. dns_queue->set_callback(new_dns_request, &client);
    21. //4 设置当收到dns service回执的消息ID_GetRouteResponse处理函数
    22. client.add_msg_router(lars::ID_GetRouteResponse, deal_recv_route);
    23. //========================================================
    24. //5.设置链接成功/链接断开重连成功之后,通过conn_init来清理之前的任务
    25. client.set_conn_start(conn_init);
    26. //========================================================
    27. //启动事件监听
    28. loop.event_process();
    29. return NULL;
    30. }

    lars_loadbalance_agent/src/route_lb.cpp

    1. //将全部的load_balance都重置为NEW状态
    2. void route_lb::reset_lb_status()
    3. {
    4. pthread_mutex_lock(&_mutex);
    5. for (route_map_it it = _route_lb_map.begin();
    6. it != _route_lb_map.end(); it++) {
    7. load_balance *lb = it->second;
    8. if (lb->status == load_balance::PULLING) {
    9. lb->status = load_balance::NEW;
    10. }
    11. }
    12. pthread_mutex_unlock(&_mutex);
    13. }
    1. 增加配置文件参数update_timeout, 表示一个NEW状态的load_balance下的modid/cmdid节点应该经历多长时间进行一次刷新拉取。

    lars_loadbalance_agent/conf/lars_lb_agent.conf

    1. ;对于每个NEW状态的modid/cmdid,多久更新一下本地路由,秒
    2. update_timeout=15

    lars_loadbalance_agent/include/main_server.h

    1. struct load_balance_config
    2. {
    3. //...
    4. //对于每个NEW状态的modid/cmdid,多久更新一下本地路由,秒
    5. long update_timeout;
    6. };

    lars_loadbalance_agent/src/main_server.cpp

    1. static void init_lb_agent()
    2. {
    3. //1. 加载配置文件
    4. config_file::setPath("./conf/lars_lb_agent.conf");
    5. //...
    6. //...
    7. lb_config.update_timeout = config_file::instance()->GetNumber("loadbalance", "update_timeout", 15);
    8. //...
    9. }
    1. load_balance设置最后update时间参数,及最后一次从dns service拉取下来更新host_map的时间.然后在route_lb每次执行get_host的时候,对每个已经存在的host节点做最后时间超时检查,如果超时,则重新从 dns service中拉取。

    lars_loadbalance_agent/include/load_balance.h

    1. /*
    2. * 负载均衡算法核心模块
    3. * 针对一组(modid/cmdid)下的全部host节点的负载规则
    4. */
    5. class load_balance {
    6. public:
    7. load_balance(int modid, int cmdid):
    8. status(PULLING),
    9. last_update_time(0),
    10. _modid(modid),
    11. _cmdid(cmdid)
    12. {
    13. //load_balance 初始化构造
    14. }
    15. // ...
    16. long last_update_time; //最后更新host_map时间戳
    17. private:
    18. // ...
    19. };

    lars_loadbalance_agent/src/load_balance.cpp

    1. //根据dns service远程返回的结果,更新_host_map
    2. void load_balance::update(lars::GetRouteResponse &rsp)
    3. {
    4. long current_time = time(NULL);
    5. //...
    6. //...
    7. //更新最后update时间
    8. last_update_time = current_time;
    9. //重置状态为NEW
    10. status = NEW;
    11. }
    1. `load_balance`每次调用`update()`都记录一次最后的更新时间,并标记为`NEW`表示当前`modid/cmdid`没有在PULLING,可以再更新。

    lars_loadbalance_agent/src/route_lb.cpp

    1. //agent获取一个host主机,将返回的主机结果存放在rsp中
    2. int route_lb::get_host(int modid, int cmdid, lars::GetHostResponse &rsp)
    3. {
    4. int ret = lars::RET_SUCC;
    5. //1. 得到key
    6. uint64_t key = ((uint64_t)modid << 32) + cmdid;
    7. pthread_mutex_lock(&_mutex);
    8. //2. 当前key已经存在_route_lb_map中
    9. if (_route_lb_map.find(key) != _route_lb_map.end()) {
    10. //2.1 取出对应的load_balance
    11. load_balance *lb = _route_lb_map[key];
    12. if (lb->empty() == true) {
    13. //存在lb 里面的host为空,说明正在pull()中,还没有从dns_service返回来,所以直接回复不存在
    14. assert(lb->status == load_balance::PULLING);
    15. rsp.set_retcode(lars::RET_NOEXIST);
    16. }
    17. else {
    18. ret = lb->choice_one_host(rsp);
    19. rsp.set_retcode(ret);
    20. // =================================================
    21. //超时重拉路由
    22. //检查是否要重新拉路由信息
    23. //若路由并没有处于PULLING状态,且有效期已经超时,则重新拉取
    24. if (lb->status == load_balance::NEW && time(NULL) - lb->last_update_time > lb_config.update_timeout) {
    25. lb->pull();
    26. }
    27. // =================================================
    28. }
    29. }
    30. //3. ...
    31. // ...
    32. pthread_mutex_unlock(&_mutex);
    33. return ret;
    34. }