Redis

一、理解多路复用原理

在开始介绍 Redis 之前,有必要先来简单介绍下 epoll。
在传统的同步阻塞网络编程模型里(没有协程以前),性能上不来的根本原因在于进程线程都是笨重的家伙。让一个进(线)程只处理一个用户请求确确实实是有点浪费了。
深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力 - 图1
先抛开高内存开销不说,在海量的网络请求到来的时候,光是频繁的进程线程上下文就让 CPU 疲于奔命了。
深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力 - 图2
如果把进程比作牧羊人,一个进(线)程同时只能处理一个用户请求,相当于一个人只能看一只羊,放完这一只才能放下一只。如果同时来了 1000 只羊,那就得 1000 个人去放,这人力成本是非常高的。
深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力 - 图3
性能提升思路很简单,就是让很多的用户连接来复用同一个进(线)程,这就是多路复用多路指的是许许多多个用户的网络连接。复用指的是对进(线)程的复用。换到牧羊人的例子里,就是一群羊只要一个牧羊人来处理就行了。
不过复用实现起来是需要特殊的 socket 事件管理机制的,最典型和高效的方案就是 epoll。放到牧羊人的例子来,epoll 就相当于一只牧羊犬。
在 epoll 的系列函数里, epoll_create 用于创建一个 epoll 对象,epoll_ctl 用来给 epoll 对象添加或者删除一个 socket。epoll_wait 就是查看它当前管理的这些 socket 上有没有可读可写事件发生。
深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力 - 图4
当网卡上收到数据包后,Linux 内核进行一系列的处理后把数据放到 socket 的接收队列。然后会检查是否有 epoll 在管理它,如果是则在 epoll 的就绪队列中插入一个元素。epoll_wait 的操作就非常的简单了,就是到 epoll 的就绪队列上来查询有没有事件发生就行了。
在基于 epoll 的编程中,和传统的函数调用思路不同的是,并不能主动调用某个 API 来处理。因为无法知道想要处理的事件啥时候发生。所以只好提前把想要处理的事件的处理函数注册到一个事件分发器上去。当事件发生的时候,由这个事件分发器调用回调函数进行处理。这类基于实现注册事件分发器的开发模式也叫 Reactor 模型。

二、Redis 服务启动初始化

理解了 epoll 原理后,再来实际看 Redis 具体是如何使用 epoll 的。直接在 Github 上就可以非常方便地获取 Redis 的源码。切到 5.0.0 版本来看单线程版本的实现。

  1. # git clone https://github.com/redis/redis
  2. # cd redis
  3. # git checkout -b 5.0.0 5.0.0

其中整个 Redis 服务的代码总入口在 src/server.c 文件中,把入口函数的核心部分摘了出来,如下。

  1. //file: src/server.c
  2. int main(int argc, char **argv) {
  3. ......
  4. // 启动初始化
  5. initServer();
  6. // 运行事件处理循环,一直到服务器关闭为止
  7. aeMain(server.el);
  8. }

