35.1 proto通信协议定义

base/proto/lars.proto

  1. /* Lars系统的消息ID */
  2. enum MessageId {
  3. ID_UNKNOW = 0; //proto3 enum第一个属性必须是0,用来占位
  4. ID_GetRouteRequest = 1; //向DNS请求Route对应的关系的消息ID
  5. ID_GetRouteResponse = 2; //DNS回复的Route信息的消息ID
  6. ID_ReportStatusRequest = 3; //上报host调用状态信息请求消息ID
  7. ID_GetHostRequest = 4; //API 发送请求host信息给 Lb Agent模块 消息ID
  8. ID_GetHostResponse = 5; //agent 回执给 API host信息的 消息ID
  9. ID_ReportRequest = 6; //API report get_host的调用结果给agent的 消息ID
  10. // =======================================================
  11. ID_API_GetRouteRequest = 7; //API 请求agent某个modid/cmdid的全部hosts信息的route 消息ID
  12. ID_API_GetRouteResponse = 8; //agent 回执给 API的全部hosts的route信息 消息ID
  13. // =======================================================
  14. }
  1. 增加两个message ID `ID_API_GetRouteRequest``ID_API_GetRouteResponse`,主要是针对API层获取route全部的host节点信息通信使用。

35.2 Lars-API:get_route()方法客户端实现

api/cpp/lars_api/lars_api.h

  1. typedef std::pair<std::string, int> ip_port;
  2. typedef std::vector<ip_port> route_set;
  3. typedef route_set::iterator route_set_it;

api/cpp/lars_api/lars_api.cpp

  1. //lars 系统获取某modid/cmdid全部的hosts(route)信息
  2. int lars_client::get_route(int modid, int cmdid, route_set &route)
  3. {
  4. //1. 封装请求消息
  5. lars::GetRouteRequest req;
  6. req.set_modid(modid);
  7. req.set_cmdid(cmdid);
  8. //2. send
  9. char write_buf[4096], read_buf[80*1024];
  10. //消息头
  11. msg_head head;
  12. head.msglen = req.ByteSizeLong();
  13. head.msgid = lars::ID_API_GetRouteRequest;
  14. memcpy(write_buf, &head, MESSAGE_HEAD_LEN);
  15. //消息体
  16. req.SerializeToArray(write_buf+MESSAGE_HEAD_LEN, head.msglen);
  17. //简单的hash来发给对应的agent udp server
  18. int index = (modid + cmdid) %3;
  19. int ret = sendto(_sockfd[index], write_buf, head.msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);
  20. if (ret == -1) {
  21. perror("sendto");
  22. return lars::RET_SYSTEM_ERROR;
  23. }
  24. //3. recv
  25. lars::GetRouteResponse rsp;
  26. int message_len = recvfrom(_sockfd[index], read_buf, sizeof(read_buf), 0, NULL, NULL);
  27. if (message_len == -1) {
  28. perror("recvfrom");
  29. return lars::RET_SYSTEM_ERROR;
  30. }
  31. //消息头
  32. memcpy(&head, read_buf, MESSAGE_HEAD_LEN);
  33. if (head.msgid != lars::ID_API_GetRouteResponse) {
  34. fprintf(stderr, "message ID error!\n");
  35. return lars::RET_SYSTEM_ERROR;
  36. }
  37. //消息体
  38. ret = rsp.ParseFromArray(read_buf + MESSAGE_HEAD_LEN, message_len - MESSAGE_HEAD_LEN);
  39. if (!ret) {
  40. fprintf(stderr, "message format error: head.msglen = %d, message_len = %d, message_len - MESSAGE_HEAD_LEN = %d, head msgid = %d, ID_GetHostResponse = %d\n", head.msglen, message_len, message_len-MESSAGE_HEAD_LEN, head.msgid, lars::ID_GetRouteResponse);
  41. return lars::RET_SYSTEM_ERROR;
  42. }
  43. if (rsp.modid() != modid || rsp.cmdid() != cmdid) {
  44. fprintf(stderr, "message format error\n");
  45. return lars::RET_SYSTEM_ERROR;
  46. }
  47. //4 处理消息
  48. for (int i = 0; i < rsp.host_size(); i++) {
  49. const lars::HostInfo &host = rsp.host(i);
  50. struct in_addr inaddr;
  51. inaddr.s_addr = host.ip();
  52. std::string ip = inet_ntoa(inaddr);
  53. int port = host.port();
  54. route.push_back(ip_port(ip,port));
  55. }
  56. return lars::RET_SUCC;
  57. }

35.3 Agent UDP Server处理API-get_route请求

