先聊一道经典面试题:Redis为什么这么快? 因为Redis使用了纯内存结构,单线程IO多路复用事件驱动模型。 先说单线程的好处:

  1. 没有线程创建,销毁带来的开销
  2. 避免了cpu上下文切换的开销
  3. 避免了线程之间带来的竞争问题,例如,加锁,释放锁,死锁等等。

Redis使用单线程不是白白浪费了cpu的资源么? 事实上单线程已经够用了,CPU 不是 redis 的瓶颈。Redis 的瓶颈最有可能是机器内存或者网络带宽。既然单线程容易实现,而且 CPU 不会成为瓶颈,那就顺理成章地采用单线程的方案了。

一,Redis的事件驱动

Redis服务器是一个事件驱动程序,我们先来看一下什么是事件驱动?

所谓的事件驱动,其实就是当你输入一条命令之后,这条命令被组装成Redis协议的格式发送到了Redis的服务器,Redis服务器认为这是一个事件,Redis服务器会接收这个事件/命令,利用事件处理器对事件进行处理。当没有与Redis服务器进行交互的时候,服务器就会处于阻塞等待状态,这个时候CPU在干嘛?其实是在睡眠,直到事件触发的时候,就会再次被操作系统所唤醒。所以说,事件驱动对CPU的利用率更为高效。

事件驱动是一个抽象的说法,也可以叫做IO多路复用,上一篇文章介绍过,针对于不同的操作系统,其实现上是有一些不同的。

Redis没有选择libevent或者libev作为事件模型,而是自己定义了一个事件模型,主要支持了epoll,select,kqueue,以及基于Solaris的wvent ports。主要支持了两种类型的事件驱动:

  1. IO事件,包括有IO的读事件&写事件
  2. 定时器事件,包括一次性定时器和循环定时器

二,Redis对于事件的抽象

Redis将上面的IO事件&定时器事件分别抽象成为了一个数据结构。

1.文件事件结构

文件事件结构包括IO读事件和IO写事件。文件事件结构aeFileEvent的定义在ae.h文件中,包括一个整型的文件事件类型【AE_NONE,AE_READABLE,AE_WRITABLE】,两个函数指针可读处理函数和可写处理函数,还有一个用来指向客户端传入数据的指针。

这两个函数指针的类型函数为aeFileProc,这是一个回调函数,如果当前文件事件所指定的事件类型发生时,就会调用对应的回调函数处理该事件。

当事件就绪的时候,我们需要知道文件事件的文件描述符还有事件类型才能对应该事件,因为Redis定义了aeFiredEvent就绪事件结构体来进行统一管理,这个结构体的定义也是在ae.h头文件中,里面有两个字段:

  1. 就绪事件的文件描述符
  2. 就绪事件的类型【AE_NONE,AE_READABLE,AE_WRITABLE
    1. typedef struct aeFiredEvent {
    2. // 就绪事件的文件描述符
    3. int fd;
    4. // 就绪事件类型:AE_NONE,AE_READABLE,AE_WRITABLE
    5. int mask;
    6. } aeFiredEvent;

    2.时间事件结构

时间事件的结构aeTimeEvent同样定义在ae.h头文件中,其主要字段如下:

  1. 时间事件的id
  2. 时间事件到达的时间的毫秒数
  3. 引用计数:防止计时器事件在递归时间事件的调用中被释放
  4. 时间事件处理的回调函数
  5. 时间事件删除时的回调函数
  6. 客户端传入的数据
  7. 指向上一个事件 & 下一个事件的指针

由此可以看出,时间事件是通过指向上一个事件的指针和下一个事件的指针构成了一个双向链表。

类似于文件事件,当时间事件所指定的事件发生的时候,也会调用对应的回调函数aeTimeProc (定时器事件回调函数)& aeEventFinalizerProc(删除定时事件的回调函数)。

