21.1 proto协议定义

  1. 获取Route信息,根据之前的架构图,可以看出来应该是Agent来获取,这里我们并没有实现Agent,所以用一个其他简单的客户端来完成单元测试。但是无论用什么,总需要一个传递数据,需要一定的消息协议,lars-dns也需要设置不同纤细的分发消息路由机制,所以我们需要先定义一些proto协议。
  2. `Lars/base`下创建`proto/`文件夹.

Lars/base/proto/lars.proto

  1. syntax = "proto3";
  2. package lars;
  3. /* Lars系统的消息ID */
  4. enum MessageId {
  5. ID_UNKNOW = 0; //proto3 enum第一个属性必须是0,用来占位
  6. ID_GetRouteRequest = 1; //向DNS请求Route对应的关系的消息ID
  7. ID_GetRouteResponse = 2; //DNS回复的Route信息的消息ID
  8. }
  9. //一个管理的主机的信息
  10. message HostInfo {
  11. int32 ip = 1;
  12. int32 port = 2;
  13. }
  14. //请求lars-dns route信息的消息内容
  15. message GetRouteRequest {
  16. int32 modid = 1;
  17. int32 cmdid = 2;
  18. }
  19. //lars-dns 回复的route信息消息内容
  20. message GetRouteResponse {
  21. int32 modid = 1;
  22. int32 cmdid = 2;
  23. repeated HostInfo host = 3;
  24. }
  1. 然后我们将proto文件编译成对应的C++文件, 我们还是提供一个脚本

