我们可以给客户端添加触发模型。同时也提供一系列的接口供开发者写客户端应用程序来使用。

6.1 tcp_client类设计

lars_reactor/include/tcp_client.h

  1. #pragma once
  2. #include "io_buf.h"
  3. #include "event_loop.h"
  4. #include "message.h"
  5. #include <sys/types.h>
  6. #include <sys/socket.h>
  7. #include <netinet/in.h>
  8. #include <arpa/inet.h>
  9. class tcp_client
  10. {
  11. public:
  12. //初始化客户端套接字
  13. tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name);
  14. //发送message方法
  15. int send_message(const char *data, int msglen, int msgid);
  16. //创建链接
  17. void do_connect();
  18. //处理读业务
  19. int do_read();
  20. //处理写业务
  21. int do_write();
  22. //释放链接资源
  23. void clean_conn();
  24. ~tcp_client();
  25. //设置业务处理回调函数
  26. void set_msg_callback(msg_callback *msg_cb)
  27. {
  28. this->_msg_callback = msg_cb;
  29. }
  30. bool connected; //链接是否创建成功
  31. //server端地址
  32. struct sockaddr_in _server_addr;
  33. io_buf _obuf;
  34. io_buf _ibuf;
  35. private:
  36. int _sockfd;
  37. socklen_t _addrlen;
  38. //客户端的事件处理机制
  39. event_loop* _loop;
  40. //当前客户端的名称 用户记录日志
  41. const char *_name;
  42. msg_callback *_msg_callback;
  43. };
  1. 这里注意的是,tcp_client并不是tcp_server的一部分,而是单纯为写客户端提供的接口。所以这里也需要实现一套对读写事件处理的业务。 这里使用的读写缓冲是原始的`io_buf`,并不是服务器封装好的`reactor_buf`原因是后者是转为server做了一层封装,io_buf的基本方法比较全。

关键成员:

_sockfd:当前客户端套接字。

_server_addr: 链接的服务端的IP地址。

_loop: 客户端异步触发事件机制event_loop句柄。

_msg_callback: 当前客户端处理服务端的回调业务。

connected:是否已经成功connect服务端的标致。

方法:

tcp_client():构造函数,主要是在里面完成基本的套接字初始化及connect操作.

do_connect():创建链接

do_read():处理链接的读业务。

do_write():处理链接的写业务。

clean_conn():清空链接资源。

6.2 创建链接

lars_reactor/src/tcp_client.cpp

  1. tcp_client::tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name):
  2. _ibuf(4194304),
  3. _obuf(4194304)
  4. {
  5. _sockfd = -1;
  6. _msg_callback = NULL;
  7. _name = name;
  8. _loop = loop;
  9. bzero(&_server_addr, sizeof(_server_addr));
  10. _server_addr.sin_family = AF_INET;
  11. inet_aton(ip, &_server_addr.sin_addr);
  12. _server_addr.sin_port = htons(port);
  13. _addrlen = sizeof(_server_addr);
  14. this->do_connect();
  15. }
  1. 这里初始化tcp_client链接信息,然后调用`do_connect()`创建链接.

lars_reactor/src/tcp_client.cpp

  1. //创建链接
  2. void tcp_client::do_connect()
  3. {
  4. if (_sockfd != -1) {
  5. close(_sockfd);
  6. }
  7. //创建套接字
  8. _sockfd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, IPPROTO_TCP);
  9. if (_sockfd == -1) {
  10. fprintf(stderr, "create tcp client socket error\n");
  11. exit(1);
  12. }
  13. int ret = connect(_sockfd, (const struct sockaddr*)&_server_addr, _addrlen);
  14. if (ret == 0) {
  15. //链接创建成功
  16. connected = true;
  17. //注册读回调
  18. _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
  19. //如果写缓冲去有数据,那么也需要触发写回调
  20. if (this->_obuf.length != 0) {
  21. _loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this);
  22. }
  23. printf("connect %s:%d succ!\n", inet_ntoa(_server_addr.sin_addr), ntohs(_server_addr.sin_port));
  24. }
  25. else {
  26. if(errno == EINPROGRESS) {
  27. //fd是非阻塞的,可能会出现这个错误,但是并不表示链接创建失败
  28. //如果fd是可写状态,则为链接是创建成功的.
  29. fprintf(stderr, "do_connect EINPROGRESS\n");
  30. //让event_loop去触发一个创建判断链接业务 用EPOLLOUT事件立刻触发
  31. _loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this);
  32. }
  33. else {
  34. fprintf(stderr, "connection error\n");
  35. exit(1);
  36. }
  37. }
  38. }