虽然Redis已经对文件事件和时间事件都做了抽象,但是实际上Redis仍然又对事件做了整体的抽象,也就是aeEventLoop结构体,这个结构体用于表示事件状态结构,同样定义与ae.h头文件中。

3.事件状态结构

3.1 aeEventLoop结构

aeEventLoop结构保存了文件事件,定时事件,底层IO多路复用模型等诸多信息,下面我们来具体看一下:

  1. /* State of an event based program 事件状态结构*/
  2. typedef struct aeEventLoop {
  3. //当前注册的最大文件描述符
  4. int maxfd; /* highest file descriptor currently registered */
  5. /**
  6. * 指定事件循环要监听的文件描述符集合的大小。这个值与配置文件中得maxclients有关。
  7. *
  8. * setsize参数表示了eventloop可以监听的网络事件fd的个数(不包含超时事件),
  9. * 如果当前监听的fd个数超过了setsize,eventloop将不能继续注册。
  10. */
  11. int setsize; /* max number of file descriptors tracked */
  12. // 下一个时间事件的ID
  13. long long timeEventNextId;
  14. // 最后一次执行事件的时间
  15. time_t lastTime; /* Used to detect system clock skew */
  16. /**
  17. * 存放所有注册的读写事件,是大小为setsize的数组。内核会保证新建连接的fd是当前可用描述符的最小值,
  18. * 所以最多监听setsize个描述符,那么最大的fd就是setsize - 1。这种组织方式的好处是,可以以fd为下标,
  19. * 索引到对应的事件,在事件触发后根据fd快速查找到对应的事件。
  20. */
  21. aeFileEvent *events; /* Registered events */
  22. //存放触发的读写事件。同样是setsize大小的数组。
  23. aeFiredEvent *fired; /* Fired events */
  24. //redis将定时器事件组织成链表,这个属性指向表头。
  25. aeTimeEvent *timeEventHead;
  26. // 事件处理开关
  27. int stop;
  28. //存放epoll、select等实现相关的数据。
  29. void *apidata; /* This is used for polling API specific data */
  30. //事件循环在每次迭代前会调用beforesleep执行一些异步处理。
  31. aeBeforeSleepProc *beforesleep;
  32. // 执行处理事件之后的回调函数
  33. aeBeforeSleepProc *aftersleep;
  34. } aeEventLoop;

3.2 底层IO多路复用模式的选择

aeEventLoop结构保存了一个void *类型的万能指针apidata,是用来保存轮训事件的状态的,也就是保存了底层调用的多路复用库的事件状态。关于Redis的多路复用库的选择,Redis包装了常见的selectepollevportkqueue,他们在编译阶段,根据不同的系统选择性能最高的一个多路复用库作为Redis的多路复用程序的实现,而且所有库实现的接口名称都是相同的,因此Redis多路复用程序底层实现是可以相互切换的,具体的实现逻辑在ae.c文件中。

多路复用库的选择顺序:evport->epoll->kqueue->select

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE

#include "ae_kqueue.c"

#else
#include "ae_select.c"
#endif
#endif
#endif

我们如何判断当前系统所使用的多路复用库呢?可以在redis-cli端输入info server命令查看。我的当前系统是mac os 系统,使用的库函数实现为multiplexing_api:kqueue

3.3 epoll多路复用库

epoll多路复用库的相关函数都在ae_epoll.c文件中,实际上就是对将要监听的事件和epoll_create创建的文件描述符epfd封装到了一起。

Redis在每一种多路复用模型的实现文件中都定义了一个aeApiState结构体,用于记录需要用到的数据,epoll模型里面定义的aeApiState结构体包含了两个字段:

  1. 保存epoll实例的文件描述符
  2. 事件表:保存epoll监控的文件描述符发生的IO事件,是一个数组,用于epoll_wait

epoll模型的struct epoll_event的结构中定义了自己的事件类型,例如EPOLLIN POLLOUT等,但是Redis的文件事件结构aeFileEvent也在mask中定义了自己的事件类型,例如:AE_READABLE,AE_WRITEABLE等,所以就需要实现一个中间层将用户层的事件类型转换成系统底层事件类型,这也就是ae_epoll.c文件中实现的相同的api。

