Data Structures

Object-Oriented Programming in C

C 语言虽然是面向过程的,但是也可以通过特殊的语法实现一些面向对象的特征,如代码 1 所示,结构体内的所有字段都是函数,这类似于 Go 语言中的接口。通过不同的赋值,这个结构体表现出不同的特性,从而实现面向对象中的多态。

  1. typedef struct {
  2. ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
  3. ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
  4. ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
  5. ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
  6. ngx_int_t (*add_conn)(ngx_connection_t *c);
  7. ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);
  8. ngx_int_t (*notify)(ngx_event_handler_pt handler);
  9. ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
  10. ngx_uint_t flags);
  11. ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer);
  12. void (*done)(ngx_cycle_t *cycle);
  13. } ngx_event_actions_t;

代码 1-1:ngx_event_actions_t

代码 2 中出现的 ngx_event_actions 是一个全局变量,类型为 ngx_event_actions_t,这些宏定义是对调用其中函数的简单缩写。通过将这个全局变量赋予不同的值,在事件处理中就可以使用不同的底层特性,比如 epoll, kqueue, poll, select 等,这些实现都保存在 src/event/modules 中。

  1. #define ngx_process_events ngx_event_actions.process_events
  2. #define ngx_done_events ngx_event_actions.done
  3. #define ngx_add_event ngx_event_actions.add
  4. #define ngx_del_event ngx_event_actions.del
  5. #define ngx_add_conn ngx_event_actions.add_conn
  6. #define ngx_del_conn ngx_event_actions.del_conn
  7. #define ngx_notify ngx_event_actions.notify
  8. #define ngx_add_timer ngx_event_add_timer
  9. #define ngx_del_timer ngx_event_del_timer

代码 1-2

ngx_event_action 也可以放在其他结构体内,如果作为其他结构欧提的第一个字段,可以使用强制类型转换来调用,更类似于接口的使用方式。但是这样做就无法在一个结构体内放入多个接口。另一种方式是作为某一个字段,在调用的时候通过 . 或者 -> 进行调用,这样就可以嵌入多个接口类型。
oop-method.drawio.svg

oop-override.drawio.svg

Listen Port

创建 ngx_listening_t 结构体是在处理配置文件的时候,通过代码 3 中的 Nginx 配置文件不难看出,监听的端口信息被放在了 http 中的 server 下,通过查找所有调用 ngx_create_listening 的位置也不难佐证这一假设。

  1. user www www; ## Default: nobody
  2. worker_processes 5; ## Default: 1
  3. error_log logs/error.log;
  4. pid logs/nginx.pid;
  5. worker_rlimit_nofile 8192;
  6. events {
  7. worker_connections 4096; ## Default: 1024
  8. }
  9. http {
  10. include conf/mime.types;
  11. include /etc/nginx/proxy.conf;
  12. include /etc/nginx/fastcgi.conf;
  13. index index.html index.htm index.php;
  14. default_type application/octet-stream;
  15. log_format main '$remote_addr - $remote_user [$time_local] $status '
  16. '"$request" $body_bytes_sent "$http_referer" '
  17. '"$http_user_agent" "$http_x_forwarded_for"';
  18. access_log logs/access.log main;
  19. sendfile on;
  20. tcp_nopush on;
  21. server_names_hash_bucket_size 128; # this seems to be required for some vhosts
  22. server { # simple load balancing
  23. listen 80;
  24. server_name big.server.com;
  25. access_log logs/big.server.access.log main;
  26. location / {
  27. proxy_pass http://big_server_com;
  28. }
  29. }
  30. }

代码 2-1:nginx config

