InitDB
在初始化数据库的时候,redis 会监听所有的端口,打开 socket 连接。然后将它们绑定到 Eventloop 上。
/* Create the timer callback, this is our way to process many background* operations incrementally, like clients timeout, eviction of unaccessed* expired keys and so forth. */if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {serverPanic("Can't create event loop timers.");exit(1);}/* Create an event handler for accepting new connections in TCP and Unix* domain sockets. */if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {serverPanic("Unrecoverable error creating TCP socket accept handler.");}if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {serverPanic("Unrecoverable error creating TLS socket accept handler.");}if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");/* Register a readable event for the pipe used to awake the event loop* when a blocked client in a module needs attention. */if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,moduleBlockedClientPipeReadable,NULL) == AE_ERR) {serverPanic("Error registering the readable event for the module ""blocked clients subsystem.");}/* Register before and after sleep handlers (note this needs to be done* before loading persistence since it is used by processEventsWhileBlocked. */aeSetBeforeSleepProc(server.el,beforeSleep);aeSetAfterSleepProc(server.el,afterSleep);
、
这里出现了两种 Event,一种是 fileEvent, 这个是用于监听文件描述符的,每次文件描述符需要写入或者读取数据,都会调用 fileEvent 中存储的 Handler 函数,另外一种是 TimeEvent,是用于固定时间触发的。
这里还出现了 BeforeSleepProc 和 AfterSleepProc 这两个函数会在程序被阻塞之前和之后被调用,具体的内容会再后面分析。
图一:initServer
然后在 aeMain 中,会循环执行函数 aeProcessEvents ,函数的大体流程如下,首先遍历所有的 aeTimeEvent,找到最近需要执行的 aeTimeEvent,将这个事件下次执行的还需要等待的时间作为 aeApiPoll 的最长等待时间。aeApiPoll 产生 aeFiredEvent,这是都是需要进行处理的文件描述符。在进入等待之前会执行 beforeSleep 和 afterSleep。

图二:aeProcessEvent
Epoll

图三:epoll
在 Linux 中 Eventloop 的底层是 Epoll,epoll 会在 epoll_wait 时产生出来的 aeEventState,转化为 aeFiredEvent,让上层进行处理,添加删除需要监听的 fd 都是通过 epoll_ctl 进行管理的。
FileEvent

图四:aeFiredEvent
aeFiredEvent 中会保存需要处理的文件描述符,然后通过这个文件描述符能够找到对应的 aeFileEvent,里面有曾经注册过的处理函数,根据产生的事件 mask 的不同,会调用不同的处理函数,一般是先处理读取事件,然后再处理写入事件,因为读取的时候可能会产生写入的内容,应该尽量再一轮循环中进行处理。
TimeEvent

图五:aeTimeEvent
aeTimeEvent 在每轮循环的末尾会进行处理,每次处理都会遍历整个链表,然后查看是否应该执行,这里通过 timeProc 的返回值判断是否需要下次执行,如果返回的是 -1,就会将 Event 的 ID 标记为 AE_DELETE_EVENT_ID,然后再下一次循环的时候进行删除。
Process

图六:process
Accept
Eventloop 中每个文件描述符需要处理时,都会调用他们注册到 EventLoop 中的函数,对于我们监听的端口来说,就是调用 accpetTcpHander 函数。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;char cip[NET_IP_STR_LEN];UNUSED(el);UNUSED(mask);UNUSED(privdata);while(max--) {cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);if (cfd == ANET_ERR) {if (errno != EWOULDBLOCK)serverLog(LL_WARNING,"Accepting client connection: %s", server.neterr);return;}serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);}}
这个函数至多会进行 max 次循环,如果 max 次没有成功建立连接就退出了,每次循环中都会尝试调用 anetTcpAccept 方法,这个函数的核心就是系统调用 accept,调用成功会返回一个新的文件描述符,作为本次连接的通信 socket。
然后会基于这个文件描述符创建一个连接,然后调用 acceptCommonHandler 方法,这个方法中会创建 client 并且调用 connAccept 方法。这里创建的 client 虽然没有返回,但是会成为 conn 的一个字段,后面会从 conn 中取出来使用。

图一:accpetTcpHandler
在 createClient 中,会调用 connSocketSetReadHandler 函数,这个函数会将 fd 加入到 Eventloop 中,等待能够进行读取。但是这里的对应的 Handler 为 type->ae_handler,而不是新添加的 read_handler,在 ae_handler 中会对 mask 进行检查,判断时需要读取还是写入,然后调用 conn 上的 read_handler 或者 write_handler。
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {if (func == conn->read_handler) return C_OK;conn->read_handler = func;if (!conn->read_handler)aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);elseif (aeCreateFileEvent(server.el,conn->fd,AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;return C_OK;}

图二:CT_SOCKET
connAccept 就是调用 conn 中的 type 的 accept 函数,type 为 ConnectionType 类型,下面时结构体的字段,可以看到,这里面就是一系列函数的组合,这与 Golang 中的接口非常相似,通过将逻辑移植到 type 中,我们可以轻松的对 conn 类型实现多态。目前 ConnectType 的实例只有两种,分别是 CT_SOCKET 和 CT_TLS,但是通过实现 ConnectType,可以轻松的对连接进行扩展,增加新的功能。
static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) {return conn->type->accept(conn, accept_handler);}typedef struct ConnectionType {void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);int (*write)(struct connection *conn, const void *data, size_t data_len);int (*read)(struct connection *conn, void *buf, size_t buf_len);void (*close)(struct connection *conn);int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);const char *(*get_last_error)(struct connection *conn);int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);int (*get_type)(struct connection *conn);} ConnectionType;
connCreateAcceptSocket 函数会创建一个类型为 CT_SOCKET 类型的 conn,然后将状态设置为 CONN_STATE_ACCEPTING ,下面时 CONN 的状态转换图,在调用上面的 conn->type->accept 的时候,就会将状态转换为 CONN_STATE_CONNECTED,如果读取数据时,没有返回值,那就是链接已经关闭,将状态变为 CONN_STATE_CLOSED。
图三:conn State
进行到这里 Client 与 Server 的连接就完成了,剩下的就是等待 Client 给 Server 发送命令了,命令会在注册好的 read_handler 中进行解析,然后执行。
