Redis
Redis 是一个高性能服务端的典范。它通过多路复用 epoll 来管理海量的用户连接,只使用一个线程来通过事件循环来处理所有用户请求,就可以达到每秒数万 QPS 的处理能力。下图是单线程版本 Redis 工作的核心原理图。
Redis 6 中多线程的实现原理 - 图1
单线程的 Redis 虽然性能很高,但是却有两个问题。一个问题是没有办法充分发挥现代 CPU 的多核处理能力,一个实例只能使用一个核的能力。二是如果某个用户请求的处理过程卡住一段时间,会导致其它所有的请求都会出现超时的情况。所以,在线上的 redis 使用过程时是明确禁止使用 keys * 等长耗时的操作的。
那如何改进呢,思路和方向其实很明确。那就是和其它的主流程序一样引入多线程,用更多的线程来分担这些可能耗时的操作。事实上 Redis 也确实这么干了,在 6.0 以后的版本里,开始支持了多线程。来领略一下 Redis 的多线程是如何实现的。

一、多线程 Redis 服务启动

首先获取多线程版本 Redis 的源码

  1. # git clone https://github.com/redis/redis
  2. # cd redis
  3. # git checkout -b 6.2.0 6.2.0

默认情况下多线程是默认关闭的。如果想要启动多线程,需要在配置文件中做适当的修改。相关的配置项是 io-threads 和 io-threads-do-reads 两个。

  1. #vi /usr/local/soft/redis6/conf/redis.conf
  2. io-threads 4 #启用的 io 线程数量
  3. io-threads-do-reads yes #读请求也使用io线程

其中 io-threads 表示要启动的 io 线程的数量。io-threads-do-reads 表示是否在读阶段也使用 io 线程,默认是只在写阶段使用 io 线程的。
现在假设已经打开了如上两项多线程配置。带着这个假设,进入到 Redis 的 main 入口函数。

  1. //file: src/server.c
  2. int main(int argc, char **argv) {
  3. ......
  4. // 1.1 主线程初始化
  5. initServer();
  6. // 1.2 启动 io 线程
  7. InitServerLast();
  8. // 进入事件循环
  9. aeMain(server.el);
  10. }

1.1 主线程初始化

