现在我们已经把server端所创建的套接字包装成了tcp_conn类,那么我们就可以对他们进行一定的管理,比如限制最大的连接数量等等。

7.1 定义链接管理相关属性

lars_reactor/include/tcp_server.h

  1. #pragma once
  2. #include <netinet/in.h>
  3. #include "event_loop.h"
  4. #include "tcp_conn.h"
  5. class tcp_server
  6. {
  7. public:
  8. //server的构造函数
  9. tcp_server(event_loop* loop, const char *ip, uint16_t port);
  10. //开始提供创建链接服务
  11. void do_accept();
  12. //链接对象释放的析构
  13. ~tcp_server();
  14. private:
  15. //基础信息
  16. int _sockfd; //套接字
  17. struct sockaddr_in _connaddr; //客户端链接地址
  18. socklen_t _addrlen; //客户端链接地址长度
  19. //event_loop epoll事件机制
  20. event_loop* _loop;
  21. //---- 客户端链接管理部分-----
  22. public:
  23. static void increase_conn(int connfd, tcp_conn *conn); //新增一个新建的连接
  24. static void decrease_conn(int connfd); //减少一个断开的连接
  25. static void get_conn_num(int *curr_conn); //得到当前链接的刻度
  26. static tcp_conn **conns; //全部已经在线的连接信息
  27. private:
  28. //TODO
  29. //从配置文件中读取
  30. #define MAX_CONNS 2
  31. static int _max_conns; //最大client链接个数
  32. static int _curr_conns; //当前链接刻度
  33. static pthread_mutex_t _conns_mutex; //保护_curr_conns刻度修改的锁
  34. };

这里解释一下关键成员

  • conns:这个是记录已经建立成功的全部链接的struct tcp_conn*数组。
  • _curr_conns:表示当前链接个数,其中increase_conn,decrease_conn,get_conn_num三个方法分别是对链接个数增加、减少、和获取。
  • _max_conns:限制的最大链接数量。
  • _conns_mutex:保护_curr_conns的锁。

    好了,我们首先首先将这些静态变量初始化,并且对函数见一些定义:

lars_reactor/src/tcp_server.cpp

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <strings.h>
  5. #include <unistd.h>
  6. #include <signal.h>
  7. #include <sys/types.h> /* See NOTES */
  8. #include <sys/socket.h>
  9. #include <arpa/inet.h>
  10. #include <errno.h>
  11. #include "tcp_server.h"
  12. #include "tcp_conn.h"
  13. #include "reactor_buf.h"
  14. // ==== 链接资源管理 ====
  15. //全部已经在线的连接信息
  16. tcp_conn ** tcp_server::conns = NULL;
  17. //最大容量链接个数;
  18. int tcp_server::_max_conns = 0;
  19. //当前链接刻度
  20. int tcp_server::_curr_conns = 0;
  21. //保护_curr_conns刻度修改的锁
  22. pthread_mutex_t tcp_server::_conns_mutex = PTHREAD_MUTEX_INITIALIZER;
  23. //新增一个新建的连接
  24. void tcp_server::increase_conn(int connfd, tcp_conn *conn)
  25. {
  26. pthread_mutex_lock(&_conns_mutex);
  27. conns[connfd] = conn;
  28. _curr_conns++;
  29. pthread_mutex_unlock(&_conns_mutex);
  30. }
  31. //减少一个断开的连接
  32. void tcp_server::decrease_conn(int connfd)
  33. {
  34. pthread_mutex_lock(&_conns_mutex);
  35. conns[connfd] = NULL;
  36. _curr_conns--;
  37. pthread_mutex_unlock(&_conns_mutex);
  38. }
  39. //得到当前链接的刻度
  40. void tcp_server::get_conn_num(int *curr_conn)
  41. {
  42. pthread_mutex_lock(&_conns_mutex);
  43. *curr_conn = _curr_conns;
  44. pthread_mutex_unlock(&_conns_mutex);
  45. }
  46. //...
  47. //...
  48. //...

7.2 创建链接集合初始化

  1. 我们在初始化tcp_server的同时也将`conns`初始化.

