Backend Thread的后台总业务流程如下:

23.1 数据库表相关查询方法实现
我们先实现一些基本的数据表达查询方法:
lars_dns/src/dns_route.cpp
/** return 0, 表示 加载成功,version没有改变* 1, 表示 加载成功,version有改变* -1 表示 加载失败* */int Route::load_version(){//这里面只会有一条数据snprintf(_sql, 1000, "SELECT version from RouteVersion WHERE id = 1;");int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));if (ret){fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));return -1;}MYSQL_RES *result = mysql_store_result(&_db_conn);if (!result){fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));return -1;}long line_num = mysql_num_rows(result);if (line_num == 0){fprintf(stderr, "No version in table RouteVersion: %s\n", mysql_error(&_db_conn));return -1;}MYSQL_ROW row = mysql_fetch_row(result);//得到versionlong new_version = atol(row[0]);if (new_version == this->_version){//加载成功但是没有修改return 0;}this->_version = new_version;printf("now route version is %ld\n", this->_version);mysql_free_result(result);return 1;}//加载RouteData到_temp_pointerint Route::load_route_data(){_temp_pointer->clear();snprintf(_sql, 100, "SELECT * FROM RouteData;");int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));if (ret){fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));return -1;}MYSQL_RES *result = mysql_store_result(&_db_conn);if (!result){fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));return -1;}long line_num = mysql_num_rows(result);MYSQL_ROW row;for (long i = 0;i < line_num; ++i){row = mysql_fetch_row(result);int modid = atoi(row[1]);int cmdid = atoi(row[2]);unsigned ip = atoi(row[3]);int port = atoi(row[4]);uint64_t key = ((uint64_t)modid << 32) + cmdid;uint64_t value = ((uint64_t)ip << 32) + port;(*_temp_pointer)[key].insert(value);}printf("load data to tmep succ! size is %lu\n", _temp_pointer->size());mysql_free_result(result);return 0;}//将temp_pointer的数据更新到data_pointervoid Route::swap(){pthread_rwlock_wrlock(&_map_lock);route_map *temp = _data_pointer;_data_pointer = _temp_pointer;_temp_pointer = temp;pthread_rwlock_unlock(&_map_lock);}//加载RouteChange得到修改的modid/cmdid//将结果放在vector中void Route::load_changes(std::vector<uint64_t> &change_list){//读取当前版本之前的全部修改snprintf(_sql, 1000, "SELECT modid,cmdid FROM RouteChange WHERE version <= %ld;", _version);int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));if (ret){fprintf(stderr, "mysql_real_query: %s\n", mysql_error(&_db_conn));return ;}MYSQL_RES *result = mysql_store_result(&_db_conn);if (!result){fprintf(stderr, "mysql_store_result %s\n", mysql_error(&_db_conn));return ;}long lineNum = mysql_num_rows(result);if (lineNum == 0){fprintf(stderr, "No version in table ChangeLog: %s\n", mysql_error(&_db_conn));return ;}MYSQL_ROW row;for (long i = 0;i < lineNum; ++i){row = mysql_fetch_row(result);int modid = atoi(row[0]);int cmdid = atoi(row[1]);uint64_t key = (((uint64_t)modid) << 32) + cmdid;change_list.push_back(key);}mysql_free_result(result);}//将RouteChange//删除RouteChange的全部修改记录数据,remove_all为全部删除//否则默认删除当前版本之前的全部修改void Route::remove_changes(bool remove_all){if (remove_all == false){snprintf(_sql, 1000, "DELETE FROM RouteChange WHERE version <= %ld;", _version);}else{snprintf(_sql, 1000, "DELETE FROM RouteChange;");}int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));if (ret != 0){fprintf(stderr, "delete RouteChange: %s\n", mysql_error(&_db_conn));return ;}return;}
这里面提供了基本的对一些表的加载和删除操作:
load_version():加载当前route信息版本号。
load_route_data():加载RouteData信息表,到_temp_pointer中。
swap():将__temp_pointer的表数据同步到_data_temp表中.
load_changes():加载RouteChange得到修改的modid/cmdid,将结果放在vector中
remove_changes():清空之前的修改记录。
23.2 Backend Thread业务流程实现
lars_dns/src/dns_route.cpp
//周期性后端检查db的route信息的更新变化业务//backend thread完成void *check_route_changes(void *args){int wait_time = 10;//10s自动修改一次,也可以从配置文件读取long last_load_time = time(NULL);//清空全部的RouteChangeRoute::instance()->remove_changes(true);//1 判断是否有修改while (true) {sleep(1);long current_time = time(NULL);//1.1 加载RouteVersion得到当前版本号int ret = Route::instance()->load_version();if (ret == 1) {//version改版 有modid/cmdid修改//2 如果有修改//2.1 将最新的RouteData加载到_temp_pointer中if (Route::instance()->load_route_data() == 0) {//2.2 更新_temp_pointer数据到_data_pointer map中Route::instance()->swap();last_load_time = current_time;//更新最后加载时间}//2.3 获取被修改的modid/cmdid对应的订阅客户端,进行推送std::vector<uint64_t> changes;Route::instance()->load_changes(changes);//推送SubscribeList::instance()->publish(changes);//2.4 删除当前版本之前的修改记录Route::instance()->remove_changes();}else {//3 如果没有修改if (current_time - last_load_time >= wait_time) {//3.1 超时,加载最新的temp_pointerif (Route::instance()->load_route_data() == 0) {//3.2 _temp_pointer数据更新到_data_pointer map中Route::instance()->swap();last_load_time = current_time;}}}}return NULL;}
该实现与上面流程图描述的过程一样。那么`check_route_changes()`我们可以让一个后台线程进行承载。
lars_dns/src/dns_service.cpp
int main(int argc, char **argv){event_loop loop;//加载配置文件config_file::setPath("conf/lars_dns.conf");std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");short port = config_file::instance()->GetNumber("reactor", "port", 7778);//创建tcp服务器server = new tcp_server(&loop, ip.c_str(), port);//注册链接创建/销毁Hook函数server->set_conn_start(create_subscribe);server->set_conn_close(clear_subscribe);//注册路由业务server->add_msg_router(lars::ID_GetRouteRequest, get_route);// =================================================//开辟backend thread 周期性检查db数据库route信息的更新状态pthread_t tid;int ret = pthread_create(&tid, NULL, check_route_changes, NULL);if (ret == -1) {perror("pthread_create backendThread");exit(1);}//设置分离模式pthread_detach(tid);// =================================================//开始事件监听printf("lars dns service ....\n");loop.event_process();return 0;}
23.3 完成dns模块的订阅功能测试V0.3
我们提供一个修改一个modid/cmdid的sql语句来触发订阅条件,并且让dns service服务器主动给订阅的客户端发送该订阅消息。
lars_dns/test/test_insert_dns_route.sql
USE lars_dns;SET @time = UNIX_TIMESTAMP(NOW());INSERT INTO RouteData(modid, cmdid, serverip, serverport) VALUES(1, 1, 3232235953, 9999);UPDATE RouteVersion SET version = @time WHERE id = 1;INSERT INTO RouteChange(modid, cmdid, version) VALUES(1, 1, @time);
客户端代码:
lars_dns/test/lars_dns_test1.cpp
#include <string.h>#include <unistd.h>#include <string>#include "lars_reactor.h"#include "lars.pb.h"//命令行参数struct Option{Option():ip(NULL),port(0) {}char *ip;short port;};Option option;void Usage() {printf("Usage: ./lars_dns_test -h ip -p port\n");}//解析命令行void parse_option(int argc, char **argv){for (int i = 0; i < argc; i++) {if (strcmp(argv[i], "-h") == 0) {option.ip = argv[i + 1];}else if (strcmp(argv[i], "-p") == 0) {option.port = atoi(argv[i + 1]);}}if ( !option.ip || !option.port ) {Usage();exit(1);}}//typedef void (*conn_callback)(net_connection *conn, void *args);void on_connection(net_connection *conn, void *args){//发送Route信息请求lars::GetRouteRequest req;req.set_modid(1);req.set_cmdid(1);std::string requestString;req.SerializeToString(&requestString);conn->send_message(requestString.c_str(), requestString.size(), lars::ID_GetRouteRequest);}void deal_get_route(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data){//解包得到数据lars::GetRouteResponse rsp;rsp.ParseFromArray(data, len);//打印数据printf("modid = %d\n", rsp.modid());printf("cmdid = %d\n", rsp.cmdid());printf("host_size = %d\n", rsp.host_size());for (int i = 0; i < rsp.host_size(); i++) {printf("-->ip = %u\n", rsp.host(i).ip());printf("-->port = %d\n", rsp.host(i).port());}}int main(int argc, char **argv){parse_option(argc, argv);event_loop loop;tcp_client *client;//创建客户端client = new tcp_client(&loop, option.ip, option.port, "lars_dns_test");if (client == NULL) {fprintf(stderr, "client == NULL\n");exit(1);}//客户端成功建立连接,首先发送请求包client->set_conn_start(on_connection);//设置服务端回应包处理业务client->add_msg_router(lars::ID_GetRouteResponse, deal_get_route);loop.event_process();return 0;}
启动dns_server:
$ ./bin/lars_dnsmsg_router init...create 0 threadcreate 1 threadcreate 2 threadcreate 3 threadcreate 4 threadadd msg cb msgid = 1lars dns service ....now route version is 1571058286 modID = 1, cmdID = 1, ip = 3232235953, port = 9999
启动客户端:
$ ./lars_dns_test1 -h 127.0.0.1 -p 7778msg_router init...do_connect EINPROGRESSadd msg cb msgid = 2connect 127.0.0.1:7778 succ!modid = 1cmdid = 1host_size = 1-->ip = 3232235953-->port = 9999
我们知道,第一请求modid/cmdid就会订阅该Route模块。
然后我们通过外界修改modid=1,cmdid=1的模块,新开一个终端,执行test_insert_dns_route.sql
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.mysql> use lars_dns;Reading table information for completion of table and column namesYou can turn off this feature to get a quicker startup with -ADatabase changedmysql> \. test_insert_dns_route.sqlDatabase changedQuery OK, 0 rows affected (0.00 sec)Query OK, 1 row affected (0.01 sec)Query OK, 1 row affected (0.01 sec)Rows matched: 1 Changed: 1 Warnings: 0Query OK, 1 row affected (0.02 sec)mysql>
然后我会会发现客户端已经得到一个新的消息,就是最新的route数据过来。是由dns_service主动推送过来的订阅消息.
客户端:
$ ./lars_dns_test1 -h 127.0.0.1 -p 7778msg_router init...do_connect EINPROGRESSadd msg cb msgid = 2connect 127.0.0.1:7778 succ!modid = 1cmdid = 1host_size = 1-->ip = 3232235953-->port = 9999modid = 1cmdid = 1host_size = 1-->ip = 3232235953-->port = 9999
这样我们的订阅功能就完成了,整体的lars_dns模块的工作到此的基本需求全部也已经满足。