其实整个 Redis 的工作过程,就只需要理解清楚 main 函数中调用的 initServer 和 aeMain 这两个函数就足够了。
本节中重点介绍 initServer,在下一节介绍事件处理循环 aeMain。在 initServer 这个函数内,Redis 做了这么三件重要的事情。
深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力 - 图5

  • 创建一个 epoll 对象
  • 对配置的监听端口进行 listen
  • 把 listen socket 让 epoll 给管理起来

    1. //file: src/server.c
    2. void initServer() {
    3. // 2.1.1 创建 epoll
    4. server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    5. // 2.1.2 绑定监听服务端口
    6. listenToPort(server.port,server.ipfd,&server.ipfd_count);
    7. // 2.1.3 注册 accept 事件处理器
    8. for (j = 0; j < server.ipfd_count; j++) {
    9. aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
    10. acceptTcpHandler,NULL);
    11. }
    12. ...
    13. }

    接下来分别来看。

    2.1 创建 epoll 对象

    本小节的逻辑看起来貌似不短,但其实只是创建了一个 epoll 对象出来而已。
    创建 epoll 对象的逻辑在 aeCreateEventLoop 中,在创建完后,Redis 将其保存在 redisServer 的 aeEventLoop 成员中,以备后续使用。

    1. struct redisServer {
    2. ...
    3. aeEventLoop *el;
    4. }

    来看 aeCreateEventLoop 详细逻辑。Redis 在操作系统提供的 epoll 对象基础上又封装了一个 eventLoop 出来,所以创建的时候是先申请和创建 eventLoop。

    1. //file:src/ae.c
    2. aeEventLoop *aeCreateEventLoop(int setsize) {
    3. aeEventLoop *eventLoop;
    4. eventLoop = zmalloc(sizeof(*eventLoop);
    5. //将来的各种回调事件就都会存在这里
    6. eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    7. ......
    8. aeApiCreate(eventLoop);
    9. return eventLoop;
    10. }

    在 eventLoop 里,稍微注意一下 eventLoop->events,将来在各种事件注册的时候都会保存到这个数组里。

    1. //file:src/ae.h
    2. typedef struct aeEventLoop {
    3. ......
    4. aeFileEvent *events; /* Registered events */
    5. }

    具体创建 epoll 的过程在 ae_epoll.c 文件下的 aeApiCreate 中。在这里,真正调用了 epoll_create

    1. //file:src/ae_epoll.c
    2. static int aeApiCreate(aeEventLoop *eventLoop) {
    3. aeApiState *state = zmalloc(sizeof(aeApiState));
    4. state->epfd = epoll_create(1024);
    5. eventLoop->apidata = state;
    6. return 0;
    7. }

    2.2 绑定监听服务端口

    再来看 Redis 中的 listen 过程,它在 listenToPort 函数中。虽然调用链条很长,但其实主要就是执行了个简单 listen 而已。

    1. //file: src/redis.c
    2. int listenToPort(int port, int *fds, int *count) {
    3. for (j = 0; j < server.bindaddr_count || j == 0; j++) {
    4. fds[*count] = anetTcpServer(server.neterr,port,NULL,
    5. server.tcp_backlog);
    6. }
    7. }

    Redis 是支持开启多个端口的,所以在 listenToPort 中看到是启用一个循环来调用 anetTcpServer。在 anetTcpServer 中,逐步会展开调用,直到执行到 bind 和 listen 系统调用。

    1. //file:src/anet.c
    2. int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
    3. {
    4. return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
    5. }
    6. static int _anetTcpServer(......)
    7. {
    8. // 设置端口重用
    9. anetSetReuseAddr(err,s)
    10. // 监听
    11. anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog)
    12. }
    13. static int anetListen(......) {
    14. bind(s,sa,len);
    15. listen(s, backlog);
    16. ......
    17. }

    2.3 注册事件回调函数

    回头再看一下 initServer,它调用 aeCreateEventLoop 创建了 epoll,调用 listenToPort 进行了服务端口的 bind 和 listen。接着就开始调用 aeCreateFileEvent 来注册一个 accept 事件处理器。

    1. //file: src/server.c
    2. void initServer() {
    3. // 2.1.1 创建 epoll
    4. server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    5. // 2.1.2 监听服务端口
    6. listenToPort(server.port,server.ipfd,&server.ipfd_count);
    7. // 2.1.3 注册 accept 事件处理器
    8. for (j = 0; j < server.ipfd_count; j++) {
    9. aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
    10. acceptTcpHandler,NULL);
    11. }
    12. ...
    13. }

    来注意看调用 aeCreateFileEvent 时传的重要参数是 acceptTcpHandler,它表示将来在 listen socket 上有新用户连接到达的时候,该函数将被调用执行。来看 aeCreateFileEvent 具体代码。

    1. //file: src/ae.c
    2. int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
    3. aeFileProc *proc, void *clientData)
    4. {
    5. // 取出一个文件事件结构
    6. aeFileEvent *fe = &eventLoop->events[fd];
    7. // 监听指定 fd 的指定事件
    8. aeApiAddEvent(eventLoop, fd, mask);
    9. // 设置文件事件类型,以及事件的处理器
    10. fe->mask |= mask;
    11. if (mask & AE_READABLE) fe->rfileProc = proc;
    12. if (mask & AE_WRITABLE) fe->wfileProc = proc;
    13. // 私有数据
    14. fe->clientData = clientData;
    15. }

    函数 aeCreateFileEvent 一开始,从 eventLoop->events 获取了一个 aeFileEvent 对象。在 2.1 中介绍过 eventLoop->events 数组,注册的各种事件处理器会保存在这个地方。
    接下来调用 aeApiAddEvent。这个函数其实就是对 epoll_ctl 的一个封装。主要就是实际执行 epoll_ctl EPOLL_CTL_ADD。

    1. //file:src/ae_epoll.c
    2. static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    3. // add or mod
    4. int op = eventLoop->events[fd].mask == AE_NONE ?
    5. EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    6. ......
    7. // epoll_ctl 添加事件
    8. epoll_ctl(state->epfd,op,fd,&ee);
    9. return 0;
    10. }

    每一个 eventLoop->events 元素都指向一个 aeFileEvent 对象。在这个对象上,设置了三个关键东西

  • rfileProc:读事件回调

  • wfileProc:写事件回调
  • clientData:一些额外的扩展数据

