22.1 订阅模块的设计与实现
订阅模式整体的设计.
lars_dns/include/subscribe.h
#pragma once#include <vector>#include <pthread.h>#include <ext/hash_set>#include <ext/hash_map>#include "lars_reactor.h"#include "lars.pb.h"#include "dns_route.h"using namespace __gnu_cxx;//定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符)typedef hash_map<uint64_t, hash_set<int>> subscribe_map;//定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modidstypedef hash_map<int, hash_set<uint64_t>> publish_map;class SubscribeList {public://设计单例static void init() {_instance = new SubscribeList();}static SubscribeList *instance() {//保证init方法在这个进程执行中,只执行一次pthread_once(&_once, init);return _instance;}//订阅void subscribe(uint64_t mod, int fd);//取消订阅void unsubscribe(uint64_t mod, int fd);//发布void publish(std::vector<uint64_t> &change_mods);//根据在线用户fd得到需要发布的列表void make_publish_map(listen_fd_set &online_fds,publish_map &need_publish);private://设计单例SubscribeList();SubscribeList(const SubscribeList &);const SubscribeList& operator=(const SubscribeList);static SubscribeList *_instance;static pthread_once_t _once;subscribe_map _book_list; //订阅清单pthread_mutex_t _book_list_lock;publish_map _push_list; //发布清单pthread_mutex_t _push_list_lock;};
首先`SubscribeList`采用单例设计。这里面定义了两种数据类型
//定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符)typedef hash_map<uint64_t, hash_set<int>> subscribe_map;//定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modidstypedef hash_map<int, hash_set<uint64_t>> publish_map;
`subscribe_map`是目前dns系统的总体订阅列表,记录了订阅的modid/cmdid都有哪些fds已经订阅了,其实一个fd就代表一个客户端。`publish_map`是即将发布的表,其实这里面是subscribe_map的一个反表,key是订阅的客户端fd,而value是该客户端需要接收的订阅modid/cmdid数据。
属性:
_book_list:目前dns已经全部的订阅信息清单。
_push_list:目前dns即将发布的客户端及订阅信息清单。
方法
void subscribe(uint64_t mod, int fd): 加入modid/cmdid 和订阅的客户端fd到_book_list中。
void unsubscribe(uint64_t mod, int fd):取消一条订阅数据。
void publish(std::vector<uint64_t> &change_mods): 发布订阅数据,其中change_mods是需要发布的那些modid/cmdid组合。
void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish): 根据目前在线的订阅用户,得到需要通信的发布订阅列表。
具体实现如下:
lars_dns/src/subscribe.cpp
#include "subscribe.h"extern tcp_server *server;//单例对象SubscribeList *SubscribeList::_instance = NULL;//用于保证创建单例的init方法只执行一次的锁pthread_once_t SubscribeList::_once = PTHREAD_ONCE_INIT;SubscribeList::SubscribeList(){}//订阅void SubscribeList::subscribe(uint64_t mod, int fd){//将mod->fd的关系加入到_book_list中pthread_mutex_lock(&_book_list_lock);_book_list[mod].insert(fd);pthread_mutex_unlock(&_book_list_lock);}//取消订阅void SubscribeList::unsubscribe(uint64_t mod, int fd){//将mod->fd关系从_book_list中删除pthread_mutex_lock(&_book_list_lock);if (_book_list.find(mod) != _book_list.end()) {_book_list[mod].erase(fd);if (_book_list[mod].empty() == true) {_book_list.erase(mod);}}pthread_mutex_unlock(&_book_list_lock);}void push_change_task(event_loop *loop, void *args){SubscribeList *subscribe = (SubscribeList*)args;//1 获取全部的在线客户端fdlisten_fd_set online_fds;loop->get_listen_fds(online_fds);//2 从subscribe的_push_list中 找到与online_fds集合匹配,放在一个新的publish_map里publish_map need_publish;subscribe->make_publish_map(online_fds, need_publish);//3 依次从need_publish取出数据 发送给对应客户端链接publish_map::iterator it;for (it = need_publish.begin(); it != need_publish.end(); it++) {int fd = it->first; //fd//遍历 fd对应的 modid/cmdid集合hash_set<uint64_t>::iterator st;for (st = it->second.begin(); st != it->second.end(); st++) {//一个modid/cmdidint modid = int((*st) >> 32);int cmdid = int(*st);//组装pb消息,发送给客户lars::GetRouteResponse rsp;rsp.set_modid(modid);rsp.set_cmdid(cmdid);//通过route查询对应的host ip/port信息 进行组装host_set hosts = Route::instance()->get_hosts(modid, cmdid) ;for (host_set_it hit = hosts.begin(); hit != hosts.end(); hit++) {uint64_t ip_port_pair = *hit;lars::HostInfo host_info;host_info.set_ip((uint32_t)(ip_port_pair >> 32));host_info.set_port((int)ip_port_pair);//添加到rsp中rsp.add_host()->CopyFrom(host_info);}//给当前fd 发送一个更新消息std::string responseString;rsp.SerializeToString(&responseString);//通过fd取出链接信息net_connection *conn = tcp_server::conns[fd];if (conn != NULL) {conn->send_message(responseString.c_str(), responseString.size(), lars::ID_GetRouteResponse);}}}}//根据在线用户fd得到需要发布的列表void SubscribeList::make_publish_map(listen_fd_set &online_fds,publish_map &need_publish){publish_map::iterator it;pthread_mutex_lock(&_push_list_lock);//遍历_push_list 找到 online_fds匹配的数据,放到need_publish中for (it = _push_list.begin(); it != _push_list.end(); it++) {//it->first 是 fd//it->second 是 modid/cmdidif (online_fds.find(it->first) != online_fds.end()) {//匹配到//当前的键值对移动到need_publish中need_publish[it->first] = _push_list[it->first];//当该组数据从_push_list中删除掉_push_list.erase(it);}}pthread_mutex_unlock(&_push_list_lock);}//发布void SubscribeList::publish(std::vector<uint64_t> &change_mods){//1 将change_mods已经修改的mod->fd// 放到 发布清单_push_list中pthread_mutex_lock(&_book_list_lock);pthread_mutex_lock(&_push_list_lock);std::vector<uint64_t>::iterator it;for (it = change_mods.begin(); it != change_mods.end(); it++) {uint64_t mod = *it;if (_book_list.find(mod) != _book_list.end()) {//将mod下面的fd set集合拷迁移到 _push_list中hash_set<int>::iterator fds_it;for (fds_it = _book_list[mod].begin(); fds_it != _book_list[mod].end(); fds_it++) {int fd = *fds_it;_push_list[fd].insert(mod);}}}pthread_mutex_unlock(&_push_list_lock);pthread_mutex_unlock(&_book_list_lock);//2 通知各个线程去执行推送任务server->thread_poll()->send_task(push_change_task, this);}
这里需要注意的是`publish()`里的server变量是全局变量,全局唯一的server句柄。
22.2 开启订阅
那么订阅功能实现了,该如何是调用触发订阅功能能,我们可以在一个客户端建立连接成功之后来调用.
lars_dns/src/dns_service.cpp
#include <ext/hash_set>#include "lars_reactor.h"#include "subscribe.h"#include "dns_route.h"#include "lars.pb.h"tcp_server *server;using __gnu_cxx::hash_set;typedef hash_set<uint64_t> client_sub_mod_list;// ...//订阅route 的modid/cmdidvoid create_subscribe(net_connection * conn, void *args){conn->param = new client_sub_mod_list;}//退订route 的modid/cmdidvoid clear_subscribe(net_connection * conn, void *args){client_sub_mod_list::iterator it;client_sub_mod_list *sub_list = (client_sub_mod_list*)conn->param;for (it = sub_list->begin(); it != sub_list->end(); it++) {uint64_t mod = *it;SubscribeList::instance()->unsubscribe(mod, conn->get_fd());}delete sub_list;conn->param = NULL;}int main(int argc, char **argv){event_loop loop;//加载配置文件config_file::setPath("conf/lars_dns.conf");std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");short port = config_file::instance()->GetNumber("reactor", "port", 7778);//创建tcp服务器server = new tcp_server(&loop, ip.c_str(), port);//==========注册链接创建/销毁Hook函数============server->set_conn_start(create_subscribe);server->set_conn_close(clear_subscribe);//============================================//注册路由业务server->add_msg_router(lars::ID_GetRouteRequest, get_route);//开始事件监听printf("lars dns service ....\n");loop.event_process();return 0;}
这里注册了两个链接Hook。`create_subscribe()`和`clear_subscribe()`。
client_sub_mod_list为当前客户端链接所订阅的route信息列表。主要存放当前客户订阅的modid/cmdid的集合。因为不同的客户端订阅的信息不同,所以要将该列表与每个conn进行绑定。
