1 Connector连接server
Connector负责根据IP地址创建socket连接server,并带有重试功能。其主要逻辑为:
- 在connect函数中创建socket并尝试连接server,根据errno不同调用不同的函数
- errno为EINPROGRESS,EINTR,EISCONN时,先进入connecting状态,初始化Channel对象,监听socket上的可读事件
- 发送可读事件时,需要先是否Channel对象,因为Connector类只负责建立连接并返回描述符,不负责监听。所有Channel可以reset,reset之后用于建立下一连接。
- 可写事件不表示连接建立,需要用getsockopt再确认一下err。之后通过newConnectionCallback_回调返回socket描述符给TcpClient
部分函数实现如下:
void Connector::connecting(int sockfd){setState(kConnecting);assert(!channel_);channel_.reset(new Channel(loop_, sockfd)); //根据socket描述符初始化channel//绑定回调channel_->setWriteCallback(std::bind(&Connector::handleWrite, this));channel_->setErrorCallback(std::bind(&Connector::handleError, this));channel_->enableWriting(); //将可写事件添加到epoll wait list中}void Connector::handleWrite(){if (state_ == kConnecting){//本类只负责建立连接,确定连接后返回描述符,内部channel没用了,reset之后用于建立下一连接int sockfd = removeAndResetChannel();//有可写事件不表示连接建立,需要用getsockopt再确认一下errint err = sockets::getSocketError(sockfd);if (err){retry(sockfd);}else if (sockets::isSelfConnect(sockfd)) //防止自连接{retry(sockfd);}else{setState(kConnected); //其他情况标识连接建立if (connect_){newConnectionCallback_(sockfd); //调用回调,返回描述符}else{sockets::close(sockfd);}}}//...}
Connector还有重试机制,连接不成功会启动EvenLoop定时任务重连:
void Connector::retry(int sockfd){sockets::close(sockfd);setState(kDisconnected);if (connect_){//设置定时任务,重试连接serverloop_->runAfter(retryDelayMs_ / 1000.0,std::bind(&Connector::startInLoop, shared_from_this()));retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);}//...}
2 TcpClient管理连接
TcpServer类通过唯一的Connector去连接server,连接建立后保存在类型为TCP Connection的成员变量connections_中。
TcpServer是供App直接使用的,内嵌在用户程序中,创建和释放由用户程序控制。
构造函数中设置connector的回调
connector_->setNewConnectionCallback(std::bind(&TcpClient::newConnection, this, _1));
App层调用connect()函数,触发connector开始连接server
- 连接建立后,connector回调newConnection,TcpClient创建TCP Connection对象(一对一关系)
```cpp
TcpConnectionPtr conn(new TcpConnection(loop_, //创建新的TcpConnection对象
connName,sockfd,localAddr,peerAddr));
conn->setConnectionCallback(connectionCallback); conn->setMessageCallback(messageCallback); conn->setWriteCompleteCallback(writeCompleteCallback); conn->setCloseCallback(std::bind(&TcpClient::removeConnection, this, _1)); { MutexLockGuard lock(mutex); connection_ = conn; } conn->connectEstablished(); //启动可读事件的监听 ```