通过代码 4 的 L10 能够看出,所有被创建的 ngx_listening_t 都是来自于 cycle->listening,所有的结构体在创建的时候就与这个数组绑定了。函数剩余的工作就是初始化结构体内的一些字段,但是这里并没有创建真正的 socket。ngx_socket_ntop 这个函数名比较令人困惑,它的作用是将传入的 sockaddr 变为文本的形式存储到 text 变量中。

  1. ngx_listening_t *
  2. ngx_create_listening(ngx_conf_t *cf, struct sockaddr *sockaddr,
  3. socklen_t socklen)
  4. {
  5. size_t len;
  6. ngx_listening_t *ls;
  7. struct sockaddr *sa;
  8. u_char text[NGX_SOCKADDR_STRLEN];
  9. ls = ngx_array_push(&cf->cycle->listening);
  10. if (ls == NULL) {
  11. return NULL;
  12. }
  13. ngx_memzero(ls, sizeof(ngx_listening_t));
  14. sa = ngx_palloc(cf->pool, socklen);
  15. if (sa == NULL) {
  16. return NULL;
  17. }
  18. ngx_memcpy(sa, sockaddr, socklen);
  19. ls->sockaddr = sa;
  20. ls->socklen = socklen;
  21. len = ngx_sock_ntop(sa, socklen, text, NGX_SOCKADDR_STRLEN, 1);
  22. ls->addr_text.len = len;
  23. // ...
  24. return ls;
  25. }

代码 2-2:ngx_create_listening

代码 5 中的函数中出现了 worker_processes,这里就需要介绍一下 Nginx 的多进程处理模型了,Nginx 包含一个 Master 进程,多个 Worker 进程,这些 Worker 进程是平等的,一起处理客户端的请求。而这里对于监听同一个端口也有多种处理模型,这里对 listening 结构体的复制就是为每一个 worker 复制一份 ngx_listening_t。这里的复制并不是使用 memcpy,而是通过一个临时变量做周转。

  1. ngx_int_t
  2. ngx_clone_listening(ngx_cycle_t *cycle, ngx_listening_t *ls)
  3. {
  4. #if (NGX_HAVE_REUSEPORT)
  5. ngx_int_t n;
  6. ngx_core_conf_t *ccf;
  7. ngx_listening_t ols;
  8. if (!ls->reuseport || ls->worker != 0) {
  9. return NGX_OK;
  10. }
  11. ols = *ls;
  12. ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
  13. for (n = 1; n < ccf->worker_processes; n++) {
  14. /* create a socket for each worker process */
  15. ls = ngx_array_push(&cycle->listening);
  16. if (ls == NULL) {
  17. return NGX_ERROR;
  18. }
  19. *ls = ols;
  20. ls->worker = n;
  21. }
  22. #endif
  23. return NGX_OK;
  24. }

代码 2-3:ngx_clone_listening

未命名绘图.drawio.svg
图 3:ngx_clone_listening

由于进程之间无法共享内存,所以复制操作肯定是在 fork() 之前进行的。通过查找代码可以发现,它是在 ngx_events_moduleinit_conf 函数中被调用的(如代码 6 所示),也就是在 ngx_init_cycle 中。在这里会遍历 cycle->listening 中的全部元素,进行一一复制。L4 说明如果当前的 socket 不能作为 reuseport 或者 worker 已经被赋值,就不再进行复制。

  1. ls = cycle->listening.elts;
  2. for (i = 0; i < cycle->listening.nelts; i++) {
  3. if (!ls[i].reuseport || ls[i].worker != 0) {
  4. continue;
  5. }
  6. if (ngx_clone_listening(cycle, &ls[i]) != NGX_OK) {
  7. return NGX_CONF_ERROR;
  8. }
  9. /* cloning may change cycle->listening.elts */
  10. ls = cycle->listening.elts;
  11. }

代码 2-4

顺着 ngx_init_cycle 继续看,接下来调用的是 ngx_open_listening_sockets,它会遍历 cycle->listening 中的每一个结构体,创建对应的 socket 并进行设置,包括端口重用和非阻塞,然后绑定端口开始监听。至此监听在主进程中的处理已经全部结束。

