TCP Server类封装

目的:使用之前已经封装好的Address通信地址类、Socket套接字类、IOManagerIO调度类组合封装出较为上层的TCP服务器的类。

关注要点:

  1. 服务器自己所拥有的监听套接字listen socket,负责去接受客户端的连接。
    1. 根据传入的Address对象将对应的监听套接字生成、绑定、监听
    2. 将已经成功监听的监听套接字,使用IO调度器管理调度起来,去接受客户端的连接
  2. 服务器接受连接后生成对应的客户端通信的套接字accept socket,负责和客户端完成数据交互
    1. 使用另一个新的IO调度器管理调度通信套接字,每生成一个新的通信套接字,就将其加入调度

· 类关系设计

TCP Server开发 - 图1

1. 成员变量

1.1 监听套接字组std::vector<Socket::ptr> m_listenSockets

  1. 存储多个监听socket 可能支持多协议 可能存在多个网卡 可能监听多个地址

1.2 IO调度器 IOManager* m_worker

  1. 作为一个 线程池 专门负责已经接收连接的客户端的调度

1.3 IO调度器 IOManager* m_acceptWorker

  1. 作为一个线程池 专门负责服务器监听套接字执行accept的调度

1.4 读取超时时间uint64_t m_recvTimeout

  1. 读取超时时间。防止攻击 ,限制读取时间间接限定数据发送量;防止死的客户端来浪费资源,一段时间没有来自客户端的响应,就要断开与其的连接,类似"心跳"功能。

1.5 服务器的名字std::string m_name

  1. 用一个名字来进行区分 socket所属的server 也作为一个服务器版本号方便更迭

1.6 服务器当前的工作状态 bool m_stopped

  1. 服务器当前的工作状态,判断服务器是运行/停止。

2. 接口

2.1 构造函数

  1. /**
  2. * @brief Tcp服务器类构造函数
  3. * @param[in] worker 负责和客户端通信的线程池
  4. * @param[in] accept_worker 负责接收客户端新连接的线程池
  5. */
  6. TcpServer(IOManager* worker = IOManager::GetThis(),
  7. IOManager* accept_worker = IOManager::GetThis());
  8. TcpServer::TcpServer(IOManager* worker, IOManager* accept_worker)
  9. :m_worker(worker)
  10. ,m_acceptWorker(accept_worker)
  11. ,m_recvTimeout(g_tcp_server_read_timeout->getValue())
  12. ,m_name("kit/1.0.0") //服务器的版本号
  13. ,m_stopped(true)
  14. {
  15. }

2.2 析构函数

  1. /**
  2. * @brief Tcp服务器类析构函数
  3. */
  4. virtual ~TcpServer();
  5. TcpServer::~TcpServer()
  6. {
  7. //关闭所有的监听套接字
  8. for(auto &x : m_listenSockets)
  9. {
  10. x->close();
  11. }
  12. m_listenSockets.clear();
  13. }

2.3 bind() (核心)

功能:绑定套接字的通信地址,并且完成监听。使用批量绑定和监听,中途一个套接字发生错误全部回滚无法完成绑定和监听。

组合:

  • 使用Address通信地址类生成对应协议(AF_INETAF_INET6等)、类型(TCPUDP)的Socket套接字
  • 使用Socket套接字类中已经封装好的bindlisten,内部使用真正系统调用::bind/::listen(这两个调用未被HOOK处理使用的原语) ```cpp /**
    • @brief 绑定一个地址
    • @param[in] addr 地址对象智能指针
    • @return true 绑定成功
    • @return false 绑定失败 */ virtual bool bind(Address::ptr addr);

//支持一个地址bind 直接复用多个地址的情况 只是容器里只有一个地址 bool TcpServer::bind(Address::ptr addr) { std::vector in_addrs, out_addrs; in_addrs.push_back(addr); return bind(in_addrs, out_addrs); }

/**

  • @brief 绑定多个地址
  • @param[in] in_addrs 传入要绑定的地址数组
  • @param[out] out_addrs 传出绑定失败的地址数组
  • @return true 全部绑定成功
  • @return false 全部绑定失败 */ virtual bool bind(const std::vector &in_addrs, std::vector &out_addrs);

bool TcpServer::bind(const std::vector &in_addrs, std::vector &out_addrs) { for(auto &x : in_addrs) { //创建协议不确定 但是传输类型是TCP的socket Socket::ptr sock = Socket::CreateTCP(x); //bind绑定地址 if(!sock->bind(x)) { KIT_LOG_ERROR(g_logger) << “bind error, errno=” << errno << “, is:” << strerror(errno) << “, addr=[“ << x->toString() << “]”;

  1. out_addrs.push_back(x);
  2. continue;
  3. }
  4. //listen监听地址
  5. if(!sock->listen())
  6. {
  7. KIT_LOG_ERROR(g_logger) << "listen error, errno=" << errno
  8. << ", is:" << strerror(errno)
  9. << ", addr=[" << x->toString() << "]";
  10. out_addrs.push_back(x);
  11. continue;
  12. }
  13. //成功监听
  14. m_listenSockets.push_back(sock);
  15. }
  16. //如果存在监听失败的套接字 要将成功监听那部分清除
  17. if(out_addrs.size())
  18. {
  19. m_listenSockets.clear();
  20. return false;
  21. }
  22. for(auto &x : m_listenSockets)
  23. {
  24. //利用重载<<符号 打印一下Socket的内容
  25. KIT_LOG_INFO(g_logger) << "sock bind success: " << *x;
  26. }
  27. return true;

}

  1. <a name="W5Lul"></a>
  2. ### 2.4 start()(核心)
  3. 功能:开启服务器,给每个监听套接字分配一个执行函数`startAccept()`去监测客户端的新连接,将其作为任务加入到IO调度器`m_acceptWorker`去进行调度管理。
  4. ```cpp
  5. /**
  6. * @brief 服务器启动
  7. * @return true 启动成功
  8. * @return false 启动失败
  9. */
  10. virtual bool start();
  11. bool TcpServer::start()
  12. {
  13. if(!m_stopped)
  14. return true;
  15. m_stopped = false;
  16. for(auto &x : m_listenSockets)
  17. {
  18. m_acceptWorker->schedule(std::bind(&TcpServer::startAccept, shared_from_this(), x));
  19. }
  20. return true;
  21. }