Lars/base/proto/build.sh

  1. #!/bin/bash
  2. # proto编译
  3. protoc --cpp_out=. ./*.proto
  4. # 将全部的cc 文件 变成 cpp文件
  5. oldsuffix="cc"
  6. newsuffix="cpp"
  7. dir=$(eval pwd)
  8. for file in $(ls $dir | grep .${oldsuffix})
  9. do
  10. name=$(ls ${file} | cut -d. -f1,2)
  11. mv $file ${name}.${newsuffix}
  12. done
  13. echo "build proto file successd!"
  1. 因为protoc会自动生成cc后缀的文件,为了方便我们Makefile的编译,所以将cc文件改成cpp的。

21.2 proto编译环境集成

  1. 现在我们将`lars-dns`Makefile加入针对proto文件的编译

Lars/lars_dns/Makefile

  1. TARGET= bin/lars_dns
  2. CXX=g++
  3. CFLAGS=-g -O2 -Wall -Wno-deprecated
  4. BASE=../base
  5. BASE_H=$(BASE)/include
  6. PROTO = $(BASE)/proto
  7. PROTO_H = $(BASE)/proto
  8. LARS_REACTOR=../lars_reactor
  9. LARS_REACTOR_H =$(LARS_REACTOR)/include
  10. LARS_REACTOR_LIB=$(LARS_REACTOR)/lib -llreactor
  11. MYSQL=$(BASE)/mysql-connector-c
  12. MYSQL_H=$(MYSQL)/include
  13. MYSQL_LIB=$(MYSQL)/lib/libmysqlclient.a
  14. OTHER_LIB = -lpthread -ldl -lprotobuf
  15. SRC= ./src
  16. INC= -I./include -I$(BASE_H) -I$(LARS_REACTOR_H) -I$(MYSQL_H) -I$(PROTO_H)
  17. LIB= $(MYSQL_LIB) -L$(LARS_REACTOR_LIB) $(OTHER_LIB)
  18. OBJS = $(addsuffix .o, $(basename $(wildcard $(SRC)/*.cpp)))
  19. OBJS += $(PROTO)/lars.pb.o
  20. $(TARGET): $(OBJS)
  21. mkdir -p bin
  22. $(CXX) $(CFLAGS) -o $(TARGET) $(OBJS) $(INC) $(LIB)
  23. %.o: %.cpp
  24. $(CXX) $(CFLAGS) -c -o $@ $< $(INC)
  25. .PHONY: clean
  26. clean:
  27. -rm -f src/*.o $(TARGET)
  1. 添加了两个部分一个`OBJS`增添一个`lars.pb.o`的依赖,然后再`OTHER_LIB`增加`-lprotobuf`动态库的连接。

21.3 实现Route获取

  1. 接下来我们来实现针对`ID_GetRouteRequest`消息指令的业务处理.

lars_dns/src/dns_service.cpp

  1. #include "lars_reactor.h"
  2. #include "dns_route.h"
  3. #include "lars.pb.h"
  4. void get_route(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data)
  5. {
  6. //1. 解析proto文件
  7. lars::GetRouteRequest req;
  8. req.ParseFromArray(data, len);
  9. //2. 得到modid 和 cmdid
  10. int modid, cmdid;
  11. modid = req.modid();
  12. cmdid = req.cmdid();
  13. //3. 根据modid/cmdid 获取 host信息
  14. host_set hosts = Route::instance()->get_hosts(modid, cmdid);
  15. //4. 将数据打包成protobuf
  16. lars::GetRouteResponse rsp;
  17. rsp.set_modid(modid);
  18. rsp.set_cmdid(cmdid);
  19. for (host_set_it it = hosts.begin(); it != hosts.end(); it ++) {
  20. uint64_t ip_port = *it;
  21. lars::HostInfo host;
  22. host.set_ip((uint32_t)(ip_port >> 32));
  23. host.set_port((int)(ip_port));
  24. rsp.add_host()->CopyFrom(host);
  25. }
  26. //5. 发送给客户端
  27. std::string responseString;
  28. rsp.SerializeToString(&responseString);
  29. net_conn->send_message(responseString.c_str(), responseString.size(), lars::ID_GetRouteResponse) ;
  30. }
  31. int main(int argc, char **argv)
  32. {
  33. event_loop loop;
  34. //加载配置文件
  35. config_file::setPath("conf/lars_dns.conf");
  36. std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
  37. short port = config_file::instance()->GetNumber("reactor", "port", 7778);
  38. //创建tcp服务器
  39. tcp_server *server = new tcp_server(&loop, ip.c_str(), port);
  40. //注册路由业务
  41. server->add_msg_router(lars::ID_GetRouteRequest, get_route);
  42. //开始事件监听
  43. printf("lars dns service ....\n");
  44. loop.event_process();
  45. return 0;
  46. }
  1. 需要给`Route`类,实现一个get_host()方法,来针对modid/cmdid取出对应的value

lars_dns/src/dns_route.cpp

  1. //获取modid/cmdid对应的host信息
  2. host_set Route::get_hosts(int modid, int cmdid)
  3. {
  4. host_set hosts;
  5. //组装key
  6. uint64_t key = ((uint64_t)modid << 32) + cmdid;
  7. pthread_rwlock_rdlock(&_map_lock);
  8. route_map_it it = _data_pointer->find(key);
  9. if (it != _data_pointer->end()) {
  10. //找到对应的ip + port对
  11. hosts = it->second;
  12. }
  13. pthread_rwlock_unlock(&_map_lock);
  14. return hosts;
  15. }

21.3 lars_dns获取Route信息测试-V0.2

  1. 下面我们写一个客户端检查测试一下该功能.

lars_dns/test/lars_dns_test1.cpp

  1. #include <string.h>
  2. #include <unistd.h>
  3. #include <string>
  4. #include "lars_reactor.h"
  5. #include "lars.pb.h"
  6. //命令行参数
  7. struct Option
  8. {
  9. Option():ip(NULL),port(0) {}
  10. char *ip;
  11. short port;
  12. };
  13. Option option;
  14. void Usage() {
  15. printf("Usage: ./lars_dns_test -h ip -p port\n");
  16. }
  17. //解析命令行
  18. void parse_option(int argc, char **argv)
  19. {
  20. for (int i = 0; i < argc; i++) {
  21. if (strcmp(argv[i], "-h") == 0) {
  22. option.ip = argv[i + 1];
  23. }
  24. else if (strcmp(argv[i], "-p") == 0) {
  25. option.port = atoi(argv[i + 1]);
  26. }
  27. }
  28. if ( !option.ip || !option.port ) {
  29. Usage();
  30. exit(1);
  31. }
  32. }
  33. //typedef void (*conn_callback)(net_connection *conn, void *args);
  34. void on_connection(net_connection *conn, void *args)
  35. {
  36. //发送Route信息请求
  37. lars::GetRouteRequest req;
  38. req.set_modid(1);
  39. req.set_cmdid(2);
  40. std::string requestString;
  41. req.SerializeToString(&requestString);
  42. conn->send_message(requestString.c_str(), requestString.size(), lars::ID_GetRouteRequest);
  43. }
  44. void deal_get_route(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data)
  45. {
  46. //解包得到数据
  47. lars::GetRouteResponse rsp;
  48. rsp.ParseFromArray(data, len);
  49. //打印数据
  50. printf("modid = %d\n", rsp.modid());
  51. printf("cmdid = %d\n", rsp.cmdid());
  52. printf("host_size = %d\n", rsp.host_size());
  53. for (int i = 0; i < rsp.host_size(); i++) {
  54. printf("-->ip = %u\n", rsp.host(i).ip());
  55. printf("-->port = %d\n", rsp.host(i).port());
  56. }
  57. //再请求
  58. lars::GetRouteRequest req;
  59. req.set_modid(rsp.modid());
  60. req.set_cmdid(rsp.cmdid());
  61. std::string requestString;
  62. req.SerializeToString(&requestString);
  63. net_conn->send_message(requestString.c_str(), requestString.size(), lars::ID_GetRouteRequest);
  64. }
  65. int main(int argc, char **argv)
  66. {
  67. parse_option(argc, argv);
  68. event_loop loop;
  69. tcp_client *client;
  70. //创建客户端
  71. client = new tcp_client(&loop, option.ip, option.port, "lars_dns_test");
  72. if (client == NULL) {
  73. fprintf(stderr, "client == NULL\n");
  74. exit(1);
  75. }
  76. //客户端成功建立连接,首先发送请求包
  77. client->set_conn_start(on_connection);
  78. //设置服务端回应包处理业务
  79. client->add_msg_router(lars::ID_GetRouteResponse, deal_get_route);
  80. loop.event_process();
  81. return 0;
  82. }
  1. 同时提供一个`Makefile`对客户端进行编译。

lars_dns/test/Makefile

  1. TARGET= lars_dns_test1
  2. CXX=g++
  3. CFLAGS=-g -O2 -Wall -Wno-deprecated
  4. BASE=../../base
  5. BASE_H=$(BASE)/include
  6. PROTO = $(BASE)/proto
  7. PROTO_H = $(BASE)/proto
  8. LARS_REACTOR=../../lars_reactor
  9. LARS_REACTOR_H =$(LARS_REACTOR)/include
  10. LARS_REACTOR_LIB=$(LARS_REACTOR)/lib -llreactor
  11. OTHER_LIB = -lpthread -ldl -lprotobuf
  12. SRC= ./src
  13. INC= -I./include -I$(BASE_H) -I$(LARS_REACTOR_H) -I$(PROTO_H)
  14. LIB= $(MYSQL_LIB) -L$(LARS_REACTOR_LIB) $(OTHER_LIB)
  15. OBJS = lars_dns_test1.o
  16. OBJS += $(PROTO)/lars.pb.o
  17. $(TARGET): $(OBJS)
  18. $(CXX) $(CFLAGS) -o $(TARGET) $(OBJS) $(INC) $(LIB)
  19. %.o: %.cpp
  20. $(CXX) $(CFLAGS) -c -o $@ $< $(INC)
  21. .PHONY: clean
  22. clean:
  23. -rm -f ./*.o $(TARGET)

我们分别启动bin/lars_dns进程,和test/lars_dns_test1进程

服务端

  1. $ ./bin/lars_dns
  2. msg_router init...
  3. create 0 thread
  4. create 1 thread
  5. create 2 thread
  6. create 3 thread
  7. create 4 thread
  8. add msg cb msgid = 1
  9. lars dns service ....
  10. begin accept
  11. begin accept
  12. [thread]: get new connection succ!
  13. modID = 1, cmdID = 1, ip = 3232235953, port = 7777
  14. modID = 1, cmdID = 2, ip = 3232235954, port = 7776
  15. modID = 2, cmdID = 1, ip = 3232235955, port = 7778
  16. modID = 2, cmdID = 2, ip = 3232235956, port = 7779
  17. modID = 1, cmdID = 1, ip = 3232235953, port = 7777
  18. modID = 1, cmdID = 2, ip = 3232235954, port = 7776
  19. modID = 2, cmdID = 1, ip = 3232235955, port = 7778
  20. modID = 2, cmdID = 2, ip = 3232235956, port = 7779
  21. read data from socket

客户端

  1. $ ./lars_dns_test1 -h 127.0.0.1 -p 7778
  2. msg_router init...
  3. do_connect EINPROGRESS
  4. add msg cb msgid = 2
  5. connect 127.0.0.1:7778 succ!
  6. modid = 1
  7. cmdid = 2
  8. host_size = 1
  9. -->ip = 3232235954
  10. -->port = 7776