接下来在 ngx_event_process_init 中,也就是每个线程的模块初始化中,会调用如下代码。这段代码会筛选属于这个进程的 ngx_listening_t 结构体,然后为它们创建连接和事件,并且使用 ngx_add_event 添加到事件循环中,进行监听。所以在事件循环中并没有为监听使用专用的数据结构,而是直接使用 ngx_connection_t。这也就是为什么 ngx_listening_t 中要保存一个 ngx_connection_t 的原因。

  1. ls = cycle->listening.elts;
  2. for (i = 0; i < cycle->listening.nelts; i++) {
  3. #if (NGX_HAVE_REUSEPORT)
  4. if (ls[i].reuseport && ls[i].worker != ngx_worker) {
  5. continue;
  6. }
  7. #endif
  8. c = ngx_get_connection(ls[i].fd, cycle->log);
  9. if (c == NULL) {
  10. return NGX_ERROR;
  11. }
  12. c->type = ls[i].type;
  13. c->log = &ls[i].log;
  14. c->listening = &ls[i];
  15. ls[i].connection = c;
  16. rev = c->read;
  17. rev->log = c->log;
  18. rev->accept = 1;
  19. #if (NGX_HAVE_DEFERRED_ACCEPT)
  20. rev->deferred_accept = ls[i].deferred_accept;
  21. #endif
  22. if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
  23. if (ls[i].previous) {
  24. /*
  25. * delete the old accept events that were bound to
  26. * the old cycle read events array
  27. */
  28. old = ls[i].previous->connection;
  29. if (ngx_del_event(old->read, NGX_READ_EVENT, NGX_CLOSE_EVENT)
  30. == NGX_ERROR)
  31. {
  32. return NGX_ERROR;
  33. }
  34. old->fd = (ngx_socket_t) -1;
  35. }
  36. }
  37. rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
  38. : ngx_event_recvmsg;
  39. #if (NGX_HAVE_REUSEPORT)
  40. if (ls[i].reuseport) {
  41. if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
  42. return NGX_ERROR;
  43. }
  44. continue;
  45. }
  46. #endif
  47. if (ngx_use_accept_mutex) {
  48. continue;
  49. }
  50. #if (NGX_HAVE_EPOLLEXCLUSIVE)
  51. if ((ngx_event_flags & NGX_USE_EPOLL_EVENT)
  52. && ccf->worker_processes > 1)
  53. {
  54. ngx_use_exclusive_accept = 1;
  55. if (ngx_add_event(rev, NGX_READ_EVENT, NGX_EXCLUSIVE_EVENT)
  56. == NGX_ERROR)
  57. {
  58. return NGX_ERROR;
  59. }
  60. continue;
  61. }
  62. #endif
  63. if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
  64. return NGX_ERROR;
  65. }
  66. #endif
  67. }

代码 2-5
ngx_connection-listening event.drawio.svg
图 4:listening event

ngx_get_connection 简单来说就是从 free_connections 中获取一个空闲的 ngx_connection_t 结构体。然后使用文件描述符作为索引,在 files 数组中存储新获得的结构体。在获取结构体之前,还调用了一次 ngx_drain_connection,这个函数看名字就知道是释放一些连接,尽量保证有空闲的连接可以使用,这个函数会在后面关闭连接中详细介绍。

  1. ngx_connection_t *
  2. ngx_get_connection(ngx_socket_t s, ngx_log_t *log)
  3. {
  4. ngx_uint_t instance;
  5. ngx_event_t *rev, *wev;
  6. ngx_connection_t *c;
  7. /* disable warning: Win32 SOCKET is u_int while UNIX socket is int */
  8. if (ngx_cycle->files && (ngx_uint_t) s >= ngx_cycle->files_n) {
  9. ngx_log_error(NGX_LOG_ALERT, log, 0,
  10. "the new socket has number %d, "
  11. "but only %ui files are available",
  12. s, ngx_cycle->files_n);
  13. return NULL;
  14. }
  15. ngx_drain_connections((ngx_cycle_t *) ngx_cycle);
  16. c = ngx_cycle->free_connections;
  17. if (c == NULL) {
  18. ngx_log_error(NGX_LOG_ALERT, log, 0,
  19. "%ui worker_connections are not enough",
  20. ngx_cycle->connection_n);
  21. return NULL;
  22. }
  23. ngx_cycle->free_connections = c->data;
  24. ngx_cycle->free_connection_n--;
  25. if (ngx_cycle->files && ngx_cycle->files[s] == NULL) {
  26. ngx_cycle->files[s] = c;
  27. }
  28. rev = c->read;
  29. wev = c->write;
  30. ngx_memzero(c, sizeof(ngx_connection_t));
  31. c->read = rev;
  32. c->write = wev;
  33. c->fd = s;
  34. c->log = log;
  35. instance = rev->instance;
  36. ngx_memzero(rev, sizeof(ngx_event_t));
  37. ngx_memzero(wev, sizeof(ngx_event_t));
  38. rev->instance = !instance;
  39. wev->instance = !instance;
  40. rev->index = NGX_INVALID_INDEX;
  41. wev->index = NGX_INVALID_INDEX;
  42. rev->data = c;
  43. wev->data = c;
  44. wev->write = 1;
  45. return c;
  46. }

