1 Connector连接server

Connector负责根据IP地址创建socket连接server,并带有重试功能。其主要逻辑为:

  1. 在connect函数中创建socket并尝试连接server,根据errno不同调用不同的函数
  2. errno为EINPROGRESS,EINTR,EISCONN时,先进入connecting状态,初始化Channel对象,监听socket上的可读事件
  3. 发送可读事件时,需要先是否Channel对象,因为Connector类只负责建立连接并返回描述符,不负责监听。所有Channel可以reset,reset之后用于建立下一连接。
  4. 可写事件不表示连接建立,需要用getsockopt再确认一下err。之后通过newConnectionCallback_回调返回socket描述符给TcpClient

部分函数实现如下:

  1. void Connector::connecting(int sockfd)
  2. {
  3. setState(kConnecting);
  4. assert(!channel_);
  5. channel_.reset(new Channel(loop_, sockfd)); //根据socket描述符初始化channel
  6. //绑定回调
  7. channel_->setWriteCallback(std::bind(&Connector::handleWrite, this));
  8. channel_->setErrorCallback(std::bind(&Connector::handleError, this));
  9. channel_->enableWriting(); //将可写事件添加到epoll wait list中
  10. }
  11. void Connector::handleWrite()
  12. {
  13. if (state_ == kConnecting)
  14. {
  15. //本类只负责建立连接,确定连接后返回描述符,内部channel没用了,reset之后用于建立下一连接
  16. int sockfd = removeAndResetChannel();
  17. //有可写事件不表示连接建立,需要用getsockopt再确认一下err
  18. int err = sockets::getSocketError(sockfd);
  19. if (err)
  20. {
  21. retry(sockfd);
  22. }
  23. else if (sockets::isSelfConnect(sockfd)) //防止自连接
  24. {
  25. retry(sockfd);
  26. }
  27. else
  28. {
  29. setState(kConnected); //其他情况标识连接建立
  30. if (connect_)
  31. {
  32. newConnectionCallback_(sockfd); //调用回调,返回描述符
  33. }
  34. else
  35. {
  36. sockets::close(sockfd);
  37. }
  38. }
  39. }
  40. //...
  41. }

Connector还有重试机制,连接不成功会启动EvenLoop定时任务重连:

  1. void Connector::retry(int sockfd)
  2. {
  3. sockets::close(sockfd);
  4. setState(kDisconnected);
  5. if (connect_)
  6. {
  7. //设置定时任务,重试连接server
  8. loop_->runAfter(retryDelayMs_ / 1000.0,
  9. std::bind(&Connector::startInLoop, shared_from_this()));
  10. retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
  11. }
  12. //...
  13. }

2 TcpClient管理连接

TcpServer类通过唯一的Connector去连接server,连接建立后保存在类型为TCP Connection的成员变量connections_中。
TcpServer是供App直接使用的,内嵌在用户程序中,创建和释放由用户程序控制。

  1. 构造函数中设置connector的回调

    1. connector_->setNewConnectionCallback(std::bind(&TcpClient::newConnection, this, _1));
  2. App层调用connect()函数,触发connector开始连接server

  3. 连接建立后,connector回调newConnection,TcpClient创建TCP Connection对象(一对一关系) ```cpp TcpConnectionPtr conn(new TcpConnection(loop_, //创建新的TcpConnection对象
    1. connName,
    2. sockfd,
    3. localAddr,
    4. peerAddr));

conn->setConnectionCallback(connectionCallback); conn->setMessageCallback(messageCallback); conn->setWriteCompleteCallback(writeCompleteCallback); conn->setCloseCallback(std::bind(&TcpClient::removeConnection, this, _1)); { MutexLockGuard lock(mutex); connection_ = conn; } conn->connectEstablished(); //启动可读事件的监听 ```