35.1 proto通信协议定义
base/proto/lars.proto
/* Lars系统的消息ID */enum MessageId {ID_UNKNOW = 0; //proto3 enum第一个属性必须是0,用来占位ID_GetRouteRequest = 1; //向DNS请求Route对应的关系的消息IDID_GetRouteResponse = 2; //DNS回复的Route信息的消息IDID_ReportStatusRequest = 3; //上报host调用状态信息请求消息IDID_GetHostRequest = 4; //API 发送请求host信息给 Lb Agent模块 消息IDID_GetHostResponse = 5; //agent 回执给 API host信息的 消息IDID_ReportRequest = 6; //API report get_host的调用结果给agent的 消息ID// =======================================================ID_API_GetRouteRequest = 7; //API 请求agent某个modid/cmdid的全部hosts信息的route 消息IDID_API_GetRouteResponse = 8; //agent 回执给 API的全部hosts的route信息 消息ID// =======================================================}
增加两个message ID, `ID_API_GetRouteRequest`和`ID_API_GetRouteResponse`,主要是针对API层获取route全部的host节点信息通信使用。
35.2 Lars-API:get_route()方法客户端实现
api/cpp/lars_api/lars_api.h
typedef std::pair<std::string, int> ip_port;typedef std::vector<ip_port> route_set;typedef route_set::iterator route_set_it;
api/cpp/lars_api/lars_api.cpp
//lars 系统获取某modid/cmdid全部的hosts(route)信息int lars_client::get_route(int modid, int cmdid, route_set &route){//1. 封装请求消息lars::GetRouteRequest req;req.set_modid(modid);req.set_cmdid(cmdid);//2. sendchar write_buf[4096], read_buf[80*1024];//消息头msg_head head;head.msglen = req.ByteSizeLong();head.msgid = lars::ID_API_GetRouteRequest;memcpy(write_buf, &head, MESSAGE_HEAD_LEN);//消息体req.SerializeToArray(write_buf+MESSAGE_HEAD_LEN, head.msglen);//简单的hash来发给对应的agent udp serverint index = (modid + cmdid) %3;int ret = sendto(_sockfd[index], write_buf, head.msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);if (ret == -1) {perror("sendto");return lars::RET_SYSTEM_ERROR;}//3. recvlars::GetRouteResponse rsp;int message_len = recvfrom(_sockfd[index], read_buf, sizeof(read_buf), 0, NULL, NULL);if (message_len == -1) {perror("recvfrom");return lars::RET_SYSTEM_ERROR;}//消息头memcpy(&head, read_buf, MESSAGE_HEAD_LEN);if (head.msgid != lars::ID_API_GetRouteResponse) {fprintf(stderr, "message ID error!\n");return lars::RET_SYSTEM_ERROR;}//消息体ret = rsp.ParseFromArray(read_buf + MESSAGE_HEAD_LEN, message_len - MESSAGE_HEAD_LEN);if (!ret) {fprintf(stderr, "message format error: head.msglen = %d, message_len = %d, message_len - MESSAGE_HEAD_LEN = %d, head msgid = %d, ID_GetHostResponse = %d\n", head.msglen, message_len, message_len-MESSAGE_HEAD_LEN, head.msgid, lars::ID_GetRouteResponse);return lars::RET_SYSTEM_ERROR;}if (rsp.modid() != modid || rsp.cmdid() != cmdid) {fprintf(stderr, "message format error\n");return lars::RET_SYSTEM_ERROR;}//4 处理消息for (int i = 0; i < rsp.host_size(); i++) {const lars::HostInfo &host = rsp.host(i);struct in_addr inaddr;inaddr.s_addr = host.ip();std::string ip = inet_ntoa(inaddr);int port = host.port();route.push_back(ip_port(ip,port));}return lars::RET_SUCC;}
35.3 Agent UDP Server处理API-get_route请求
lars_loadbalance_agent/src/agent_udp_server.cpp
static void get_route_cb(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data){//解析api发送的请求包lars::GetRouteRequest req;req.ParseFromArray(data, len);int modid = req.modid();int cmdid = req.cmdid();//设置回执消息lars::GetRouteResponse rsp;rsp.set_modid(modid);rsp.set_cmdid(cmdid);route_lb *ptr_route_lb = (route_lb*)user_data;//调用route_lb的获取host方法,得到rsp返回结果ptr_route_lb->get_route(modid, cmdid, rsp);//打包回执给api消息std::string responseString;rsp.SerializeToString(&responseString);net_conn->send_message(responseString.c_str(), responseString.size(), lars::ID_API_GetRouteResponse);}void * agent_server_main(void * args){// ....//给server注册消息分发路由业务,针对ID_API_GetRouteRequest处理server.add_msg_router(lars::ID_API_GetRouteRequest, get_route_cb, r_lb[port-8888]);// ...return NULL;}
针对ID_API_GetRouteRequest添加一个消息处理方法,在回调业务中,通过route_lb的get_route()方法获取信息.我们来实现这个方法。
lars_loadbalance_agent/src/route_lb.cpp
//agent获取某个modid/cmdid的全部主机,将返回的主机结果存放在rsp中int route_lb::get_route(int modid, int cmdid, lars::GetRouteResponse &rsp){int ret = lars::RET_SUCC;//1. 得到keyuint64_t key = ((uint64_t)modid << 32) + cmdid;pthread_mutex_lock(&_mutex);//2. 当前key已经存在_route_lb_map中if (_route_lb_map.find(key) != _route_lb_map.end()) {//2.1 取出对应的load_balanceload_balance *lb = _route_lb_map[key];std::vector<host_info*> vec;lb->get_all_hosts(vec);for (std::vector<host_info*>::iterator it = vec.begin(); it != vec.end(); it++) {lars::HostInfo host;host.set_ip((*it)->ip);host.set_port((*it)->port);rsp.add_host()->CopyFrom(host);}//超时重拉路由//检查是否要重新拉路由信息//若路由并没有处于PULLING状态,且有效期已经超时,则重新拉取if (lb->status == load_balance::NEW && time(NULL) - lb->last_update_time > lb_config.update_timeout) {lb->pull();}}//3. 当前key不存在_route_lb_map中else {//3.1 新建一个load_balanceload_balance *lb = new load_balance(modid, cmdid);if (lb == NULL) {fprintf(stderr, "no more space to create loadbalance\n");exit(1);}//3.2 新建的load_balance加入到map中_route_lb_map[key] = lb;//3.3 从dns service服务拉取具体的host信息lb->pull();ret = lars::RET_NOEXIST;}pthread_mutex_unlock(&_mutex);return ret;}
其中,load_balance的get_all_hosts()方法实现如下:
lars_loadbalance_agent/src/load_balance.cpp
//获取当前挂载下的全部host信息 添加到vec中void load_balance::get_all_hosts(std::vector<host_info*> &vec){for (host_map_it it = _host_map.begin(); it != _host_map.end(); it++) {host_info *hi = it->second;vec.push_back(hi);}}
接下来,我们可以简单测试一些获取route信息的api
api/cpp/example/example.cpp
#include "lars_api.h"#include <iostream>void usage(){printf("usage: ./example [modid] [cmdid]\n");}int main(int argc, char **argv){if (argc != 3) {usage();return 1;}int modid = atoi(argv[1]);int cmdid = atoi(argv[2]);lars_client api;std::string ip;int port;route_set route;int ret = api.get_route(modid, cmdid, route);if (ret == 0) {std::cout << "get route succ!" << std::endl;for (route_set_it it = route.begin(); it != route.end(); it++) {std::cout << "ip = " << (*it).first << ", port = " << (*it).second << std::endl;}}ret = api.get_host(modid, cmdid, ip, port);if (ret == 0) {std::cout << "host is " << ip << ":" << port << std::endl;//上报调用结果api.report(modid, cmdid, ip, port, 0);}return 0;}
