客户端发起的一个set操作,服务端会如何处理呢?

1.回顾

上一篇在讲到initServer函数的时候,里面有一段逻辑如下:

  1. if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler, NULL) == AE_ERR) {
  2. serverPanic("Unrecoverable error creating server.ipfd file event.");
  3. }

这里调用了aeCreateFileEvent函数,将读请求和读请求的处理函数acceptTcpHandler进行绑定,这样也就确定了服务端对于客户端请求的处理函数acceptTcpHandler

  1. int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
  2. aeFileProc *proc, void *clientData) {
  3. ...
  4. //因为我们在上层传入的是AE_READABLE ,所以会走这里的逻辑,
  5. // 将函数 acceptTcpHandler() 赋给了文件事件的读处理指针 rfileProc
  6. if (mask & AE_READABLE) fe->rfileProc = proc;
  7. if (mask & AE_WRITABLE) fe->wfileProc = proc;
  8. ....
  9. }

2.接收请求

服务端启动完成后就可以接受客户端的连接了,当客户端连接请求到来时,会在服务端的事件轮询时轮询出可读事件进行处理,这样就会触发启动过程中设置的处理函数 networking.c里面的acceptTcpHandler函数,这个函数会调用 connCreateAcceptedSocket函数创建客户端与服务端之间的 SocketacceptCommonHandler函数会对新建的 Socket 进行处理。

我们先来看一下上一篇最后提到的aeProcessEvents函数,这个函数会去轮训等待事件的到达,在这里我们暂时只需要关注读事件的处理即可。可能有的读者朋友会有疑问,为什么要这样跳着看呢?这就像是你来到一家新公司刚开始做需求,你不可能为了了解之前的需求把所有的代码都看了,这不现实,你只需要先了解和你本次需求相关的代码,随着你做的需求越来越多,自然而然就对某一个业务领域的代码都很熟悉了。

言归正传,fe->rfileProc(eventLoop, fd, fe->clientData, mask)这一行就调用了我们前面讲到的设置好的读请求处理函数acceptTcpHandler

  1. int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
  2. ....
  3. //等待所监听文件描述符上有事件发生。
  4. numevents = aeApiPoll(eventLoop, tvp);
  5. //处理触发的各个事件,包括网络事件和超时事件
  6. for (j = 0; j < numevents; j++) {
  7. // 获取就绪文件事件的地址
  8. aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
  9. ....
  10. if (!invert && fe->mask & mask & AE_READABLE) { //处理读事件
  11. fe->rfileProc(eventLoop, fd, fe->clientData, mask);
  12. fired++;
  13. fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
  14. }
  15. ....
  16. }

接下来我们来看一下acceptTcpHandler函数的逻辑:可以看到这里的逻辑就是循环接收到的事件数,然后调用connCreateAcceptedSocket函数创建服务端与客户端的tcp连接,拿到连接以后就可以通过acceptCommonHandler函数对请求进行处理。

  1. void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
  2. ....
  3. while(max--) {
  4. cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
  5. ...
  6. acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
  7. }
  8. }

3.创建客户端对象

acceptCommonHandler函数会调用 createClient函数新建一个 client 对象作为客户端的引用,以便维护服务端与客户端之间的连接。

  1. static void acceptCommonHandler(connection *conn, int flags, char *ip) {
  2. ...
  3. if ((c = createClient(conn)) == NULL) {
  4. //此处省略错误处理逻辑
  5. }
  6. ...
  7. }

接下来我们继续深究一下createClient函数,这个方法看着内容很多,但是实际上的逻辑并不复杂。

这个方法主要做的几件事包括:

  1. 如果已经成功建立了连接,就给连接设置读处理器readQueryFromClient函数
  2. 通过selectDb将客户端要操作的数据库默认设置为下标为 0 的数据库
  3. 设置一些其他属性为client对象赋能
  4. 通过listCreate把客户端连接通过尾插保存到服务端维护的客户端连接列表
    1. client *createClient(connection *conn) {
    2. ...
    3. if (conn) { //如果已经成功建立了连接
    4. ....
    5. //设置该连接的读处理器
    6. connSetReadHandler(conn, readQueryFromClient);
    7. connSetPrivateData(conn, c);
    8. }
    9. selectDb(c,0);
    10. //接下来省略设置一些其他属性为client对象赋能的代码
    11. }

    4.根据客户端发送事件的类型,调用不同的函数

