Backend Thread的后台总业务流程如下:

11-Lars-dns_BackendThread.png

23.1 数据库表相关查询方法实现

  1. 我们先实现一些基本的数据表达查询方法:

lars_dns/src/dns_route.cpp

  1. /*
  2. * return 0, 表示 加载成功,version没有改变
  3. * 1, 表示 加载成功,version有改变
  4. * -1 表示 加载失败
  5. * */
  6. int Route::load_version()
  7. {
  8. //这里面只会有一条数据
  9. snprintf(_sql, 1000, "SELECT version from RouteVersion WHERE id = 1;");
  10. int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
  11. if (ret)
  12. {
  13. fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));
  14. return -1;
  15. }
  16. MYSQL_RES *result = mysql_store_result(&_db_conn);
  17. if (!result)
  18. {
  19. fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));
  20. return -1;
  21. }
  22. long line_num = mysql_num_rows(result);
  23. if (line_num == 0)
  24. {
  25. fprintf(stderr, "No version in table RouteVersion: %s\n", mysql_error(&_db_conn));
  26. return -1;
  27. }
  28. MYSQL_ROW row = mysql_fetch_row(result);
  29. //得到version
  30. long new_version = atol(row[0]);
  31. if (new_version == this->_version)
  32. {
  33. //加载成功但是没有修改
  34. return 0;
  35. }
  36. this->_version = new_version;
  37. printf("now route version is %ld\n", this->_version);
  38. mysql_free_result(result);
  39. return 1;
  40. }
  41. //加载RouteData到_temp_pointer
  42. int Route::load_route_data()
  43. {
  44. _temp_pointer->clear();
  45. snprintf(_sql, 100, "SELECT * FROM RouteData;");
  46. int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
  47. if (ret)
  48. {
  49. fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));
  50. return -1;
  51. }
  52. MYSQL_RES *result = mysql_store_result(&_db_conn);
  53. if (!result)
  54. {
  55. fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));
  56. return -1;
  57. }
  58. long line_num = mysql_num_rows(result);
  59. MYSQL_ROW row;
  60. for (long i = 0;i < line_num; ++i)
  61. {
  62. row = mysql_fetch_row(result);
  63. int modid = atoi(row[1]);
  64. int cmdid = atoi(row[2]);
  65. unsigned ip = atoi(row[3]);
  66. int port = atoi(row[4]);
  67. uint64_t key = ((uint64_t)modid << 32) + cmdid;
  68. uint64_t value = ((uint64_t)ip << 32) + port;
  69. (*_temp_pointer)[key].insert(value);
  70. }
  71. printf("load data to tmep succ! size is %lu\n", _temp_pointer->size());
  72. mysql_free_result(result);
  73. return 0;
  74. }
  75. //将temp_pointer的数据更新到data_pointer
  76. void Route::swap()
  77. {
  78. pthread_rwlock_wrlock(&_map_lock);
  79. route_map *temp = _data_pointer;
  80. _data_pointer = _temp_pointer;
  81. _temp_pointer = temp;
  82. pthread_rwlock_unlock(&_map_lock);
  83. }
  84. //加载RouteChange得到修改的modid/cmdid
  85. //将结果放在vector中
  86. void Route::load_changes(std::vector<uint64_t> &change_list)
  87. {
  88. //读取当前版本之前的全部修改
  89. snprintf(_sql, 1000, "SELECT modid,cmdid FROM RouteChange WHERE version <= %ld;", _version);
  90. int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
  91. if (ret)
  92. {
  93. fprintf(stderr, "mysql_real_query: %s\n", mysql_error(&_db_conn));
  94. return ;
  95. }
  96. MYSQL_RES *result = mysql_store_result(&_db_conn);
  97. if (!result)
  98. {
  99. fprintf(stderr, "mysql_store_result %s\n", mysql_error(&_db_conn));
  100. return ;
  101. }
  102. long lineNum = mysql_num_rows(result);
  103. if (lineNum == 0)
  104. {
  105. fprintf(stderr, "No version in table ChangeLog: %s\n", mysql_error(&_db_conn));
  106. return ;
  107. }
  108. MYSQL_ROW row;
  109. for (long i = 0;i < lineNum; ++i)
  110. {
  111. row = mysql_fetch_row(result);
  112. int modid = atoi(row[0]);
  113. int cmdid = atoi(row[1]);
  114. uint64_t key = (((uint64_t)modid) << 32) + cmdid;
  115. change_list.push_back(key);
  116. }
  117. mysql_free_result(result);
  118. }
  119. //将RouteChange
  120. //删除RouteChange的全部修改记录数据,remove_all为全部删除
  121. //否则默认删除当前版本之前的全部修改
  122. void Route::remove_changes(bool remove_all)
  123. {
  124. if (remove_all == false)
  125. {
  126. snprintf(_sql, 1000, "DELETE FROM RouteChange WHERE version <= %ld;", _version);
  127. }
  128. else
  129. {
  130. snprintf(_sql, 1000, "DELETE FROM RouteChange;");
  131. }
  132. int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
  133. if (ret != 0)
  134. {
  135. fprintf(stderr, "delete RouteChange: %s\n", mysql_error(&_db_conn));
  136. return ;
  137. }
  138. return;
  139. }

