现在我们发送的消息都是message结构的,有个message头里面其中有两个关键的字段,msgidmsglen,其中加入msgid的意义就是我们可以甄别是哪个消息,从而对这类消息做出不同的业务处理。但是现在我们无论是服务端还是客户端都是写死的两个业务,就是”回显业务”,显然这并不满足我们作为服务器框架的需求。我们需要开发者可以注册自己的回调业务。所以我们需要提供一个注册业务的入口,然后在后端根据不同的msgid来激活不同的回调业务函数。

8.1 添加消息分发路由类msg_router

  1. 下面我们提供这样一个中转的router模块,在include/message.h添加

lars_reactor/include/message.h

  1. #pragma once
  2. #include <ext/hash_map>
  3. //解决tcp粘包问题的消息头
  4. struct msg_head
  5. {
  6. int msgid;
  7. int msglen;
  8. };
  9. //消息头的二进制长度,固定数
  10. #define MESSAGE_HEAD_LEN 8
  11. //消息头+消息体的最大长度限制
  12. #define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)
  13. //msg 业务回调函数原型
  14. //===================== 消息分发路由机制 ==================
  15. class tcp_client;
  16. typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data);
  17. //消息路由分发机制
  18. class msg_router
  19. {
  20. public:
  21. msg_router():_router(),_args() {}
  22. //给一个消息ID注册一个对应的回调业务函数
  23. int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data)
  24. {
  25. if(_router.find(msgid) != _router.end()) {
  26. //该msgID的回调业务已经存在
  27. return -1;
  28. }
  29. _router[msgid] = msg_cb;
  30. _args[msgid] = user_data;
  31. return 0;
  32. }
  33. //调用注册的对应的回调业务函数
  34. void call(int msgid, uint32_t msglen, const char *data, tcp_client *client)
  35. {
  36. //判断msgid对应的回调是否存在
  37. if (_router.find(msgid) == _router.end()) {
  38. fprintf(stderr, "msgid %d is not register!\n", msgid);
  39. return;
  40. }
  41. //直接取出回调函数,执行
  42. msg_callback *callback = _router[msgid];
  43. void *user_data = _args[msgid];
  44. callback(data, msglen, msgid, client, user_data);
  45. }
  46. private:
  47. //针对消息的路由分发,key为msgID, value为注册的回调业务函数
  48. __gnu_cxx::hash_map<int, msg_callback *> _router;
  49. //回调业务函数对应的参数,key为msgID, value为对应的参数
  50. __gnu_cxx::hash_map<int, void *> _args;
  51. };
  52. //===================== 消息分发路由机制 ==================
  1. 开发者需要注册一个`msg_callback`类型的函数,通过`msg_router`类的`register_msg_router()`方法来注册,同时通过`call()`方法来调用。
  2. 全部回调业务函数和msgid的对应关系保存在一个hash_map类型的`_router`map中,`_args`保存对应的参数。
  3. 但是这里有个小细节需要注意一下,`msg_callback`的函数类型声明是这样的。
  1. typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data);
  1. 其中这里面第4个参数,只能是tcp_client类型的参数,也就是我们之前的设计的msg_callback只支持tcp_client的消息回调机制,但是很明显我们的需求是不仅是`tcp_client`要用,tcp_server中的`tcp_conn`也要用到这个机制,那么很显然这个参数在这就不是很合适,那么如果设定一个形参既能指向`tcp_client`又能能指向`tcp_conn`两个类型呢,当然答案就只能是将这两个类抽象出来一层,用父类指针指向子类然后通过多态特性来调用就可以了,所以我们需要先定义一个抽象类。

8.2 链接抽象类创建

  1. 经过分析,我们定义如下的抽象类,并提供一些接口。

lars_reactor/include/net_connection.h

  1. #pragma once
  2. /*
  3. *
  4. * 网络通信的抽象类,任何需要进行收发消息的模块,都可以实现该类
  5. *
  6. * */
  7. class net_connection
  8. {
  9. public:
  10. //发送消息的接口
  11. virtual int send_message(const char *data, int datalen, int msgid) = 0;
  12. };
  1. 然后让我们tcp_server端的`tcp_conn`类继承`net_connecton`, 客户端的`tcp_client` 继承`net_connection`

