25.1 构建Lars-Reporter项目
创建Lars-Reporter项目目录
Lars/lars_reporter/bin/
Lars/lars_reporter/conf/
Lars/lars_reporter/include/
Lars/lars_reporter/src/
Lars/lars_reporter/test/
Lars/lars_reporter/Makefile
其中:
lars_reporter/conf/lars_reporter.conf
[reactor]maxConn = 1024threadNum = 5ip = 127.0.0.1port = 7779[mysql]db_host = 127.0.0.1db_port = 3306db_user = rootdb_passwd = acelddb_name = lars_dns[repoter]db_thread_cnt = 3
lars_reporter/Makefile
TARGET= bin/lars_reporterCXX=g++CFLAGS=-g -O2 -Wall -Wno-deprecatedBASE=../baseBASE_H=$(BASE)/includePROTO = $(BASE)/protoPROTO_H = $(BASE)/protoLARS_REACTOR=../lars_reactorLARS_REACTOR_H =$(LARS_REACTOR)/includeLARS_REACTOR_LIB=$(LARS_REACTOR)/lib -llreactorMYSQL=$(BASE)/mysql-connector-cMYSQL_H=$(MYSQL)/includeMYSQL_LIB=$(MYSQL)/lib/libmysqlclient.aOTHER_LIB = -lpthread -ldl -lprotobufSRC= ./srcINC= -I./include -I$(BASE_H) -I$(LARS_REACTOR_H) -I$(MYSQL_H) -I$(PROTO_H)LIB= $(MYSQL_LIB) -L$(LARS_REACTOR_LIB) $(OTHER_LIB)OBJS = $(addsuffix .o, $(basename $(wildcard $(SRC)/*.cpp)))OBJS += $(PROTO)/lars.pb.o$(TARGET): $(OBJS)mkdir -p bin$(CXX) $(CFLAGS) -o $(TARGET) $(OBJS) $(INC) $(LIB)%.o: %.cpp$(CXX) $(CFLAGS) -c -o $@ $< $(INC).PHONY: cleanclean:-rm -f src/*.o $(PROTO)/lars.pb.o $(TARGET)
25.2 完成Lars-Service Reporter接受处理业务
我们先完成客户端/或者agent的发送过来的reporter上报数据请求的处理业务。
lars_reporter/src/reporter_service.cpp
#include "lars_reactor.h"#include "lars.pb.h"#include "store_report.h"#include <string>void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data){lars::ReportStatusRequest req;req.ParseFromArray(data, len);//将上报数据存储到dbStoreReport sr;sr.store(req);}int main(int argc, char **argv){event_loop loop;//加载配置文件config_file::setPath("./conf/lars_reporter.conf");std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");short port = config_file::instance()->GetNumber("reactor", "port", 7779);//创建tcp servertcp_server server(&loop, ip.c_str(), port);//添加数据上报请求处理的消息分发处理业务server.add_msg_router(lars::ID_ReportStatusRequest, get_report_status);//启动事件监听loop.event_process();return 0;}
lars_reporter/include/store_report.h
#pragma once#include "mysql.h"#include "lars.pb.h"class StoreReport{public:StoreReport();void store(lars::ReportStatusRequest req);private:MYSQL _db_conn;};
lars_reporter/src/store_report.cpp
#include "store_report.h"#include "lars_reactor.h"#include <string>#include <unistd.h>StoreReport::StoreReport(){//1 初始化//1.1 多线程使用mysql需要先调用mysql_library_initmysql_library_init(0, NULL, NULL);//1.2 初始化链接,和设置超时时间mysql_init(&_db_conn);mysql_options(&_db_conn, MYSQL_OPT_CONNECT_TIMEOUT, "30");my_bool reconnect = 1;mysql_options(&_db_conn, MYSQL_OPT_RECONNECT, &reconnect);//2 加载配置std::string db_host = config_file::instance()->GetString("mysql", "db_host", "127.0.0.1");short db_port = config_file::instance()->GetNumber("mysql", "db_port", 3306);std::string db_user = config_file::instance()->GetString("mysql", "db_user", "root");std::string db_passwd = config_file::instance()->GetString("mysql", "db_passwd", "aceld");std::string db_name = config_file::instance()->GetString("mysql", "db_name", "lars_dns");//3 链接数据库if ( mysql_real_connect(&_db_conn, db_host.c_str(), db_user.c_str(), db_passwd.c_str(), db_name.c_str(), db_port, NULL, 0) == NULL) {fprintf(stderr, "mysql real connect error\n");exit(1);}}void StoreReport::store(lars::ReportStatusRequest req){for (int i = 0; i < req.results_size(); i++) {//一条report 调用记录const lars::HostCallResult &result = req.results(i);int overload = result.overload() ? 1: 0;char sql[1024];snprintf(sql, 1024, "INSERT INTO ServerCallStatus""(modid, cmdid, ip, port, caller, succ_cnt, err_cnt, ts, overload) ""VALUES (%d, %d, %u, %u, %u, %u, %u, %u, %d) ON DUPLICATE KEY ""UPDATE succ_cnt = %u, err_cnt = %u, ts = %u, overload = %d",req.modid(), req.cmdid(), result.ip(), result.port(), req.caller(),result.succ(), result.err(), req.ts(), overload,result.succ(), result.err(), req.ts(), overload);mysql_ping(&_db_conn);//ping 测试一下,防止链接断开,会触发重新建立连接if (mysql_real_query(&_db_conn, sql, strlen(sql)) != 0) {fprintf(stderr, "Fial to Insert into ServerCallStatus %s\n", mysql_error(&_db_conn));}}}
这里面的业务很简单,就是如果有客户端发送`ID_ReportStatusRequest`的消息过来,进行处理,然后入库即可。
25.3 完成Lars-reporterV0.1版本测试
lars_reporter/test/reportClient.cpp
#include "lars_reactor.h"#include "lars.pb.h"void report_status(net_connection *conn, void *user_data){tcp_client *client = (tcp_client*)conn;lars::ReportStatusRequest req;//组装测试消息req.set_modid(rand() % 3 + 1);req.set_cmdid(1);req.set_caller(123);req.set_ts(time(NULL));for (int i = 0; i < 3; i ++) {lars::HostCallResult result;result.set_ip(i + 1);result.set_port((i + 1) * (i + 1));result.set_succ(100);result.set_err(3);result.set_overload(true);req.add_results()->CopyFrom(result);}std::string requestString;req.SerializeToString(&requestString);//发送给reporter serviceclient->send_message(requestString.c_str(), requestString.size(), lars::ID_ReportStatusRequest);}void connection_build(net_connection *conn, void *args){report_status(conn, args);}int main(int argc, char **argv){event_loop loop;tcp_client client(&loop, "127.0.0.1", 7779, "reportClient");//添加建立连接成功业务client.set_conn_start(connection_build);loop.event_process();return 0;}
lars_reporter/test/Makefile
TARGET= reportClientCXX=g++CFLAGS=-g -O2 -Wall -Wno-deprecatedBASE=../../baseBASE_H=$(BASE)/includePROTO = $(BASE)/protoPROTO_H = $(BASE)/protoLARS_REACTOR=../../lars_reactorLARS_REACTOR_H =$(LARS_REACTOR)/includeLARS_REACTOR_LIB=$(LARS_REACTOR)/lib -llreactorOTHER_LIB = -lpthread -ldl -lprotobufSRC= ./srcINC= -I./include -I$(BASE_H) -I$(LARS_REACTOR_H) -I$(PROTO_H)LIB= $(MYSQL_LIB) -L$(LARS_REACTOR_LIB) $(OTHER_LIB)OBJS = reportClient.oOBJS += $(PROTO)/lars.pb.o$(TARGET): $(OBJS)$(CXX) $(CFLAGS) -o $(TARGET) $(OBJS) $(INC) $(LIB)%.o: %.cpp$(CXX) $(CFLAGS) -c -o $@ $< $(INC).PHONY: cleanclean:-rm -f ./*.o $(TARGET)
这里我们简单写了一个针对lars_reporter的一个客户端测试程序,模拟发送一个包,测试一下一个基本的reporter的正常功能。
编译并执行,我们发现数据库表中ServerCallStatus已经有了我们模拟封装的数据入库。