// 创建一个epoll实例,保存到eventLoop中
static int aeApiCreate(aeEventLoop *eventLoop)
// 调整事件表的大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
// 释放epoll实例和事件表空间
static void aeApiFree(aeEventLoop *eventLoop)
// 在epfd标识的事件表上注册fd的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
// 在epfd标识的事件表上注删除fd的事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
// 等待所监听文件描述符上有事件发生
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
// 返回正在使用的IO多路复用库的名字
static char *aeApiName(void)

这些API都是调用相应的底层多路复用库来将Redis事件状态结构aeEventLoop所关联,就是将epoll底层函数封装起来,Redis实现事件时,只需要调用这些接口即可。我们来分析两个重要的函数源码:

首先我们来看一下在epfd文件描述符标识的事件表上注册fd事件的过程:

  1. 根据之前有没有监听这个文件描述符的事件,判断是添加还是修改操作
  2. 如果是修改事件,就需要合并之前的事件类型
  3. 将Redis层面的事件类型映射为系统底层epoll的事件类型
  4. 设置事件所对应的目标文件的描述符
  5. 调用底层的epoll_ctl函数,将事件注册到epoll中

    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
     aeApiState *state = eventLoop->apidata;
     struct epoll_event ee = {0}; /* avoid valgrind warning */
     //如果之前有监控这个文件描述符的事件,就是修改操作,否则是添加操作
     int op = eventLoop->events[fd].mask == AE_NONE ?
             EPOLL_CTL_ADD : EPOLL_CTL_MOD;
     ee.events = 0;
     //如果是修改事件,合并之前的事件类型
     mask |= eventLoop->events[fd].mask;
     //根据mask映射epoll的事件类型
     if (mask & AE_READABLE) ee.events |= EPOLLIN;
     if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
     //设置事件所对应的目标文件描述符
     ee.data.fd = fd;
     //调用epoll接口,将ee事件注册到epoll中
     if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
     return 0;
    }
    

    接下来我们再来看一下等待监听的文件描述符上有事件发生的过程:

  6. 调用epoll_wait监听事件表上是否有事件发生

  7. 如果至少有一个就绪的事件,那就遍历就绪的事件表,将其加入到eventLoop的就绪事件表
    1. 将就绪的epoll事件转换为统一的AE事件
    2. 添加到就绪的事件表中
  8. 返回就绪的事件个数

    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
     aeApiState *state = eventLoop->apidata;
     int retval, numevents = 0;
     //调用epoll_wait监听事件表上是否有事件发生,tvp设置超时时间,没有就设置-1,永久阻塞
     retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
             tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
     //如果至少有一个就绪的事件
     if (retval > 0) {
         int j;
    
         numevents = retval;
         //遍历就绪的事件表,将其加入到eventLoop的就绪事件表
         for (j = 0; j < numevents; j++) {
             int mask = 0;
             struct epoll_event *e = state->events+j;
             //翻译就绪的epoll事件为统一的AE事件
             if (e->events & EPOLLIN) mask |= AE_READABLE;
             if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
             if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
             if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
             //添加到就绪的事件表中
             eventLoop->fired[j].fd = e->data.fd;
             eventLoop->fired[j].mask = mask;
         }
     }
     //返回就绪的事件个数
     return numevents;
    }
    

    4.回调函数类型

  • IO读写事件回调函数
  • 定时器事件回调函数
  • 删除定时器事件回调函数
  • 进入事件循环等待之前的回调函数

关于回调函数的类型,定义在了ae.h头文件中:

/*io读写事件的回调函数*/
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
/*定时器事件回调函数*/
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
/*删除定时事件的回调函数*/
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
/*进入循环等待之前的回调函数*/
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);

三,事件源码分析

1.事件模型的创建&释放

