下面我们来给链接注册两个hook节点的函数,即服务端有新的客户端链接创建之后用户可以注册一个回调,有客户端断开链接的回调。还有客户端在成功与服务端创建链接之后创建的回调,和客户端与服务端断开链接之前的回调。

9.1 tcp_server服务端添加链接Hook函数

A. 定义Hook函数原型

lars_reactor/include/net_connection.h

  1. #pragma once
  2. /*
  3. *
  4. * 网络通信的抽象类,任何需要进行收发消息的模块,都可以实现该类
  5. *
  6. * */
  7. class net_connection
  8. {
  9. public:
  10. net_connection() {}
  11. //发送消息的接口
  12. virtual int send_message(const char *data, int datalen, int msgid) = 0;
  13. };
  14. //创建链接/销毁链接 要触发的 回调函数类型
  15. typedef void (*conn_callback)(net_connection *conn, void *args);

B. tcp_server定义相关hook函数的属性

lars_reactor/include/tcp_server.h

  1. #pragma once
  2. #include <netinet/in.h>
  3. #include "event_loop.h"
  4. #include "tcp_conn.h"
  5. #include "message.h"
  6. class tcp_server
  7. {
  8. public:
  9. //server的构造函数
  10. tcp_server(event_loop* loop, const char *ip, uint16_t port);
  11. //开始提供创建链接服务
  12. void do_accept();
  13. //链接对象释放的析构
  14. ~tcp_server();
  15. //注册消息路由回调函数
  16. void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
  17. router.register_msg_router(msgid, cb, user_data);
  18. }
  19. private:
  20. //基础信息
  21. int _sockfd; //套接字
  22. struct sockaddr_in _connaddr; //客户端链接地址
  23. socklen_t _addrlen; //客户端链接地址长度
  24. //event_loop epoll事件机制
  25. event_loop* _loop;
  26. public:
  27. //---- 消息分发路由 ----
  28. static msg_router router;
  29. //---- 客户端链接管理部分-----
  30. public:
  31. static void increase_conn(int connfd, tcp_conn *conn); //新增一个新建的连接
  32. static void decrease_conn(int connfd); //减少一个断开的连接
  33. static void get_conn_num(int *curr_conn); //得到当前链接的刻度
  34. static tcp_conn **conns; //全部已经在线的连接信息
  35. // ------- 创建链接/销毁链接 Hook 部分 -----
  36. //设置链接的创建hook函数
  37. static void set_conn_start(conn_callback cb, void *args = NULL) {
  38. conn_start_cb = cb;
  39. conn_start_cb_args = args;
  40. }
  41. //设置链接的销毁hook函数
  42. static void set_conn_close(conn_callback cb, void *args = NULL) {
  43. conn_close_cb = cb;
  44. conn_close_cb_args = args;
  45. }
  46. //创建链接之后要触发的 回调函数
  47. static conn_callback conn_start_cb;
  48. static void *conn_start_cb_args;
  49. //销毁链接之前要触发的 回调函数
  50. static conn_callback conn_close_cb;
  51. static void *conn_close_cb_args;
  52. private:
  53. //TODO
  54. //从配置文件中读取
  55. #define MAX_CONNS 10000
  56. static int _max_conns; //最大client链接个数
  57. static int _curr_conns; //当前链接刻度
  58. static pthread_mutex_t _conns_mutex; //保护_curr_conns刻度修改的锁
  59. };

C. tcp_conn在连接创建/销毁调用Hook函数

  1. //初始化tcp_conn
  2. tcp_conn::tcp_conn(int connfd, event_loop *loop)
  3. {
  4. _connfd = connfd;
  5. _loop = loop;
  6. //1. 将connfd设置成非阻塞状态
  7. int flag = fcntl(_connfd, F_GETFL, 0);
  8. fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);
  9. //2. 设置TCP_NODELAY禁止做读写缓存,降低小包延迟
  10. int op = 1;
  11. setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h
  12. //2.5 如果用户注册了链接建立Hook 则调用
  13. if (tcp_server::conn_start_cb) {
  14. tcp_server::conn_start_cb(this, tcp_server::conn_start_cb_args);
  15. }
  16. //3. 将该链接的读事件让event_loop监控
  17. _loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);
  18. //4 将该链接集成到对应的tcp_server中
  19. tcp_server::increase_conn(_connfd, this);
  20. }
  21. //...
  22. //...
  23. //销毁tcp_conn
  24. void tcp_conn::clean_conn()
  25. {
  26. // 如果注册了链接销毁Hook函数,则调用
  27. if (tcp_server::conn_close_cb) {
  28. tcp_server::conn_close_cb(this, tcp_server::conn_close_cb_args);
  29. }
  30. //链接清理工作
  31. //1 将该链接从tcp_server摘除掉
  32. tcp_server::decrease_conn(_connfd);
  33. //2 将该链接从event_loop中摘除
  34. _loop->del_io_event(_connfd);
  35. //3 buf清空
  36. ibuf.clear();
  37. obuf.clear();
  38. //4 关闭原始套接字
  39. int fd = _connfd;
  40. _connfd = -1;
  41. close(fd);
  42. }

