InitDB
在初始化数据库的时候,redis 会监听所有的端口,打开 socket 连接。然后将它们绑定到 Eventloop 上。
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
serverPanic("Unrecoverable error creating TCP socket accept handler.");
}
if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {
serverPanic("Unrecoverable error creating TLS socket accept handler.");
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
/* Register before and after sleep handlers (note this needs to be done
* before loading persistence since it is used by processEventsWhileBlocked. */
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
、
这里出现了两种 Event,一种是 fileEvent, 这个是用于监听文件描述符的,每次文件描述符需要写入或者读取数据,都会调用 fileEvent 中存储的 Handler 函数,另外一种是 TimeEvent,是用于固定时间触发的。
这里还出现了 BeforeSleepProc
和 AfterSleepProc
这两个函数会在程序被阻塞之前和之后被调用,具体的内容会再后面分析。
图一:initServer
然后在 aeMain 中,会循环执行函数 aeProcessEvents
,函数的大体流程如下,首先遍历所有的 aeTimeEvent,找到最近需要执行的 aeTimeEvent,将这个事件下次执行的还需要等待的时间作为 aeApiPoll 的最长等待时间。aeApiPoll 产生 aeFiredEvent,这是都是需要进行处理的文件描述符。在进入等待之前会执行 beforeSleep
和 afterSleep
。
图二:aeProcessEvent
Epoll
图三:epoll
在 Linux 中 Eventloop 的底层是 Epoll,epoll 会在 epoll_wait 时产生出来的 aeEventState,转化为 aeFiredEvent,让上层进行处理,添加删除需要监听的 fd 都是通过 epoll_ctl 进行管理的。
FileEvent
图四:aeFiredEvent
aeFiredEvent 中会保存需要处理的文件描述符,然后通过这个文件描述符能够找到对应的 aeFileEvent,里面有曾经注册过的处理函数,根据产生的事件 mask 的不同,会调用不同的处理函数,一般是先处理读取事件,然后再处理写入事件,因为读取的时候可能会产生写入的内容,应该尽量再一轮循环中进行处理。
TimeEvent
图五:aeTimeEvent
aeTimeEvent 在每轮循环的末尾会进行处理,每次处理都会遍历整个链表,然后查看是否应该执行,这里通过 timeProc 的返回值判断是否需要下次执行,如果返回的是 -1,就会将 Event 的 ID 标记为 AE_DELETE_EVENT_ID,然后再下一次循环的时候进行删除。
Process
图六:process
Accept
Eventloop 中每个文件描述符需要处理时,都会调用他们注册到 EventLoop 中的函数,对于我们监听的端口来说,就是调用 accpetTcpHander
函数。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
这个函数至多会进行 max 次循环,如果 max 次没有成功建立连接就退出了,每次循环中都会尝试调用 anetTcpAccept
方法,这个函数的核心就是系统调用 accept
,调用成功会返回一个新的文件描述符,作为本次连接的通信 socket。
然后会基于这个文件描述符创建一个连接,然后调用 acceptCommonHandler
方法,这个方法中会创建 client 并且调用 connAccept
方法。这里创建的 client 虽然没有返回,但是会成为 conn 的一个字段,后面会从 conn 中取出来使用。
图一:accpetTcpHandler
在 createClient 中,会调用 connSocketSetReadHandler
函数,这个函数会将 fd 加入到 Eventloop 中,等待能够进行读取。但是这里的对应的 Handler 为 type->ae_handler
,而不是新添加的 read_handler
,在 ae_handler 中会对 mask 进行检查,判断时需要读取还是写入,然后调用 conn 上的 read_handler 或者 write_handler。
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
图二:CT_SOCKET
connAccept 就是调用 conn 中的 type 的 accept 函数,type 为 ConnectionType
类型,下面时结构体的字段,可以看到,这里面就是一系列函数的组合,这与 Golang 中的接口非常相似,通过将逻辑移植到 type 中,我们可以轻松的对 conn 类型实现多态。目前 ConnectType 的实例只有两种,分别是 CT_SOCKET 和 CT_TLS,但是通过实现 ConnectType,可以轻松的对连接进行扩展,增加新的功能。
static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
return conn->type->accept(conn, accept_handler);
}
typedef struct ConnectionType {
void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
int (*write)(struct connection *conn, const void *data, size_t data_len);
int (*read)(struct connection *conn, void *buf, size_t buf_len);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
int (*get_type)(struct connection *conn);
} ConnectionType;
connCreateAcceptSocket
函数会创建一个类型为 CT_SOCKET 类型的 conn,然后将状态设置为 CONN_STATE_ACCEPTING
,下面时 CONN 的状态转换图,在调用上面的 conn->type->accept
的时候,就会将状态转换为 CONN_STATE_CONNECTED
,如果读取数据时,没有返回值,那就是链接已经关闭,将状态变为 CONN_STATE_CLOSED
。
图三:conn State
进行到这里 Client 与 Server 的连接就完成了,剩下的就是等待 Client 给 Server 发送命令了,命令会在注册好的 read_handler
中进行解析,然后执行。