接下来,我们来就来实现Lars系统第一个暴露给业务开发者的API,get_host获取主机信息。
31.1 Lars-API:GetHost()方法客户端实现
我们首先先提供一个`C++`语言的API接口层,以后根据不同的业务的实现语言,可以多实现一些其他语言的API接口层。
在/Lars下创建api/文件夹.
/Lars/api/└── cpp/├── example/│ ├── example.cpp│ └── Makefile├── lars_api/│ ├── lars_api.cpp│ ├── lars_api.h│ └── Makefile└── lib/
lars_api/lars_api.h
#pragma once#include "lars_reactor.h"#include <string>class lars_client{public:lars_client();~lars_client();//lars 系统获取host信息 得到可用host的ip和portint get_host(int modid, int cmdid, std::string& ip, int &port);private:int _sockfd[3]; //3个udp socket fd 对应agent 3个udp serveruint32_t _seqid; //消息的序列号};
lars_api/lars_api.cpp
#include "lars_api.h"#include "lars.pb.h"#include <string.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>lars_client::lars_client():_seqid(0){printf("lars_client()\n");//1 初始化服务器地址struct sockaddr_in servaddr;bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;//默认的ip地址是本地,因为是API和agent应该部署于同一台主机上inet_aton("127.0.0.1", &servaddr.sin_addr);//2. 创建3个UDPsocketfor (int i = 0; i < 3; i ++) {_sockfd[i] = socket(AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, IPPROTO_UDP);if (_sockfd[i] == -1) {perror("socket()");exit(1);}//agent的3个udp端口默认为8888,8889, 8890servaddr.sin_port = htons(8888 + i);int ret = connect(_sockfd[i], (const struct sockaddr *)&servaddr, sizeof(servaddr));if (ret == -1) {perror("connect()");exit(1);}printf("connection agent udp server succ!\n");}}lars_client::~lars_client(){printf("~lars_client()\n");for (int i = 0; i < 3; ++i) {close(_sockfd[i]);}}//lars 系统获取host信息 得到可用host的ip和portint lars_client::get_host(int modid, int cmdid, std::string &ip, int &port){//从本地agent service获取 host信息uint32_t seq = _seqid++;//1. 封装请求信息lars::GetHostRequest req;req.set_seq(seq);//序列编号req.set_modid(modid);req.set_cmdid(cmdid);//2. sendchar write_buf[4096], read_buf[80*1024];//消息头msg_head head;head.msglen = req.ByteSizeLong();head.msgid = lars::ID_GetHostRequest;memcpy(write_buf, &head, MESSAGE_HEAD_LEN);//消息体req.SerializeToArray(write_buf+MESSAGE_HEAD_LEN, head.msglen);//简单的hash来发给对应的agent udp serverint index = (modid + cmdid) %3;int ret = sendto(_sockfd[index], write_buf, head.msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);if (ret == -1) {perror("sendto");return lars::RET_SYSTEM_ERROR;}//3. recvint message_len;lars::GetHostResponse rsp;do {message_len = recvfrom(_sockfd[index], read_buf, sizeof(read_buf), 0, NULL, NULL);if (message_len == -1) {perror("recvfrom");return lars::RET_SYSTEM_ERROR;}//消息头memcpy(&head, read_buf, MESSAGE_HEAD_LEN);if (head.msgid != lars::ID_GetHostResponse) {fprintf(stderr, "message ID error!\n");return lars::RET_SYSTEM_ERROR;}//消息体ret = rsp.ParseFromArray(read_buf + MESSAGE_HEAD_LEN, message_len - MESSAGE_HEAD_LEN);if (!ret) {fprintf(stderr, "message format error: head.msglen = %d, message_len = %d, message_len - MESSAGE_HEAD_LEN = %d, head msgid = %d, ID_GetHostResponse = %d\n", head.msglen, message_len, message_len-MESSAGE_HEAD_LEN, head.msgid, lars::ID_GetHostResponse);return lars::RET_SYSTEM_ERROR;}} while (rsp.seq() < seq);if (rsp.seq() != seq || rsp.modid() != modid || rsp.cmdid() != cmdid) {fprintf(stderr, "message format error\n");return lars::RET_SYSTEM_ERROR;}//4 处理消息if (rsp.retcode() == 0) {lars::HostInfo host = rsp.host();struct in_addr inaddr;inaddr.s_addr = host.ip();ip = inet_ntoa(inaddr);port = host.port();}return rsp.retcode();//lars::RET_SUCC}
模拟Lars的支持的传输协议,发送udp请求,其中message的ID是`ID_GetHostRequest`,返回的消息结构类型是`GetHostResponse`。每个消息有一个seq序列号,防止udp的丢包和消息不对称情况。
lars_api/Makefile
TARGET= ../lib/liblarsclient.aCXX=g++CFLAGS=-g -O2 -Wall -Wno-deprecatedBASE=../../../baseBASE_H=$(BASE)/includePROTO = $(BASE)/protoPROTO_H = $(BASE)/protoLARS_REACTOR=../../../lars_reactorLARS_REACTOR_H =$(LARS_REACTOR)/includeLARS_REACTOR_LIB=$(LARS_REACTOR)/lib -llreactorOTHER_LIB = -lpthread -ldl -lprotobufSRC= ./INC= -I./include -I$(BASE_H) -I$(LARS_REACTOR_H) -I$(PROTO_H)LIB= -L$(LARS_REACTOR_LIB) $(OTHER_LIB)OBJS = $(addsuffix .o, $(basename $(wildcard $(SRC)/*.cpp)))OBJS += $(PROTO)/lars.pb.o$(TARGET): $(OBJS)mkdir -p ../libar cqs $@ $^%.o: %.cpp$(CXX) $(CFLAGS) -c -o $@ $< $(INC).PHONY: cleanclean:-rm -f ./*.o $(PROTO)/lars.pb.o $(TARGET)
最终生成一个liblarsclient.a静态库,供开发者使用。
接下来我们来实现一个模拟的业务端,example/
example/example.cpp
#include "lars_api.h"#include <iostream>void usage(){printf("usage: ./example [modid] [cmdid]\n");}int main(int argc, char **argv){if (argc != 3) {usage();return 1;}int modid = atoi(argv[1]);int cmdid = atoi(argv[2]);lars_client api;std::string ip;int port;int ret = api.get_host(modid, cmdid, ip, port);if (ret == 0) {std::cout << "host is " << ip << ":" << port << std::endl;//上报调用结果}return 0;}
example/Makefile
CXX = g++CFLAGS = -g -O2 -Wall -Wno-deprecatedLARS_REACTOR=../../../lars_reactorLARS_REACTOR_H =$(LARS_REACTOR)/includeLARS_REACTOR_LIB=$(LARS_REACTOR)/lib -llreactorall:$(CXX) $(CFLAGS) -o example example.cpp -I$(LARS_REACTOR_H) -I../lars_api ../lib/liblarsclient.a -lprotobuf.PHONY: cleanclean:-rm ./example
31.2 Agent UDP Server处理API-GetHost请求
API:get_host()—>Agent UDP Server—>route_lb—>load_balance
如上所示,get_host的请求消息会依次经过agent udp server处理ID_GetHostRequest,然后交给某个route_lb
那么,API一旦发送get_host, Agent UDP Server需要添加一个对应的处理该消息的路由处理业务。
lars_loadbalance_agent/src/agent_udp_server.cpp
void * agent_server_main(void * args){long index = (long)args;short port = index + 8888;event_loop loop;udp_server server(&loop, "0.0.0.0", port);//给server注册消息分发路由业务, 每个udp拥有一个对应的route_lbserver.add_msg_router(lars::ID_GetHostRequest, get_host_cb, r_lb[port-8888]);printf("agent UDP server :port %d is started...\n", port);loop.event_process();return NULL;}
其中get_host_cb()实现如下:
lars_loadbalance_agent/src/agent_udp_server.cpp
static void get_host_cb(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data){printf("get_host_cb is called ....\n");//解析api发送的请求包lars::GetHostRequest req;req.ParseFromArray(data, len);int modid = req.modid();int cmdid = req.cmdid();//设置回执消息lars::GetHostResponse rsp;rsp.set_seq(req.seq());rsp.set_modid(modid);rsp.set_cmdid(cmdid);route_lb *ptr_route_lb = (route_lb*)user_data;//调用route_lb的获取host方法,得到rsp返回结果ptr_route_lb->get_host(modid, cmdid, rsp);//打包回执给api消息std::string responseString;rsp.SerializeToString(&responseString);net_conn->send_message(responseString.c_str(), responseString.size(), lars::ID_GetHostResponse);printf("rspstring size = %d\n", responseString.size());}
这里面实际上的最终业务,交给了route_lb的`get_host()`方法,而且其中`lars::GetHostResponse rsp;`做为函数的返回参数类型。`r_lb`是全局定义的3个route_lb对象.定义实现如下.
lars_loadbalance_agent/src/main_server.cpp
//每个Agent UDP server的 负载均衡器路由 route_lbroute_lb * r_lb[3];static void create_route_lb(){for (int i = 0; i < 3; i++) {int id = i + 1; //route_lb的id从1 开始计数r_lb[i] = new route_lb(id);if (r_lb[i] == NULL) {fprintf(stderr, "no more space to new route_lb\n");exit(1);}}}static void init_lb_agent(){//1. 加载配置文件config_file::setPath("./conf/lars_lb_agent.conf");lb_config.probe_num = config_file::instance()->GetNumber("loadbalance", "probe_num", 10);lb_config.init_succ_cnt = config_file::instance()->GetNumber("loadbalance", "init_succ_cnt", 180);//2. 初始化3个route_lb模块create_route_lb();}int main(int argc, char **argv){//1 初始化环境init_lb_agent();//1.5 初始化LoadBalance的负载均衡器//2 启动udp server服务,用来接收业务层(调用者/使用者)的消息start_UDP_servers();// ...return 0;}
接下来我们在看看`route_lb->get_host(modid, cmdid, rsp);`的实现。
lars_loadbalance_agent/src/route_lb.cpp
#include "route_lb.h"#include "lars.pb.h"//构造初始化route_lb::route_lb(int id):_id(id){pthread_mutex_init(&_mutex, NULL);}//agent获取一个host主机,将返回的主机结果存放在rsp中int route_lb::get_host(int modid, int cmdid, lars::GetHostResponse &rsp){int ret = lars::RET_SUCC;//1. 得到keyuint64_t key = ((uint64_t)modid << 32) + cmdid;pthread_mutex_lock(&_mutex);//2. 当前key已经存在_route_lb_map中if (_route_lb_map.find(key) != _route_lb_map.end()) {//2.1 取出对应的load_balanceload_balance *lb = _route_lb_map[key];if (lb->empty() == true) {//存在lb 里面的host为空,说明正在pull()中,还没有从dns_service返回来,所以直接回复不存在assert(lb->status == load_balance::PULLING);rsp.set_retcode(lars::RET_NOEXIST);}else {ret = lb->choice_one_host(rsp);rsp.set_retcode(ret);//TODO 超时重拉路由}}//3. 当前key不存在_route_lb_map中else {//3.1 新建一个load_balanceload_balance *lb = new load_balance(modid, cmdid);if (lb == NULL) {fprintf(stderr, "no more space to create loadbalance\n");exit(1);}//3.2 新建的load_balance加入到map中_route_lb_map[key] = lb;//3.3 从dns service服务拉取具体的host信息lb->pull();//3.4 设置rsp的回执retcodersp.set_retcode(lars::RET_NOEXIST);ret = lars::RET_NOEXIST;}pthread_mutex_unlock(&_mutex);return ret;}
get_host在获取host的时候是一个动态的获取模式,如果根据当前的modid/cmdid请求的load_balance模块来获取,如果load_balance存在,则直接调用load_balance的choice_one_host()方法获取。 如果load_balance不存在,需要新建load_balance,并且当前的load_balance所携带的host信息,需要从远程dns service下载拉取下来,调用pull()方法来实现。
本端选择host信息(load_balance存在情况)
lars_loadbalance_agent/src/load_balance.cpp
//从一个host_list中得到一个节点放到GetHostResponse 的HostInfo中static void get_host_from_list(lars::GetHostResponse &rsp, host_list &l){//选择list中第一个节点host_info *host = l.front();//HostInfo自定义类型,proto3并没提供set方法,而是通过mutable_接口返回HostInfo的指针,可根据此指针进行赋值操作lars::HostInfo* hip = rsp.mutable_host();hip->set_ip(host->ip);hip->set_port(host->port);//将上面处理过的第一个节点放在队列的末尾l.pop_front();l.push_back(host);}//从两个队列中获取一个host给到上层int load_balance::choice_one_host(lars::GetHostResponse &rsp){//1 判断_dile_list队列是否已经空,如果空表示没有空闲节点if (_idle_list.empty() == true) {// 1.1 判断是否已经超过了probe_numif (_access_cnt >= lb_config.probe_num) {_access_cnt = 0;//从 overload_list中选择一个已经过载的节点get_host_from_list(rsp, _overload_list);}else {//明确返回给API层,已经过载了++_access_cnt;return lars::RET_OVERLOAD;}}else {// 2 判断过载列表是否为空if (_overload_list.empty() == true) {//2.1 当前modid/cmdid所有节点均为正常//选择一个idle节点get_host_from_list(rsp, _idle_list);}else {//2.2 有部分节点过载//判断访问次数是否超过probe_num阈值,超过则从overload_list取出一个if (_access_cnt >= lb_config.probe_num) {_access_cnt = 0;get_host_from_list(rsp, _overload_list);}else {//正常从idle_list中选出一个节点++_access_cnt;get_host_from_list(rsp, _idle_list);}//选择一个idle节点get_host_from_list(rsp, _idle_list);}}return lars::RET_SUCC;}
从idle_list和over_list中的去取出适当的host阶段返回给上层。
远程拉取host信息(load_balance不存在情况)
load_balance首先向dns_client的thread_queue发送GetRouteRequest请求。load_balance更新为PULLING状态。
pull发送请求过程
load_balance->pull() ----> dns client ----> dns server
lars_loadbalance_agent/src/load_balance.cpp
//如果list中没有host信息,需要从远程的DNS Service发送GetRouteHost请求申请int load_balance::pull(){//请求dns service请求lars::GetRouteRequest route_req;route_req.set_modid(_modid);route_req.set_cmdid(_cmdid);//通过dns client的thread queue发送请求dns_queue->send(route_req);//由于远程会有一定延迟,所以应该给当前的load_balance模块标记一个正在拉取的状态status = PULLING;return 0;}
lars_loadbalance_agent/src/dns_client.cpp
#include "lars_reactor.h"#include "main_server.h"#include <pthread.h>//typedef void io_callback(event_loop *loop, int fd, void *args);//只要thread_queue有数据,loop就会触发此回调函数来处理业务void new_dns_request(event_loop *loop, int fd, void *args){tcp_client *client = (tcp_client*)args;//1. 将请求数据从thread_queue中取出,std::queue<lars::GetRouteRequest> msgs;//2. 将数据放在queue队列中dns_queue->recv(msgs);//3. 遍历队列,通过client依次将每个msg发送给reporter servicewhile (!msgs.empty()) {lars::GetRouteRequest req = msgs.front();msgs.pop();std::string requestString;req.SerializeToString(&requestString);//client 发送数据client->send_message(requestString.c_str(), requestString.size(), lars::ID_GetRouteRequest);}}//...void *dns_client_thread(void* args){printf("dns client thread start\n");event_loop loop;//1 加载配置文件得到dns service ip + portstd::string ip = config_file::instance()->GetString("dnsserver", "ip", "");short port = config_file::instance()->GetNumber("dnsserver", "port", 0);//2 创建客户端tcp_client client(&loop, ip.c_str(), port, "dns client");//3 将thread_queue消息回调事件,绑定到loop中dns_queue->set_loop(&loop);dns_queue->set_callback(new_dns_request, &client);//4 设置当收到dns service回执的消息ID_GetRouteResponse处理函数client.add_msg_router(lars::ID_GetRouteResponse, deal_recv_route);//启动事件监听loop.event_process();return NULL;}void start_dns_client(){//开辟一个线程pthread_t tid;//启动线程业务函数int ret = pthread_create(&tid, NULL, dns_client_thread, NULL);if (ret == -1) {perror("pthread_create");exit(1);}//设置分离模式pthread_detach(tid);}
接收远程host信息回执过程
dns service ----> dns client ----> route_lb::update_host() ----> load_balance::update()
lars_loadbalance_agent/src/dns_client.cpp
/** 处理远程dns service回复的modid/cmdid对应的路由信息* */void deal_recv_route(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data){lars::GetRouteResponse rsp;//解析远程消息到proto结构体中rsp.ParseFromArray(data, len);int modid = rsp.modid();int cmdid = rsp.cmdid();int index = (modid+cmdid)%3;// 将该modid/cmdid交给一个route_lb处理 将rsp中的hostinfo集合加入到对应的route_lb中r_lb[index]->update_host(modid, cmdid, rsp);}
lars_loadbalance_agent/src/route_lb.cpp
//根据Dns Service返回的结果更新自己的route_lb_mapint route_lb::update_host(int modid, int cmdid, lars::GetRouteResponse &rsp){//1. 得到keyuint64_t key = ((uint64_t)modid << 32) + cmdid;pthread_mutex_lock(&_mutex);//2. 在_route_map中找到对应的keyif (_route_lb_map.find(key) != _route_lb_map.end()) {load_balance *lb = _route_lb_map[key];if (rsp.host_size() == 0) {//2.1 如果返回的结果 lb下已经没有任何host信息,则删除该keydelete lb;_route_lb_map.erase(key);}else {//2.2 更新新host信息lb->update(rsp);}}pthread_mutex_unlock(&_mutex);return 0;}
lars_loadbalance_agent/src/load_balance.cpp
//根据dns service远程返回的结果,更新_host_mapvoid load_balance::update(lars::GetRouteResponse &rsp){//确保dns service返回的结果有host信息assert(rsp.host_size() != 0);std::set<uint64_t> remote_hosts;std::set<uint64_t> need_delete;//1. 插入新增的host信息 到_host_map中for (int i = 0; i < rsp.host_size(); i++) {//1.1 得到rsp中的一个hostconst lars::HostInfo & h = rsp.host(i);//1.2 得到ip+port的key值uint64_t key = ((uint64_t)h.ip() << 32) + h.port();remote_hosts.insert(key);//1.3 如果自身的_host_map找不到当下的key,说明是新增if (_host_map.find(key) == _host_map.end()) {//新增host_info *hi = new host_info(h.ip(), h.port(), lb_config.init_succ_cnt);if (hi == NULL) {fprintf(stderr, "new host_info error!\n");exit(1);}_host_map[key] = hi;//新增的host信息加入到 空闲列表中_idle_list.push_back(hi);}}//2. 删除减少的host信息 从_host_map中//2.1 得到哪些节点需要删除for (host_map_it it = _host_map.begin(); it != _host_map.end(); it++) {if (remote_hosts.find(it->first) == remote_hosts.end()) {//该key在host_map中存在,而在远端返回的结果集不存在,需要锁定被删除need_delete.insert(it->first);}}//2.2 删除for (std::set<uint64_t>::iterator it = need_delete.begin();it != need_delete.end(); it++) {uint64_t key = *it;host_info *hi = _host_map[key];if (hi->overload == true) {//从过载列表中删除_overload_list.remove(hi);}else {//从空闲列表删除_idle_list.remove(hi);}delete hi;}}
load balance agent V0.3—API get_host请求功能单元测试
为了方便我们编译Lars的全部模块,我们在/Lars/下提供一个Makefile来编译子目录模块
/Lars/Makefile
SUBDIRS = lars_reactor lars_dns lars_reporter lars_loadbalance_agent.PHONY: allall:@list='$(SUBDIRS)'; for subdir in $$list; do \echo "Clean in $$subdir";\$(MAKE) -C $$subdir;\done.PHONY: cleanclean:@echo Making clean@list='$(SUBDIRS)'; for subdir in $$list; do \echo "Clean in $$subdir";\$(MAKE) -C $$subdir clean;\done
现在我们编译,然后分别启动lars_dns,lars_reporter,lars_loadbalance_agent
lars_dns
~/Lars/lars_dns$ ./bin/lars_dnsmsg_router init...create 0 threadcreate 1 threadcreate 2 threadcreate 3 threadcreate 4 threadadd msg cb msgid = 1lars dns service ....now route version is 1573034612modID = 1, cmdID = 1, ip = 3232235953, port = 7777modID = 1, cmdID = 2, ip = 3232235954, port = 7776modID = 2, cmdID = 1, ip = 3232235955, port = 7778modID = 2, cmdID = 2, ip = 3232235956, port = 7779load data to tmep succ! size is 4load data to tmep succ! size is 4...
lars_reporter
~/Lars/lars_reporter$ ./bin/lars_reportermsg_router init...create 0 threadcreate 1 threadcreate 2 threadcreate 3 threadcreate 4 threadadd msg cb msgid = 3
lars_lb_agent
~/Lars/lars_loadbalance_agent$ ./bin/lars_lb_agentmsg_router init...UDP server on 0.0.0.0:8888 is running...add msg cb msgid = 4agent UDP server :port 8888 is started...msg_router init...msg_router init...UDP server on 0.0.0.0:8890 is running...add msg cb msgid = 4agent UDP server :port 8890 is started...[report] client thread start...done!msg_router init...UDP server on 0.0.0.0:8889 is running...add msg cb msgid = 4agent UDP server :port 8889 is started...dns client thread startdo_connect EINPROGRESSconnect 127.0.0.1:7779 succ!msg_router init...do_connect EINPROGRESSadd msg cb msgid = 2connect 127.0.0.1:7778 succ!
启动API example/的单元测试程序
API:get_host example
~/Lars/api/cpp/example$ ./example 1 1lars_client()connection agent udp server succ!connection agent udp server succ!connection agent udp server succ!host is 177.1.168.192:7777~lars_client()
