InitDB

在初始化数据库的时候,redis 会监听所有的端口,打开 socket 连接。然后将它们绑定到 Eventloop 上。

  1. /* Create the timer callback, this is our way to process many background
  2. * operations incrementally, like clients timeout, eviction of unaccessed
  3. * expired keys and so forth. */
  4. if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
  5. serverPanic("Can't create event loop timers.");
  6. exit(1);
  7. }
  8. /* Create an event handler for accepting new connections in TCP and Unix
  9. * domain sockets. */
  10. if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
  11. serverPanic("Unrecoverable error creating TCP socket accept handler.");
  12. }
  13. if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {
  14. serverPanic("Unrecoverable error creating TLS socket accept handler.");
  15. }
  16. if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
  17. acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
  18. /* Register a readable event for the pipe used to awake the event loop
  19. * when a blocked client in a module needs attention. */
  20. if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
  21. moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
  22. serverPanic(
  23. "Error registering the readable event for the module "
  24. "blocked clients subsystem.");
  25. }
  26. /* Register before and after sleep handlers (note this needs to be done
  27. * before loading persistence since it is used by processEventsWhileBlocked. */
  28. aeSetBeforeSleepProc(server.el,beforeSleep);
  29. aeSetAfterSleepProc(server.el,afterSleep);


这里出现了两种 Event,一种是 fileEvent, 这个是用于监听文件描述符的,每次文件描述符需要写入或者读取数据,都会调用 fileEvent 中存储的 Handler 函数,另外一种是 TimeEvent,是用于固定时间触发的。

这里还出现了 BeforeSleepProcAfterSleepProc 这两个函数会在程序被阻塞之前和之后被调用,具体的内容会再后面分析。
redis-server-InitServer.drawio.png

图一:initServer

然后在 aeMain 中,会循环执行函数 aeProcessEvents ,函数的大体流程如下,首先遍历所有的 aeTimeEvent,找到最近需要执行的 aeTimeEvent,将这个事件下次执行的还需要等待的时间作为 aeApiPoll 的最长等待时间。aeApiPoll 产生 aeFiredEvent,这是都是需要进行处理的文件描述符。在进入等待之前会执行 beforeSleepafterSleep

未命名绘图-第 1 页.drawio.png
图二:aeProcessEvent

Epoll

未命名绘图-epoll.drawio.png
图三:epoll

在 Linux 中 Eventloop 的底层是 Epoll,epoll 会在 epoll_wait 时产生出来的 aeEventState,转化为 aeFiredEvent,让上层进行处理,添加删除需要监听的 fd 都是通过 epoll_ctl 进行管理的。

FileEvent

未命名绘图-firedEvent.drawio.png
图四:aeFiredEvent

aeFiredEvent 中会保存需要处理的文件描述符,然后通过这个文件描述符能够找到对应的 aeFileEvent,里面有曾经注册过的处理函数,根据产生的事件 mask 的不同,会调用不同的处理函数,一般是先处理读取事件,然后再处理写入事件,因为读取的时候可能会产生写入的内容,应该尽量再一轮循环中进行处理。

TimeEvent

eventloop-第 4 页.drawio.png
图五:aeTimeEvent

aeTimeEvent 在每轮循环的末尾会进行处理,每次处理都会遍历整个链表,然后查看是否应该执行,这里通过 timeProc 的返回值判断是否需要下次执行,如果返回的是 -1,就会将 Event 的 ID 标记为 AE_DELETE_EVENT_ID,然后再下一次循环的时候进行删除。

Process

未命名绘图.drawio.png
图六:process

Accept

Eventloop 中每个文件描述符需要处理时,都会调用他们注册到 EventLoop 中的函数,对于我们监听的端口来说,就是调用 accpetTcpHander 函数。

  1. void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
  2. int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
  3. char cip[NET_IP_STR_LEN];
  4. UNUSED(el);
  5. UNUSED(mask);
  6. UNUSED(privdata);
  7. while(max--) {
  8. cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
  9. if (cfd == ANET_ERR) {
  10. if (errno != EWOULDBLOCK)
  11. serverLog(LL_WARNING,
  12. "Accepting client connection: %s", server.neterr);
  13. return;
  14. }
  15. serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
  16. acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
  17. }
  18. }

这个函数至多会进行 max 次循环,如果 max 次没有成功建立连接就退出了,每次循环中都会尝试调用 anetTcpAccept 方法,这个函数的核心就是系统调用 accept,调用成功会返回一个新的文件描述符,作为本次连接的通信 socket。

然后会基于这个文件描述符创建一个连接,然后调用 acceptCommonHandler 方法,这个方法中会创建 client 并且调用 connAccept 方法。这里创建的 client 虽然没有返回,但是会成为 conn 的一个字段,后面会从 conn 中取出来使用。

未命名绘图.drawio.png
图一:accpetTcpHandler

在 createClient 中,会调用 connSocketSetReadHandler 函数,这个函数会将 fd 加入到 Eventloop 中,等待能够进行读取。但是这里的对应的 Handler 为 type->ae_handler,而不是新添加的 read_handler,在 ae_handler 中会对 mask 进行检查,判断时需要读取还是写入,然后调用 conn 上的 read_handler 或者 write_handler。

  1. static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
  2. if (func == conn->read_handler) return C_OK;
  3. conn->read_handler = func;
  4. if (!conn->read_handler)
  5. aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
  6. else
  7. if (aeCreateFileEvent(server.el,conn->fd,
  8. AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
  9. return C_OK;
  10. }

未命名绘图-第 2 页.drawio.png
图二:CT_SOCKET

connAccept 就是调用 conn 中的 type 的 accept 函数,type 为 ConnectionType 类型,下面时结构体的字段,可以看到,这里面就是一系列函数的组合,这与 Golang 中的接口非常相似,通过将逻辑移植到 type 中,我们可以轻松的对 conn 类型实现多态。目前 ConnectType 的实例只有两种,分别是 CT_SOCKET 和 CT_TLS,但是通过实现 ConnectType,可以轻松的对连接进行扩展,增加新的功能。

  1. static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
  2. return conn->type->accept(conn, accept_handler);
  3. }
  4. typedef struct ConnectionType {
  5. void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
  6. int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
  7. int (*write)(struct connection *conn, const void *data, size_t data_len);
  8. int (*read)(struct connection *conn, void *buf, size_t buf_len);
  9. void (*close)(struct connection *conn);
  10. int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
  11. int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
  12. int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
  13. const char *(*get_last_error)(struct connection *conn);
  14. int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
  15. ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
  16. ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
  17. ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
  18. int (*get_type)(struct connection *conn);
  19. } ConnectionType;

connCreateAcceptSocket 函数会创建一个类型为 CT_SOCKET 类型的 conn,然后将状态设置为 CONN_STATE_ACCEPTING ,下面时 CONN 的状态转换图,在调用上面的 conn->type->accept 的时候,就会将状态转换为 CONN_STATE_CONNECTED,如果读取数据时,没有返回值,那就是链接已经关闭,将状态变为 CONN_STATE_CLOSED
redis-server-connState.drawio.png
图三:conn State

进行到这里 Client 与 Server 的连接就完成了,剩下的就是等待 Client 给 Server 发送命令了,命令会在注册好的 read_handler 中进行解析,然后执行。