6.3 有关非阻塞客户端socket创建链接问题

  1. 这里转载一篇文章,是有关非阻塞套接字,connect返回-1,并且errno`EINPROGRESS`的情况。因为我们的client是采用event_loop形式,socket需要被设置为非阻塞。所以需要针对这个情况做处理。下面是说明。
  2. 客户端测试程序时,由于出现很多客户端,经过connect成功后,代码卡在recv系统调用中,后来发现可能是由于socket默认是阻塞模式,所以会令很多客户端链接处于链接却不能传输数据状态。
  3. 后来修改socket为非阻塞模式,但在connect的时候,发现返回值为-1,刚开始以为是connect出现错误,但在服务器上看到了链接是ESTABLISED状态。证明链接是成功的
  4. 但为什么会出现返回值是-1呢? 经过查询资料,以及看stevensAPUE,也发现有这么一说。
  5. connect在非阻塞模式下,会出现返回`-1`值,错误码是`EINPROGRESS`,但如何判断connect是联通的呢?stevens书中说明要在connect后,继续判断该socket是否可写?
  6. **若可写,则证明链接成功。**
  7. 如何判断可写,有2种方案,一种是select判断是否可写,二用poll模型。

select:

  1. int CheckConnect(int iSocket)
  2. {
  3. fd_set rset;
  4. FD_ZERO(&rset);
  5. FD_SET(iSocket, &rset);
  6. timeval tm;
  7. tm. tv_sec = 0;
  8. tm.tv_usec = 0;
  9. if ( select(iSocket + 1, NULL, &rset, NULL, &tval) <= 0)
  10. {
  11. close(iSocket);
  12. return -1;
  13. }
  14. if (FD_ISSET(iSocket, &rset))
  15. {
  16. int err = -1;
  17. socklen_t len = sizeof(int);
  18. if ( getsockopt(iSocket, SOL_SOCKET, SO_ERROR ,&err, &len) < 0 )
  19. {
  20. close(iSocket);
  21. printf("errno:%d %s\n", errno, strerror(errno));
  22. return -2;
  23. }
  24. if (err)
  25. {
  26. errno = err;
  27. close(iSocket);
  28. return -3;
  29. }
  30. }
  31. return 0;
  32. }

poll:

  1. int CheckConnect(int iSocket) {
  2. struct pollfd fd;
  3. int ret = 0;
  4. socklen_t len = 0;
  5. fd.fd = iSocket;
  6. fd.events = POLLOUT;
  7. while ( poll (&fd, 1, -1) == -1 ) {
  8. if( errno != EINTR ){
  9. perror("poll");
  10. return -1;
  11. }
  12. }
  13. len = sizeof(ret);
  14. if ( getsockopt (iSocket, SOL_SOCKET, SO_ERROR, &ret, &len) == -1 ) {
  15. perror("getsockopt");
  16. return -1;
  17. }
  18. if(ret != 0) {
  19. fprintf (stderr, "socket %d connect failed: %s\n",
  20. iSocket, strerror (ret));
  21. return -1;
  22. }
  23. return 0;
  24. }

6.3 针对EINPROGRESS的连接创建处理

  1. 看上面`do_connect()`的代码其中一部分:
  1. if(errno == EINPROGRESS) {
  2. //fd是非阻塞的,可能会出现这个错误,但是并不表示链接创建失败
  3. //如果fd是可写状态,则为链接是创建成功的.
  4. fprintf(stderr, "do_connect EINPROGRESS\n");
  5. //让event_loop去触发一个创建判断链接业务 用EPOLLOUT事件立刻触发
  6. _loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this);
  7. }

这里是又触发一个写事件,直接让程序流程跳转到connection_delay()方法.那么我们需要在里面判断链接是否已经判断成功,并且做出一定的创建成功之后的业务动作。