Redis通过以下接口完成eventLoop的创建和释放:

aeEventLoop *aeCreateEventLoop(int setsize);
void aeDEleteEventLoop(aeEventLoop *eventLoop);

1.1 eventLoop的创建

Redis将对应的事件注册到 eventLoop中,然后不断循环检测有无事件触发。目前 eventLoop 支持超时事件和网络IO读写事件的注册。我们可以通过aeCreateEventLoop来创建一个eventLoop,事实上,在Redis服务器初始化的时候就会调用server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);创建一个eventLoop,作为后续整个服务器运行期间的事件模型。在创建eventLoop的时候,必须指定一个setsize的参数,setsize参数表示 eventLoop可以监听的网络事件 fd 的个数(不包含超时事件),如果当前监听的fd个数超过了setsizeeventLoop不会再继续注册,而setsize的大小,实际上就是server.maxclients+CONFIG_FDSET_INCR,其中CONFIG_FDSET_INCR大小为128,定义在server.h头文件中。

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    monotonicInit();    /* just in case the calling app didn't initialize */

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    /*创建两个尚未初始化的数组*/
    eventLoop->events = zmalloc(sizeof(aeFileEvent) * setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent) * setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    /*
     * setsize:server.maxclients+CONFIG_FDSET_INCR
     * 1. maxclients代表用户配置的最大连接数,可以在启动的时候通过 --maxclients指定,默认为10000。
     * 2. ONFIG_FDSET_INCR大小为128,给Redis预留一些安全的空间。
     * */
    eventLoop->setsize = setsize;
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    eventLoop->flags = 0;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

    err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

1.2 eventLoop如何存储监听的事件

上一篇文章提到过,Linux内核会为每个进程维护一个文件描述符列表。而POSIX标准对于文件描述符进行了一些约束:

  • fd 为 0 ,1 ,2 分别表示标准输入,标准输出&错误输出
  • 每次新打开的fd,必须使用当前进程中最小可用的文件描述符

Redis充分利用了fd的这些特点,来存储每个fd对应的事件;在Redis的eventLoop中,直接用了一个连续的数组来存储事件信息:

eventLoop->events = zmalloc(sizeof(aeFileEvent) * setsize);
for (i = 0; i < setsize; i++)
    eventLoop->events[i].mask = AE_NONE;

可以看到数组的长度就是setisze,同时创建之后将每一个event的mask属性设置为AE_NONE,前面简单提到过,mask表示fd注册了哪些事件。对于eventLoop->events数组来说,fd就是这个数组的下标。比如当程序刚刚启动的时候,创建监听套接字,按照标准规定,该fd的值为3.此时就直接在eventLoop->events 下标为3的元素中存放相应的event数据。不过也基于fd的这些特点,意味着events数组的前三位一定不会有相应的fd赋值。

2.网络IO事件的创建&释放

Redis 通过以下接口进行网络 IO 事件的注册和删除:

//将某个 fd 的某些事件注册到 eventloop 中
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
                      aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);

当前版本可以注册事件有三种:

  1. AE_READABLE 可读事件
  2. AE_WRITEABLE 可写事件
  3. AE_BARRIER 该事件可以实现读写事件处理顺序的反转

而mask就是这几个事件经过或运算以后的值。

aeCreateFileEvent 调用了aeApiAddEvent函数,aeApiAddEvent函数真正将事件加入监听,但其具体实现与选择的底层IO模型相关,其实前面我们已经分析过,如果实在epoll模型下,将事件加入监听的流程,在 epoll 的实现中调用了 epoll_ctl 函数。Redis 会根据该事件对应之前的 mask 是否为 AE_NONE,来决定使用 EPOLL_CTL_ADD 还是 EPOLL_CTL_MOD。

同样的,aeDeleteFileEvent 调用了aeApiDelEvent函数, aeApiDelEvent函数真正将事件移除监听,但其具体实现与选择的底层IO模型相关,如果选择的是epoll模型,那么就会调用 epoll_ctl,Redis 判断用户是否是要完全删除该 fd 上所有事件,来决定使用 EPOLL_CTL_DEL 还是 EPOLL_CTL_MOD。