在 initServer 这个函数内,Redis 主线程做了这么几件重要的事情。
Redis 6 中多线程的实现原理 - 图2

  • 初始化读任务队列、写任务队列
  • 创建一个 epoll 对象
  • 对配置的监听端口进行 listen
  • 把 listen socket 让 epoll 给管理起来

    1. //file: src/server.c
    2. void initServer() {
    3. // 1 初始化 server 对象
    4. server.clients_pending_write = listCreate();
    5. server.clients_pending_read = listCreate();
    6. ......
    7. // 2 初始化回调 events,创建 epoll
    8. server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    9. // 3 绑定监听服务端口
    10. listenToPort(server.port,server.ipfd,&server.ipfd_count);
    11. // 4 注册 accept 事件处理器
    12. for (j = 0; j < server.ipfd_count; j++) {
    13. aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
    14. acceptTcpHandler,NULL);
    15. }
    16. ...
    17. }

    接下来分别来看。

    初始化 server 对象

    在 initServer 的一开头,先是对 server 的各种成员变量进行初始化。值得注意的是 clients_pending_write 和 clients_pending_read 这两个成员,它们分别是写任务队列和读任务队列。将来主线程产生的任务都会放在放在这两个任务队列里。
    主线程会根据这两个任务队列来进行任务哈希散列,以将任务分配到多个线程中进行处理。

    aeCreateEventLoop 处理

    来看 aeCreateEventLoop 详细逻辑。它会初始化事件回调 event,并且创建了一个 epoll 对象出来。

    1. //file:src/ae.c
    2. aeEventLoop *aeCreateEventLoop(int setsize) {
    3. aeEventLoop *eventLoop;
    4. eventLoop = zmalloc(sizeof(*eventLoop);
    5. //将来的各种回调事件就都会存在这里
    6. eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    7. ......
    8. aeApiCreate(eventLoop);
    9. return eventLoop;
    10. }

    注意一下 eventLoop->events,将来在各种事件注册的时候都会保存到这个数组里。

    1. //file:src/ae.h
    2. typedef struct aeEventLoop {
    3. ......
    4. aeFileEvent *events; /* Registered events */
    5. }

    具体创建 epoll 的过程在 ae_epoll.c 文件下的 aeApiCreate 中。在这里,真正调用了 epoll_create

    1. //file:src/ae_epoll.c
    2. static int aeApiCreate(aeEventLoop *eventLoop) {
    3. aeApiState *state = zmalloc(sizeof(aeApiState));
    4. state->epfd = epoll_create(1024);
    5. eventLoop->apidata = state;
    6. return 0;
    7. }

    绑定监听服务端口

    再来看 Redis 中的 listen 过程,它在 listenToPort 函数中。调用链条很长,依次是 listenToPort => anetTcpServer => _anetTcpServer => anetListen。在 anetListen 中,就是简单的 bind 和 listen 的调用。

    1. //file:src/anet.c
    2. static int anetListen(......) {
    3. bind(s,sa,len);
    4. listen(s, backlog);
    5. ......
    6. }

    注册事件回调函数

    前面调用 aeCreateEventLoop 创建了 epoll,调用 listenToPort 进行了服务端口的 bind 和 listen。接着就调用的 aeCreateFileEvent 就是来注册一个 accept 事件处理器。
    来看 aeCreateFileEvent 具体代码。

    1. //file: src/ae.c
    2. int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
    3. aeFileProc *proc, void *clientData)
    4. {
    5. // 取出一个文件事件结构
    6. aeFileEvent *fe = &eventLoop->events[fd];
    7. // 监听指定 fd 的指定事件
    8. aeApiAddEvent(eventLoop, fd, mask);
    9. // 设置文件事件类型,以及事件的处理器
    10. fe->mask |= mask;
    11. if (mask & AE_READABLE) fe->rfileProc = proc;
    12. if (mask & AE_WRITABLE) fe->wfileProc = proc;
    13. // 私有数据
    14. fe->clientData = clientData;
    15. }

    函数 aeCreateFileEvent 一开始,从 eventLoop->events 获取了一个 aeFileEvent 对象。
    接下来调用 aeApiAddEvent。这个函数其实就是对 epoll_ctl 的一个封装。主要就是实际执行 epoll_ctl EPOLL_CTL_ADD。

    1. //file:src/ae_epoll.c
    2. static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    3. // add or mod
    4. int op = eventLoop->events[fd].mask == AE_NONE ?
    5. EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    6. ......
    7. // epoll_ctl 添加事件
    8. epoll_ctl(state->epfd,op,fd,&ee);
    9. return 0;
    10. }

    每一个 eventLoop->events 元素都指向一个 aeFileEvent 对象。在这个对象上,设置了三个关键东西

  • rfileProc:读事件回调

  • wfileProc:写事件回调
  • clientData:一些额外的扩展数据

将来 当 epoll_wait 发现某个 fd 上有事件发生的时候,这样 redis 首先根据 fd 到 eventLoop->events 中查找 aeFileEvent 对象,然后再看 rfileProc、wfileProc 就可以找到读、写回调处理函数。
回头看 initServer 调用 aeCreateFileEvent 时传参来看。

  1. //file: src/server.c
  2. void initServer() {
  3. ......
  4. for (j = 0; j < server.ipfd_count; j++) {
  5. aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
  6. acceptTcpHandler,NULL);
  7. }
  8. }

listen fd 对应的读回调函数 rfileProc 事实上就被设置成了 acceptTcpHandler,写回调没有设置,私有数据 client_data 也为 null。

1.2 io 线程启动

在主线程启动以后,会调用 InitServerLast => initThreadedIO 来创建多个 io 线程。
Redis 6 中多线程的实现原理 - 图3
将来这些 IO 线程会配合主线程一起共同来处理所有的 read 和 write 任务。