这里面提供了基本的对一些表的加载和删除操作:

load_version():加载当前route信息版本号。

load_route_data():加载RouteData信息表,到_temp_pointer中。

swap():将__temp_pointer的表数据同步到_data_temp表中.

load_changes():加载RouteChange得到修改的modid/cmdid,将结果放在vector中

remove_changes():清空之前的修改记录。

23.2 Backend Thread业务流程实现

lars_dns/src/dns_route.cpp

  1. //周期性后端检查db的route信息的更新变化业务
  2. //backend thread完成
  3. void *check_route_changes(void *args)
  4. {
  5. int wait_time = 10;//10s自动修改一次,也可以从配置文件读取
  6. long last_load_time = time(NULL);
  7. //清空全部的RouteChange
  8. Route::instance()->remove_changes(true);
  9. //1 判断是否有修改
  10. while (true) {
  11. sleep(1);
  12. long current_time = time(NULL);
  13. //1.1 加载RouteVersion得到当前版本号
  14. int ret = Route::instance()->load_version();
  15. if (ret == 1) {
  16. //version改版 有modid/cmdid修改
  17. //2 如果有修改
  18. //2.1 将最新的RouteData加载到_temp_pointer中
  19. if (Route::instance()->load_route_data() == 0) {
  20. //2.2 更新_temp_pointer数据到_data_pointer map中
  21. Route::instance()->swap();
  22. last_load_time = current_time;//更新最后加载时间
  23. }
  24. //2.3 获取被修改的modid/cmdid对应的订阅客户端,进行推送
  25. std::vector<uint64_t> changes;
  26. Route::instance()->load_changes(changes);
  27. //推送
  28. SubscribeList::instance()->publish(changes);
  29. //2.4 删除当前版本之前的修改记录
  30. Route::instance()->remove_changes();
  31. }
  32. else {
  33. //3 如果没有修改
  34. if (current_time - last_load_time >= wait_time) {
  35. //3.1 超时,加载最新的temp_pointer
  36. if (Route::instance()->load_route_data() == 0) {
  37. //3.2 _temp_pointer数据更新到_data_pointer map中
  38. Route::instance()->swap();
  39. last_load_time = current_time;
  40. }
  41. }
  42. }
  43. }
  44. return NULL;
  45. }
  1. 该实现与上面流程图描述的过程一样。那么`check_route_changes()`我们可以让一个后台线程进行承载。

lars_dns/src/dns_service.cpp

  1. int main(int argc, char **argv)
  2. {
  3. event_loop loop;
  4. //加载配置文件
  5. config_file::setPath("conf/lars_dns.conf");
  6. std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
  7. short port = config_file::instance()->GetNumber("reactor", "port", 7778);
  8. //创建tcp服务器
  9. server = new tcp_server(&loop, ip.c_str(), port);
  10. //注册链接创建/销毁Hook函数
  11. server->set_conn_start(create_subscribe);
  12. server->set_conn_close(clear_subscribe);
  13. //注册路由业务
  14. server->add_msg_router(lars::ID_GetRouteRequest, get_route);
  15. // =================================================
  16. //开辟backend thread 周期性检查db数据库route信息的更新状态
  17. pthread_t tid;
  18. int ret = pthread_create(&tid, NULL, check_route_changes, NULL);
  19. if (ret == -1) {
  20. perror("pthread_create backendThread");
  21. exit(1);
  22. }
  23. //设置分离模式
  24. pthread_detach(tid);
  25. // =================================================
  26. //开始事件监听
  27. printf("lars dns service ....\n");
  28. loop.event_process();
  29. return 0;
  30. }

23.3 完成dns模块的订阅功能测试V0.3

  1. 我们提供一个修改一个modid/cmdidsql语句来触发订阅条件,并且让dns service服务器主动给订阅的客户端发送该订阅消息。

lars_dns/test/test_insert_dns_route.sql

  1. USE lars_dns;
  2. SET @time = UNIX_TIMESTAMP(NOW());
  3. INSERT INTO RouteData(modid, cmdid, serverip, serverport) VALUES(1, 1, 3232235953, 9999);
  4. UPDATE RouteVersion SET version = @time WHERE id = 1;
  5. INSERT INTO RouteChange(modid, cmdid, version) VALUES(1, 1, @time);

客户端代码:

lars_dns/test/lars_dns_test1.cpp

  1. #include <string.h>
  2. #include <unistd.h>
  3. #include <string>
  4. #include "lars_reactor.h"
  5. #include "lars.pb.h"
  6. //命令行参数
  7. struct Option
  8. {
  9. Option():ip(NULL),port(0) {}
  10. char *ip;
  11. short port;
  12. };
  13. Option option;
  14. void Usage() {
  15. printf("Usage: ./lars_dns_test -h ip -p port\n");
  16. }
  17. //解析命令行
  18. void parse_option(int argc, char **argv)
  19. {
  20. for (int i = 0; i < argc; i++) {
  21. if (strcmp(argv[i], "-h") == 0) {
  22. option.ip = argv[i + 1];
  23. }
  24. else if (strcmp(argv[i], "-p") == 0) {
  25. option.port = atoi(argv[i + 1]);
  26. }
  27. }
  28. if ( !option.ip || !option.port ) {
  29. Usage();
  30. exit(1);
  31. }
  32. }
  33. //typedef void (*conn_callback)(net_connection *conn, void *args);
  34. void on_connection(net_connection *conn, void *args)
  35. {
  36. //发送Route信息请求
  37. lars::GetRouteRequest req;
  38. req.set_modid(1);
  39. req.set_cmdid(1);
  40. std::string requestString;
  41. req.SerializeToString(&requestString);
  42. conn->send_message(requestString.c_str(), requestString.size(), lars::ID_GetRouteRequest);
  43. }
  44. void deal_get_route(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data)
  45. {
  46. //解包得到数据
  47. lars::GetRouteResponse rsp;
  48. rsp.ParseFromArray(data, len);
  49. //打印数据
  50. printf("modid = %d\n", rsp.modid());
  51. printf("cmdid = %d\n", rsp.cmdid());
  52. printf("host_size = %d\n", rsp.host_size());
  53. for (int i = 0; i < rsp.host_size(); i++) {
  54. printf("-->ip = %u\n", rsp.host(i).ip());
  55. printf("-->port = %d\n", rsp.host(i).port());
  56. }
  57. }
  58. int main(int argc, char **argv)
  59. {
  60. parse_option(argc, argv);
  61. event_loop loop;
  62. tcp_client *client;
  63. //创建客户端
  64. client = new tcp_client(&loop, option.ip, option.port, "lars_dns_test");
  65. if (client == NULL) {
  66. fprintf(stderr, "client == NULL\n");
  67. exit(1);
  68. }
  69. //客户端成功建立连接,首先发送请求包
  70. client->set_conn_start(on_connection);
  71. //设置服务端回应包处理业务
  72. client->add_msg_router(lars::ID_GetRouteResponse, deal_get_route);
  73. loop.event_process();
  74. return 0;
  75. }

启动dns_server:

  1. $ ./bin/lars_dns
  2. msg_router init...
  3. create 0 thread
  4. create 1 thread
  5. create 2 thread
  6. create 3 thread
  7. create 4 thread
  8. add msg cb msgid = 1
  9. lars dns service ....
  10. now route version is 1571058286 modID = 1, cmdID = 1, ip = 3232235953, port = 9999

启动客户端:

  1. $ ./lars_dns_test1 -h 127.0.0.1 -p 7778
  2. msg_router init...
  3. do_connect EINPROGRESS
  4. add msg cb msgid = 2
  5. connect 127.0.0.1:7778 succ!
  6. modid = 1
  7. cmdid = 1
  8. host_size = 1
  9. -->ip = 3232235953
  10. -->port = 9999

我们知道,第一请求modid/cmdid就会订阅该Route模块。

然后我们通过外界修改modid=1,cmdid=1的模块,新开一个终端,执行test_insert_dns_route.sql

  1. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
  2. mysql> use lars_dns;
  3. Reading table information for completion of table and column names
  4. You can turn off this feature to get a quicker startup with -A
  5. Database changed
  6. mysql> \. test_insert_dns_route.sql
  7. Database changed
  8. Query OK, 0 rows affected (0.00 sec)
  9. Query OK, 1 row affected (0.01 sec)
  10. Query OK, 1 row affected (0.01 sec)
  11. Rows matched: 1 Changed: 1 Warnings: 0
  12. Query OK, 1 row affected (0.02 sec)
  13. mysql>

然后我会会发现客户端已经得到一个新的消息,就是最新的route数据过来。是由dns_service主动推送过来的订阅消息.

客户端:

  1. $ ./lars_dns_test1 -h 127.0.0.1 -p 7778
  2. msg_router init...
  3. do_connect EINPROGRESS
  4. add msg cb msgid = 2
  5. connect 127.0.0.1:7778 succ!
  6. modid = 1
  7. cmdid = 1
  8. host_size = 1
  9. -->ip = 3232235953
  10. -->port = 9999
  11. modid = 1
  12. cmdid = 1
  13. host_size = 1
  14. -->ip = 3232235953
  15. -->port = 9999
  1. 这样我们的订阅功能就完成了,整体的lars_dns模块的工作到此的基本需求全部也已经满足。