9.2 tcp_client客户端添加链接Hook函数

A. tcp_client添加Hook属性

lars_reactor/include/tcp_client.h

  1. #pragma once
  2. #include <sys/types.h>
  3. #include <sys/socket.h>
  4. #include <netinet/in.h>
  5. #include <arpa/inet.h>
  6. #include "io_buf.h"
  7. #include "event_loop.h"
  8. #include "message.h"
  9. #include "net_connection.h"
  10. class tcp_client : public net_connection
  11. {
  12. public:
  13. //初始化客户端套接字
  14. tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name);
  15. //发送message方法
  16. int send_message(const char *data, int msglen, int msgid);
  17. //创建链接
  18. void do_connect();
  19. //处理读业务
  20. int do_read();
  21. //处理写业务
  22. int do_write();
  23. //释放链接资源
  24. void clean_conn();
  25. ~tcp_client();
  26. //设置业务处理回调函数
  27. //void set_msg_callback(msg_callback *msg_cb)
  28. //{
  29. //this->_msg_callback = msg_cb;
  30. //}
  31. //注册消息路由回调函数
  32. void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
  33. _router.register_msg_router(msgid, cb, user_data);
  34. }
  35. //----- 链接创建/销毁回调Hook ----
  36. //设置链接的创建hook函数
  37. void set_conn_start(conn_callback cb, void *args = NULL)
  38. {
  39. _conn_start_cb = cb;
  40. _conn_start_cb_args = args;
  41. }
  42. //设置链接的销毁hook函数
  43. void set_conn_close(conn_callback cb, void *args = NULL) {
  44. _conn_close_cb = cb;
  45. _conn_close_cb_args = args;
  46. }
  47. //创建链接之后要触发的 回调函数
  48. conn_callback _conn_start_cb;
  49. void * _conn_start_cb_args;
  50. //销毁链接之前要触发的 回调函数
  51. conn_callback _conn_close_cb;
  52. void * _conn_close_cb_args;
  53. // ---------------------------------
  54. bool connected; //链接是否创建成功
  55. //server端地址
  56. struct sockaddr_in _server_addr;
  57. io_buf _obuf;
  58. io_buf _ibuf;
  59. private:
  60. int _sockfd;
  61. socklen_t _addrlen;
  62. //处理消息的分发路由
  63. msg_router _router;
  64. //msg_callback *_msg_callback; //单路由模式去掉
  65. //客户端的事件处理机制
  66. event_loop* _loop;
  67. //当前客户端的名称 用户记录日志
  68. const char *_name;
  69. };

B. tcp_client在创建/销毁调用Hook

lars_reactor/src/tcp_client.c

  1. //创建链接
  2. void tcp_client::do_connect()
  3. {
  4. // ...
  5. // ...
  6. int ret = connect(_sockfd, (const struct sockaddr*)&_server_addr, _addrlen);
  7. if (ret == 0) {
  8. //链接创建成功
  9. connected = true;
  10. //调用开发者客户端注册的创建链接之后的hook函数
  11. if (_conn_start_cb != NULL) {
  12. _conn_start_cb(this, _conn_start_cb_args);
  13. }
  14. // ...
  15. // ...
  16. }
  17. }
  18. //判断链接是否是创建链接,主要是针对非阻塞socket 返回EINPROGRESS错误
  19. static void connection_delay(event_loop *loop, int fd, void *args)
  20. {
  21. tcp_client *cli = (tcp_client*)args;
  22. loop->del_io_event(fd);
  23. int result = 0;
  24. socklen_t result_len = sizeof(result);
  25. getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);
  26. if (result == 0) {
  27. //链接是建立成功的
  28. cli->connected = true;
  29. printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
  30. //调用开发者注册的创建链接Hook函数
  31. if (cli->_conn_start_cb != NULL) {
  32. cli->_conn_start_cb(cli, cli->_conn_start_cb_args);
  33. }
  34. // ....
  35. // ...
  36. }
  37. }
  38. //释放链接资源,重置连接
  39. void tcp_client::clean_conn()
  40. {
  41. if (_sockfd != -1) {
  42. printf("clean conn, del socket!\n");
  43. _loop->del_io_event(_sockfd);
  44. close(_sockfd);
  45. }
  46. connected = false;
  47. //调用开发者注册的销毁链接之前触发的Hook
  48. if (_conn_close_cb != NULL) {
  49. _conn_close_cb(this, _conn_close_cb_args);
  50. }
  51. //重新连接
  52. this->do_connect();
  53. }