lars_loadbalance_agent/src/agent_udp_server.cpp

  1. static void get_route_cb(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data)
  2. {
  3. //解析api发送的请求包
  4. lars::GetRouteRequest req;
  5. req.ParseFromArray(data, len);
  6. int modid = req.modid();
  7. int cmdid = req.cmdid();
  8. //设置回执消息
  9. lars::GetRouteResponse rsp;
  10. rsp.set_modid(modid);
  11. rsp.set_cmdid(cmdid);
  12. route_lb *ptr_route_lb = (route_lb*)user_data;
  13. //调用route_lb的获取host方法,得到rsp返回结果
  14. ptr_route_lb->get_route(modid, cmdid, rsp);
  15. //打包回执给api消息
  16. std::string responseString;
  17. rsp.SerializeToString(&responseString);
  18. net_conn->send_message(responseString.c_str(), responseString.size(), lars::ID_API_GetRouteResponse);
  19. }
  20. void * agent_server_main(void * args)
  21. {
  22. // ....
  23. //给server注册消息分发路由业务,针对ID_API_GetRouteRequest处理
  24. server.add_msg_router(lars::ID_API_GetRouteRequest, get_route_cb, r_lb[port-8888]);
  25. // ...
  26. return NULL;
  27. }

针对ID_API_GetRouteRequest添加一个消息处理方法,在回调业务中,通过route_lbget_route()方法获取信息.我们来实现这个方法。

lars_loadbalance_agent/src/route_lb.cpp

  1. //agent获取某个modid/cmdid的全部主机,将返回的主机结果存放在rsp中
  2. int route_lb::get_route(int modid, int cmdid, lars::GetRouteResponse &rsp)
  3. {
  4. int ret = lars::RET_SUCC;
  5. //1. 得到key
  6. uint64_t key = ((uint64_t)modid << 32) + cmdid;
  7. pthread_mutex_lock(&_mutex);
  8. //2. 当前key已经存在_route_lb_map中
  9. if (_route_lb_map.find(key) != _route_lb_map.end()) {
  10. //2.1 取出对应的load_balance
  11. load_balance *lb = _route_lb_map[key];
  12. std::vector<host_info*> vec;
  13. lb->get_all_hosts(vec);
  14. for (std::vector<host_info*>::iterator it = vec.begin(); it != vec.end(); it++) {
  15. lars::HostInfo host;
  16. host.set_ip((*it)->ip);
  17. host.set_port((*it)->port);
  18. rsp.add_host()->CopyFrom(host);
  19. }
  20. //超时重拉路由
  21. //检查是否要重新拉路由信息
  22. //若路由并没有处于PULLING状态,且有效期已经超时,则重新拉取
  23. if (lb->status == load_balance::NEW && time(NULL) - lb->last_update_time > lb_config.update_timeout) {
  24. lb->pull();
  25. }
  26. }
  27. //3. 当前key不存在_route_lb_map中
  28. else {
  29. //3.1 新建一个load_balance
  30. load_balance *lb = new load_balance(modid, cmdid);
  31. if (lb == NULL) {
  32. fprintf(stderr, "no more space to create loadbalance\n");
  33. exit(1);
  34. }
  35. //3.2 新建的load_balance加入到map中
  36. _route_lb_map[key] = lb;
  37. //3.3 从dns service服务拉取具体的host信息
  38. lb->pull();
  39. ret = lars::RET_NOEXIST;
  40. }
  41. pthread_mutex_unlock(&_mutex);
  42. return ret;
  43. }

其中,load_balanceget_all_hosts()方法实现如下:

lars_loadbalance_agent/src/load_balance.cpp

  1. //获取当前挂载下的全部host信息 添加到vec中
  2. void load_balance::get_all_hosts(std::vector<host_info*> &vec)
  3. {
  4. for (host_map_it it = _host_map.begin(); it != _host_map.end(); it++) {
  5. host_info *hi = it->second;
  6. vec.push_back(hi);
  7. }
  8. }

接下来,我们可以简单测试一些获取route信息的api

api/cpp/example/example.cpp

  1. #include "lars_api.h"
  2. #include <iostream>
  3. void usage()
  4. {
  5. printf("usage: ./example [modid] [cmdid]\n");
  6. }
  7. int main(int argc, char **argv)
  8. {
  9. if (argc != 3) {
  10. usage();
  11. return 1;
  12. }
  13. int modid = atoi(argv[1]);
  14. int cmdid = atoi(argv[2]);
  15. lars_client api;
  16. std::string ip;
  17. int port;
  18. route_set route;
  19. int ret = api.get_route(modid, cmdid, route);
  20. if (ret == 0) {
  21. std::cout << "get route succ!" << std::endl;
  22. for (route_set_it it = route.begin(); it != route.end(); it++) {
  23. std::cout << "ip = " << (*it).first << ", port = " << (*it).second << std::endl;
  24. }
  25. }
  26. ret = api.get_host(modid, cmdid, ip, port);
  27. if (ret == 0) {
  28. std::cout << "host is " << ip << ":" << port << std::endl;
  29. //上报调用结果
  30. api.report(modid, cmdid, ip, port, 0);
  31. }
  32. return 0;
  33. }