lars_reactor/src/tcp_client.cpp

  1. //判断链接是否是创建链接,主要是针对非阻塞socket 返回EINPROGRESS错误
  2. static void connection_delay(event_loop *loop, int fd, void *args)
  3. {
  4. tcp_client *cli = (tcp_client*)args;
  5. loop->del_io_event(fd);
  6. int result = 0;
  7. socklen_t result_len = sizeof(result);
  8. getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);
  9. if (result == 0) {
  10. //链接是建立成功的
  11. cli->connected = true;
  12. printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
  13. //建立连接成功之后,主动发送send_message
  14. const char *msg = "hello lars!";
  15. int msgid = 1;
  16. cli->send_message(msg, strlen(msg), msgid);
  17. loop->add_io_event(fd, read_callback, EPOLLIN, cli);
  18. if (cli->_obuf.length != 0) {
  19. //输出缓冲有数据可写
  20. loop->add_io_event(fd, write_callback, EPOLLOUT, cli);
  21. }
  22. }
  23. else {
  24. //链接创建失败
  25. fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
  26. }
  27. }
  1. 这是一个事件回调,所以用的是static方法而不是成员方法。首先是利用`getsockopt`判断链接是否创建成功,如果成功,那么 我们当前这个版本的客户端是直接写死主动调用`send_message()`方法发送给服务端一个`hello lars!`字符串。然后直接交给我们的`read_callback()`方法处理,当然如果写缓冲有数据,我们也会触发写的`write_callback()`方法。
  2. 接下来,看看这两个callback以及send_message是怎么实现的。

callback

lars_reactor/src/tcp_client.cpp

  1. static void write_callback(event_loop *loop, int fd, void *args)
  2. {
  3. tcp_client *cli = (tcp_client *)args;
  4. cli->do_write();
  5. }
  6. static void read_callback(event_loop *loop, int fd, void *args)
  7. {
  8. tcp_client *cli = (tcp_client *)args;
  9. cli->do_read();
  10. }
  11. //处理读业务
  12. int tcp_client::do_read()
  13. {
  14. //确定已经成功建立连接
  15. assert(connected == true);
  16. // 1. 一次性全部读取出来
  17. //得到缓冲区里有多少字节要被读取,然后将字节数放入b里面。
  18. int need_read = 0;
  19. if (ioctl(_sockfd, FIONREAD, &need_read) == -1) {
  20. fprintf(stderr, "ioctl FIONREAD error");
  21. return -1;
  22. }
  23. //确保_buf可以容纳可读数据
  24. assert(need_read <= _ibuf.capacity - _ibuf.length);
  25. int ret;
  26. do {
  27. ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read);
  28. } while(ret == -1 && errno == EINTR);
  29. if (ret == 0) {
  30. //对端关闭
  31. if (_name != NULL) {
  32. printf("%s client: connection close by peer!\n", _name);
  33. }
  34. else {
  35. printf("client: connection close by peer!\n");
  36. }
  37. clean_conn();
  38. return -1;
  39. }
  40. else if (ret == -1) {
  41. fprintf(stderr, "client: do_read() , error\n");
  42. clean_conn();
  43. return -1;
  44. }
  45. assert(ret == need_read);
  46. _ibuf.length += ret;
  47. //2. 解包
  48. msg_head head;
  49. int msgid, length;
  50. while (_ibuf.length >= MESSAGE_HEAD_LEN) {
  51. memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN);
  52. msgid = head.msgid;
  53. length = head.msglen;
  54. /*
  55. if (length + MESSAGE_HEAD_LEN < _ibuf.length) {
  56. break;
  57. }
  58. */
  59. //头部读取完毕
  60. _ibuf.pop(MESSAGE_HEAD_LEN);
  61. //3. 交给业务函数处理
  62. if (_msg_callback != NULL) {
  63. this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
  64. }
  65. //数据区域处理完毕
  66. _ibuf.pop(length);
  67. }
  68. //重置head指针
  69. _ibuf.adjust();
  70. return 0;
  71. }
  72. //处理写业务
  73. int tcp_client::do_write()
  74. {
  75. //数据有长度,切头部索引是起始位置
  76. assert(_obuf.head == 0 && _obuf.length);
  77. int ret;
  78. while (_obuf.length) {
  79. //写数据
  80. do {
  81. ret = write(_sockfd, _obuf.data, _obuf.length);
  82. } while(ret == -1 && errno == EINTR);//非阻塞异常继续重写
  83. if (ret > 0) {
  84. _obuf.pop(ret);
  85. _obuf.adjust();
  86. }
  87. else if (ret == -1 && errno != EAGAIN) {
  88. fprintf(stderr, "tcp client write \n");
  89. this->clean_conn();
  90. }
  91. else {
  92. //出错,不能再继续写
  93. break;
  94. }
  95. }
  96. if (_obuf.length == 0) {
  97. //已经写完,删除写事件
  98. printf("do write over, del EPOLLOUT\n");
  99. this->_loop->del_io_event(_sockfd, EPOLLOUT);
  100. }
  101. return 0;
  102. }
  103. //释放链接资源,重置连接
  104. void tcp_client::clean_conn()
  105. {
  106. if (_sockfd != -1) {
  107. printf("clean conn, del socket!\n");
  108. _loop->del_io_event(_sockfd);
  109. close(_sockfd);
  110. }
  111. connected = false;
  112. //重新连接
  113. this->do_connect();
  114. }
  115. tcp_client::~tcp_client()
  116. {
  117. close(_sockfd);
  118. }
  1. 这里是基本的读数据和写数据的处理业务实现。我们重点看`do_read()`方法,里面有段代码:
  1. //3. 交给业务函数处理
  2. if (_msg_callback != NULL) {
  3. this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
  4. }
  1. 是将我们从服务端读取到的代码,交给了`_msg_callback()`方法来处理的,这个实际上是用户开发者自己在业务上注册的回调业务函数。在tcp_client.h中我们已经提供了`set_msg_callback`暴露给开发者注册使用。