3.定时事件

Redis 通过以下两个接口进行定时器的注册和取消:

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
                            aeTimeProc *proc, void *clientData,
                            aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);

在调用 aeCreateTimeEvent 注册超时事件的时候,主要是绑定了调用方传递过来的两个函数aeTimeProc 和 aeEventFinalizerProc。

  • aeTimeProc 就是当超时事件触发时调用的 callback。但是,aeTimeProc 需要返回一个 int 值,代表下次该超时事件触发的时间间隔。如果返回 - 1,则说明超时时间不需要再触发了,标记为删除即可。

  • finalizerProc 当 timer 被删除的时候,会调用这个 callback。

Redis 的定时器其实非常简单,只是一个普通的双向链表,链表也并不是有序的。每次最新的超时事件,直接插入链表的最头部。当AE要遍历当前时刻的超时事件时,直接从头到尾遍历链表,看看有没有超时的事件。一般来说,定时器都会采用最小堆或者时间轮等有序数据结构进行存储,为什么 Redis 的定时器做的这么简陋?《Redis的设计与实现》一书中说,在 Redis 3.0 版本中,只使用到了serverCon这一个超时事件。所以这种情况下,也无所谓性能了,虽然是个链表,但其实用起来就只有一个元素,相当于当做一个指针在用。

Redis 在msUntilEarliestTimer函数的注释里面也说明了这事,并且给出了以后的优化方案:用skiplist代替现有普通链表,查询的时间复杂度将优化为O(1),插入的时间复杂度将变成O(log(N)),不过一般这么写的其实都不会优化的,如果你比较勤奋,这么干了,说不定你就是Redis的开源commter了。

4.事件循环

4.1 事件循环的主流程

在ae.c文件的主函数aeMain,实现事件模型的底层循环等待,其实在前面的文章里我们已经看过大体的流程,这次我们再来详细分析下。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    //当事件组没有停止的时候
    while (!eventLoop->stop) {
        //就会执行这个函数,所以看这个函数的逻辑
        aeProcessEvents(eventLoop, AE_ALL_EVENTS | AE_CALL_BEFORE_SLEEP | AE_CALL_AFTER_SLEEP);

    }
}

这个函数的逻辑其实很简单,如果服务器一直处理事件,那么就是一个死循环,而一个最典型的事件驱动,就是一个死循环。调用处理事件的函数aeProcessEvents,他们参数是一个事件状态结构aeEventLoopAE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP

int aeProcessEvents(aeEventLoop *eventLoop, int flags);

函数形参flags 是 (AE_ALL_EVENTS | AE_CALL_BEFORE_SLEEP |AE_CALL_AFTER_SLEEP)的结果。

  • 如果flags = 0,函数什么都不做,直接返回
  • 如果flags设置了 AE_ALL_EVENTS ,则执行所有类型的事件
  • 如果flags设置了 AE_FILE_EVENTS ,则执行文件事件
  • 如果flags设置了 AE_TIME_EVENTS ,则执行时间事件
  • 如果flags设置了 AE_DONT_WAIT ,那么函数处理完事件后直接返回,不阻塞等待
  • 如果flags设置了 AE_CALL_BEFORE_SLEEP,那么函数在aeApiPoll函数返回之前就执行eventLoop->beforesleep(eventLoop);
  • 如果flags设置了 AE_CALL_AFTER_SLEEP,那么函数在aeApiPoll函数返回之后就执行eventLoop->aftersleep(eventLoop);