调度执行函数:startAccept()

功能:真正负责客户端连接工作的处理函数,给每个通信套接字分配一个执行函数handleClient()去完成服务器和客户端的数据交互,将其作为任务加入到IO调度器m_worker去进行调度管理。

小技巧点:使用std::bind()函数适配器传入当前类自己的智能指针(即:this指针封装为智能指针形式)是为了增加对该TcpServer对象的引用,防止意外析构造成毁灭性错误。

组合:

  • 使用Socket套接字类封装的accept(被实施了HOOK处理,将执行自己封装的调用行为),由于原系统调用::accept必定造成阻塞,因此会将该函数使用之前开发的hook.cpp:do_io()利用协程切入/切出进行一个异步回调(当真的有连接到达时候才来执行相应的操作,而不必一直阻塞等待),提高处理效率。 ```cpp /**
    • @brief 监听客户端 处理客户端新连接
    • @param sock */ virtual void startAccept(Socket::ptr sock);

void TcpServer::startAccept(Socket::ptr sock) { // KIT_LOG_DEBUG(g_logger) << “TcpServer startAccept”;

  1. //循环 只要不停止就要一直去accept客户端
  2. while(!m_stopped)
  3. {
  4. Socket::ptr client = sock->accept();
  5. if(client)
  6. {
  7. //给每一通信套接字设置读超时
  8. client->setRecvTimeout(m_recvTimeout);
  9. //将通信套接字加入线程池管理
  10. m_worker->schedule(std::bind(&TcpServer::handleClient, shared_from_this(), client));
  11. }
  12. else
  13. {
  14. KIT_LOG_ERROR(g_logger) << "accept error, client=" << client.get();
  15. }
  16. }

}

  1. <a name="IZq5w"></a>
  2. #####
  3. <a name="QsfEB"></a>
  4. ##### 调度执行函数:`handleClient()`
  5. 功能:真正负责服务器和客户端之间数据交互的处理函数。假如我们要做即时通讯服务器,那么聊天信息的转发和交换就在该函数中完成;假如做游戏服务器,那么游戏角色的相关信息更新在该函数中完成······
  6. ```cpp
  7. /**
  8. * @brief 处理已经连接的客户端 完成数据交互通信
  9. * @param[in] client
  10. */
  11. virtual void handleClient(Socket::ptr client);
  12. void TcpServer::handleClient(Socket::ptr client)
  13. {
  14. KIT_LOG_INFO(g_logger) << "client handle start";
  15. //具体业务操作
  16. ...
  17. }

2.5 stop()(核心)

功能:停止服务器,通过往IO调度器m_acceptWorker中添加任务的形式,在该匿名任务中唤醒所有监听线程并且让协程调度停止、所有线程退出。

  1. /**
  2. * @brief 服务器停止
  3. * @return true 停止成功
  4. * @return false 停止失败
  5. */
  6. virtual bool stop();
  7. bool TcpServer::stop()
  8. {
  9. if(m_stopped)
  10. return true;
  11. m_stopped = true;
  12. auto self = shared_from_this();
  13. m_acceptWorker->schedule([this, self](){
  14. //唤醒所有线程 进行退出
  15. for(auto &x : self->m_listenSockets)
  16. {
  17. x->cancelAll();
  18. }
  19. self->m_listenSockets.clear();
  20. });
  21. return true;
  22. }

2.6 通用接口

  1. /**
  2. * @brief 设置读超时时间
  3. * @param[in] v 具体超时时间
  4. */
  5. void setRecvTimeout(uint64_t v) {m_recvTimeout = v;}
  6. /**
  7. * @brief 获取读超时时间
  8. * @return uint64_t
  9. */
  10. uint64_t getRecvTimeout() const {return m_recvTimeout;}
  11. /**
  12. * @brief 设置服务器名称
  13. * @param[in] v 具体名称
  14. */
  15. virtual void setName(const std::string& v) {m_name = v;}
  16. /**
  17. * @brief 获取服务器名称
  18. * @return std::string
  19. */
  20. std::string getName() const {return m_name;}
  21. /**
  22. * @brief 服务器工作状态
  23. * @return true 已经停止
  24. * @return false 还在运行
  25. */
  26. bool isStop() const {return m_stopped;}