Redis 6 中多线程的实现原理 - 图4
来看 InitServerLast 创建 IO 线程的过程。

  1. //file:src/server.c
  2. void InitServerLast() {
  3. initThreadedIO();
  4. ......
  5. }
  6. //file:src/networking.c
  7. void initThreadedIO(void) {
  8. //如果没开启多 io 线程配置就不创建了
  9. if (server.io_threads_num == 1) return;
  10. //开始 io 线程的创建
  11. for (int i = 0; i < server.io_threads_num; i++) {
  12. pthread_t tid;
  13. pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i)
  14. io_threads[i] = tid;
  15. }
  16. }

在 initThreadedIO 中调用 pthread_create 库函数创建线程,并且注册线程回调函数 IOThreadMain。

  1. //file:src/networking.c
  2. void *IOThreadMain(void *myid) {
  3. long id = (unsigned long)myid;
  4. while(1) {
  5. //循环等待任务
  6. for (int j = 0; j < 1000000; j++) {
  7. if (getIOPendingCount(id) != 0) break;
  8. }
  9. //允许主线程来关闭自己
  10. ......
  11. //遍历当前线程等待队列里的请求 client
  12. listIter li;
  13. listNode *ln;
  14. listRewind(io_threads_list[id],&li);
  15. while((ln = listNext(&li))) {
  16. client *c = listNodeValue(ln);
  17. if (io_threads_op == IO_THREADS_OP_WRITE) {
  18. writeToClient(c,0);
  19. } else if (io_threads_op == IO_THREADS_OP_READ) {
  20. readQueryFromClient(c->conn);
  21. } else {
  22. serverPanic("io_threads_op value is unknown");
  23. }
  24. }
  25. listEmpty(io_threads_list[id]);
  26. }
  27. }

是将当前线程等待队列 io_threads_list[id] 里所有的请求 client,依次取出处理。其中读操作通过 readQueryFromClient 处理, 写操作通过 writeToClient 处理。其中 io_threads_list[id] 中的任务是主线程分配过来的,后面将会看到。

二、主线程事件循环

接着进入到 Redis 最重要的 aeMain,这个函数就是一个死循环(Redis 不退出的话),不停地执行 aeProcessEvents 函数。

  1. void aeMain(aeEventLoop *eventLoop) {
  2. eventLoop->stop = 0;
  3. while (!eventLoop->stop) {
  4. aeProcessEvents(eventLoop, AE_ALL_EVENTS|
  5. AE_CALL_BEFORE_SLEEP|
  6. AE_CALL_AFTER_SLEEP);
  7. }
  8. }

其中 aeProcessEvents 就是所谓的事件分发器。它通过调用 epoll_wait 来发现所发生的各种事件,然后调用事先注册好的处理函数进行处理。
Redis 6 中多线程的实现原理 - 图5
接着看 aeProcessEvents 函数。

  1. //file:src/ae.c
  2. int aeProcessEvents(aeEventLoop *eventLoop, int flags)
  3. {
  4. // 2.3 事件循环处理3:epoll_wait 前进行读写任务队列处理
  5. if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
  6. eventLoop->beforesleep(eventLoop);
  7. //epoll_wait发现事件并进行处理
  8. numevents = aeApiPoll(eventLoop, tvp);
  9. // 从已就绪数组中获取事件
  10. aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
  11. //如果是读事件,并且有读回调函数
  12. //2.1 如果是 listen socket 读事件,则处理新连接请求
  13. //2.2 如果是客户连接socket 读事件,处理客户连接上的读请求
  14. fe->rfileProc()
  15. //如果是写事件,并且有写回调函数
  16. fe->wfileProc()
  17. ......
  18. }

其中 aeApiPoll 就是对 epoll_wait 的一个封装而已。

  1. //file: src/ae_epoll.c
  2. static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
  3. // 等待事件
  4. aeApiState *state = eventLoop->apidata;
  5. epoll_wait(state->epfd,state->events,eventLoop->setsize,
  6. tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
  7. ...
  8. }

aeProcessEvents 就是调用 epoll_wait 来发现事件。当发现有某个 fd 上事件发生以后,则调为其事先注册的事件处理器函数 rfileProc 和 wfileProc。

2.1 事件循环处理1:新连接到达

