好了,现在我们来将服务器的连接做一个简单的封装,在这之前,我们要将我我们所发的数据做一个规定,采用TLV的格式,来进行封装。目的是解决TCP传输的粘包问题。

5.1 Message消息封装

TCP粘包问题-拆包封包过程
先创建一个message.h头文件

lars_reactor/include/message.h

  1. #pragma once
  2. //解决tcp粘包问题的消息头
  3. struct msg_head
  4. {
  5. int msgid;
  6. int msglen;
  7. };
  8. //消息头的二进制长度,固定数
  9. #define MESSAGE_HEAD_LEN 8
  10. //消息头+消息体的最大长度限制
  11. #define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)
  1. 接下来我们每次在server client之间传递数据的时候,都发送这种数据格式的头再加上后面的数据内容即可。

5.2 创建一个tcp_conn连接类

lars_reactor/include/tcp_conn.h

  1. #pragma once
  2. #include "reactor_buf.h"
  3. #include "event_loop.h"
  4. //一个tcp的连接信息
  5. class tcp_conn
  6. {
  7. public:
  8. //初始化tcp_conn
  9. tcp_conn(int connfd, event_loop *loop);
  10. //处理读业务
  11. void do_read();
  12. //处理写业务
  13. void do_write();
  14. //销毁tcp_conn
  15. void clean_conn();
  16. //发送消息的方法
  17. int send_message(const char *data, int msglen, int msgid);
  18. private:
  19. //当前链接的fd
  20. int _connfd;
  21. //该连接归属的event_poll
  22. event_loop *_loop;
  23. //输出buf
  24. output_buf obuf;
  25. //输入buf
  26. input_buf ibuf;
  27. };

简单说明一下里面的成员和方法:

成员:

_connfd:server刚刚accept成功的套接字

_loop:当前链接所绑定的事件触发句柄.

obuf:链接输出缓冲,向对端写数据

ibuf:链接输入缓冲,从对端读数据

方法

tcp_client():构造,主要在里面实现初始化及创建链接链接的connect过程。

do_read():读数据处理业务,主要是EPOLLIN事件触发。

do_write():写数据处理业务,主要是EPOLLOUT事件触发。

clean_conn():清空链接资源。

send_message():将消息打包成TLV格式发送给对端。

  1. 接下来,实现以下`tcp_conn`类.