代码 3-1:ngx_get_connection

还需要注意这里讲的都是基于监听指定了 reuseport 的情况。如果没有声明使用 reuseport 特性,不会调用 ngx_clone_listening,而是多个 worker 监听同一个 socket,这时候需要在调用 accept 之前先竞争全局锁 ngx_accept_mutex,保证每一个请求只有一个 worker 处理。
ngx_connection-第 3 页.drawio.png
图 5:reuseport

Accept Connect

在 epoll 中,发起连接的请求会转换为 EPOLLIN 事件,Nginx 的事件模块会像处理普通的读事件一样,调用 ngx_connection_t->read->handler 方法,在进行监听的时候,就是 ngx_event_accept。在图 4 中,也展示了这部分内容。

ngx_event_accept 的核心就是 accept 系统调用,还记得在上面创建 socket 的时候,已经将 socket 设置为非阻塞的,这就是 Nginx 实现 multi_accept 的基础,multi_accept 是指在建立连接的时候,会一次性将一个端口的所有连接都建立完毕,防止有的连接需要在下次处理事件时才进行接收,整体的代码框架其实非常简单,如代码 4-1 所示。

  1. ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);
  2. if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
  3. ev->available = ecf->multi_accept;
  4. }
  5. do {
  6. if (use_accept4) {
  7. s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK);
  8. } else {
  9. s = accept(lc->fd, &sa.sockaddr, &socklen);
  10. }
  11. c->pool = ngx_create_pool(ls->pool_size, ev->log);
  12. if (c->pool == NULL) {
  13. ngx_close_accepted_connection(c);
  14. return;
  15. }
  16. c->recv = ngx_recv;
  17. c->send = ngx_send;
  18. c->recv_chain = ngx_recv_chain;
  19. c->send_chain = ngx_send_chain;
  20. rev = c->read;
  21. wev = c->write;
  22. wev->ready = 1;
  23. if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
  24. rev->ready = 1;
  25. }
  26. if (ev->deferred_accept) {
  27. rev->ready = 1;
  28. #if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP)
  29. rev->available = 1;
  30. #endif
  31. }
  32. if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
  33. if (ngx_add_conn(c) == NGX_ERROR) {
  34. ngx_close_accepted_connection(c);
  35. return;
  36. }
  37. }
  38. ls->handler(c);
  39. } while (ev->available);

代码 4 - 1:ngx_event_accept

代码 4 - 1 中还需要注意的部分就是 ngx_add_conn,这个函数实现的模块只有 epoll,它会直接监听 ngx_connection_t 中存储的 fd 触发的所有事件。对于其他底层多路复用逻辑来说,事件监听是在 ls->handler 中完成的。而是即使是对于 epoll 来说,ngx_add_conn 将事件加入到了监听队列里,但是还没有为真正处理输入输出的 c->read->handlerc->write->handler 赋值。前面介绍的部分都是通用的建立连接的代码,接下来是需要涉及到不同协议的内容的具体处理,肯定需要根据模块来选择处理函数,之前也介绍过,所有的 ngx_listening_t 结构体都是在各自模块内部构建的,所以这个结构体内的 handler 应该就是存储着不同模块初始化连接的代码。