lars_reactor/include/tcp_conn.h

  1. class tcp_conn : public net_connection
  2. {
  3. //...
  4. };

lars_reactor/include/tcp_client.h

  1. class tcp_client : public net_connection
  2. {
  3. //...
  4. }

这样,我们就可以用一个net_connection指针指向这两种不同的对象实例了。

  1. 接下来我们将`msg_callback`回调业务函数类型改成
  1. typedef void msg_callback(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data);
  1. 这样这个业务函数就可以支持tcp_conntcp_client了。

所以修改之后,我们的msg_router类定义如下:

lars_reactor/include/message.h

  1. //消息路由分发机制
  2. class msg_router
  3. {
  4. public:
  5. msg_router(): {
  6. printf("msg router init ...\n");
  7. }
  8. //给一个消息ID注册一个对应的回调业务函数
  9. int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data)
  10. {
  11. if(_router.find(msgid) != _router.end()) {
  12. //该msgID的回调业务已经存在
  13. return -1;
  14. }
  15. printf("add msg cb msgid = %d\n", msgid);
  16. _router[msgid] = msg_cb;
  17. _args[msgid] = user_data;
  18. return 0;
  19. }
  20. //调用注册的对应的回调业务函数
  21. void call(int msgid, uint32_t msglen, const char *data, net_connection *net_conn)
  22. {
  23. printf("call msgid = %d\n", msgid);
  24. //判断msgid对应的回调是否存在
  25. if (_router.find(msgid) == _router.end()) {
  26. fprintf(stderr, "msgid %d is not register!\n", msgid);
  27. return;
  28. }
  29. //直接取出回调函数,执行
  30. msg_callback *callback = _router[msgid];
  31. void *user_data = _args[msgid];
  32. callback(data, msglen, msgid, net_conn, user_data);
  33. printf("=======\n");
  34. }
  35. private:
  36. //针对消息的路由分发,key为msgID, value为注册的回调业务函数
  37. __gnu_cxx::hash_map<int, msg_callback*> _router;
  38. //回调业务函数对应的参数,key为msgID, value为对应的参数
  39. __gnu_cxx::hash_map<int, void*> _args;
  40. };

8.3 msg_router集成到tcp_server中

A. tcp_server添加msg_router静态成员变量

lars_reactor/include/tcp_server.h

  1. class tcp_server
  2. {
  3. public:
  4. // ...
  5. //---- 消息分发路由 ----
  6. static msg_router router;
  7. // ...
  8. };

同时定义及初始化

lars_reactor/src/tcp_server.cpp

  1. //...
  2. // ==== 消息分发路由 ===
  3. msg_router tcp_server::router;
  4. //...

B. tcp_server提供注册路由方法

lars_reactor/include/tcp_server.c

  1. class tcp_server
  2. {
  3. public:
  4. //...
  5. //注册消息路由回调函数
  6. void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
  7. router.register_msg_router(msgid, cb, user_data);
  8. }
  9. //...
  10. public:
  11. //全部已经在线的连接信息
  12. //---- 消息分发路由 ----
  13. static msg_router router;
  14. //...
  15. };

C. 修正tcp_conn的do_read改成消息分发