send_message

lars_reactor/src/tcp_client.cpp

  1. //主动发送message方法
  2. int tcp_client::send_message(const char *data, int msglen, int msgid)
  3. {
  4. if (connected == false) {
  5. fprintf(stderr, "no connected , send message stop!\n");
  6. return -1;
  7. }
  8. //是否需要添加写事件触发
  9. //如果obuf中有数据,没必要添加,如果没有数据,添加完数据需要触发
  10. bool need_add_event = (_obuf.length == 0) ? true:false;
  11. if (msglen + MESSAGE_HEAD_LEN > this->_obuf.capacity - _obuf.length) {
  12. fprintf(stderr, "No more space to Write socket!\n");
  13. return -1;
  14. }
  15. //封装消息头
  16. msg_head head;
  17. head.msgid = msgid;
  18. head.msglen = msglen;
  19. memcpy(_obuf.data + _obuf.length, &head, MESSAGE_HEAD_LEN);
  20. _obuf.length += MESSAGE_HEAD_LEN;
  21. memcpy(_obuf.data + _obuf.length, data, msglen);
  22. _obuf.length += msglen;
  23. if (need_add_event) {
  24. _loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this);
  25. }
  26. return 0;
  27. }
  1. 将发送的数据写给obuf,然后出发write_callbackobuf的数据传递给对方服务端。

6.4 完成Lars Reactor V0.4开发

  1. 好了,现在我们框架部分已经完成,接下来我们就要实现一个serverapp 一个clientapp来进行测试.

我们创建example/lars_reactor_0.4文件夹。

Makefile

  1. CXX=g++
  2. CFLAGS=-g -O2 -Wall -fPIC -Wno-deprecated
  3. INC=-I../../include
  4. LIB=-L../../lib -llreactor -lpthread
  5. OBJS = $(addsuffix .o, $(basename $(wildcard *.cc)))
  6. all:
  7. $(CXX) -o server $(CFLAGS) server.cpp $(INC) $(LIB)
  8. $(CXX) -o client $(CFLAGS) client.cpp $(INC) $(LIB)
  9. clean:
  10. -rm -f *.o server client

服务端代码:

server.cpp

  1. #include "tcp_server.h"
  2. int main()
  3. {
  4. event_loop loop;
  5. tcp_server server(&loop, "127.0.0.1", 7777);
  6. loop.event_process();
  7. return 0;
  8. }

客户端代码:

client.cpp

  1. #include "tcp_client.h"
  2. #include <stdio.h>
  3. #include <string.h>
  4. //客户端业务
  5. void busi(const char *data, uint32_t len, int msgid, tcp_client *conn, void *user_data)
  6. {
  7. //得到服务端回执的数据
  8. printf("recv server: [%s]\n", data);
  9. printf("msgid: [%d]\n", msgid);
  10. printf("len: [%d]\n", len);
  11. }
  12. int main()
  13. {
  14. event_loop loop;
  15. //创建tcp客户端
  16. tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.4");
  17. //注册回调业务
  18. client.set_msg_callback(busi);
  19. //开启事件监听
  20. loop.event_process();
  21. return 0;
  22. }

编译并分别启动server 和client

服务端输出:

  1. $ ./server
  2. begin accept
  3. get new connection succ!
  4. read data: hello lars!
  5. server send_message: hello lars!:11, msgid = 1

客户端输出:

  1. $ ./client
  2. do_connect EINPROGRESS
  3. connect 127.0.0.1:7777 succ!
  4. do write over, del EPOLLOUT
  5. recv server: [hello lars!]
  6. msgid: [1]
  7. len: [11]
  1. 现在客户端已经成功的发送数据给服务端,并且回显的数据也直接被客户端解析,我们的框架到现在就可以做一个基本的客户端和服务端的完成了,但是还差很多,接下来我们继续优化。