connection.c里面的connSocketEventHandler函数将在客户端发送命令后,服务端轮询到可读事件时触发(具体的绑定是在创建Socket连接的时候实现的绑定)。其逻辑是根据事件类型触发不同的处理函数,则对于读事件将触发连接上的读处理函数 readQueryFromClient函数(具体的绑定是在创建Socket连接的时候实现的绑定)。

  1. static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask){
  2. ...
  3. int call_write = (mask & AE_WRITABLE) && conn->write_handler;
  4. int call_read = (mask & AE_READABLE) && conn->read_handler;
  5. /* 处理正常的输入/输出流 */
  6. if (!invert && call_read) {
  7. if (!callHandler(conn, conn->read_handler)) return;
  8. }
  9. /* 激发可写事件。 */
  10. if (call_write) {
  11. if (!callHandler(conn, conn->write_handler)) return;
  12. }
  13. ...
  14. }

5.读取客户端缓冲区的数据

�现在我们回到networking.c,看readQueryFromClient函数。这个方法主要就是两件事:首先读取socket中的数据,如果成功读取到数据那就将读取到的数据通过processInputBuffer方法转换成字符串命令。

  1. void readQueryFromClient(connection *conn) {
  2. ...
  3. nread = connRead(c->conn, c->querybuf+qblen, readlen);
  4. ...
  5. processInputBuffer(c);
  6. }

6.将数据转换成命令

读取到数据以后,接下来我们来看processInputBuffer函数式如何把从socket读取到的数据转换成字符串命令的。

首先做的是确定请求的类型,接着解析socket数据成redis识别的命令,然后根据请求的类型调用不同的处理函数。

  1. processInlineBuffer函数处理解析 telnet 发送的 PING 等内联命令
  2. processMultibulkBuffer函数处理解析批量命令包括 get/set 等
  3. processCommandAndResetClient函数开始执行解析出来的命令字符串

    1. void processInputBuffer(client *c) {
    2. /* 当输入缓冲区中有东西时,继续处理 */
    3. while(c->qb_pos < sdslen(c->querybuf)) {
    4. ...
    5. /* 确定请求类型 */
    6. if (!c->reqtype) {
    7. ....
    8. }
    9. if (c->reqtype == PROTO_REQ_INLINE) {
    10. //处理 telnet 发送的 PING 等内联命令
    11. if (processInlineBuffer(c) != C_OK) break;
    12. ....
    13. } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
    14. //处理批量命令包括 get/set 等
    15. if (processMultibulkBuffer(c) != C_OK) break;
    16. } else {
    17. serverPanic("Unknown request type");
    18. }
    19. /* 批处理有可能长度小于等于 0,这个时候重置客户端*/
    20. if (c->argc == 0) {
    21. resetClient(c);
    22. } else {
    23. /* 如果我们处于输入/输出线程的上下文中,我们就不能在这里真正执行命令。我们所能做的就是将客户端标记为需要处理命令的客户端。 */
    24. if (c->flags & CLIENT_PENDING_READ) {
    25. c->flags |= CLIENT_PENDING_COMMAND;
    26. break;
    27. }
    28. /* 函数开始执行解析出来的命令字符串 */
    29. if (processCommandAndResetClient(c) == C_ERR) {
    30. return;
    31. }
    32. }
    33. }
    34. ...
    35. }

    到了这里其实就是很轻松了,我们就关注一下processCommandAndResetClient函数的逻辑,看看他是如何执行解析出来的字符串命令。它的核心逻辑实际上就是调用processCommand函数对命令进行处理。

  1. int processCommandAndResetClient(client *c) {
  2. ...
  3. if (processCommand(c) == C_OK) {
  4. commandProcessed(c);
  5. }
  6. ...
  7. }

processCommand函数逻辑大概如下:

  1. 调用 lookupCommand函数将客户端传入的命令名称入参,从服务端启动过程中存储的命令列表中找到对应的 redisCommand
  2. 如果 redis 以 cluster 集群模式启动,需要调用 getNodeByQuery函数找到 key 所在的 slot 是否应该交给当前服务端处理,否则调用 clusterRedirectClient函数告诉客户端应该去找哪个服务端
  3. 异常情况的检测,包括是否超过内存限制、是否合法命令等
  4. 如果是事务执行命令, 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外其他命令都会被函数 queueMultiCommand函数入队到事务队列中处理
  5. 如果是常规命令,调用 call函数执行命令
  1. int processCommand(client *c) {
  2. ...
  3. c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
  4. ...
  5. if (server.cluster_enabled && !(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) &&!(!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)){
  6. int hashslot;
  7. int error_code;
  8. clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
  9. &hashslot,&error_code);
  10. if (n == NULL || n != server.cluster->myself) {
  11. if (c->cmd->proc == execCommand) {
  12. discardTransaction(c);
  13. } else {
  14. flagTransaction(c);
  15. }
  16. clusterRedirectClient(c,n,hashslot,error_code);
  17. c->cmd->rejected_calls++;
  18. return C_OK;
  19. }
  20. }
  21. ....
  22. /*如果是事务执行命令, 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外其他命令都会被函数 queueMultiCommand() 入队到事务队列中处理 */
  23. if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand && c->cmd->proc != resetCommand){
  24. queueMultiCommand(c);
  25. addReply(c,shared.queued);
  26. } else {//如果是常规命令,调用 call() 函数执行命令
  27. call(c,CMD_CALL_FULL);
  28. c->woff = server.master_repl_offset;
  29. if (listLength(server.ready_keys))
  30. handleClientsBlockedOnKeys();
  31. }
  32. ...
  33. }