lars_reactor/src/tcp_conn.cpp

  1. #include <unistd.h>
  2. #include <fcntl.h>
  3. #include <sys/types.h>
  4. #include <sys/socket.h>
  5. #include <netinet/in.h>
  6. #include <netinet/tcp.h>
  7. #include <string.h>
  8. #include "tcp_conn.h"
  9. #include "message.h"
  10. //回显业务
  11. void callback_busi(const char *data, uint32_t len, int msgid, void *args, tcp_conn *conn)
  12. {
  13. conn->send_message(data, len, msgid);
  14. }
  15. //连接的读事件回调
  16. static void conn_rd_callback(event_loop *loop, int fd, void *args)
  17. {
  18. tcp_conn *conn = (tcp_conn*)args;
  19. conn->do_read();
  20. }
  21. //连接的写事件回调
  22. static void conn_wt_callback(event_loop *loop, int fd, void *args)
  23. {
  24. tcp_conn *conn = (tcp_conn*)args;
  25. conn->do_write();
  26. }
  27. //初始化tcp_conn
  28. tcp_conn::tcp_conn(int connfd, event_loop *loop)
  29. {
  30. _connfd = connfd;
  31. _loop = loop;
  32. //1. 将connfd设置成非阻塞状态
  33. int flag = fcntl(_connfd, F_GETFL, 0);
  34. fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);
  35. //2. 设置TCP_NODELAY禁止做读写缓存,降低小包延迟
  36. int op = 1;
  37. setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h
  38. //3. 将该链接的读事件让event_loop监控
  39. _loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);
  40. //4 将该链接集成到对应的tcp_server中
  41. //TODO
  42. }
  43. //处理读业务
  44. void tcp_conn::do_read()
  45. {
  46. //1. 从套接字读取数据
  47. int ret = ibuf.read_data(_connfd);
  48. if (ret == -1) {
  49. fprintf(stderr, "read data from socket\n");
  50. this->clean_conn();
  51. return ;
  52. }
  53. else if ( ret == 0) {
  54. //对端正常关闭
  55. printf("connection closed by peer\n");
  56. clean_conn();
  57. return ;
  58. }
  59. //2. 解析msg_head数据
  60. msg_head head;
  61. //[这里用while,可能一次性读取多个完整包过来]
  62. while (ibuf.length() >= MESSAGE_HEAD_LEN) {
  63. //2.1 读取msg_head头部,固定长度MESSAGE_HEAD_LEN
  64. memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);
  65. if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {
  66. fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);
  67. this->clean_conn();
  68. break;
  69. }
  70. if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {
  71. //缓存buf中剩余的数据,小于实际上应该接受的数据
  72. //说明是一个不完整的包,应该抛弃
  73. break;
  74. }
  75. //2.2 再根据头长度读取数据体,然后针对数据体处理 业务
  76. //TODO 添加包路由模式
  77. //头部处理完了,往后偏移MESSAGE_HEAD_LEN长度
  78. ibuf.pop(MESSAGE_HEAD_LEN);
  79. //处理ibuf.data()业务数据
  80. printf("read data: %s\n", ibuf.data());
  81. //回显业务
  82. callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);
  83. //消息体处理完了,往后便宜msglen长度
  84. ibuf.pop(head.msglen);
  85. }
  86. ibuf.adjust();
  87. return ;
  88. }
  89. //处理写业务
  90. void tcp_conn::do_write()
  91. {
  92. //do_write是触发玩event事件要处理的事情,
  93. //应该是直接将out_buf力度数据io写会对方客户端
  94. //而不是在这里组装一个message再发
  95. //组装message的过程应该是主动调用
  96. //只要obuf中有数据就写
  97. while (obuf.length()) {
  98. int ret = obuf.write2fd(_connfd);
  99. if (ret == -1) {
  100. fprintf(stderr, "write2fd error, close conn!\n");
  101. this->clean_conn();
  102. return ;
  103. }
  104. if (ret == 0) {
  105. //不是错误,仅返回0表示不可继续写
  106. break;
  107. }
  108. }
  109. if (obuf.length() == 0) {
  110. //数据已经全部写完,将_connfd的写事件取消掉
  111. _loop->del_io_event(_connfd, EPOLLOUT);
  112. }
  113. return ;
  114. }
  115. //发送消息的方法
  116. int tcp_conn::send_message(const char *data, int msglen, int msgid)
  117. {
  118. printf("server send_message: %s:%d, msgid = %d\n", data, msglen, msgid);
  119. bool active_epollout = false;
  120. if(obuf.length() == 0) {
  121. //如果现在已经数据都发送完了,那么是一定要激活写事件的
  122. //如果有数据,说明数据还没有完全写完到对端,那么没必要再激活等写完再激活
  123. active_epollout = true;
  124. }
  125. //1 先封装message消息头
  126. msg_head head;
  127. head.msgid = msgid;
  128. head.msglen = msglen;
  129. //1.1 写消息头
  130. int ret = obuf.send_data((const char *)&head, MESSAGE_HEAD_LEN);
  131. if (ret != 0) {
  132. fprintf(stderr, "send head error\n");
  133. return -1;
  134. }
  135. //1.2 写消息体
  136. ret = obuf.send_data(data, msglen);
  137. if (ret != 0) {
  138. //如果写消息体失败,那就回滚将消息头的发送也取消
  139. obuf.pop(MESSAGE_HEAD_LEN);
  140. return -1;
  141. }
  142. if (active_epollout == true) {
  143. //2. 激活EPOLLOUT写事件
  144. _loop->add_io_event(_connfd, conn_wt_callback, EPOLLOUT, this);
  145. }
  146. return 0;
  147. }
  148. //销毁tcp_conn
  149. void tcp_conn::clean_conn()
  150. {
  151. //链接清理工作
  152. //1 将该链接从tcp_server摘除掉
  153. //TODO
  154. //2 将该链接从event_loop中摘除
  155. _loop->del_io_event(_connfd);
  156. //3 buf清空
  157. ibuf.clear();
  158. obuf.clear();
  159. //4 关闭原始套接字
  160. int fd = _connfd;
  161. _connfd = -1;
  162. close(fd);
  163. }
  1. 具体每个方法的实现,都很清晰。其中`conn_rd_callback()``conn_wt_callback()`是注册读写事件的回调函数,设置为static是因为函数类型没有this指针。在里面分别再调用`do_read()``do_write()`方法。

5.3 修正tcp_server对accept之后的处理方法

lars_reactor/src/tcp_server.cpp

  1. //...
  2. //开始提供创建链接服务
  3. void tcp_server::do_accept()
  4. {
  5. int connfd;
  6. while(true) {
  7. //accept与客户端创建链接
  8. printf("begin accept\n");
  9. connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
  10. if (connfd == -1) {
  11. if (errno == EINTR) {
  12. fprintf(stderr, "accept errno=EINTR\n");
  13. continue;
  14. }
  15. else if (errno == EMFILE) {
  16. //建立链接过多,资源不够
  17. fprintf(stderr, "accept errno=EMFILE\n");
  18. }
  19. else if (errno == EAGAIN) {
  20. fprintf(stderr, "accept errno=EAGAIN\n");
  21. break;
  22. }
  23. else {
  24. fprintf(stderr, "accept error");
  25. exit(1);
  26. }
  27. }
  28. else {
  29. //accept succ!
  30. // ============= 将之前的触发回调的删掉,改成如下====
  31. tcp_conn *conn = new tcp_conn(connfd, _loop);
  32. if (conn == NULL) {
  33. fprintf(stderr, "new tcp_conn error\n");
  34. exit(1);
  35. }
  36. // ============================================
  37. printf("get new connection succ!\n");
  38. break;
  39. }
  40. }
  41. }
  42. //...
  1. 这样,每次accept成功之后,创建一个与当前客户端套接字绑定的tcp_conn对象。在构造里就完成了基本的对于EPOLLIN事件的监听和回调动作.
  2. 现在可以先编译一下,保证没有语法错误,但是如果想测试,就不能够使用`nc`指令测试了,因为现在服务端只能够接收我们自定义的TLV格式的报文。那么我们需要自己写一个客户端来完成基本的测试。