将来 当 epoll_wait 发现某个 fd 上有事件发生的时候,这样 redis 首先根据 fd 到 eventLoop->events 中查找 aeFileEvent 对象,然后再看 rfileProc、wfileProc 就可以找到读、写回调处理函数。
回头看 initServer 调用 aeCreateFileEvent 时传参来看。

  1. //file: src/server.c
  2. void initServer() {
  3. ......
  4. // 2.1.3 注册 accept 事件处理器
  5. for (j = 0; j < server.ipfd_count; j++) {
  6. aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
  7. acceptTcpHandler,NULL);
  8. }
  9. }

listen fd 对应的读回调函数 rfileProc 事实上就被设置成了 acceptTcpHandler,写回调没有设置,私有数据 client_data 也为 null。

三、Redis 事件处理循环

在上一节介绍完了 Redis 的启动初始化过程,创建了 epoll,也进行了绑定监听,也注册了 accept 事件处理函数为 acceptTcpHandler。

  1. //file: src/server.c
  2. int main(int argc, char **argv) {
  3. ......
  4. // 启动初始化
  5. initServer();
  6. // 运行事件处理循环,一直到服务器关闭为止
  7. aeMain(server.el);
  8. }

接下来,Redis 就会进入 aeMain 开始进行真正的用户请求处理了。在 aeMain 函数中,是一个无休止的循环。在每一次的循环中,要做如下几件事情。
深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力 - 图6

  • 通过 epoll_wait 发现 listen socket 以及其它连接上的可读、可写事件
  • 若发现 listen socket 上有新连接到达,则接收新连接,并追加到 epoll 中进行管理
  • 若发现其它 socket 上有命令请求到达,则读取和处理命令,把命令结果写到缓存中,加入写任务队列
  • 每一次进入 epoll_wait 前都调用 beforesleep 来将写任务队列中的数据实际进行发送
  • 如若有首次未发送完毕的,当写事件发生时继续发送

    1. //file:src/ae.c
    2. void aeMain(aeEventLoop *eventLoop) {
    3. eventLoop->stop = 0;
    4. while (!eventLoop->stop) {
    5. // 如果有需要在事件处理前执行的函数,那么运行它
    6. // 3.4 beforesleep 处理写任务队列并实际发送之
    7. if (eventLoop->beforesleep != NULL)
    8. eventLoop->beforesleep(eventLoop);
    9. // 开始等待事件并处理
    10. // 3.1 epoll_wait 发现事件
    11. // 3.2 处理新连接请求
    12. // 3.3 处理客户连接上的可读事件
    13. aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    14. }
    15. }

    以上就是 aeMain 函数的核心逻辑所在,接下来分别对如上提到的四件事情进行详细的阐述。

    3.1 epoll_wait 发现事件

    Redis 不管有多少个用户连接,都是通过 epoll_wait 来统一发现和管理其上的可读(包括 liisten socket 上的 accept事件)、可写事件的。甚至连 timer,也都是交给 epoll_wait 来统一管理的。
    深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力 - 图7
    每当 epoll_wait 发现特定的事件发生的时候,就会调用相应的事先注册好的事件处理函数进行处理。来详细看 aeProcessEvents 对 epoll_wait 的封装。 ```c //file:src/ae.c int aeProcessEvents(aeEventLoop *eventLoop, int flags) { // 获取最近的时间事件 tvp = xxx

    // 处理文件事件,阻塞时间由 tvp 决定 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) {

    1. // 从已就绪数组中获取事件
    2. aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    3. //如果是读事件,并且有读回调函数
    4. fe->rfileProc()
    5. //如果是写事件,并且有写回调函数
    6. fe->wfileProc()

    } }

//file: src/ae_epoll.c static int aeApiPoll(aeEventLoop eventLoop, struct timeval tvp) { // 等待事件 aeApiState state = eventLoop->apidata; epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec1000 + tvp->tv_usec/1000) : -1); … }

  1. aeProcessEvents 就是调用 epoll_wait 来发现事件。当发现有某个 fd 上事件发生以后,则调为其事先注册的事件处理器函数 rfileProc wfileProc
  2. <a name="m3nk2"></a>
  3. ### 3.2 处理新连接请求
  4. 假设现在有新用户连接到达了。前面在看到 listen socket 上的 rfileProc 注册的是 acceptTcpHandler。也就是说,如果有连接到达的时候,会回调到 acceptTcpHandler。<br />在 acceptTcpHandler 中,主要做了几件事情<br />![](https://cdn.nlark.com/yuque/0/2022/png/396745/1653379252496-cdd296d2-8649-47fd-ab78-9e2895759433.png#clientId=u3b3eb21b-f394-4&crop=0&crop=0&crop=1&crop=1&from=paste&id=ua6f986d2&margin=%5Bobject%20Object%5D&originHeight=346&originWidth=493&originalType=url&ratio=1&rotation=0&showTitle=false&status=done&style=shadow&taskId=udc167658-7b8d-434b-af56-28792cc10a9&title=)
  5. - 调用 accept 系统调用把用户连接给接收回来
  6. - 为这个新连接创建一个唯一 redisClient 对象
  7. - 将这个新连接添加到 epoll,并注册一个读事件处理函数
  8. 接下来看上面这三件事情都分别是如何被处理的。
  9. ```c
  10. //file:src/networking.c
  11. void acceptTcpHandler(aeEventLoop *el, int fd, ...) {
  12. cfd = anetTcpAccept(server.neterr, fd, cip, ...);
  13. acceptCommonHandler(cfd,0);
  14. }