9.3 完成Lars Reactor V0.7开发

server.cpp

  1. #include "tcp_server.h"
  2. #include <string.h>
  3. //回显业务的回调函数
  4. void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
  5. {
  6. printf("callback_busi ...\n");
  7. //直接回显
  8. conn->send_message(data, len, msgid);
  9. }
  10. //打印信息回调函数
  11. void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
  12. {
  13. printf("recv client: [%s]\n", data);
  14. printf("msgid: [%d]\n", msgid);
  15. printf("len: [%d]\n", len);
  16. }
  17. //新客户端创建的回调
  18. void on_client_build(net_connection *conn, void *args)
  19. {
  20. int msgid = 101;
  21. const char *msg = "welcome! you online..";
  22. conn->send_message(msg, strlen(msg), msgid);
  23. }
  24. //客户端销毁的回调
  25. void on_client_lost(net_connection *conn, void *args)
  26. {
  27. printf("connection is lost !\n");
  28. }
  29. int main()
  30. {
  31. event_loop loop;
  32. tcp_server server(&loop, "127.0.0.1", 7777);
  33. //注册消息业务路由
  34. server.add_msg_router(1, callback_busi);
  35. server.add_msg_router(2, print_busi);
  36. //注册链接hook回调
  37. server.set_conn_start(on_client_build);
  38. server.set_conn_close(on_client_lost);
  39. loop.event_process();
  40. return 0;
  41. }

client.cpp

  1. #include "tcp_client.h"
  2. #include <stdio.h>
  3. #include <string.h>
  4. //客户端业务
  5. void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
  6. {
  7. //得到服务端回执的数据
  8. printf("recv server: [%s]\n", data);
  9. printf("msgid: [%d]\n", msgid);
  10. printf("len: [%d]\n", len);
  11. }
  12. //客户端销毁的回调
  13. void on_client_build(net_connection *conn, void *args)
  14. {
  15. int msgid = 1;
  16. const char *msg = "Hello Lars!";
  17. conn->send_message(msg, strlen(msg), msgid);
  18. }
  19. //客户端销毁的回调
  20. void on_client_lost(net_connection *conn, void *args)
  21. {
  22. printf("on_client_lost...\n");
  23. printf("Client is lost!\n");
  24. }
  25. int main()
  26. {
  27. event_loop loop;
  28. //创建tcp客户端
  29. tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");
  30. //注册消息路由业务
  31. client.add_msg_router(1, busi);
  32. client.add_msg_router(101, busi);
  33. //设置hook函数
  34. client.set_conn_start(on_client_build);
  35. client.set_conn_close(on_client_lost);
  36. //开启事件监听
  37. loop.event_process();
  38. return 0;
  39. }

运行结果

服务端

  1. $ ./server
  2. msg_router init...
  3. add msg cb msgid = 1
  4. add msg cb msgid = 2
  5. begin accept
  6. get new connection succ!
  7. read data: Hello Lars!
  8. call msgid = 1
  9. callback_busi ...
  10. =======
  11. connection closed by peer
  12. connection is lost !

客户端:

  1. $ ./client
  2. msg_router init...
  3. do_connect EINPROGRESS
  4. add msg cb msgid = 1
  5. add msg cb msgid = 101
  6. connect 127.0.0.1:7777 succ!
  7. do write over, del EPOLLOUT
  8. call msgid = 101
  9. recv server: [welcome! you online..]
  10. msgid: [101]
  11. len: [21]
  12. =======
  13. call msgid = 1
  14. recv server: [Hello Lars!]
  15. msgid: [1]
  16. len: [11]
  17. =======
  18. ^C
  1. 这样我们的成功的将hook机制加入进去了。