下面我们来给链接注册两个hook节点的函数,即服务端有新的客户端链接创建之后用户可以注册一个回调,有客户端断开链接的回调。还有客户端在成功与服务端创建链接之后创建的回调,和客户端与服务端断开链接之前的回调。
9.1 tcp_server服务端添加链接Hook函数
A. 定义Hook函数原型
lars_reactor/include/net_connection.h
#pragma once/*** 网络通信的抽象类,任何需要进行收发消息的模块,都可以实现该类** */class net_connection{public:net_connection() {}//发送消息的接口virtual int send_message(const char *data, int datalen, int msgid) = 0;};//创建链接/销毁链接 要触发的 回调函数类型typedef void (*conn_callback)(net_connection *conn, void *args);
B. tcp_server定义相关hook函数的属性
lars_reactor/include/tcp_server.h
#pragma once#include <netinet/in.h>#include "event_loop.h"#include "tcp_conn.h"#include "message.h"class tcp_server{public://server的构造函数tcp_server(event_loop* loop, const char *ip, uint16_t port);//开始提供创建链接服务void do_accept();//链接对象释放的析构~tcp_server();//注册消息路由回调函数void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {router.register_msg_router(msgid, cb, user_data);}private://基础信息int _sockfd; //套接字struct sockaddr_in _connaddr; //客户端链接地址socklen_t _addrlen; //客户端链接地址长度//event_loop epoll事件机制event_loop* _loop;public://---- 消息分发路由 ----static msg_router router;//---- 客户端链接管理部分-----public:static void increase_conn(int connfd, tcp_conn *conn); //新增一个新建的连接static void decrease_conn(int connfd); //减少一个断开的连接static void get_conn_num(int *curr_conn); //得到当前链接的刻度static tcp_conn **conns; //全部已经在线的连接信息// ------- 创建链接/销毁链接 Hook 部分 -----//设置链接的创建hook函数static void set_conn_start(conn_callback cb, void *args = NULL) {conn_start_cb = cb;conn_start_cb_args = args;}//设置链接的销毁hook函数static void set_conn_close(conn_callback cb, void *args = NULL) {conn_close_cb = cb;conn_close_cb_args = args;}//创建链接之后要触发的 回调函数static conn_callback conn_start_cb;static void *conn_start_cb_args;//销毁链接之前要触发的 回调函数static conn_callback conn_close_cb;static void *conn_close_cb_args;private://TODO//从配置文件中读取#define MAX_CONNS 10000static int _max_conns; //最大client链接个数static int _curr_conns; //当前链接刻度static pthread_mutex_t _conns_mutex; //保护_curr_conns刻度修改的锁};
C. tcp_conn在连接创建/销毁调用Hook函数
//初始化tcp_conntcp_conn::tcp_conn(int connfd, event_loop *loop){_connfd = connfd;_loop = loop;//1. 将connfd设置成非阻塞状态int flag = fcntl(_connfd, F_GETFL, 0);fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);//2. 设置TCP_NODELAY禁止做读写缓存,降低小包延迟int op = 1;setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h//2.5 如果用户注册了链接建立Hook 则调用if (tcp_server::conn_start_cb) {tcp_server::conn_start_cb(this, tcp_server::conn_start_cb_args);}//3. 将该链接的读事件让event_loop监控_loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);//4 将该链接集成到对应的tcp_server中tcp_server::increase_conn(_connfd, this);}//...//...//销毁tcp_connvoid tcp_conn::clean_conn(){// 如果注册了链接销毁Hook函数,则调用if (tcp_server::conn_close_cb) {tcp_server::conn_close_cb(this, tcp_server::conn_close_cb_args);}//链接清理工作//1 将该链接从tcp_server摘除掉tcp_server::decrease_conn(_connfd);//2 将该链接从event_loop中摘除_loop->del_io_event(_connfd);//3 buf清空ibuf.clear();obuf.clear();//4 关闭原始套接字int fd = _connfd;_connfd = -1;close(fd);}
9.2 tcp_client客户端添加链接Hook函数
A. tcp_client添加Hook属性
lars_reactor/include/tcp_client.h
#pragma once#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include "io_buf.h"#include "event_loop.h"#include "message.h"#include "net_connection.h"class tcp_client : public net_connection{public://初始化客户端套接字tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name);//发送message方法int send_message(const char *data, int msglen, int msgid);//创建链接void do_connect();//处理读业务int do_read();//处理写业务int do_write();//释放链接资源void clean_conn();~tcp_client();//设置业务处理回调函数//void set_msg_callback(msg_callback *msg_cb)//{//this->_msg_callback = msg_cb;//}//注册消息路由回调函数void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {_router.register_msg_router(msgid, cb, user_data);}//----- 链接创建/销毁回调Hook ----//设置链接的创建hook函数void set_conn_start(conn_callback cb, void *args = NULL){_conn_start_cb = cb;_conn_start_cb_args = args;}//设置链接的销毁hook函数void set_conn_close(conn_callback cb, void *args = NULL) {_conn_close_cb = cb;_conn_close_cb_args = args;}//创建链接之后要触发的 回调函数conn_callback _conn_start_cb;void * _conn_start_cb_args;//销毁链接之前要触发的 回调函数conn_callback _conn_close_cb;void * _conn_close_cb_args;// ---------------------------------bool connected; //链接是否创建成功//server端地址struct sockaddr_in _server_addr;io_buf _obuf;io_buf _ibuf;private:int _sockfd;socklen_t _addrlen;//处理消息的分发路由msg_router _router;//msg_callback *_msg_callback; //单路由模式去掉//客户端的事件处理机制event_loop* _loop;//当前客户端的名称 用户记录日志const char *_name;};
B. tcp_client在创建/销毁调用Hook
lars_reactor/src/tcp_client.c
//创建链接void tcp_client::do_connect(){// ...// ...int ret = connect(_sockfd, (const struct sockaddr*)&_server_addr, _addrlen);if (ret == 0) {//链接创建成功connected = true;//调用开发者客户端注册的创建链接之后的hook函数if (_conn_start_cb != NULL) {_conn_start_cb(this, _conn_start_cb_args);}// ...// ...}}//判断链接是否是创建链接,主要是针对非阻塞socket 返回EINPROGRESS错误static void connection_delay(event_loop *loop, int fd, void *args){tcp_client *cli = (tcp_client*)args;loop->del_io_event(fd);int result = 0;socklen_t result_len = sizeof(result);getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);if (result == 0) {//链接是建立成功的cli->connected = true;printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));//调用开发者注册的创建链接Hook函数if (cli->_conn_start_cb != NULL) {cli->_conn_start_cb(cli, cli->_conn_start_cb_args);}// ....// ...}}//释放链接资源,重置连接void tcp_client::clean_conn(){if (_sockfd != -1) {printf("clean conn, del socket!\n");_loop->del_io_event(_sockfd);close(_sockfd);}connected = false;//调用开发者注册的销毁链接之前触发的Hookif (_conn_close_cb != NULL) {_conn_close_cb(this, _conn_close_cb_args);}//重新连接this->do_connect();}
9.3 完成Lars Reactor V0.7开发
server.cpp
#include "tcp_server.h"#include <string.h>//回显业务的回调函数void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data){printf("callback_busi ...\n");//直接回显conn->send_message(data, len, msgid);}//打印信息回调函数void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data){printf("recv client: [%s]\n", data);printf("msgid: [%d]\n", msgid);printf("len: [%d]\n", len);}//新客户端创建的回调void on_client_build(net_connection *conn, void *args){int msgid = 101;const char *msg = "welcome! you online..";conn->send_message(msg, strlen(msg), msgid);}//客户端销毁的回调void on_client_lost(net_connection *conn, void *args){printf("connection is lost !\n");}int main(){event_loop loop;tcp_server server(&loop, "127.0.0.1", 7777);//注册消息业务路由server.add_msg_router(1, callback_busi);server.add_msg_router(2, print_busi);//注册链接hook回调server.set_conn_start(on_client_build);server.set_conn_close(on_client_lost);loop.event_process();return 0;}
client.cpp
#include "tcp_client.h"#include <stdio.h>#include <string.h>//客户端业务void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data){//得到服务端回执的数据printf("recv server: [%s]\n", data);printf("msgid: [%d]\n", msgid);printf("len: [%d]\n", len);}//客户端销毁的回调void on_client_build(net_connection *conn, void *args){int msgid = 1;const char *msg = "Hello Lars!";conn->send_message(msg, strlen(msg), msgid);}//客户端销毁的回调void on_client_lost(net_connection *conn, void *args){printf("on_client_lost...\n");printf("Client is lost!\n");}int main(){event_loop loop;//创建tcp客户端tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");//注册消息路由业务client.add_msg_router(1, busi);client.add_msg_router(101, busi);//设置hook函数client.set_conn_start(on_client_build);client.set_conn_close(on_client_lost);//开启事件监听loop.event_process();return 0;}
运行结果
服务端
$ ./servermsg_router init...add msg cb msgid = 1add msg cb msgid = 2begin acceptget new connection succ!read data: Hello Lars!call msgid = 1callback_busi ...=======connection closed by peerconnection is lost !
客户端:
$ ./clientmsg_router init...do_connect EINPROGRESSadd msg cb msgid = 1add msg cb msgid = 101connect 127.0.0.1:7777 succ!do write over, del EPOLLOUTcall msgid = 101recv server: [welcome! you online..]msgid: [101]len: [21]=======call msgid = 1recv server: [Hello Lars!]msgid: [1]len: [11]=======^C
这样我们的成功的将hook机制加入进去了。
