22.1 订阅模块的设计与实现

  1. 订阅模式整体的设计.

lars_dns/include/subscribe.h

  1. #pragma once
  2. #include <vector>
  3. #include <pthread.h>
  4. #include <ext/hash_set>
  5. #include <ext/hash_map>
  6. #include "lars_reactor.h"
  7. #include "lars.pb.h"
  8. #include "dns_route.h"
  9. using namespace __gnu_cxx;
  10. //定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符)
  11. typedef hash_map<uint64_t, hash_set<int>> subscribe_map;
  12. //定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modids
  13. typedef hash_map<int, hash_set<uint64_t>> publish_map;
  14. class SubscribeList {
  15. public:
  16. //设计单例
  17. static void init() {
  18. _instance = new SubscribeList();
  19. }
  20. static SubscribeList *instance() {
  21. //保证init方法在这个进程执行中,只执行一次
  22. pthread_once(&_once, init);
  23. return _instance;
  24. }
  25. //订阅
  26. void subscribe(uint64_t mod, int fd);
  27. //取消订阅
  28. void unsubscribe(uint64_t mod, int fd);
  29. //发布
  30. void publish(std::vector<uint64_t> &change_mods);
  31. //根据在线用户fd得到需要发布的列表
  32. void make_publish_map(listen_fd_set &online_fds,
  33. publish_map &need_publish);
  34. private:
  35. //设计单例
  36. SubscribeList();
  37. SubscribeList(const SubscribeList &);
  38. const SubscribeList& operator=(const SubscribeList);
  39. static SubscribeList *_instance;
  40. static pthread_once_t _once;
  41. subscribe_map _book_list; //订阅清单
  42. pthread_mutex_t _book_list_lock;
  43. publish_map _push_list; //发布清单
  44. pthread_mutex_t _push_list_lock;
  45. };
  1. 首先`SubscribeList`采用单例设计。这里面定义了两种数据类型
  1. //定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符)
  2. typedef hash_map<uint64_t, hash_set<int>> subscribe_map;
  3. //定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modids
  4. typedef hash_map<int, hash_set<uint64_t>> publish_map;
  1. `subscribe_map`是目前dns系统的总体订阅列表,记录了订阅的modid/cmdid都有哪些fds已经订阅了,其实一个fd就代表一个客户端。
  2. `publish_map`是即将发布的表,其实这里面是subscribe_map的一个反表,key是订阅的客户端fd,而value是该客户端需要接收的订阅modid/cmdid数据。

属性

_book_list:目前dns已经全部的订阅信息清单。

_push_list:目前dns即将发布的客户端及订阅信息清单。

方法

void subscribe(uint64_t mod, int fd): 加入modid/cmdid 和订阅的客户端fd到_book_list中。

void unsubscribe(uint64_t mod, int fd):取消一条订阅数据。

void publish(std::vector<uint64_t> &change_mods): 发布订阅数据,其中change_mods是需要发布的那些modid/cmdid组合。

void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish): 根据目前在线的订阅用户,得到需要通信的发布订阅列表。

具体实现如下:

