Data Structures
Object-Oriented Programming in C
C 语言虽然是面向过程的,但是也可以通过特殊的语法实现一些面向对象的特征,如代码 1 所示,结构体内的所有字段都是函数,这类似于 Go 语言中的接口。通过不同的赋值,这个结构体表现出不同的特性,从而实现面向对象中的多态。
typedef struct {
ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
ngx_int_t (*add_conn)(ngx_connection_t *c);
ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);
ngx_int_t (*notify)(ngx_event_handler_pt handler);
ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
ngx_uint_t flags);
ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer);
void (*done)(ngx_cycle_t *cycle);
} ngx_event_actions_t;
代码 1-1:ngx_event_actions_t
代码 2 中出现的 ngx_event_actions
是一个全局变量,类型为 ngx_event_actions_t
,这些宏定义是对调用其中函数的简单缩写。通过将这个全局变量赋予不同的值,在事件处理中就可以使用不同的底层特性,比如 epoll, kqueue, poll, select 等,这些实现都保存在 src/event/modules
中。
#define ngx_process_events ngx_event_actions.process_events
#define ngx_done_events ngx_event_actions.done
#define ngx_add_event ngx_event_actions.add
#define ngx_del_event ngx_event_actions.del
#define ngx_add_conn ngx_event_actions.add_conn
#define ngx_del_conn ngx_event_actions.del_conn
#define ngx_notify ngx_event_actions.notify
#define ngx_add_timer ngx_event_add_timer
#define ngx_del_timer ngx_event_del_timer
代码 1-2
ngx_event_action 也可以放在其他结构体内,如果作为其他结构欧提的第一个字段,可以使用强制类型转换来调用,更类似于接口的使用方式。但是这样做就无法在一个结构体内放入多个接口。另一种方式是作为某一个字段,在调用的时候通过 .
或者 ->
进行调用,这样就可以嵌入多个接口类型。
Listen Port
创建 ngx_listening_t
结构体是在处理配置文件的时候,通过代码 3 中的 Nginx 配置文件不难看出,监听的端口信息被放在了 http 中的 server 下,通过查找所有调用 ngx_create_listening
的位置也不难佐证这一假设。
user www www; ## Default: nobody
worker_processes 5; ## Default: 1
error_log logs/error.log;
pid logs/nginx.pid;
worker_rlimit_nofile 8192;
events {
worker_connections 4096; ## Default: 1024
}
http {
include conf/mime.types;
include /etc/nginx/proxy.conf;
include /etc/nginx/fastcgi.conf;
index index.html index.htm index.php;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] $status '
'"$request" $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log logs/access.log main;
sendfile on;
tcp_nopush on;
server_names_hash_bucket_size 128; # this seems to be required for some vhosts
server { # simple load balancing
listen 80;
server_name big.server.com;
access_log logs/big.server.access.log main;
location / {
proxy_pass http://big_server_com;
}
}
}
代码 2-1:nginx config
通过代码 4 的 L10 能够看出,所有被创建的 ngx_listening_t
都是来自于 cycle->listening
,所有的结构体在创建的时候就与这个数组绑定了。函数剩余的工作就是初始化结构体内的一些字段,但是这里并没有创建真正的 socket。ngx_socket_ntop
这个函数名比较令人困惑,它的作用是将传入的 sockaddr
变为文本的形式存储到 text
变量中。
ngx_listening_t *
ngx_create_listening(ngx_conf_t *cf, struct sockaddr *sockaddr,
socklen_t socklen)
{
size_t len;
ngx_listening_t *ls;
struct sockaddr *sa;
u_char text[NGX_SOCKADDR_STRLEN];
ls = ngx_array_push(&cf->cycle->listening);
if (ls == NULL) {
return NULL;
}
ngx_memzero(ls, sizeof(ngx_listening_t));
sa = ngx_palloc(cf->pool, socklen);
if (sa == NULL) {
return NULL;
}
ngx_memcpy(sa, sockaddr, socklen);
ls->sockaddr = sa;
ls->socklen = socklen;
len = ngx_sock_ntop(sa, socklen, text, NGX_SOCKADDR_STRLEN, 1);
ls->addr_text.len = len;
// ...
return ls;
}
代码 2-2:ngx_create_listening
代码 5 中的函数中出现了 worker_processes,这里就需要介绍一下 Nginx 的多进程处理模型了,Nginx 包含一个 Master 进程,多个 Worker 进程,这些 Worker 进程是平等的,一起处理客户端的请求。而这里对于监听同一个端口也有多种处理模型,这里对 listening 结构体的复制就是为每一个 worker 复制一份 ngx_listening_t
。这里的复制并不是使用 memcpy
,而是通过一个临时变量做周转。
ngx_int_t
ngx_clone_listening(ngx_cycle_t *cycle, ngx_listening_t *ls)
{
#if (NGX_HAVE_REUSEPORT)
ngx_int_t n;
ngx_core_conf_t *ccf;
ngx_listening_t ols;
if (!ls->reuseport || ls->worker != 0) {
return NGX_OK;
}
ols = *ls;
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
for (n = 1; n < ccf->worker_processes; n++) {
/* create a socket for each worker process */
ls = ngx_array_push(&cycle->listening);
if (ls == NULL) {
return NGX_ERROR;
}
*ls = ols;
ls->worker = n;
}
#endif
return NGX_OK;
}
代码 2-3:ngx_clone_listening
图 3:ngx_clone_listening
由于进程之间无法共享内存,所以复制操作肯定是在 fork()
之前进行的。通过查找代码可以发现,它是在 ngx_events_module
的 init_conf
函数中被调用的(如代码 6 所示),也就是在 ngx_init_cycle
中。在这里会遍历 cycle->listening
中的全部元素,进行一一复制。L4 说明如果当前的 socket 不能作为 reuseport 或者 worker 已经被赋值,就不再进行复制。
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
if (!ls[i].reuseport || ls[i].worker != 0) {
continue;
}
if (ngx_clone_listening(cycle, &ls[i]) != NGX_OK) {
return NGX_CONF_ERROR;
}
/* cloning may change cycle->listening.elts */
ls = cycle->listening.elts;
}
代码 2-4
顺着 ngx_init_cycle
继续看,接下来调用的是 ngx_open_listening_sockets
,它会遍历 cycle->listening
中的每一个结构体,创建对应的 socket 并进行设置,包括端口重用和非阻塞,然后绑定端口开始监听。至此监听在主进程中的处理已经全部结束。
接下来在 ngx_event_process_init
中,也就是每个线程的模块初始化中,会调用如下代码。这段代码会筛选属于这个进程的 ngx_listening_t
结构体,然后为它们创建连接和事件,并且使用 ngx_add_event
添加到事件循环中,进行监听。所以在事件循环中并没有为监听使用专用的数据结构,而是直接使用 ngx_connection_t
。这也就是为什么 ngx_listening_t
中要保存一个 ngx_connection_t
的原因。
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
#if (NGX_HAVE_REUSEPORT)
if (ls[i].reuseport && ls[i].worker != ngx_worker) {
continue;
}
#endif
c = ngx_get_connection(ls[i].fd, cycle->log);
if (c == NULL) {
return NGX_ERROR;
}
c->type = ls[i].type;
c->log = &ls[i].log;
c->listening = &ls[i];
ls[i].connection = c;
rev = c->read;
rev->log = c->log;
rev->accept = 1;
#if (NGX_HAVE_DEFERRED_ACCEPT)
rev->deferred_accept = ls[i].deferred_accept;
#endif
if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
if (ls[i].previous) {
/*
* delete the old accept events that were bound to
* the old cycle read events array
*/
old = ls[i].previous->connection;
if (ngx_del_event(old->read, NGX_READ_EVENT, NGX_CLOSE_EVENT)
== NGX_ERROR)
{
return NGX_ERROR;
}
old->fd = (ngx_socket_t) -1;
}
}
rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
: ngx_event_recvmsg;
#if (NGX_HAVE_REUSEPORT)
if (ls[i].reuseport) {
if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
return NGX_ERROR;
}
continue;
}
#endif
if (ngx_use_accept_mutex) {
continue;
}
#if (NGX_HAVE_EPOLLEXCLUSIVE)
if ((ngx_event_flags & NGX_USE_EPOLL_EVENT)
&& ccf->worker_processes > 1)
{
ngx_use_exclusive_accept = 1;
if (ngx_add_event(rev, NGX_READ_EVENT, NGX_EXCLUSIVE_EVENT)
== NGX_ERROR)
{
return NGX_ERROR;
}
continue;
}
#endif
if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
return NGX_ERROR;
}
#endif
}
代码 2-5
图 4:listening event
ngx_get_connection
简单来说就是从 free_connections
中获取一个空闲的 ngx_connection_t
结构体。然后使用文件描述符作为索引,在 files
数组中存储新获得的结构体。在获取结构体之前,还调用了一次 ngx_drain_connection
,这个函数看名字就知道是释放一些连接,尽量保证有空闲的连接可以使用,这个函数会在后面关闭连接中详细介绍。
ngx_connection_t *
ngx_get_connection(ngx_socket_t s, ngx_log_t *log)
{
ngx_uint_t instance;
ngx_event_t *rev, *wev;
ngx_connection_t *c;
/* disable warning: Win32 SOCKET is u_int while UNIX socket is int */
if (ngx_cycle->files && (ngx_uint_t) s >= ngx_cycle->files_n) {
ngx_log_error(NGX_LOG_ALERT, log, 0,
"the new socket has number %d, "
"but only %ui files are available",
s, ngx_cycle->files_n);
return NULL;
}
ngx_drain_connections((ngx_cycle_t *) ngx_cycle);
c = ngx_cycle->free_connections;
if (c == NULL) {
ngx_log_error(NGX_LOG_ALERT, log, 0,
"%ui worker_connections are not enough",
ngx_cycle->connection_n);
return NULL;
}
ngx_cycle->free_connections = c->data;
ngx_cycle->free_connection_n--;
if (ngx_cycle->files && ngx_cycle->files[s] == NULL) {
ngx_cycle->files[s] = c;
}
rev = c->read;
wev = c->write;
ngx_memzero(c, sizeof(ngx_connection_t));
c->read = rev;
c->write = wev;
c->fd = s;
c->log = log;
instance = rev->instance;
ngx_memzero(rev, sizeof(ngx_event_t));
ngx_memzero(wev, sizeof(ngx_event_t));
rev->instance = !instance;
wev->instance = !instance;
rev->index = NGX_INVALID_INDEX;
wev->index = NGX_INVALID_INDEX;
rev->data = c;
wev->data = c;
wev->write = 1;
return c;
}
代码 3-1:ngx_get_connection
还需要注意这里讲的都是基于监听指定了 reuseport 的情况。如果没有声明使用 reuseport 特性,不会调用 ngx_clone_listening
,而是多个 worker 监听同一个 socket,这时候需要在调用 accept
之前先竞争全局锁 ngx_accept_mutex
,保证每一个请求只有一个 worker 处理。
图 5:reuseport
Accept Connect
在 epoll 中,发起连接的请求会转换为 EPOLLIN
事件,Nginx 的事件模块会像处理普通的读事件一样,调用 ngx_connection_t->read->handler
方法,在进行监听的时候,就是 ngx_event_accept
。在图 4 中,也展示了这部分内容。
ngx_event_accept
的核心就是 accept
系统调用,还记得在上面创建 socket 的时候,已经将 socket 设置为非阻塞的,这就是 Nginx 实现 multi_accept 的基础,multi_accept 是指在建立连接的时候,会一次性将一个端口的所有连接都建立完毕,防止有的连接需要在下次处理事件时才进行接收,整体的代码框架其实非常简单,如代码 4-1 所示。
ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);
if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
ev->available = ecf->multi_accept;
}
do {
if (use_accept4) {
s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK);
} else {
s = accept(lc->fd, &sa.sockaddr, &socklen);
}
c->pool = ngx_create_pool(ls->pool_size, ev->log);
if (c->pool == NULL) {
ngx_close_accepted_connection(c);
return;
}
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
rev = c->read;
wev = c->write;
wev->ready = 1;
if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
rev->ready = 1;
}
if (ev->deferred_accept) {
rev->ready = 1;
#if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP)
rev->available = 1;
#endif
}
if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
if (ngx_add_conn(c) == NGX_ERROR) {
ngx_close_accepted_connection(c);
return;
}
}
ls->handler(c);
} while (ev->available);
代码 4 - 1:ngx_event_accept
代码 4 - 1 中还需要注意的部分就是 ngx_add_conn
,这个函数实现的模块只有 epoll,它会直接监听 ngx_connection_t
中存储的 fd 触发的所有事件。对于其他底层多路复用逻辑来说,事件监听是在 ls->handler
中完成的。而是即使是对于 epoll 来说,ngx_add_conn
将事件加入到了监听队列里,但是还没有为真正处理输入输出的 c->read->handler
和 c->write->handler
赋值。前面介绍的部分都是通用的建立连接的代码,接下来是需要涉及到不同协议的内容的具体处理,肯定需要根据模块来选择处理函数,之前也介绍过,所有的 ngx_listening_t
结构体都是在各自模块内部构建的,所以这个结构体内的 handler 应该就是存储着不同模块初始化连接的代码。
通过查找创建 ngx_listening_t
的函数不难找到,对于 http 来说,这里的 handler
是 ngx_http_init_connection
,它会为读写事件分别添加不同的 handler,这一部分就是各模块内部的处理了。
图 6:event handler
Close Connection
关闭连接主要是涉及 ngx_connection_t
结构复用的问题,这里分为两部分,空闲连接和可重用的连接。首先是空闲的连接,在代码 5-1 中,会存储在 cycle->free_connections
中,释放的过程与 ngx_get_connection
恰好相反。
void
ngx_free_connection(ngx_connection_t *c)
{
c->data = ngx_cycle->free_connections;
ngx_cycle->free_connections = c;
ngx_cycle->free_connection_n++;
if (ngx_cycle->files && ngx_cycle->files[c->fd] == c) {
ngx_cycle->files[c->fd] = NULL;
}
}
代码 5-1:ngx_free_connection
图 7:ngx_free_connection
Reuseable Connection
void
ngx_reusable_connection(ngx_connection_t *c, ngx_uint_t reusable)
{
ngx_log_debug1(NGX_LOG_DEBUG_CORE, c->log, 0,
"reusable connection: %ui", reusable);
if (c->reusable) {
ngx_queue_remove(&c->queue);
ngx_cycle->reusable_connections_n--;
#if (NGX_STAT_STUB)
(void) ngx_atomic_fetch_add(ngx_stat_waiting, -1);
#endif
}
c->reusable = reusable;
if (reusable) {
/* need cast as ngx_cycle is volatile */
ngx_queue_insert_head(
(ngx_queue_t *) &ngx_cycle->reusable_connections_queue, &c->queue);
ngx_cycle->reusable_connections_n++;
#if (NGX_STAT_STUB)
(void) ngx_atomic_fetch_add(ngx_stat_waiting, 1);
#endif
}
}
static void
ngx_drain_connections(ngx_cycle_t *cycle)
{
ngx_uint_t i, n;
ngx_queue_t *q;
ngx_connection_t *c;
if (cycle->free_connection_n > cycle->connection_n / 16
|| cycle->reusable_connections_n == 0)
{
return;
}
if (cycle->connections_reuse_time != ngx_time()) {
cycle->connections_reuse_time = ngx_time();
ngx_log_error(NGX_LOG_WARN, cycle->log, 0,
"%ui worker_connections are not enough, "
"reusing connections",
cycle->connection_n);
}
c = NULL;
n = ngx_max(ngx_min(32, cycle->reusable_connections_n / 8), 1);
for (i = 0; i < n; i++) {
if (ngx_queue_empty(&cycle->reusable_connections_queue)) {
break;
}
q = ngx_queue_last(&cycle->reusable_connections_queue);
c = ngx_queue_data(q, ngx_connection_t, queue);
ngx_log_debug0(NGX_LOG_DEBUG_CORE, c->log, 0,
"reusing connection");
c->close = 1;
c->read->handler(c->read);
}
if (cycle->free_connection_n == 0 && c && c->reusable) {
/*
* if no connections were freed, try to reuse the last
* connection again: this should free it as long as
* previous reuse moved it to lingering close
*/
ngx_log_debug0(NGX_LOG_DEBUG_CORE, c->log, 0,
"reusing connection again");
c->close = 1;
c->read->handler(c->read);
}
}