在 anetTcpAccept 中执行非常的简单,就是调用 accept 把连接接收回来。

  1. //file: src/anet.c
  2. int anetTcpAccept(......) {
  3. anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)
  4. }
  5. static int anetGenericAccept(......) {
  6. fd = accept(s,sa,len)
  7. }

接下来在 acceptCommonHandler 为这个新的客户端连接 socket,创建一个 redisClient 对象。

  1. //file: src/networking.c
  2. static void acceptCommonHandler(int fd, int flags) {
  3. // 创建 redisClient 对象
  4. redisClient *c;
  5. c = createClient(fd);
  6. ......
  7. }

在 createClient 中,创建 client 对象,并且为该用户连接注册了读事件处理器。

  1. //file:src/networking.c
  2. redisClient *createClient(int fd) {
  3. // 为用户连接创建 client 对象
  4. redisClient *c = zmalloc(sizeof(redisClient));
  5. if (fd != -1) {
  6. ...
  7. // 为用户连接注册读事件处理器
  8. aeCreateFileEvent(server.el,fd,AE_READABLE,
  9. readQueryFromClient, c)
  10. }
  11. ...
  12. }

关于 aeCreateFileEvent 的处理过程这里就不赘述了,详情参见 2.3 节。其效果就是将该用户连接 socket fd 对应的读处理函数设置为 readQueryFromClient, 并且设置私有数据为 redisClient c。

3.3 处理客户连接上的可读事件

现在假设该用户连接有命令到达了,就假设用户发送了GET XXXXXX_KEY 命令。那么在 Redis 的时间循环中调用 epoll_wait 发现该连接上有读时间后,会调用在上一节中讨论的为其注册的读处理函数 readQueryFromClient。
深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力 - 图8
在读处理函数 readQueryFromClient 中主要做了这么几件事情。

  • 解析并查找命令
  • 调用命令处理
  • 添加写任务到队列
  • 将输出写到缓存等待发送

来详细地看 readQueryFromClient 的代码。在 readQueryFromClient 中会调用 processInputBuffer,然后进入 processCommand 对命令进行处理。其调用链如下:

  1. //file: src/networking.c
  2. void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, ...) {
  3. redisClient *c = (redisClient*) privdata;
  4. processInputBufferAndReplicate(c);
  5. }
  6. void processInputBufferAndReplicate(client *c) {
  7. ...
  8. processInputBuffer(c);
  9. }
  10. // 处理客户端输入的命令内容
  11. void processInputBuffer(redisClient *c) {
  12. // 执行命令,
  13. processCommand(c);
  14. }

再来详细看 processCommand 。

  1. //file:
  2. int processCommand(redisClient *c) {
  3. // 查找命令,并进行命令合法性检查,以及命令参数个数检查
  4. c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
  5. ......
  6. // 处理命令
  7. // 如果是 MULTI 事务,则入队,否则调用 call 直接处理
  8. if (c->flags & CLIENT_MULTI && ...)
  9. {
  10. queueMultiCommand(c);
  11. } else {
  12. call(c,CMD_CALL_FULL);
  13. ...
  14. }
  15. return C_OK;
  16. }

