25.1 构建Lars-Reporter项目

创建Lars-Reporter项目目录

Lars/lars_reporter/bin/

Lars/lars_reporter/conf/

Lars/lars_reporter/include/

Lars/lars_reporter/src/

Lars/lars_reporter/test/

Lars/lars_reporter/Makefile

其中:

lars_reporter/conf/lars_reporter.conf

  1. [reactor]
  2. maxConn = 1024
  3. threadNum = 5
  4. ip = 127.0.0.1
  5. port = 7779
  6. [mysql]
  7. db_host = 127.0.0.1
  8. db_port = 3306
  9. db_user = root
  10. db_passwd = aceld
  11. db_name = lars_dns
  12. [repoter]
  13. db_thread_cnt = 3

lars_reporter/Makefile

  1. TARGET= bin/lars_reporter
  2. CXX=g++
  3. CFLAGS=-g -O2 -Wall -Wno-deprecated
  4. BASE=../base
  5. BASE_H=$(BASE)/include
  6. PROTO = $(BASE)/proto
  7. PROTO_H = $(BASE)/proto
  8. LARS_REACTOR=../lars_reactor
  9. LARS_REACTOR_H =$(LARS_REACTOR)/include
  10. LARS_REACTOR_LIB=$(LARS_REACTOR)/lib -llreactor
  11. MYSQL=$(BASE)/mysql-connector-c
  12. MYSQL_H=$(MYSQL)/include
  13. MYSQL_LIB=$(MYSQL)/lib/libmysqlclient.a
  14. OTHER_LIB = -lpthread -ldl -lprotobuf
  15. SRC= ./src
  16. INC= -I./include -I$(BASE_H) -I$(LARS_REACTOR_H) -I$(MYSQL_H) -I$(PROTO_H)
  17. LIB= $(MYSQL_LIB) -L$(LARS_REACTOR_LIB) $(OTHER_LIB)
  18. OBJS = $(addsuffix .o, $(basename $(wildcard $(SRC)/*.cpp)))
  19. OBJS += $(PROTO)/lars.pb.o
  20. $(TARGET): $(OBJS)
  21. mkdir -p bin
  22. $(CXX) $(CFLAGS) -o $(TARGET) $(OBJS) $(INC) $(LIB)
  23. %.o: %.cpp
  24. $(CXX) $(CFLAGS) -c -o $@ $< $(INC)
  25. .PHONY: clean
  26. clean:
  27. -rm -f src/*.o $(PROTO)/lars.pb.o $(TARGET)

25.2 完成Lars-Service Reporter接受处理业务

  1. 我们先完成客户端/或者agent的发送过来的reporter上报数据请求的处理业务。

lars_reporter/src/reporter_service.cpp

  1. #include "lars_reactor.h"
  2. #include "lars.pb.h"
  3. #include "store_report.h"
  4. #include <string>
  5. void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
  6. {
  7. lars::ReportStatusRequest req;
  8. req.ParseFromArray(data, len);
  9. //将上报数据存储到db
  10. StoreReport sr;
  11. sr.store(req);
  12. }
  13. int main(int argc, char **argv)
  14. {
  15. event_loop loop;
  16. //加载配置文件
  17. config_file::setPath("./conf/lars_reporter.conf");
  18. std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
  19. short port = config_file::instance()->GetNumber("reactor", "port", 7779);
  20. //创建tcp server
  21. tcp_server server(&loop, ip.c_str(), port);
  22. //添加数据上报请求处理的消息分发处理业务
  23. server.add_msg_router(lars::ID_ReportStatusRequest, get_report_status);
  24. //启动事件监听
  25. loop.event_process();
  26. return 0;
  27. }

lars_reporter/include/store_report.h

  1. #pragma once
  2. #include "mysql.h"
  3. #include "lars.pb.h"
  4. class StoreReport
  5. {
  6. public:
  7. StoreReport();
  8. void store(lars::ReportStatusRequest req);
  9. private:
  10. MYSQL _db_conn;
  11. };

lars_reporter/src/store_report.cpp

  1. #include "store_report.h"
  2. #include "lars_reactor.h"
  3. #include <string>
  4. #include <unistd.h>
  5. StoreReport::StoreReport()
  6. {
  7. //1 初始化
  8. //1.1 多线程使用mysql需要先调用mysql_library_init
  9. mysql_library_init(0, NULL, NULL);
  10. //1.2 初始化链接,和设置超时时间
  11. mysql_init(&_db_conn);
  12. mysql_options(&_db_conn, MYSQL_OPT_CONNECT_TIMEOUT, "30");
  13. my_bool reconnect = 1;
  14. mysql_options(&_db_conn, MYSQL_OPT_RECONNECT, &reconnect);
  15. //2 加载配置
  16. std::string db_host = config_file::instance()->GetString("mysql", "db_host", "127.0.0.1");
  17. short db_port = config_file::instance()->GetNumber("mysql", "db_port", 3306);
  18. std::string db_user = config_file::instance()->GetString("mysql", "db_user", "root");
  19. std::string db_passwd = config_file::instance()->GetString("mysql", "db_passwd", "aceld");
  20. std::string db_name = config_file::instance()->GetString("mysql", "db_name", "lars_dns");
  21. //3 链接数据库
  22. if ( mysql_real_connect(&_db_conn, db_host.c_str(), db_user.c_str(), db_passwd.c_str(), db_name.c_str(), db_port, NULL, 0) == NULL) {
  23. fprintf(stderr, "mysql real connect error\n");
  24. exit(1);
  25. }
  26. }
  27. void StoreReport::store(lars::ReportStatusRequest req)
  28. {
  29. for (int i = 0; i < req.results_size(); i++) {
  30. //一条report 调用记录
  31. const lars::HostCallResult &result = req.results(i);
  32. int overload = result.overload() ? 1: 0;
  33. char sql[1024];
  34. snprintf(sql, 1024, "INSERT INTO ServerCallStatus"
  35. "(modid, cmdid, ip, port, caller, succ_cnt, err_cnt, ts, overload) "
  36. "VALUES (%d, %d, %u, %u, %u, %u, %u, %u, %d) ON DUPLICATE KEY "
  37. "UPDATE succ_cnt = %u, err_cnt = %u, ts = %u, overload = %d",
  38. req.modid(), req.cmdid(), result.ip(), result.port(), req.caller(),
  39. result.succ(), result.err(), req.ts(), overload,
  40. result.succ(), result.err(), req.ts(), overload);
  41. mysql_ping(&_db_conn);//ping 测试一下,防止链接断开,会触发重新建立连接
  42. if (mysql_real_query(&_db_conn, sql, strlen(sql)) != 0) {
  43. fprintf(stderr, "Fial to Insert into ServerCallStatus %s\n", mysql_error(&_db_conn));
  44. }
  45. }
  46. }
  1. 这里面的业务很简单,就是如果有客户端发送`ID_ReportStatusRequest`的消息过来,进行处理,然后入库即可。

25.3 完成Lars-reporterV0.1版本测试

lars_reporter/test/reportClient.cpp

  1. #include "lars_reactor.h"
  2. #include "lars.pb.h"
  3. void report_status(net_connection *conn, void *user_data)
  4. {
  5. tcp_client *client = (tcp_client*)conn;
  6. lars::ReportStatusRequest req;
  7. //组装测试消息
  8. req.set_modid(rand() % 3 + 1);
  9. req.set_cmdid(1);
  10. req.set_caller(123);
  11. req.set_ts(time(NULL));
  12. for (int i = 0; i < 3; i ++) {
  13. lars::HostCallResult result;
  14. result.set_ip(i + 1);
  15. result.set_port((i + 1) * (i + 1));
  16. result.set_succ(100);
  17. result.set_err(3);
  18. result.set_overload(true);
  19. req.add_results()->CopyFrom(result);
  20. }
  21. std::string requestString;
  22. req.SerializeToString(&requestString);
  23. //发送给reporter service
  24. client->send_message(requestString.c_str(), requestString.size(), lars::ID_ReportStatusRequest);
  25. }
  26. void connection_build(net_connection *conn, void *args)
  27. {
  28. report_status(conn, args);
  29. }
  30. int main(int argc, char **argv)
  31. {
  32. event_loop loop;
  33. tcp_client client(&loop, "127.0.0.1", 7779, "reportClient");
  34. //添加建立连接成功业务
  35. client.set_conn_start(connection_build);
  36. loop.event_process();
  37. return 0;
  38. }

lars_reporter/test/Makefile

  1. TARGET= reportClient
  2. CXX=g++
  3. CFLAGS=-g -O2 -Wall -Wno-deprecated
  4. BASE=../../base
  5. BASE_H=$(BASE)/include
  6. PROTO = $(BASE)/proto
  7. PROTO_H = $(BASE)/proto
  8. LARS_REACTOR=../../lars_reactor
  9. LARS_REACTOR_H =$(LARS_REACTOR)/include
  10. LARS_REACTOR_LIB=$(LARS_REACTOR)/lib -llreactor
  11. OTHER_LIB = -lpthread -ldl -lprotobuf
  12. SRC= ./src
  13. INC= -I./include -I$(BASE_H) -I$(LARS_REACTOR_H) -I$(PROTO_H)
  14. LIB= $(MYSQL_LIB) -L$(LARS_REACTOR_LIB) $(OTHER_LIB)
  15. OBJS = reportClient.o
  16. OBJS += $(PROTO)/lars.pb.o
  17. $(TARGET): $(OBJS)
  18. $(CXX) $(CFLAGS) -o $(TARGET) $(OBJS) $(INC) $(LIB)
  19. %.o: %.cpp
  20. $(CXX) $(CFLAGS) -c -o $@ $< $(INC)
  21. .PHONY: clean
  22. clean:
  23. -rm -f ./*.o $(TARGET)
  1. 这里我们简单写了一个针对lars_reporter的一个客户端测试程序,模拟发送一个包,测试一下一个基本的reporter的正常功能。

编译并执行,我们发现数据库表中ServerCallStatus已经有了我们模拟封装的数据入库。