通过查找创建 ngx_listening_t 的函数不难找到,对于 http 来说,这里的 handlerngx_http_init_connection,它会为读写事件分别添加不同的 handler,这一部分就是各模块内部的处理了。

ngx_connection-第 7 页.drawio.svg
图 6:event handler

Close Connection

关闭连接主要是涉及 ngx_connection_t 结构复用的问题,这里分为两部分,空闲连接和可重用的连接。首先是空闲的连接,在代码 5-1 中,会存储在 cycle->free_connections 中,释放的过程与 ngx_get_connection 恰好相反。

  1. void
  2. ngx_free_connection(ngx_connection_t *c)
  3. {
  4. c->data = ngx_cycle->free_connections;
  5. ngx_cycle->free_connections = c;
  6. ngx_cycle->free_connection_n++;
  7. if (ngx_cycle->files && ngx_cycle->files[c->fd] == c) {
  8. ngx_cycle->files[c->fd] = NULL;
  9. }
  10. }

代码 5-1:ngx_free_connection

ngx_connection-第 8 页.drawio.svg
图 7:ngx_free_connection

Reuseable Connection

  1. void
  2. ngx_reusable_connection(ngx_connection_t *c, ngx_uint_t reusable)
  3. {
  4. ngx_log_debug1(NGX_LOG_DEBUG_CORE, c->log, 0,
  5. "reusable connection: %ui", reusable);
  6. if (c->reusable) {
  7. ngx_queue_remove(&c->queue);
  8. ngx_cycle->reusable_connections_n--;
  9. #if (NGX_STAT_STUB)
  10. (void) ngx_atomic_fetch_add(ngx_stat_waiting, -1);
  11. #endif
  12. }
  13. c->reusable = reusable;
  14. if (reusable) {
  15. /* need cast as ngx_cycle is volatile */
  16. ngx_queue_insert_head(
  17. (ngx_queue_t *) &ngx_cycle->reusable_connections_queue, &c->queue);
  18. ngx_cycle->reusable_connections_n++;
  19. #if (NGX_STAT_STUB)
  20. (void) ngx_atomic_fetch_add(ngx_stat_waiting, 1);
  21. #endif
  22. }
  23. }
  24. static void
  25. ngx_drain_connections(ngx_cycle_t *cycle)
  26. {
  27. ngx_uint_t i, n;
  28. ngx_queue_t *q;
  29. ngx_connection_t *c;
  30. if (cycle->free_connection_n > cycle->connection_n / 16
  31. || cycle->reusable_connections_n == 0)
  32. {
  33. return;
  34. }
  35. if (cycle->connections_reuse_time != ngx_time()) {
  36. cycle->connections_reuse_time = ngx_time();
  37. ngx_log_error(NGX_LOG_WARN, cycle->log, 0,
  38. "%ui worker_connections are not enough, "
  39. "reusing connections",
  40. cycle->connection_n);
  41. }
  42. c = NULL;
  43. n = ngx_max(ngx_min(32, cycle->reusable_connections_n / 8), 1);
  44. for (i = 0; i < n; i++) {
  45. if (ngx_queue_empty(&cycle->reusable_connections_queue)) {
  46. break;
  47. }
  48. q = ngx_queue_last(&cycle->reusable_connections_queue);
  49. c = ngx_queue_data(q, ngx_connection_t, queue);
  50. ngx_log_debug0(NGX_LOG_DEBUG_CORE, c->log, 0,
  51. "reusing connection");
  52. c->close = 1;
  53. c->read->handler(c->read);
  54. }
  55. if (cycle->free_connection_n == 0 && c && c->reusable) {
  56. /*
  57. * if no connections were freed, try to reuse the last
  58. * connection again: this should free it as long as
  59. * previous reuse moved it to lingering close
  60. */
  61. ngx_log_debug0(NGX_LOG_DEBUG_CORE, c->log, 0,
  62. "reusing connection again");
  63. c->close = 1;
  64. c->read->handler(c->read);
  65. }
  66. }