lars_reactor/src/tcp_conn.cpp

  1. //...
  2. //处理读业务
  3. void tcp_conn::do_read()
  4. {
  5. //1. 从套接字读取数据
  6. int ret = ibuf.read_data(_connfd);
  7. if (ret == -1) {
  8. fprintf(stderr, "read data from socket\n");
  9. this->clean_conn();
  10. return ;
  11. }
  12. else if ( ret == 0) {
  13. //对端正常关闭
  14. printf("connection closed by peer\n");
  15. clean_conn();
  16. return ;
  17. }
  18. //2. 解析msg_head数据
  19. msg_head head;
  20. //[这里用while,可能一次性读取多个完整包过来]
  21. while (ibuf.length() >= MESSAGE_HEAD_LEN) {
  22. //2.1 读取msg_head头部,固定长度MESSAGE_HEAD_LEN
  23. memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);
  24. if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {
  25. fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);
  26. this->clean_conn();
  27. break;
  28. }
  29. if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {
  30. //缓存buf中剩余的数据,小于实际上应该接受的数据
  31. //说明是一个不完整的包,应该抛弃
  32. break;
  33. }
  34. //2.2 再根据头长度读取数据体,然后针对数据体处理 业务
  35. //头部处理完了,往后偏移MESSAGE_HEAD_LEN长度
  36. ibuf.pop(MESSAGE_HEAD_LEN);
  37. //处理ibuf.data()业务数据
  38. printf("read data: %s\n", ibuf.data());
  39. //消息包路由模式
  40. tcp_server::router.call(head.msgid, head.msglen, ibuf.data(), this);
  41. ////回显业务
  42. //callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);
  43. //消息体处理完了,往后便宜msglen长度
  44. ibuf.pop(head.msglen);
  45. }
  46. ibuf.adjust();
  47. return ;
  48. }
  49. //...

8.4 msg_router集成到tcp_client中

lars_reactor/include/tcp_client.h

  1. class tcp_client : public net_connection
  2. {
  3. public:
  4. // ...
  5. //设置业务处理回调函数
  6. //void set_msg_callback(msg_callback *msg_cb)
  7. //{
  8. //this->_msg_callback = msg_cb;
  9. //}
  10. //注册消息路由回调函数
  11. void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
  12. _router.register_msg_router(msgid, cb, user_data);
  13. }
  14. private:
  15. //处理消息的分发路由
  16. msg_router _router;
  17. //msg_callback *_msg_callback; //单路由模式去掉
  18. // ...
  19. // ...
  20. };
  1. 然后在修正`tcp_client``do_read()`方法。

lars_reactor/src/tcp_client.cpp

  1. //处理读业务
  2. int tcp_client::do_read()
  3. {
  4. //确定已经成功建立连接
  5. assert(connected == true);
  6. // 1. 一次性全部读取出来
  7. //得到缓冲区里有多少字节要被读取,然后将字节数放入b里面。
  8. int need_read = 0;
  9. if (ioctl(_sockfd, FIONREAD, &need_read) == -1) {
  10. fprintf(stderr, "ioctl FIONREAD error");
  11. return -1;
  12. }
  13. //确保_buf可以容纳可读数据
  14. assert(need_read <= _ibuf.capacity - _ibuf.length);
  15. int ret;
  16. do {
  17. ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read);
  18. } while(ret == -1 && errno == EINTR);
  19. if (ret == 0) {
  20. //对端关闭
  21. if (_name != NULL) {
  22. printf("%s client: connection close by peer!\n", _name);
  23. }
  24. else {
  25. printf("client: connection close by peer!\n");
  26. }
  27. clean_conn();
  28. return -1;
  29. }
  30. else if (ret == -1) {
  31. fprintf(stderr, "client: do_read() , error\n");
  32. clean_conn();
  33. return -1;
  34. }
  35. assert(ret == need_read);
  36. _ibuf.length += ret;
  37. //2. 解包
  38. msg_head head;
  39. int msgid, length;
  40. while (_ibuf.length >= MESSAGE_HEAD_LEN) {
  41. memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN);
  42. msgid = head.msgid;
  43. length = head.msglen;
  44. /*
  45. if (length + MESSAGE_HEAD_LEN < _ibuf.length) {
  46. break;
  47. }
  48. */
  49. //头部读取完毕
  50. _ibuf.pop(MESSAGE_HEAD_LEN);
  51. // ===================================
  52. //3. 交给业务函数处理
  53. //if (_msg_callback != NULL) {
  54. //this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
  55. //}
  56. // 消息路由分发
  57. this->_router.call(msgid, length, _ibuf.data + _ibuf.head, this);
  58. // ===================================
  59. //数据区域处理完毕
  60. _ibuf.pop(length);
  61. }
  62. //重置head指针
  63. _ibuf.adjust();
  64. return 0;
  65. }