先忽略 queueMultiCommand,直接看核心命令处理方法 call。

  1. //file:src/server.c
  2. void call(client *c, int flags) {
  3. // 查找处理命令,
  4. struct redisCommand *real_cmd = c->cmd;
  5. // 调用命令处理函数
  6. c->cmd->proc(c);
  7. ......
  8. }

在 server.c 中定义了每一个命令对应的处理函数

  1. //file:src/server.c
  2. struct redisCommand redisCommandTable[] = {
  3. {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
  4. {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
  5. {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
  6. {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
  7. {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
  8. ......
  9. {"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},
  10. {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
  11. {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
  12. {"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
  13. ......
  14. }

对于 get 命令来说,其对应的命令处理函数就是 getCommand。也就是说当处理 GET 命令执行到 c->cmd->proc 的时候会进入到 getCommand 函数中来。

  1. //file: src/t_string.c
  2. void getCommand(client *c) {
  3. getGenericCommand(c);
  4. }
  5. int getGenericCommand(client *c) {
  6. robj *o;
  7. if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)
  8. return C_OK;
  9. ...
  10. addReplyBulk(c,o);
  11. return C_OK;
  12. }

getGenericCommand 方法会调用 lookupKeyReadOrReply 来从内存中查找对应的 key值。如果找不到,则直接返回 C_OK;如果找到了,调用 addReplyBulk 方法将值添加到输出缓冲区中。

  1. //file: src/networking.c
  2. void addReplyBulk(client *c, robj *obj) {
  3. addReplyBulkLen(c,obj);
  4. addReply(c,obj);
  5. addReply(c,shared.crlf);
  6. }

其主题是调用 addReply 来设置回复数据。在 addReply 方法中做了两件事情:

  • prepareClientToWrite 判断是否需要返回数据,并且将当前 client 添加到等待写返回数据队列中。
  • 调用 _addReplyToBuffer 和 _addReplyObjectToList 方法将返回值写入到输出缓冲区中,等待写入 socekt

    1. //file:src/networking.c
    2. void addReply(client *c, robj *obj) {
    3. if (prepareClientToWrite(c) != C_OK) return;
    4. if (sdsEncodedObject(obj)) {
    5. if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
    6. _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    7. } else {
    8. ......
    9. }
    10. }

    先来看 prepareClientToWrite 的详细实现, ```c //file: src/networking.c int prepareClientToWrite(client *c) { …… if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))

    1. clientInstallWriteHandler(c);

    }

//file:src/networking.c void clientInstallWriteHandler(client *c) { c->flags |= CLIENT_PENDING_WRITE; listAddNodeHead(server.clients_pending_write,c); }

  1. 其中 server.clients_pending_write 就是说的任务队列,队列中的每一个元素都是有待写返回数据的 client 对象。在 prepareClientToWrite 函数中,把 client 添加到任务队列 server.clients_pending_write 里就算完事。<br />接下再来 _addReplyToBuffer,该方法是向固定缓存中写,如果写不下的话就继续调用 _addReplyStringToList 往链表里写。简单起见,只看 _addReplyToBuffer 的代码。
  2. ```c
  3. //file:src/networking.c
  4. int _addReplyToBuffer(client *c, const char *s, size_t len) {
  5. ......
  6. // 拷贝到 client 对象的 Response buffer 中
  7. memcpy(c->buf+c->bufpos,s,len);
  8. c->bufpos+=len;
  9. return C_OK;
  10. }

3.4 beforesleep 处理写任务队列

回想在 aeMain 函数中,每次在进入 aeProcessEvents 前都需要先进行 beforesleep 处理。这个函数名字起的怪怪的,但实际上大有用处。

  1. //file:src/ae.c
  2. void aeMain(aeEventLoop *eventLoop) {
  3. eventLoop->stop = 0;
  4. while (!eventLoop->stop) {
  5. // beforesleep 处理写任务队列并实际发送之
  6. if (eventLoop->beforesleep != NULL)
  7. eventLoop->beforesleep(eventLoop);
  8. aeProcessEvents(eventLoop, AE_ALL_EVENTS);
  9. }
  10. }

该函数处理了许多工作,其中一项便是遍历发送任务队列,并将 client 发送缓存区中的处理结果通过 write 发送到客户端手中。
深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力 - 图9
来看下 beforeSleep 的实际源码。

  1. //file:src/server.c
  2. void beforeSleep(struct aeEventLoop *eventLoop) {
  3. ......
  4. handleClientsWithPendingWrites();
  5. }
  6. //file:src/networking.c
  7. int handleClientsWithPendingWrites(void) {
  8. listIter li;
  9. listNode *ln;
  10. int processed = listLength(server.clients_pending_write);
  11. //遍历写任务队列 server.clients_pending_write
  12. listRewind(server.clients_pending_write,&li);
  13. while((ln = listNext(&li))) {
  14. client *c = listNodeValue(ln);
  15. c->flags &= ~CLIENT_PENDING_WRITE;
  16. listDelNode(server.clients_pending_write,ln);
  17. //实际将 client 中的结果数据发送出去
  18. writeToClient(c->fd,c,0)
  19. //如果一次发送不完则准备下一次发送
  20. if (clientHasPendingReplies(c)) {
  21. //注册一个写事件处理器,等待 epoll_wait 发现可写后再处理
  22. aeCreateFileEvent(server.el, c->fd, ae_flags,
  23. sendReplyToClient, c);
  24. }
  25. ......
  26. }
  27. }

在 handleClientsWithPendingWrites 中,遍历了发送任务队列 server.clients_pending_write,并调用 writeToClient 进行实际的发送处理。
值得注意的是,发送 write 并不总是能一次性发送完的。假如要发送的结果太大,而系统为每个 socket 设置的发送缓存区又是有限的。
在这种情况下,clientHasPendingReplies 判断仍然有未发送完的数据的话,就需要注册一个写事件处理函数到 epoll 上。等待 epoll 发现该 socket 可写的时候再次调用 sendReplyToClient进行发送。

  1. //file:src/networking.c
  2. int writeToClient(int fd, client *c, int handler_installed) {
  3. while(clientHasPendingReplies(c)) {
  4. // 先发送固定缓冲区
  5. if (c->bufpos > 0) {
  6. nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
  7. if (nwritten <= 0) break;
  8. ......
  9. // 再发送回复链表中数据
  10. } else {
  11. o = listNodeValue(listFirst(c->reply));
  12. nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
  13. ......
  14. }
  15. }
  16. }

writeToClient 中的主要逻辑就是调用 write 系统调用让内核帮其把数据发送出去即可。由于每个命令的处理结果大小是不固定的。所以 Redis 采用的做法用固定的 buf + 可变链表来储存结果字符串。这里自然发送的时候就需要分别对固定缓存区和链表来进行发送了。

四、高性能 Redis 网络原理总结

Redis 服务器端只需要单线程可以达到非常高的处理能力,每秒可以达到数万 QPS 的高处理能力。如此高性能的程序其实就是对 Linux 提供的多路复用机制 epoll 的一个较为完美的运用而已。
在 Redis 源码中,核心逻辑其实就是两个,一个是 initServer 启动服务,另外一个就是 aeMain 事件循环。把这两个函数弄懂了,Redis 就吃透一大半了。

  1. //file: src/server.c
  2. int main(int argc, char **argv) {
  3. ......
  4. // 启动初始化
  5. initServer();
  6. // 运行事件处理循环,一直到服务器关闭为止
  7. aeMain(server.el);
  8. }

在 initServer 这个函数内,Redis 做了这么三件重要的事情。

  • 创建一个 epoll 对象
  • 对配置的监听端口进行 listen
  • 把 listen socket 让 epoll 给管理起来

在 aeMain 函数中,是一个无休止的循环,它是 Redis 中最重要的部分。在每一次的循环中,要做的事情可以总结为如下图。
深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力 - 图10

  • 通过 epoll_wait 发现 listen socket 以及其它连接上的可读、可写事件
  • 若发现 listen socket 上有新连接到达,则接收新连接,并追加到 epoll 中进行管理
  • 若发现其它 socket 上有命令请求到达,则读取和处理命令,把命令结果写到缓存中,加入写任务队列
  • 每一次进入 epoll_wait 前都调用 beforesleep 来将写任务队列中的数据实际进行发送

其实事件分发器还处理了一个不明显的逻辑,那就是如果 beforesleep 在将结果写回给客户端的时候,如果由于内核 socket 发送缓存区过小而导致不能一次发送完毕的时候,也会注册一个写事件处理器。等到 epoll_wait 发现对应的 socket 可写的时候,再执行 write 写处理。