lars_reactor/src/tcp_server.cpp

  1. //server的构造函数
  2. tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port)
  3. {
  4. bzero(&_connaddr, sizeof(_connaddr));
  5. //忽略一些信号 SIGHUP, SIGPIPE
  6. //SIGPIPE:如果客户端关闭,服务端再次write就会产生
  7. //SIGHUP:如果terminal关闭,会给当前进程发送该信号
  8. if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {
  9. fprintf(stderr, "signal ignore SIGHUP\n");
  10. }
  11. if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
  12. fprintf(stderr, "signal ignore SIGPIPE\n");
  13. }
  14. //1. 创建socket
  15. _sockfd = socket(AF_INET, SOCK_STREAM /*| SOCK_NONBLOCK*/ | SOCK_CLOEXEC, IPPROTO_TCP);
  16. if (_sockfd == -1) {
  17. fprintf(stderr, "tcp_server::socket()\n");
  18. exit(1);
  19. }
  20. //2 初始化地址
  21. struct sockaddr_in server_addr;
  22. bzero(&server_addr, sizeof(server_addr));
  23. server_addr.sin_family = AF_INET;
  24. inet_aton(ip, &server_addr.sin_addr);
  25. server_addr.sin_port = htons(port);
  26. //2-1可以多次监听,设置REUSE属性
  27. int op = 1;
  28. if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op)) < 0) {
  29. fprintf(stderr, "setsocketopt SO_REUSEADDR\n");
  30. }
  31. //3 绑定端口
  32. if (bind(_sockfd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
  33. fprintf(stderr, "bind error\n");
  34. exit(1);
  35. }
  36. //4 监听ip端口
  37. if (listen(_sockfd, 500) == -1) {
  38. fprintf(stderr, "listen error\n");
  39. exit(1);
  40. }
  41. //5 将_sockfd添加到event_loop中
  42. _loop = loop;
  43. //6 ============= 创建链接管理 ===============
  44. _max_conns = MAX_CONNS;
  45. //创建链接信息数组
  46. conns = new tcp_conn*[_max_conns+3];//3是因为stdin,stdout,stderr 已经被占用,再新开fd一定是从3开始,所以不加3就会栈溢出
  47. if (conns == NULL) {
  48. fprintf(stderr, "new conns[%d] error\n", _max_conns);
  49. exit(1);
  50. }
  51. //===========================================
  52. //7 注册_socket读事件-->accept处理
  53. _loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);
  54. }
  1. 这里有一段代码:
  1. conns = new tcp_conn*[_max_conns+3];
  1. 其中3是因为我们已经默认打开的stdin,stdout,stderr3个文件描述符,因为我们在conns管理的形式类似一个hash的形式,每个tcp_conn的对应的数组下标就是当前tcp_connconnfd文件描述符,所以我们应该开辟足够的大的宽度的数组来满足下标要求,所以要多开辟3个。虽然这里0,1,2下标在conns永远用不上。

7.3 创建链接判断链接数量

  1. 我们在tcp_serveraccept成功之后,判断链接数量,如果满足需求将连接创建起来,并添加到conns中。

lars_reactor/src/tcp_server.cpp

  1. //开始提供创建链接服务
  2. void tcp_server::do_accept()
  3. {
  4. int connfd;
  5. while(true) {
  6. //accept与客户端创建链接
  7. printf("begin accept\n");
  8. connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
  9. if (connfd == -1) {
  10. if (errno == EINTR) {
  11. fprintf(stderr, "accept errno=EINTR\n");
  12. continue;
  13. }
  14. else if (errno == EMFILE) {
  15. //建立链接过多,资源不够
  16. fprintf(stderr, "accept errno=EMFILE\n");
  17. }
  18. else if (errno == EAGAIN) {
  19. fprintf(stderr, "accept errno=EAGAIN\n");
  20. break;
  21. }
  22. else {
  23. fprintf(stderr, "accept error");
  24. exit(1);
  25. }
  26. }
  27. else {
  28. // ===========================================
  29. //accept succ!
  30. int cur_conns;
  31. get_conn_num(&cur_conns);
  32. //1 判断链接数量
  33. if (cur_conns >= _max_conns) {
  34. fprintf(stderr, "so many connections, max = %d\n", _max_conns);
  35. close(connfd);
  36. }
  37. else {
  38. tcp_conn *conn = new tcp_conn(connfd, _loop);
  39. if (conn == NULL) {
  40. fprintf(stderr, "new tcp_conn error\n");
  41. exit(1);
  42. }
  43. printf("get new connection succ!\n");
  44. }
  45. // ===========================================
  46. break;
  47. }
  48. }
  49. }

7.4 对链接数量进行内部统计

在tcp_conn创建时,将tcp_server中的conns增加。

lars_reactor/src/tcp_conn.cpp

  1. //初始化tcp_conn
  2. tcp_conn::tcp_conn(int connfd, event_loop *loop)
  3. {
  4. _connfd = connfd;
  5. _loop = loop;
  6. //1. 将connfd设置成非阻塞状态
  7. int flag = fcntl(_connfd, F_GETFL, 0);
  8. fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);
  9. //2. 设置TCP_NODELAY禁止做读写缓存,降低小包延迟
  10. int op = 1;
  11. setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h
  12. //3. 将该链接的读事件让event_loop监控
  13. _loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);
  14. // ============================
  15. //4 将该链接集成到对应的tcp_server中
  16. tcp_server::increase_conn(_connfd, this);
  17. // ============================
  18. }

在tcp_conn销毁时,将tcp_server中的conns减少。

lars_reactor/src/tcp_conn.cpp

  1. //销毁tcp_conn
  2. void tcp_conn::clean_conn()
  3. {
  4. //链接清理工作
  5. //1 将该链接从tcp_server摘除掉
  6. tcp_server::decrease_conn(_connfd);
  7. //2 将该链接从event_loop中摘除
  8. _loop->del_io_event(_connfd);
  9. //3 buf清空
  10. ibuf.clear();
  11. obuf.clear();
  12. //4 关闭原始套接字
  13. int fd = _connfd;
  14. _connfd = -1;
  15. close(fd);
  16. }

7.5 完成Lars Reactor V0.5开发

  1. serverclient 应用app端的代码和v0.4一样,这里我们先修改tcp_server中的MAX_CONN宏为

lars_reacotr/include/tcp_server.h

  1. #define MAX_CONNS 2

方便我们测试。这个这个数值是要在配置文件中可以配置的。

我们启动服务端,然后分别启动两个client可以正常连接。

当我们启动第三个就发现已经连接不上。然后server端会打出如下结果.

  1. so many connections, max = 2