8.5 完成Lars Reactor V0.6开发

我们现在重新写一下 server.cpp 和client.cpp的两个应用程序

lars_reacor/example/lars_reactor_0.6/server.cpp

  1. #include "tcp_server.h"
  2. //回显业务的回调函数
  3. void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
  4. {
  5. printf("callback_busi ...\n");
  6. //直接回显
  7. conn->send_message(data, len, msgid);
  8. }
  9. //打印信息回调函数
  10. void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
  11. {
  12. printf("recv client: [%s]\n", data);
  13. printf("msgid: [%d]\n", msgid);
  14. printf("len: [%d]\n", len);
  15. }
  16. int main()
  17. {
  18. event_loop loop;
  19. tcp_server server(&loop, "127.0.0.1", 7777);
  20. //注册消息业务路由
  21. server.add_msg_router(1, callback_busi);
  22. server.add_msg_router(2, print_busi);
  23. loop.event_process();
  24. return 0;
  25. }

lars_reacor/example/lars_reactor_0.6/client.cpp

  1. #include "tcp_client.h"
  2. #include <stdio.h>
  3. #include <string.h>
  4. //客户端业务
  5. void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
  6. {
  7. //得到服务端回执的数据
  8. printf("recv server: [%s]\n", data);
  9. printf("msgid: [%d]\n", msgid);
  10. printf("len: [%d]\n", len);
  11. }
  12. int main()
  13. {
  14. event_loop loop;
  15. //创建tcp客户端
  16. tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");
  17. //注册消息路由业务
  18. client.add_msg_router(1, busi);
  19. //开启事件监听
  20. loop.event_process();
  21. return 0;
  22. }

lars_reactor/src/tcp_client.cpp

  1. //判断链接是否是创建链接,主要是针对非阻塞socket 返回EINPROGRESS错误
  2. static void connection_delay(event_loop *loop, int fd, void *args)
  3. {
  4. tcp_client *cli = (tcp_client*)args;
  5. loop->del_io_event(fd);
  6. int result = 0;
  7. socklen_t result_len = sizeof(result);
  8. getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);
  9. if (result == 0) {
  10. //链接是建立成功的
  11. cli->connected = true;
  12. printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
  13. // ================ 发送msgid:1 =====
  14. //建立连接成功之后,主动发送send_message
  15. const char *msg = "hello lars!";
  16. int msgid = 1;
  17. cli->send_message(msg, strlen(msg), msgid);
  18. // ================ 发送msgid:2 =====
  19. const char *msg2 = "hello Aceld!";
  20. msgid = 2;
  21. cli->send_message(msg2, strlen(msg2), msgid);
  22. // ================
  23. loop->add_io_event(fd, read_callback, EPOLLIN, cli);
  24. if (cli->_obuf.length != 0) {
  25. //输出缓冲有数据可写
  26. loop->add_io_event(fd, write_callback, EPOLLOUT, cli);
  27. }
  28. }
  29. else {
  30. //链接创建失败
  31. fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
  32. }
  33. }

运行结果:

服务端

  1. $ ./server
  2. msg_router init...
  3. add msg cb msgid = 1
  4. add msg cb msgid = 2
  5. begin accept
  6. get new connection succ!
  7. read data: hello lars!
  8. call msgid = 1
  9. callback_busi ...
  10. server send_message: hello lars!:11, msgid = 1
  11. =======
  12. read data: hello Aceld!
  13. call msgid = 2
  14. recv client: [hello Aceld!]
  15. msgid: [2]
  16. len: [12]

客户端

  1. $ ./client
  2. msg_router init...
  3. do_connect EINPROGRESS
  4. add msg cb msgid = 1
  5. connect 127.0.0.1:7777 succ!
  6. do write over, del EPOLLOUT
  7. call msgid = 1
  8. recv server: [hello lars!]
  9. msgid: [1]
  10. len: [11]
  11. =======