在 1.1 节中看到,主线程初始化的时候,将 listen socket 上的读事件处理函数注册成了 acceptTcpHandler。也就是说如果有新连接到达的时候,acceptTcpHandler 将会被执行到。
Redis 6 中多线程的实现原理 - 图6
在这个函数内,主要完成如下几件事情。

  • 调用 accept 接收连接
  • 创建一个 redisClient对象
  • 添加到 epoll
  • 注册读事件处理函数

接下来进入 acceptTcpHandler 源码。

  1. //file:src/networking.c
  2. void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
  3. ......
  4. cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
  5. acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
  6. }

其中 netTcpAccept 调用 accept 系统调用获取连接,就不展开了。来看 acceptCommonHandler。

  1. //file: src/networking.c
  2. static void acceptCommonHandler(int fd, int flags) {
  3. // 创建客户端
  4. redisClient *c;
  5. if ((c = createClient(fd)) == NULL) {
  6. }
  7. }
  8. client *createClient(connection *conn) {
  9. client *c = zmalloc(sizeof(client));
  10. // 为用户连接注册读事件处理器
  11. if (conn) {
  12. ...
  13. connSetReadHandler(conn, readQueryFromClient);
  14. connSetPrivateData(conn, c);
  15. }
  16. selectDb(c,0);
  17. c->id = client_id;
  18. c->resp = 2;
  19. c->conn = conn;
  20. ......
  21. }

在上面的代码中,重点关注 connSetReadHandler(conn, readQueryFromClient),这一行是将这个新连接的读事件处理函数设置成了 readQueryFromClient。

2.2 事件循环处理2:用户命令请求到达

在上面看到了, Redis 把用户连接上的读请求处理函数设置成了 readQueryFromClient,这意味着当用户连接上有命令发送过来的时候,会进入 readQueryFromClient 开始执行。
在多线程版本的 readQueryFromClient 中,处理逻辑非常简单,仅仅只是将发生读时间的 client 放到了任务队列里而已。
Redis 6 中多线程的实现原理 - 图7
来详细看 readQueryFromClient 代码。

  1. //file:src/networking.c
  2. void readQueryFromClient(connection *conn) {
  3. client *c = connGetPrivateData(conn);
  4. //如果启动 threaded I/O 的话,直接入队
  5. if (postponeClientRead(c)) return;
  6. //处理用户连接读请求
  7. ......
  8. c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
  9. nread = connRead(c->conn, c->querybuf+qblen, readlen);
  10. processInputBuffer(c);
  11. }