Redis服务器在没有被事件触发时,就会阻塞等待,因为没有设置AE_DONT_WAIT标识。但是他不会一直的死等文件事件的到来,因为他还要处理时间事件,因此,在调用aeApiPoll进行监听之前,先从时间事件表中获取一个最近到达的时间事件,根据要等待的时间构建一个struct timeval tv, *tvp结构的变量,这个变量保存着服务器阻塞等待文件事件的最长时间,一旦时间到达而没有触发文件事件,aeApiPoll函数就会停止阻塞,进而调用processTimeEvents处理时间事件,因为Redis服务器设定一个对自身资源和状态进行周期性检查的定时时间事件,而该函数就是timeProc所指向的回调函数serverCron,该定时事件在服务器初始化的时候就会设置,创建位置在server.c的void initServer(void)函数中。

/* 创建计时器回调,指定一下定时事件的处理函数为【serverCron】,包括:主从节点&集群模式 各个节点的定时通信,客户端超时、驱逐未访问的过期密钥等。 */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
    serverPanic("Can't create event loop timers.");
    exit(1);
}

回调函数serverCron位于server.c中,如果在阻塞等待的最长时间之间,触发了文件事件,就会先执行文件事件,后执行时间事件,因此处理时间事件通常比预设的会晚一点。

4.2 文件事件回调函数类型

执行文件事件rfileProc和wfileProc也是调用了回调函数,Redis将文件事件的处理分为了好几种,用于处理不同的网络通信需求,下面列出回调函数的原型:

  1. 网络客户端连接请求
  2. 本地客户端连接请求
  3. [客户端]读事件请求
  4. [服务端]写事件请求
// 用于accept client的connect。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
// 用于acceptclient的本地connect。
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
// 用于读入client发送的请求。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask);
// 用于向client发送命令回复。
void addReplySds(client *c, const char *s, size_t len);

4.3 获取下一个超时事件

获取最快达到的时间事件的函数msUntilEarliestTimer实现其实就是遍历链表,找到最小值。

4.4 定时事件的处理

我们重点看执行时间事件的函数processTimeEvents实现:

如果时间事件不存在,则就调用finalizerProc指向的回调函数,删除当前的时间事件,为什么会出现这种情况呢,这是因为定时事件到时处理结束后,根据timeProc函数的返回值判断该定时事件是否会继续存在,如果返回值不是-1,则说明该定时事件需要继续存在,则继续设置它的下次到期时间;如果返回值是-1,那么说明该定时事件不需要继续存在,则将该事件的id设置为AE_DELETED_EVENT_ID,然后在下一次的定时事件的遍历的时候对其进行删除。如果存在,就调用timeProc指向的回调函数处理时间事件。Redis的时间事件分为两类:

  1. 定时事件:让一段程序在指定的时间后执行一次。
  2. 周期性事件:让一段程序每隔指定的时间后执行一次。

如果当前的时间事件是周期性,那么就会在将时间周期添加到周期事件的到时时间中。如果是定时事件,则将该时间事件删除。


四,总结

至此,Redis 的事件模型源代码我们就分析完了,我们再来梳理一下:

image.png

  • Redis的事件模型是完全自定义的,底层封装了epoll,select,kqueue,wvent ports等IO多路复用模型,支持IO事件(IO读事件&IO写事件) & 定时器事件(一次性定时器和周期性定时器) 两种类型的事件驱动。

  • Redis将上面两种事件类型分别抽象成了一种专门的数据结构,不光如此,Redis又对事件做了个整体的抽象,也就是aeEventLoop结构体,这个结构体用于表示事件状态结构。

  • aeEventLoop结构保存了一个void *类型的万能指针apidata,是用来保存轮训事件的状态的,也就是保存了底层调用的多路复用库的事件状态。关于Redis的多路复用库的选择,在编译阶段,根据不同的系统选择性能最高的一个多路复用库作为Redis的多路复用程序的实现。

  • epoll模型的struct epoll_event的结构中定义了自己的事件类型,但是Redis的文件事件结构aeFileEvent也在mask中定义了自己的事件类型,所以就需要实现一个中间层将用户层的事件类型转换成系统底层事件类型,这也就是ae_epoll.c文件中实现的相同的api。

  • Redis将对应的事件注册到 eventLoop中,然后不断循环检测有无事件触发。目前 eventLoop 支持超时事件和网络IO读写事件的注册。我们可以通过aeCreateEventLoop来创建一个eventLoop,事实上,在Redis服务器初始化的时候就会创建一个eventLoop,作为后续整个服务器运行期间的事件模型。在创建eventLoop的时候,必须指定一个setsize的参数,setsize参数表示 eventLoop可以监听的网络事件 fd 的个数(不包含超时事件),如果当前监听的fd个数超过了setsizeeventLoop不会再继续注册,而setsize的大小,实际上就是server.maxclients+CONFIG_FDSET_INCR,其中CONFIG_FDSET_INCR大小为128,定义在server.h头文件中。

  • 在Redis的eventLoop中,直接用了一个连续的数组来存储事件信息,数组的长度就是setisze,同时创建之后将每一个event的mask属性设置为AE_NONE,mask表示fd注册了哪些事件。对于eventLoop->events数组来说,fd就是这个数组的下标。

  • 网络IO事件的创建:aeCreateFileEvent 调用了aeApiAddEvent函数,aeApiAddEvent函数真正将事件加入监听,但其具体实现与选择的底层IO模型相关,其实前面我们已经分析过,如果实在epoll模型下,将事件加入监听的流程,在 epoll 的实现中调用了 epoll_ctl 函数。Redis 会根据该事件对应之前的 mask 是否为 AE_NONE,来决定使用 EPOLL_CTL_ADD 还是 EPOLL_CTL_MOD。

  • Redis 的定时器是一个普通的双向链表,链表也并不是有序的。每次最新的超时事件,直接插入链表的最头部。当AE要遍历当前时刻的超时事件时,直接从头到尾遍历链表,看看有没有超时的事件。同时定时器上绑定了两个函数,一个是到期执行函数,一个是删除时执行的函数。

  • Redis服务器在没有被事件触发时,就会阻塞等待,因为没有设置AE_DONT_WAIT标识。但是他不会一直的死等文件事件的到来,因为他还要处理时间事件,因此,在调用aeApiPoll进行监听之前,先从时间事件表中获取一个最近到达的时间事件,根据要等待的时间构建一个struct timeval tv, *tvp结构的变量,这个变量保存着服务器阻塞等待文件事件的最长时间,一旦时间到达而没有触发文件事件,aeApiPoll函数就会停止阻塞,进而调用processTimeEvents处理时间事件,因为Redis服务器设定一个对自身资源和状态进行周期性检查的定时时间事件,而该函数就是timeProc所指向的回调函数serverCron,该定时事件在服务器初始化的时候就会设置。

  • Redis的时间事件分为两种:定时事件&周期事件。对于时间事件的处理:如果时间事件不存在,则就调用finalizerProc指向的回调函数,删除当前的时间事件,为什么会出现这种情况呢,这是因为定时事件到时处理结束后,根据timeProc函数的返回值判断该定时事件是否会继续存在,如果返回值不是-1,则说明该定时事件需要继续存在,则继续设置它的下次到期时间;如果返回值是-1,那么说明该定时事件不需要继续存在,则将该事件的id设置为AE_DELETED_EVENT_ID,然后在下一次的定时事件的遍历的时候对其进行删除。如果存在,就调用timeProc指向的回调函数处理时间事件。


参考文章:

[1]:https://blog.csdn.net/men_wen/article/details/71514524
[2]:https://blog.csdn.net/wynter_/article/details/53318353
[3]:https://blog.csdn.net/chosen0ne/article/details/42717571
[4]:https://www.cyhone.com/articles/analysis-of-redis-ae/
[5]:https://www.cnblogs.com/MrLiuZF/p/15001415.html

文章都是基于老版本的Redis 的源码分析,对于新版本,改动还是有些偏大,所以还是要结合源码自己实际对比才能看出Redis在不同版本的优化。