lars_dns/src/subscribe.cpp

  1. #include "subscribe.h"
  2. extern tcp_server *server;
  3. //单例对象
  4. SubscribeList *SubscribeList::_instance = NULL;
  5. //用于保证创建单例的init方法只执行一次的锁
  6. pthread_once_t SubscribeList::_once = PTHREAD_ONCE_INIT;
  7. SubscribeList::SubscribeList()
  8. {
  9. }
  10. //订阅
  11. void SubscribeList::subscribe(uint64_t mod, int fd)
  12. {
  13. //将mod->fd的关系加入到_book_list中
  14. pthread_mutex_lock(&_book_list_lock);
  15. _book_list[mod].insert(fd);
  16. pthread_mutex_unlock(&_book_list_lock);
  17. }
  18. //取消订阅
  19. void SubscribeList::unsubscribe(uint64_t mod, int fd)
  20. {
  21. //将mod->fd关系从_book_list中删除
  22. pthread_mutex_lock(&_book_list_lock);
  23. if (_book_list.find(mod) != _book_list.end()) {
  24. _book_list[mod].erase(fd);
  25. if (_book_list[mod].empty() == true) {
  26. _book_list.erase(mod);
  27. }
  28. }
  29. pthread_mutex_unlock(&_book_list_lock);
  30. }
  31. void push_change_task(event_loop *loop, void *args)
  32. {
  33. SubscribeList *subscribe = (SubscribeList*)args;
  34. //1 获取全部的在线客户端fd
  35. listen_fd_set online_fds;
  36. loop->get_listen_fds(online_fds);
  37. //2 从subscribe的_push_list中 找到与online_fds集合匹配,放在一个新的publish_map里
  38. publish_map need_publish;
  39. subscribe->make_publish_map(online_fds, need_publish);
  40. //3 依次从need_publish取出数据 发送给对应客户端链接
  41. publish_map::iterator it;
  42. for (it = need_publish.begin(); it != need_publish.end(); it++) {
  43. int fd = it->first; //fd
  44. //遍历 fd对应的 modid/cmdid集合
  45. hash_set<uint64_t>::iterator st;
  46. for (st = it->second.begin(); st != it->second.end(); st++) {
  47. //一个modid/cmdid
  48. int modid = int((*st) >> 32);
  49. int cmdid = int(*st);
  50. //组装pb消息,发送给客户
  51. lars::GetRouteResponse rsp;
  52. rsp.set_modid(modid);
  53. rsp.set_cmdid(cmdid);
  54. //通过route查询对应的host ip/port信息 进行组装
  55. host_set hosts = Route::instance()->get_hosts(modid, cmdid) ;
  56. for (host_set_it hit = hosts.begin(); hit != hosts.end(); hit++) {
  57. uint64_t ip_port_pair = *hit;
  58. lars::HostInfo host_info;
  59. host_info.set_ip((uint32_t)(ip_port_pair >> 32));
  60. host_info.set_port((int)ip_port_pair);
  61. //添加到rsp中
  62. rsp.add_host()->CopyFrom(host_info);
  63. }
  64. //给当前fd 发送一个更新消息
  65. std::string responseString;
  66. rsp.SerializeToString(&responseString);
  67. //通过fd取出链接信息
  68. net_connection *conn = tcp_server::conns[fd];
  69. if (conn != NULL) {
  70. conn->send_message(responseString.c_str(), responseString.size(), lars::ID_GetRouteResponse);
  71. }
  72. }
  73. }
  74. }
  75. //根据在线用户fd得到需要发布的列表
  76. void SubscribeList::make_publish_map(
  77. listen_fd_set &online_fds,
  78. publish_map &need_publish)
  79. {
  80. publish_map::iterator it;
  81. pthread_mutex_lock(&_push_list_lock);
  82. //遍历_push_list 找到 online_fds匹配的数据,放到need_publish中
  83. for (it = _push_list.begin(); it != _push_list.end(); it++) {
  84. //it->first 是 fd
  85. //it->second 是 modid/cmdid
  86. if (online_fds.find(it->first) != online_fds.end()) {
  87. //匹配到
  88. //当前的键值对移动到need_publish中
  89. need_publish[it->first] = _push_list[it->first];
  90. //当该组数据从_push_list中删除掉
  91. _push_list.erase(it);
  92. }
  93. }
  94. pthread_mutex_unlock(&_push_list_lock);
  95. }
  96. //发布
  97. void SubscribeList::publish(std::vector<uint64_t> &change_mods)
  98. {
  99. //1 将change_mods已经修改的mod->fd
  100. // 放到 发布清单_push_list中
  101. pthread_mutex_lock(&_book_list_lock);
  102. pthread_mutex_lock(&_push_list_lock);
  103. std::vector<uint64_t>::iterator it;
  104. for (it = change_mods.begin(); it != change_mods.end(); it++) {
  105. uint64_t mod = *it;
  106. if (_book_list.find(mod) != _book_list.end()) {
  107. //将mod下面的fd set集合拷迁移到 _push_list中
  108. hash_set<int>::iterator fds_it;
  109. for (fds_it = _book_list[mod].begin(); fds_it != _book_list[mod].end(); fds_it++) {
  110. int fd = *fds_it;
  111. _push_list[fd].insert(mod);
  112. }
  113. }
  114. }
  115. pthread_mutex_unlock(&_push_list_lock);
  116. pthread_mutex_unlock(&_book_list_lock);
  117. //2 通知各个线程去执行推送任务
  118. server->thread_poll()->send_task(push_change_task, this);
  119. }
  1. 这里需要注意的是`publish()`里的server变量是全局变量,全局唯一的server句柄。

22.2 开启订阅

  1. 那么订阅功能实现了,该如何是调用触发订阅功能能,我们可以在一个客户端建立连接成功之后来调用.

lars_dns/src/dns_service.cpp

  1. #include <ext/hash_set>
  2. #include "lars_reactor.h"
  3. #include "subscribe.h"
  4. #include "dns_route.h"
  5. #include "lars.pb.h"
  6. tcp_server *server;
  7. using __gnu_cxx::hash_set;
  8. typedef hash_set<uint64_t> client_sub_mod_list;
  9. // ...
  10. //订阅route 的modid/cmdid
  11. void create_subscribe(net_connection * conn, void *args)
  12. {
  13. conn->param = new client_sub_mod_list;
  14. }
  15. //退订route 的modid/cmdid
  16. void clear_subscribe(net_connection * conn, void *args)
  17. {
  18. client_sub_mod_list::iterator it;
  19. client_sub_mod_list *sub_list = (client_sub_mod_list*)conn->param;
  20. for (it = sub_list->begin(); it != sub_list->end(); it++) {
  21. uint64_t mod = *it;
  22. SubscribeList::instance()->unsubscribe(mod, conn->get_fd());
  23. }
  24. delete sub_list;
  25. conn->param = NULL;
  26. }
  27. int main(int argc, char **argv)
  28. {
  29. event_loop loop;
  30. //加载配置文件
  31. config_file::setPath("conf/lars_dns.conf");
  32. std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
  33. short port = config_file::instance()->GetNumber("reactor", "port", 7778);
  34. //创建tcp服务器
  35. server = new tcp_server(&loop, ip.c_str(), port);
  36. //==========注册链接创建/销毁Hook函数============
  37. server->set_conn_start(create_subscribe);
  38. server->set_conn_close(clear_subscribe);
  39. //============================================
  40. //注册路由业务
  41. server->add_msg_router(lars::ID_GetRouteRequest, get_route);
  42. //开始事件监听
  43. printf("lars dns service ....\n");
  44. loop.event_process();
  45. return 0;
  46. }
  1. 这里注册了两个链接Hook`create_subscribe()``clear_subscribe()`

client_sub_mod_list为当前客户端链接所订阅的route信息列表。主要存放当前客户订阅的modid/cmdid的集合。因为不同的客户端订阅的信息不同,所以要将该列表与每个conn进行绑定。