在 postponeClientRead 中判断,是不是开启了多 io 线程,如果开启了的话,那就将有请求数据到达的 client 直接放到读任务队列(server.clients_pending_read)中就算是完事。看下 postponeClientRead。

  1. //file:src/networking.c
  2. int postponeClientRead(client *c) {
  3. if (server.io_threads_active &&
  4. server.io_threads_do_reads &&
  5. !ProcessingEventsWhileBlocked &&
  6. !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
  7. {
  8. c->flags |= CLIENT_PENDING_READ;
  9. listAddNodeHead(server.clients_pending_read,c);
  10. return 1;
  11. } else {
  12. return 0;
  13. }
  14. }

listAddNodeHead 就是把这个 client 对象添加到 server.clients_pending_read 而已。

2.3 事件循环处理3:epoll_wait 前进行任务处理

在 aeProcessEvents 中假如 aeApiPoll(epoll_wait)中的事件都处理完了以后,则会进入下一次的循环再次进入 aeProcessEvents。
而这一次中 beforesleep 将会处理前面读事件处理函数添加的读任务队列了。

  1. //file:src/ae.c
  2. int aeProcessEvents(aeEventLoop *eventLoop, int flags)
  3. {
  4. // 参见 2.4 事件循环处理3:epoll_wait 前进行任务处理
  5. if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
  6. eventLoop->beforesleep(eventLoop);
  7. //epoll_wait发现事件并进行处理
  8. numevents = aeApiPoll(eventLoop, tvp);
  9. ......
  10. }

在 beforeSleep 里会依次处理两个任务队列。先处理读任务队列,解析其中的请求,并处理之。然后将处理结果写到缓存中,同时写到写任务队列中。紧接着 beforeSleep 会进入写任务队列处理,会将处理结果写到 socket 里,进行真正的数据发送。
Redis 6 中多线程的实现原理 - 图8
来看 beforeSleep 的代码,这个函数中最重要的两个调用是 handleClientsWithPendingReadsUsingThreads(处理读任务队列),handleClientsWithPendingWritesUsingThreads(处理写任务队列)

  1. //file:src/server.c
  2. void beforeSleep(struct aeEventLoop *eventLoop) {
  3. //处理读任务队列
  4. handleClientsWithPendingReadsUsingThreads();
  5. //处理写任务队列
  6. handleClientsWithPendingWritesUsingThreads();
  7. ......
  8. }

值得注意的是,如果开启了多 io 线程的话,handleClientsWithPendingReadsUsingThreads 和 handleClientsWithPendingWritesUsingThreads 中将会是主线程、io 线程一起配合来处理的。所以单独分两个小节来阐述。

三、主线程 && io 线程处理读请求

在 handleClientsWithPendingReadsUsingThreads 中,主线程会遍历读任务队列 server.clients_pending_read,把其中的请求分配到每个 io 线程的处理队列 io_threads_list[target_id] 中。然后通知各个 io 线程开始处理。
Redis 6 中多线程的实现原理 - 图9

3.1 主线程分配任务

来看 handleClientsWithPendingReadsUsingThreads 详细代码。

  1. //file:src/networking.c
  2. //当开启了 reading + parsing 多线程 I/O
  3. //read handler 仅仅只是把 clients 推到读队列里
  4. //而这个函数开始处理该任务队列
  5. int handleClientsWithPendingReadsUsingThreads(void) {
  6. //访问读任务队列 server.clients_pending_read
  7. listRewind(server.clients_pending_read,&li);
  8. //把每一个任务取出来
  9. //添加到指定线程的任务队列里 io_threads_list[target_id]
  10. while((ln = listNext(&li))) {
  11. client *c = listNodeValue(ln);
  12. int target_id = item_id % server.io_threads_num;
  13. listAddNodeTail(io_threads_list[target_id],c);
  14. item_id++;
  15. }
  16. //启动Worker线程,处理读请求
  17. io_threads_op = IO_THREADS_OP_READ;
  18. for (int j = 1; j < server.io_threads_num; j++) {
  19. int count = listLength(io_threads_list[j]);
  20. setIOPendingCount(j, count);
  21. }
  22. //主线程处理 0 号任务队列
  23. listRewind(io_threads_list[0],&li);
  24. while((ln = listNext(&li))) {
  25. //需要先干掉 CLIENT_PENDING_READ 标志
  26. //否则 readQueryFromClient 并不处理,而是入队
  27. client *c = listNodeValue(ln);
  28. readQueryFromClient(c->conn);
  29. }
  30. //主线程等待其它线程处理完毕
  31. while(1) {
  32. unsigned long pending = 0;
  33. for (int j = 1; j < server.io_threads_num; j++)
  34. pending += getIOPendingCount(j);
  35. if (pending == 0) break;
  36. }
  37. //再跑一遍任务队列,目的是处理输入
  38. while(listLength(server.clients_pending_read)) {
  39. ......
  40. processInputBuffer(c);
  41. if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
  42. clientInstallWriteHandler(c);
  43. }
  44. }

在主线程中将任务分别放到了 io_threads_list 的第 0 到第 N 个元素里。并对 1 : N 号线程通过 setIOPendingCount 发消息,告诉他们起来处理。这时候 io 线程将会在 IOThreadMain 中收到消息并开始处理读任务。

  1. //file:src/networking.c
  2. void *IOThreadMain(void *myid) {
  3. while(1) {
  4. //遍历当前线程等待队列里的请求 client
  5. listRewind(io_threads_list[id],&li);
  6. while((ln = listNext(&li))) {
  7. client *c = listNodeValue(ln);
  8. if (io_threads_op == IO_THREADS_OP_WRITE) {
  9. writeToClient(c,0);
  10. } else if (io_threads_op == IO_THREADS_OP_READ) {
  11. readQueryFromClient(c->conn);
  12. } else {
  13. serverPanic("io_threads_op value is unknown");
  14. }
  15. }
  16. }
  17. }

在 io 线程中,从自己的 io_threads_list[id] 中遍历获取待处理的 client。如果发现是读请求处理,则进入 readQueryFromClient 开始处理特定的 client。
而主线程在分配完 1 :N 任务队列让其它 io 线程处理后,自己则开始处理第 0 号任务池。同样是会进入到 readQueryFromClient 中来执行。

  1. //file:src/networking.c
  2. int handleClientsWithPendingReadsUsingThreads(void) {
  3. ......
  4. //主线程处理 0 号任务队列
  5. listRewind(io_threads_list[0],&li);
  6. while((ln = listNext(&li))) {
  7. //需要先干掉 CLIENT_PENDING_READ 标志
  8. //否则 readQueryFromClient 并不处理,而是入队
  9. client *c = listNodeValue(ln);
  10. readQueryFromClient(c->conn);
  11. }
  12. ......
  13. }

所以无论是主线程还是 io 线程,处理客户端的读事件都是会进入 readQueryFromClient。来看其源码。

3.2 读请求处理

  1. //file:src/networking.c
  2. void readQueryFromClient(connection *conn) {
  3. //读取请求
  4. nread = connRead(c->conn, c->querybuf+qblen, readlen);
  5. //处理请求
  6. processInputBuffer(c);
  7. }

在 connRead 中就是调用 read 将 socket 中的命令读取出来,就不展开看了。接着在 processInputBuffer 中将输入缓冲区中的数据解析成对应的命令。解析完命令后真正开始处理它。

  1. //file:src/networking.c
  2. void processInputBuffer(client *c) {
  3. while(c->qb_pos < sdslen(c->querybuf)) {
  4. //解析命令
  5. ......
  6. //真正开始处理 command
  7. processCommandAndResetClient(c);
  8. }
  9. }

函数 processCommandAndResetClient 会调用 processCommand,查询命令并开始执行。执行的核心方法是 call 函数,直接看它。

  1. //file:src/server.c
  2. void call(client *c, int flags) {
  3. // 查找处理命令,
  4. struct redisCommand *real_cmd = c->cmd;
  5. // 调用命令处理函数
  6. c->cmd->proc(c);
  7. ......
  8. }

在 server.c 中定义了每一个命令对应的处理函数

  1. //file:src/server.c
  2. struct redisCommand redisCommandTable[] = {
  3. {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
  4. {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
  5. {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
  6. {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
  7. {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
  8. ......
  9. {"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},
  10. {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
  11. {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
  12. {"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
  13. ......
  14. }

对于 get 命令来说,其对应的命令处理函数就是 getCommand。也就是说当处理 GET 命令执行到 c->cmd->proc 的时候会进入到 getCommand 函数中来。

  1. //file: src/t_string.c
  2. void getCommand(client *c) {
  3. getGenericCommand(c);
  4. }
  5. int getGenericCommand(client *c) {
  6. robj *o;
  7. if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)
  8. return C_OK;
  9. ...
  10. addReplyBulk(c,o);
  11. return C_OK;
  12. }

getGenericCommand 方法会调用 lookupKeyReadOrReply 来从内存中查找对应的 key值。如果找不到,则直接返回 C_OK;如果找到了,调用 addReplyBulk 方法将值添加到输出缓冲区中。

  1. //file: src/networking.c
  2. void addReplyBulk(client *c, robj *obj) {
  3. addReplyBulkLen(c,obj);
  4. addReply(c,obj);
  5. addReply(c,shared.crlf);
  6. }

3.3 写处理结果到发送缓存区

其主体是调用 addReply 来设置回复数据。在 addReply 方法中做了两件事情:

  • prepareClientToWrite 判断是否需要返回数据,并且将当前 client 添加到等待写返回数据队列中。
  • 调用 _addReplyToBuffer 和 _addReplyObjectToList 方法将返回值写入到输出缓冲区中,等待写入 socekt

    1. //file:src/networking.c
    2. void addReply(client *c, robj *obj) {
    3. if (prepareClientToWrite(c) != C_OK) return;
    4. if (sdsEncodedObject(obj)) {
    5. if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
    6. _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    7. } else {
    8. ......
    9. }
    10. }

    先来看 prepareClientToWrite 的详细实现, ```c //file: src/networking.c int prepareClientToWrite(client *c) { ……

    if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))

    1. clientInstallWriteHandler(c);

    }

//file:src/networking.c void clientInstallWriteHandler(client *c) { c->flags |= CLIENT_PENDING_WRITE; listAddNodeHead(server.clients_pending_write,c); }

  1. 其中 server.clients_pending_write 就是说的任务队列,队列中的每一个元素都是有待写返回数据的 client 对象。在 prepareClientToWrite 函数中,把 client 添加到任务队列 server.clients_pending_write 里就算完事。<br />接下再来 _addReplyToBuffer,该方法是向固定缓存中写,如果写不下的话就继续调用 _addReplyStringToList 往链表里写。简单起见,只看 _addReplyToBuffer 的代码。
  2. ```c
  3. //file:src/networking.c
  4. int _addReplyToBuffer(client *c, const char *s, size_t len) {
  5. ......
  6. // 拷贝到 client 对象的 Response buffer 中
  7. memcpy(c->buf+c->bufpos,s,len);
  8. c->bufpos+=len;
  9. return C_OK;
  10. }

要注意的是,本节的读请求处理过程是主线程和 io 线程在并行执行的。主线程在处理完后会等待其它的 io 线程处理。在所有的读请求都处理完后,主线程 beforeSleep 中对 handleClientsWithPendingReadsUsingThreads 的调用就结束了。

四、主线程 && io 线程配合处理写请求

当所有的读请求处理完后,handleClientsWithPendingReadsUsingThreads 会退出。主线程会紧接着进入 handleClientsWithPendingWritesUsingThreads 中来处理。
Redis 6 中多线程的实现原理 - 图10

  1. //file:src/server.c
  2. void beforeSleep(struct aeEventLoop *eventLoop) {
  3. //处理读任务队列
  4. handleClientsWithPendingReadsUsingThreads();
  5. //处理写任务队列
  6. handleClientsWithPendingWritesUsingThreads();
  7. ......
  8. }

4.1 主线程分配任务

  1. //file:src/networking.c
  2. int handleClientsWithPendingWritesUsingThreads(void) {
  3. //没有开启多线程的话,仍然是主线程自己写
  4. if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
  5. return handleClientsWithPendingWrites();
  6. }
  7. ......
  8. //获取待写任务
  9. int processed = listLength(server.clients_pending_write);
  10. //在N个任务列表中分配该任务
  11. listIter li;
  12. listNode *ln;
  13. listRewind(server.clients_pending_write,&li);
  14. int item_id = 0;
  15. while((ln = listNext(&li))) {
  16. client *c = listNodeValue(ln);
  17. c->flags &= ~CLIENT_PENDING_WRITE;
  18. /* Remove clients from the list of pending writes since
  19. * they are going to be closed ASAP. */
  20. if (c->flags & CLIENT_CLOSE_ASAP) {
  21. listDelNode(server.clients_pending_write, ln);
  22. continue;
  23. }
  24. //hash的方式进行分配
  25. int target_id = item_id % server.io_threads_num;
  26. listAddNodeTail(io_threads_list[target_id],c);
  27. item_id++;
  28. }
  29. //告诉对应的线程该开始干活了
  30. io_threads_op = IO_THREADS_OP_WRITE;
  31. for (int j = 1; j < server.io_threads_num; j++) {
  32. int count = listLength(io_threads_list[j]);
  33. setIOPendingCount(j, count);
  34. }
  35. //主线程自己也会处理一些
  36. listRewind(io_threads_list[0],&li);
  37. while((ln = listNext(&li))) {
  38. client *c = listNodeValue(ln);
  39. writeToClient(c,0);
  40. }
  41. listEmpty(io_threads_list[0]);
  42. //循环等待其它线程结束处理
  43. while(1) {
  44. unsigned long pending = 0;
  45. for (int j = 1; j < server.io_threads_num; j++)
  46. pending += getIOPendingCount(j);
  47. if (pending == 0) break;
  48. }
  49. ......
  50. }

在 io 线程中收到消息后,开始遍历自己的任务队列 io_threads_list[id],并将其中的 client 挨个取出来开始处理。

  1. //file:src/networking.c
  2. void *IOThreadMain(void *myid) {
  3. while(1) {
  4. //遍历当前线程等待队列里的请求 client
  5. listRewind(io_threads_list[id],&li);
  6. while((ln = listNext(&li))) {
  7. client *c = listNodeValue(ln);
  8. if (io_threads_op == IO_THREADS_OP_WRITE) {
  9. writeToClient(c,0);
  10. } else if (io_threads_op == IO_THREADS_OP_READ) {
  11. readQueryFromClient(c->conn);
  12. }
  13. }
  14. listEmpty(io_threads_list[id]);
  15. }
  16. }

4.2 写请求处理

由于这次任务队列里都是写请求,所以 io 线程会进入 writeToClient。而主线程在分配完任务以后,自己开始处理起了 io_threads_list[0],并也进入到 writeToClient。

  1. //file:src/networking.c
  2. int writeToClient(int fd, client *c, int handler_installed) {
  3. while(clientHasPendingReplies(c)) {
  4. // 先发送固定缓冲区
  5. if (c->bufpos > 0) {
  6. nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
  7. if (nwritten <= 0) break;
  8. ......
  9. // 再发送回复链表中数据
  10. } else {
  11. o = listNodeValue(listFirst(c->reply));
  12. nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
  13. ......
  14. }
  15. }
  16. }

writeToClient 中的主要逻辑就是调用 write 系统调用让内核帮其把数据发送出去即可。由于每个命令的处理结果大小是不固定的。所以 Redis 采用的做法用固定的 buf + 可变链表来储存结果字符串。这里自然发送的时候就需要分别对固定缓存区和链表来进行发送了。
当所有的写请求也处理完后,beforeSleep 就退出了。主线程将会再次调用 epoll_wait 来发现请求,进入下一轮的用户请求处理。

五、总结

  1. //file: src/server.c
  2. int main(int argc, char **argv) {
  3. ......
  4. // 1.1 主线程初始化
  5. initServer();
  6. // 1.2 启动 io 线程
  7. InitServerLast();
  8. // 进入事件循环
  9. aeMain(server.el);
  10. }

在 initServer 这个函数内,Redis 做了这么三件重要的事情。

  • 创建一个 epoll 对象
  • 对配置的监听端口进行 listen
  • 把 listen socket 让 epoll 给管理起来

在 initThreadedIO 中调用 pthread_create 库函数创建线程,并且注册线程回调函数 IOThreadMain。在 IOThreadMain 中等待其队列 io_threads_list[id] 产生请求,当有请求到达的时候取出 client,依次处理。其中读操作通过 readQueryFromClient 处理, 写操作通过 writeToClient 处理。
主线程在 aeMain 函数中,是一个无休止的循环,它是 Redis 中最重要的部分。它先是调用事件分发器发现事件。如果有新连接请求到达的时候,执行 accept 接收新连接,并为其注册事件处理函数。
当用户连接上有命令请求到达的时候,主线程在 read 处理函数中将其添加到读发送队列中。然后接着在 beforeSleep 中开启对读任务队列和写任务队列的处理。总体工作过程如下图所示。
Redis 6 中多线程的实现原理 - 图11
在这个处理过程中,对读任务队列和写任务队列的处理都是多线程并行进行的(前提是开篇开启了多 IO 线程并且也并发处理读)。
当读任务队列和写任务队列的都处理完的时候,主线程再一次调用 epoll_wait 去发现新的待处理事件,如此往复循环进行处理。
至此,多线程版本的 Redis 的工作原理就介绍完了。坦白讲,这种多线程模型实现的并不足够的好。
原因是主线程是在处理读、写任务队列的时候还要等待其它的 io 线程处理完才能进入下一步。假设这时有 10 个用户请求到达,其中 9 个处理耗时需要 1 ms,而另外一个命令需要 1 s。则这时主线程仍然会等待这个 io 线程处理 1s 结束后才能进入后面的处理。整个 Redis 服务还是被一个耗时的命令给 block 住了。