现在我们发送的消息都是message结构的,有个message头里面其中有两个关键的字段,msgid和msglen,其中加入msgid的意义就是我们可以甄别是哪个消息,从而对这类消息做出不同的业务处理。但是现在我们无论是服务端还是客户端都是写死的两个业务,就是”回显业务”,显然这并不满足我们作为服务器框架的需求。我们需要开发者可以注册自己的回调业务。所以我们需要提供一个注册业务的入口,然后在后端根据不同的msgid来激活不同的回调业务函数。
8.1 添加消息分发路由类msg_router
下面我们提供这样一个中转的router模块,在include/message.h添加
lars_reactor/include/message.h
#pragma once#include <ext/hash_map>//解决tcp粘包问题的消息头struct msg_head{int msgid;int msglen;};//消息头的二进制长度,固定数#define MESSAGE_HEAD_LEN 8//消息头+消息体的最大长度限制#define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)//msg 业务回调函数原型//===================== 消息分发路由机制 ==================class tcp_client;typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data);//消息路由分发机制class msg_router{public:msg_router():_router(),_args() {}//给一个消息ID注册一个对应的回调业务函数int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data){if(_router.find(msgid) != _router.end()) {//该msgID的回调业务已经存在return -1;}_router[msgid] = msg_cb;_args[msgid] = user_data;return 0;}//调用注册的对应的回调业务函数void call(int msgid, uint32_t msglen, const char *data, tcp_client *client){//判断msgid对应的回调是否存在if (_router.find(msgid) == _router.end()) {fprintf(stderr, "msgid %d is not register!\n", msgid);return;}//直接取出回调函数,执行msg_callback *callback = _router[msgid];void *user_data = _args[msgid];callback(data, msglen, msgid, client, user_data);}private://针对消息的路由分发,key为msgID, value为注册的回调业务函数__gnu_cxx::hash_map<int, msg_callback *> _router;//回调业务函数对应的参数,key为msgID, value为对应的参数__gnu_cxx::hash_map<int, void *> _args;};//===================== 消息分发路由机制 ==================
开发者需要注册一个`msg_callback`类型的函数,通过`msg_router`类的`register_msg_router()`方法来注册,同时通过`call()`方法来调用。全部回调业务函数和msgid的对应关系保存在一个hash_map类型的`_router`map中,`_args`保存对应的参数。但是这里有个小细节需要注意一下,`msg_callback`的函数类型声明是这样的。
typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data);
其中这里面第4个参数,只能是tcp_client类型的参数,也就是我们之前的设计的msg_callback只支持tcp_client的消息回调机制,但是很明显我们的需求是不仅是`tcp_client`要用,tcp_server中的`tcp_conn`也要用到这个机制,那么很显然这个参数在这就不是很合适,那么如果设定一个形参既能指向`tcp_client`又能能指向`tcp_conn`两个类型呢,当然答案就只能是将这两个类抽象出来一层,用父类指针指向子类然后通过多态特性来调用就可以了,所以我们需要先定义一个抽象类。
8.2 链接抽象类创建
经过分析,我们定义如下的抽象类,并提供一些接口。
lars_reactor/include/net_connection.h
#pragma once/*** 网络通信的抽象类,任何需要进行收发消息的模块,都可以实现该类** */class net_connection{public://发送消息的接口virtual int send_message(const char *data, int datalen, int msgid) = 0;};
然后让我们tcp_server端的`tcp_conn`类继承`net_connecton`, 客户端的`tcp_client` 继承`net_connection`
lars_reactor/include/tcp_conn.h
class tcp_conn : public net_connection{//...};
lars_reactor/include/tcp_client.h
class tcp_client : public net_connection{//...}
这样,我们就可以用一个net_connection指针指向这两种不同的对象实例了。
接下来我们将`msg_callback`回调业务函数类型改成
typedef void msg_callback(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data);
这样这个业务函数就可以支持tcp_conn和tcp_client了。
所以修改之后,我们的msg_router类定义如下:
lars_reactor/include/message.h
//消息路由分发机制class msg_router{public:msg_router(): {printf("msg router init ...\n");}//给一个消息ID注册一个对应的回调业务函数int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data){if(_router.find(msgid) != _router.end()) {//该msgID的回调业务已经存在return -1;}printf("add msg cb msgid = %d\n", msgid);_router[msgid] = msg_cb;_args[msgid] = user_data;return 0;}//调用注册的对应的回调业务函数void call(int msgid, uint32_t msglen, const char *data, net_connection *net_conn){printf("call msgid = %d\n", msgid);//判断msgid对应的回调是否存在if (_router.find(msgid) == _router.end()) {fprintf(stderr, "msgid %d is not register!\n", msgid);return;}//直接取出回调函数,执行msg_callback *callback = _router[msgid];void *user_data = _args[msgid];callback(data, msglen, msgid, net_conn, user_data);printf("=======\n");}private://针对消息的路由分发,key为msgID, value为注册的回调业务函数__gnu_cxx::hash_map<int, msg_callback*> _router;//回调业务函数对应的参数,key为msgID, value为对应的参数__gnu_cxx::hash_map<int, void*> _args;};
8.3 msg_router集成到tcp_server中
A. tcp_server添加msg_router静态成员变量
lars_reactor/include/tcp_server.h
class tcp_server{public:// ...//---- 消息分发路由 ----static msg_router router;// ...};
同时定义及初始化
lars_reactor/src/tcp_server.cpp
//...// ==== 消息分发路由 ===msg_router tcp_server::router;//...
B. tcp_server提供注册路由方法
lars_reactor/include/tcp_server.c
class tcp_server{public://...//注册消息路由回调函数void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {router.register_msg_router(msgid, cb, user_data);}//...public://全部已经在线的连接信息//---- 消息分发路由 ----static msg_router router;//...};
C. 修正tcp_conn的do_read改成消息分发
lars_reactor/src/tcp_conn.cpp
//...//处理读业务void tcp_conn::do_read(){//1. 从套接字读取数据int ret = ibuf.read_data(_connfd);if (ret == -1) {fprintf(stderr, "read data from socket\n");this->clean_conn();return ;}else if ( ret == 0) {//对端正常关闭printf("connection closed by peer\n");clean_conn();return ;}//2. 解析msg_head数据msg_head head;//[这里用while,可能一次性读取多个完整包过来]while (ibuf.length() >= MESSAGE_HEAD_LEN) {//2.1 读取msg_head头部,固定长度MESSAGE_HEAD_LENmemcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);this->clean_conn();break;}if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {//缓存buf中剩余的数据,小于实际上应该接受的数据//说明是一个不完整的包,应该抛弃break;}//2.2 再根据头长度读取数据体,然后针对数据体处理 业务//头部处理完了,往后偏移MESSAGE_HEAD_LEN长度ibuf.pop(MESSAGE_HEAD_LEN);//处理ibuf.data()业务数据printf("read data: %s\n", ibuf.data());//消息包路由模式tcp_server::router.call(head.msgid, head.msglen, ibuf.data(), this);////回显业务//callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);//消息体处理完了,往后便宜msglen长度ibuf.pop(head.msglen);}ibuf.adjust();return ;}//...
8.4 msg_router集成到tcp_client中
lars_reactor/include/tcp_client.h
class tcp_client : public net_connection{public:// ...//设置业务处理回调函数//void set_msg_callback(msg_callback *msg_cb)//{//this->_msg_callback = msg_cb;//}//注册消息路由回调函数void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {_router.register_msg_router(msgid, cb, user_data);}private://处理消息的分发路由msg_router _router;//msg_callback *_msg_callback; //单路由模式去掉// ...// ...};
然后在修正`tcp_client`的`do_read()`方法。
lars_reactor/src/tcp_client.cpp
//处理读业务int tcp_client::do_read(){//确定已经成功建立连接assert(connected == true);// 1. 一次性全部读取出来//得到缓冲区里有多少字节要被读取,然后将字节数放入b里面。int need_read = 0;if (ioctl(_sockfd, FIONREAD, &need_read) == -1) {fprintf(stderr, "ioctl FIONREAD error");return -1;}//确保_buf可以容纳可读数据assert(need_read <= _ibuf.capacity - _ibuf.length);int ret;do {ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read);} while(ret == -1 && errno == EINTR);if (ret == 0) {//对端关闭if (_name != NULL) {printf("%s client: connection close by peer!\n", _name);}else {printf("client: connection close by peer!\n");}clean_conn();return -1;}else if (ret == -1) {fprintf(stderr, "client: do_read() , error\n");clean_conn();return -1;}assert(ret == need_read);_ibuf.length += ret;//2. 解包msg_head head;int msgid, length;while (_ibuf.length >= MESSAGE_HEAD_LEN) {memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN);msgid = head.msgid;length = head.msglen;/*if (length + MESSAGE_HEAD_LEN < _ibuf.length) {break;}*///头部读取完毕_ibuf.pop(MESSAGE_HEAD_LEN);// ===================================//3. 交给业务函数处理//if (_msg_callback != NULL) {//this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);//}// 消息路由分发this->_router.call(msgid, length, _ibuf.data + _ibuf.head, this);// ===================================//数据区域处理完毕_ibuf.pop(length);}//重置head指针_ibuf.adjust();return 0;}
8.5 完成Lars Reactor V0.6开发
我们现在重新写一下 server.cpp 和client.cpp的两个应用程序
lars_reacor/example/lars_reactor_0.6/server.cpp
#include "tcp_server.h"//回显业务的回调函数void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data){printf("callback_busi ...\n");//直接回显conn->send_message(data, len, msgid);}//打印信息回调函数void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data){printf("recv client: [%s]\n", data);printf("msgid: [%d]\n", msgid);printf("len: [%d]\n", len);}int main(){event_loop loop;tcp_server server(&loop, "127.0.0.1", 7777);//注册消息业务路由server.add_msg_router(1, callback_busi);server.add_msg_router(2, print_busi);loop.event_process();return 0;}
lars_reacor/example/lars_reactor_0.6/client.cpp
#include "tcp_client.h"#include <stdio.h>#include <string.h>//客户端业务void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data){//得到服务端回执的数据printf("recv server: [%s]\n", data);printf("msgid: [%d]\n", msgid);printf("len: [%d]\n", len);}int main(){event_loop loop;//创建tcp客户端tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");//注册消息路由业务client.add_msg_router(1, busi);//开启事件监听loop.event_process();return 0;}
lars_reactor/src/tcp_client.cpp
//判断链接是否是创建链接,主要是针对非阻塞socket 返回EINPROGRESS错误static void connection_delay(event_loop *loop, int fd, void *args){tcp_client *cli = (tcp_client*)args;loop->del_io_event(fd);int result = 0;socklen_t result_len = sizeof(result);getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);if (result == 0) {//链接是建立成功的cli->connected = true;printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));// ================ 发送msgid:1 =====//建立连接成功之后,主动发送send_messageconst char *msg = "hello lars!";int msgid = 1;cli->send_message(msg, strlen(msg), msgid);// ================ 发送msgid:2 =====const char *msg2 = "hello Aceld!";msgid = 2;cli->send_message(msg2, strlen(msg2), msgid);// ================loop->add_io_event(fd, read_callback, EPOLLIN, cli);if (cli->_obuf.length != 0) {//输出缓冲有数据可写loop->add_io_event(fd, write_callback, EPOLLOUT, cli);}}else {//链接创建失败fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));}}
运行结果:
服务端
$ ./servermsg_router init...add msg cb msgid = 1add msg cb msgid = 2begin acceptget new connection succ!read data: hello lars!call msgid = 1callback_busi ...server send_message: hello lars!:11, msgid = 1=======read data: hello Aceld!call msgid = 2recv client: [hello Aceld!]msgid: [2]len: [12]
客户端
$ ./clientmsg_router init...do_connect EINPROGRESSadd msg cb msgid = 1connect 127.0.0.1:7777 succ!do write over, del EPOLLOUTcall msgid = 1recv server: [hello lars!]msgid: [1]len: [11]=======
