我们可以给客户端添加触发模型。同时也提供一系列的接口供开发者写客户端应用程序来使用。
6.1 tcp_client类设计
lars_reactor/include/tcp_client.h
#pragma once#include "io_buf.h"#include "event_loop.h"#include "message.h"#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>class tcp_client{public://初始化客户端套接字tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name);//发送message方法int send_message(const char *data, int msglen, int msgid);//创建链接void do_connect();//处理读业务int do_read();//处理写业务int do_write();//释放链接资源void clean_conn();~tcp_client();//设置业务处理回调函数void set_msg_callback(msg_callback *msg_cb){this->_msg_callback = msg_cb;}bool connected; //链接是否创建成功//server端地址struct sockaddr_in _server_addr;io_buf _obuf;io_buf _ibuf;private:int _sockfd;socklen_t _addrlen;//客户端的事件处理机制event_loop* _loop;//当前客户端的名称 用户记录日志const char *_name;msg_callback *_msg_callback;};
这里注意的是,tcp_client并不是tcp_server的一部分,而是单纯为写客户端提供的接口。所以这里也需要实现一套对读写事件处理的业务。 这里使用的读写缓冲是原始的`io_buf`,并不是服务器封装好的`reactor_buf`原因是后者是转为server做了一层封装,io_buf的基本方法比较全。
关键成员:
_sockfd:当前客户端套接字。
_server_addr: 链接的服务端的IP地址。
_loop: 客户端异步触发事件机制event_loop句柄。
_msg_callback: 当前客户端处理服务端的回调业务。
connected:是否已经成功connect服务端的标致。
方法:
tcp_client():构造函数,主要是在里面完成基本的套接字初始化及connect操作.
do_connect():创建链接
do_read():处理链接的读业务。
do_write():处理链接的写业务。
clean_conn():清空链接资源。
6.2 创建链接
lars_reactor/src/tcp_client.cpp
tcp_client::tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name):_ibuf(4194304),_obuf(4194304){_sockfd = -1;_msg_callback = NULL;_name = name;_loop = loop;bzero(&_server_addr, sizeof(_server_addr));_server_addr.sin_family = AF_INET;inet_aton(ip, &_server_addr.sin_addr);_server_addr.sin_port = htons(port);_addrlen = sizeof(_server_addr);this->do_connect();}
这里初始化tcp_client链接信息,然后调用`do_connect()`创建链接.
lars_reactor/src/tcp_client.cpp
//创建链接void tcp_client::do_connect(){if (_sockfd != -1) {close(_sockfd);}//创建套接字_sockfd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, IPPROTO_TCP);if (_sockfd == -1) {fprintf(stderr, "create tcp client socket error\n");exit(1);}int ret = connect(_sockfd, (const struct sockaddr*)&_server_addr, _addrlen);if (ret == 0) {//链接创建成功connected = true;//注册读回调_loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);//如果写缓冲去有数据,那么也需要触发写回调if (this->_obuf.length != 0) {_loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this);}printf("connect %s:%d succ!\n", inet_ntoa(_server_addr.sin_addr), ntohs(_server_addr.sin_port));}else {if(errno == EINPROGRESS) {//fd是非阻塞的,可能会出现这个错误,但是并不表示链接创建失败//如果fd是可写状态,则为链接是创建成功的.fprintf(stderr, "do_connect EINPROGRESS\n");//让event_loop去触发一个创建判断链接业务 用EPOLLOUT事件立刻触发_loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this);}else {fprintf(stderr, "connection error\n");exit(1);}}}
6.3 有关非阻塞客户端socket创建链接问题
这里转载一篇文章,是有关非阻塞套接字,connect返回-1,并且errno是`EINPROGRESS`的情况。因为我们的client是采用event_loop形式,socket需要被设置为非阻塞。所以需要针对这个情况做处理。下面是说明。客户端测试程序时,由于出现很多客户端,经过connect成功后,代码卡在recv系统调用中,后来发现可能是由于socket默认是阻塞模式,所以会令很多客户端链接处于链接却不能传输数据状态。后来修改socket为非阻塞模式,但在connect的时候,发现返回值为-1,刚开始以为是connect出现错误,但在服务器上看到了链接是ESTABLISED状态。证明链接是成功的但为什么会出现返回值是-1呢? 经过查询资料,以及看stevens的APUE,也发现有这么一说。当connect在非阻塞模式下,会出现返回`-1`值,错误码是`EINPROGRESS`,但如何判断connect是联通的呢?stevens书中说明要在connect后,继续判断该socket是否可写?**若可写,则证明链接成功。**如何判断可写,有2种方案,一种是select判断是否可写,二用poll模型。
select:
int CheckConnect(int iSocket){fd_set rset;FD_ZERO(&rset);FD_SET(iSocket, &rset);timeval tm;tm. tv_sec = 0;tm.tv_usec = 0;if ( select(iSocket + 1, NULL, &rset, NULL, &tval) <= 0){close(iSocket);return -1;}if (FD_ISSET(iSocket, &rset)){int err = -1;socklen_t len = sizeof(int);if ( getsockopt(iSocket, SOL_SOCKET, SO_ERROR ,&err, &len) < 0 ){close(iSocket);printf("errno:%d %s\n", errno, strerror(errno));return -2;}if (err){errno = err;close(iSocket);return -3;}}return 0;}
poll:
int CheckConnect(int iSocket) {struct pollfd fd;int ret = 0;socklen_t len = 0;fd.fd = iSocket;fd.events = POLLOUT;while ( poll (&fd, 1, -1) == -1 ) {if( errno != EINTR ){perror("poll");return -1;}}len = sizeof(ret);if ( getsockopt (iSocket, SOL_SOCKET, SO_ERROR, &ret, &len) == -1 ) {perror("getsockopt");return -1;}if(ret != 0) {fprintf (stderr, "socket %d connect failed: %s\n",iSocket, strerror (ret));return -1;}return 0;}
6.3 针对EINPROGRESS的连接创建处理
看上面`do_connect()`的代码其中一部分:
if(errno == EINPROGRESS) {//fd是非阻塞的,可能会出现这个错误,但是并不表示链接创建失败//如果fd是可写状态,则为链接是创建成功的.fprintf(stderr, "do_connect EINPROGRESS\n");//让event_loop去触发一个创建判断链接业务 用EPOLLOUT事件立刻触发_loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this);}
这里是又触发一个写事件,直接让程序流程跳转到connection_delay()方法.那么我们需要在里面判断链接是否已经判断成功,并且做出一定的创建成功之后的业务动作。
lars_reactor/src/tcp_client.cpp
//判断链接是否是创建链接,主要是针对非阻塞socket 返回EINPROGRESS错误static void connection_delay(event_loop *loop, int fd, void *args){tcp_client *cli = (tcp_client*)args;loop->del_io_event(fd);int result = 0;socklen_t result_len = sizeof(result);getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);if (result == 0) {//链接是建立成功的cli->connected = true;printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));//建立连接成功之后,主动发送send_messageconst char *msg = "hello lars!";int msgid = 1;cli->send_message(msg, strlen(msg), msgid);loop->add_io_event(fd, read_callback, EPOLLIN, cli);if (cli->_obuf.length != 0) {//输出缓冲有数据可写loop->add_io_event(fd, write_callback, EPOLLOUT, cli);}}else {//链接创建失败fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));}}
这是一个事件回调,所以用的是static方法而不是成员方法。首先是利用`getsockopt`判断链接是否创建成功,如果成功,那么 我们当前这个版本的客户端是直接写死主动调用`send_message()`方法发送给服务端一个`hello lars!`字符串。然后直接交给我们的`read_callback()`方法处理,当然如果写缓冲有数据,我们也会触发写的`write_callback()`方法。接下来,看看这两个callback以及send_message是怎么实现的。
callback
lars_reactor/src/tcp_client.cpp
static void write_callback(event_loop *loop, int fd, void *args){tcp_client *cli = (tcp_client *)args;cli->do_write();}static void read_callback(event_loop *loop, int fd, void *args){tcp_client *cli = (tcp_client *)args;cli->do_read();}//处理读业务int tcp_client::do_read(){//确定已经成功建立连接assert(connected == true);// 1. 一次性全部读取出来//得到缓冲区里有多少字节要被读取,然后将字节数放入b里面。int need_read = 0;if (ioctl(_sockfd, FIONREAD, &need_read) == -1) {fprintf(stderr, "ioctl FIONREAD error");return -1;}//确保_buf可以容纳可读数据assert(need_read <= _ibuf.capacity - _ibuf.length);int ret;do {ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read);} while(ret == -1 && errno == EINTR);if (ret == 0) {//对端关闭if (_name != NULL) {printf("%s client: connection close by peer!\n", _name);}else {printf("client: connection close by peer!\n");}clean_conn();return -1;}else if (ret == -1) {fprintf(stderr, "client: do_read() , error\n");clean_conn();return -1;}assert(ret == need_read);_ibuf.length += ret;//2. 解包msg_head head;int msgid, length;while (_ibuf.length >= MESSAGE_HEAD_LEN) {memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN);msgid = head.msgid;length = head.msglen;/*if (length + MESSAGE_HEAD_LEN < _ibuf.length) {break;}*///头部读取完毕_ibuf.pop(MESSAGE_HEAD_LEN);//3. 交给业务函数处理if (_msg_callback != NULL) {this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);}//数据区域处理完毕_ibuf.pop(length);}//重置head指针_ibuf.adjust();return 0;}//处理写业务int tcp_client::do_write(){//数据有长度,切头部索引是起始位置assert(_obuf.head == 0 && _obuf.length);int ret;while (_obuf.length) {//写数据do {ret = write(_sockfd, _obuf.data, _obuf.length);} while(ret == -1 && errno == EINTR);//非阻塞异常继续重写if (ret > 0) {_obuf.pop(ret);_obuf.adjust();}else if (ret == -1 && errno != EAGAIN) {fprintf(stderr, "tcp client write \n");this->clean_conn();}else {//出错,不能再继续写break;}}if (_obuf.length == 0) {//已经写完,删除写事件printf("do write over, del EPOLLOUT\n");this->_loop->del_io_event(_sockfd, EPOLLOUT);}return 0;}//释放链接资源,重置连接void tcp_client::clean_conn(){if (_sockfd != -1) {printf("clean conn, del socket!\n");_loop->del_io_event(_sockfd);close(_sockfd);}connected = false;//重新连接this->do_connect();}tcp_client::~tcp_client(){close(_sockfd);}
这里是基本的读数据和写数据的处理业务实现。我们重点看`do_read()`方法,里面有段代码:
//3. 交给业务函数处理if (_msg_callback != NULL) {this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);}
是将我们从服务端读取到的代码,交给了`_msg_callback()`方法来处理的,这个实际上是用户开发者自己在业务上注册的回调业务函数。在tcp_client.h中我们已经提供了`set_msg_callback`暴露给开发者注册使用。
send_message
lars_reactor/src/tcp_client.cpp
//主动发送message方法int tcp_client::send_message(const char *data, int msglen, int msgid){if (connected == false) {fprintf(stderr, "no connected , send message stop!\n");return -1;}//是否需要添加写事件触发//如果obuf中有数据,没必要添加,如果没有数据,添加完数据需要触发bool need_add_event = (_obuf.length == 0) ? true:false;if (msglen + MESSAGE_HEAD_LEN > this->_obuf.capacity - _obuf.length) {fprintf(stderr, "No more space to Write socket!\n");return -1;}//封装消息头msg_head head;head.msgid = msgid;head.msglen = msglen;memcpy(_obuf.data + _obuf.length, &head, MESSAGE_HEAD_LEN);_obuf.length += MESSAGE_HEAD_LEN;memcpy(_obuf.data + _obuf.length, data, msglen);_obuf.length += msglen;if (need_add_event) {_loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this);}return 0;}
将发送的数据写给obuf,然后出发write_callback将obuf的数据传递给对方服务端。
6.4 完成Lars Reactor V0.4开发
好了,现在我们框架部分已经完成,接下来我们就要实现一个serverapp 和 一个clientapp来进行测试.
我们创建example/lars_reactor_0.4文件夹。
Makefile
CXX=g++CFLAGS=-g -O2 -Wall -fPIC -Wno-deprecatedINC=-I../../includeLIB=-L../../lib -llreactor -lpthreadOBJS = $(addsuffix .o, $(basename $(wildcard *.cc)))all:$(CXX) -o server $(CFLAGS) server.cpp $(INC) $(LIB)$(CXX) -o client $(CFLAGS) client.cpp $(INC) $(LIB)clean:-rm -f *.o server client
服务端代码:
server.cpp
#include "tcp_server.h"int main(){event_loop loop;tcp_server server(&loop, "127.0.0.1", 7777);loop.event_process();return 0;}
客户端代码:
client.cpp
#include "tcp_client.h"#include <stdio.h>#include <string.h>//客户端业务void busi(const char *data, uint32_t len, int msgid, tcp_client *conn, void *user_data){//得到服务端回执的数据printf("recv server: [%s]\n", data);printf("msgid: [%d]\n", msgid);printf("len: [%d]\n", len);}int main(){event_loop loop;//创建tcp客户端tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.4");//注册回调业务client.set_msg_callback(busi);//开启事件监听loop.event_process();return 0;}
编译并分别启动server 和client
服务端输出:
$ ./serverbegin acceptget new connection succ!read data: hello lars!server send_message: hello lars!:11, msgid = 1
客户端输出:
$ ./clientdo_connect EINPROGRESSconnect 127.0.0.1:7777 succ!do write over, del EPOLLOUTrecv server: [hello lars!]msgid: [1]len: [11]
现在客户端已经成功的发送数据给服务端,并且回显的数据也直接被客户端解析,我们的框架到现在就可以做一个基本的客户端和服务端的完成了,但是还差很多,接下来我们继续优化。