7.常规命令的处理流程

我们只分析正常的命令处理流程即可,所以直接看正常命令的处理call()。这个方法过于巨大,大家忍一下,我只截取核心逻辑分析。

  1. 首先是根据命令调用对应的处理方法
  2. 然后做一些持久化相关的操作【aof】和 【slave】

    1. void call(client *c, int flags) {
    2. ...
    3. //调用客户端命令对应的 redisCommand 的处理方法,以 set 命令为例,会调用到 t_string.c#setCommand()方法
    4. c->cmd->proc(c);
    5. ...
    6. if (flags & CMD_CALL_PROPAGATE &&
    7. if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
    8. //同步数据到 AOF 文件和 slave节点。
    9. //该函数会根据服务端相关配置决定同步策略,
    10. // 其中调用 feedAppendOnlyFile() 函数会同步命令到AOF文件,
    11. // replicationFeedSlaves() 同步命令到 Slave 节点
    12. propagate(c->cmd, c->db->id, c->argv, c->argc, propagate_flags);
    13. }
    14. ...
    15. }

8.string类型的set操作

看一下string数据类型的setCommand函数。我先把源码的方法注释拷贝过来。

  1. /* SET key value [NX] [XX] [KEEPTTL] [GET] [EX <seconds>] [PX <milliseconds>] */

这里也能看出redis的string类型的使用。接下来看核心逻辑:

  1. void setCommand(client *c) {
  2. ....
  3. //将客户端传输过来的需保存的字符串对象尝试进行编码,以节约内存
  4. c->argv[2] = tryObjectEncoding(c->argv[2]);
  5. //将 key-value 保存到数据库中,接下来重点看这里
  6. setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
  7. ....
  8. }

我们放开断点,往下追踪这个方法:

  1. 命令检查
  2. 通过genericSetKey添加到数据库
  3. 把服务端的响应写入到缓冲区,发送给客户端

    1. void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    2. //将数据添加到 redisDb 中
    3. genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1);
    4. server.dirty++;
    5. //将key和过期时间保存在redis数据库的过期字典中
    6. if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
    7. notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
    8. if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
    9. "expire",key,c->db->id);
    10. if (!(flags & OBJ_SET_GET)) {
    11. //这个函数会把服务端对客户端的响应写入到缓冲区,发送给客户端
    12. addReply(c, ok_reply ? ok_reply : shared.ok);
    13. }
    14. }

    我们来看具体的添加逻辑:

    1. void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
    2. //查找 redis 数据库中是否存在指定的 key,因为 redis 的数据库结构可以看成是 HashMap,故其查找方式与 Java 中 HashMap 实现的方式相同。
    3. if (lookupKeyWrite(db,key) == NULL) {
    4. dbAdd(db,key,val);
    5. } else {
    6. dbOverwrite(db,key,val);
    7. }
    8. ...
    9. }

    9.总结

至此,一个客户端的一个set命令,服务端的处理流程我们就分析完了。接下来做一个梳理总结。

  1. 首先在服务端创建好事件循环处理器启动后,就可以开始接收客户端的请求
  2. 当客户端连接请求到来时,会在服务端的事件轮询时轮询出可读事件进行处理,这样就会触发启动过程中设置的处理函数
  3. 事件处理函数会去创建客户端对象
  4. 根据客户端对象发送的事件类型,选择不同的方法执行
  5. 读取客户端缓冲区的数据
  6. 对数据进行处理转换成字符串命令
  7. 根据命令类型的不同调用不同的方法
  8. 一个最简单的set操作,就会去调用t_string.c的setCommand方法
  9. 把数据写入到数据库,然后将相应数据写入到客户端缓冲区,发送给客户端。

Redis服务端启动&命令执行.jpg