客户端发起的一个set操作,服务端会如何处理呢?
1.回顾
上一篇在讲到initServer函数的时候,里面有一段逻辑如下:
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler, NULL) == AE_ERR) {serverPanic("Unrecoverable error creating server.ipfd file event.");}
这里调用了aeCreateFileEvent函数,将读请求和读请求的处理函数acceptTcpHandler进行绑定,这样也就确定了服务端对于客户端请求的处理函数acceptTcpHandler。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData) {...//因为我们在上层传入的是AE_READABLE ,所以会走这里的逻辑,// 将函数 acceptTcpHandler() 赋给了文件事件的读处理指针 rfileProcif (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;....}
2.接收请求
服务端启动完成后就可以接受客户端的连接了,当客户端连接请求到来时,会在服务端的事件轮询时轮询出可读事件进行处理,这样就会触发启动过程中设置的处理函数 networking.c里面的acceptTcpHandler函数,这个函数会调用 connCreateAcceptedSocket函数创建客户端与服务端之间的 Socket, acceptCommonHandler函数会对新建的 Socket 进行处理。
我们先来看一下上一篇最后提到的aeProcessEvents函数,这个函数会去轮训等待事件的到达,在这里我们暂时只需要关注读事件的处理即可。可能有的读者朋友会有疑问,为什么要这样跳着看呢?这就像是你来到一家新公司刚开始做需求,你不可能为了了解之前的需求把所有的代码都看了,这不现实,你只需要先了解和你本次需求相关的代码,随着你做的需求越来越多,自然而然就对某一个业务领域的代码都很熟悉了。
言归正传,fe->rfileProc(eventLoop, fd, fe->clientData, mask)这一行就调用了我们前面讲到的设置好的读请求处理函数acceptTcpHandler。
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {....//等待所监听文件描述符上有事件发生。numevents = aeApiPoll(eventLoop, tvp);//处理触发的各个事件,包括网络事件和超时事件for (j = 0; j < numevents; j++) {// 获取就绪文件事件的地址aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];....if (!invert && fe->mask & mask & AE_READABLE) { //处理读事件fe->rfileProc(eventLoop, fd, fe->clientData, mask);fired++;fe = &eventLoop->events[fd]; /* Refresh in case of resize. */}....}
接下来我们来看一下acceptTcpHandler函数的逻辑:可以看到这里的逻辑就是循环接收到的事件数,然后调用connCreateAcceptedSocket函数创建服务端与客户端的tcp连接,拿到连接以后就可以通过acceptCommonHandler函数对请求进行处理。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {....while(max--) {cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);...acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);}}
3.创建客户端对象
acceptCommonHandler函数会调用 createClient函数新建一个 client 对象作为客户端的引用,以便维护服务端与客户端之间的连接。
static void acceptCommonHandler(connection *conn, int flags, char *ip) {...if ((c = createClient(conn)) == NULL) {//此处省略错误处理逻辑}...}
接下来我们继续深究一下createClient函数,这个方法看着内容很多,但是实际上的逻辑并不复杂。
这个方法主要做的几件事包括:
- 如果已经成功建立了连接,就给连接设置读处理器
readQueryFromClient函数 - 通过
selectDb将客户端要操作的数据库默认设置为下标为 0 的数据库 - 设置一些其他属性为client对象赋能
- 通过
listCreate把客户端连接通过尾插保存到服务端维护的客户端连接列表client *createClient(connection *conn) {...if (conn) { //如果已经成功建立了连接....//设置该连接的读处理器connSetReadHandler(conn, readQueryFromClient);connSetPrivateData(conn, c);}selectDb(c,0);//接下来省略设置一些其他属性为client对象赋能的代码}
4.根据客户端发送事件的类型,调用不同的函数
connection.c里面的connSocketEventHandler函数将在客户端发送命令后,服务端轮询到可读事件时触发(具体的绑定是在创建Socket连接的时候实现的绑定)。其逻辑是根据事件类型触发不同的处理函数,则对于读事件将触发连接上的读处理函数 readQueryFromClient函数(具体的绑定是在创建Socket连接的时候实现的绑定)。
static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask){...int call_write = (mask & AE_WRITABLE) && conn->write_handler;int call_read = (mask & AE_READABLE) && conn->read_handler;/* 处理正常的输入/输出流 */if (!invert && call_read) {if (!callHandler(conn, conn->read_handler)) return;}/* 激发可写事件。 */if (call_write) {if (!callHandler(conn, conn->write_handler)) return;}...}
5.读取客户端缓冲区的数据
�现在我们回到networking.c,看readQueryFromClient函数。这个方法主要就是两件事:首先读取socket中的数据,如果成功读取到数据那就将读取到的数据通过processInputBuffer方法转换成字符串命令。
void readQueryFromClient(connection *conn) {...nread = connRead(c->conn, c->querybuf+qblen, readlen);...processInputBuffer(c);}
6.将数据转换成命令
读取到数据以后,接下来我们来看processInputBuffer函数式如何把从socket读取到的数据转换成字符串命令的。
首先做的是确定请求的类型,接着解析socket数据成redis识别的命令,然后根据请求的类型调用不同的处理函数。
processInlineBuffer函数处理解析 telnet 发送的 PING 等内联命令processMultibulkBuffer函数处理解析批量命令包括 get/set 等processCommandAndResetClient函数开始执行解析出来的命令字符串void processInputBuffer(client *c) {/* 当输入缓冲区中有东西时,继续处理 */while(c->qb_pos < sdslen(c->querybuf)) {.../* 确定请求类型 */if (!c->reqtype) {....}if (c->reqtype == PROTO_REQ_INLINE) {//处理 telnet 发送的 PING 等内联命令if (processInlineBuffer(c) != C_OK) break;....} else if (c->reqtype == PROTO_REQ_MULTIBULK) {//处理批量命令包括 get/set 等if (processMultibulkBuffer(c) != C_OK) break;} else {serverPanic("Unknown request type");}/* 批处理有可能长度小于等于 0,这个时候重置客户端*/if (c->argc == 0) {resetClient(c);} else {/* 如果我们处于输入/输出线程的上下文中,我们就不能在这里真正执行命令。我们所能做的就是将客户端标记为需要处理命令的客户端。 */if (c->flags & CLIENT_PENDING_READ) {c->flags |= CLIENT_PENDING_COMMAND;break;}/* 函数开始执行解析出来的命令字符串 */if (processCommandAndResetClient(c) == C_ERR) {return;}}}...}
到了这里其实就是很轻松了,我们就关注一下
processCommandAndResetClient函数的逻辑,看看他是如何执行解析出来的字符串命令。它的核心逻辑实际上就是调用processCommand函数对命令进行处理。
int processCommandAndResetClient(client *c) {...if (processCommand(c) == C_OK) {commandProcessed(c);}...}
processCommand函数逻辑大概如下:
- 调用
lookupCommand函数将客户端传入的命令名称入参,从服务端启动过程中存储的命令列表中找到对应的redisCommand - 如果 redis 以 cluster 集群模式启动,需要调用
getNodeByQuery函数找到 key 所在的 slot 是否应该交给当前服务端处理,否则调用clusterRedirectClient函数告诉客户端应该去找哪个服务端 - 异常情况的检测,包括是否超过内存限制、是否合法命令等
- 如果是事务执行命令, 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外其他命令都会被函数
queueMultiCommand函数入队到事务队列中处理 - 如果是常规命令,调用
call函数执行命令
int processCommand(client *c) {...c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);...if (server.cluster_enabled && !(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) &&!(!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)){int hashslot;int error_code;clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);if (n == NULL || n != server.cluster->myself) {if (c->cmd->proc == execCommand) {discardTransaction(c);} else {flagTransaction(c);}clusterRedirectClient(c,n,hashslot,error_code);c->cmd->rejected_calls++;return C_OK;}}..../*如果是事务执行命令, 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外其他命令都会被函数 queueMultiCommand() 入队到事务队列中处理 */if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand && c->cmd->proc != resetCommand){queueMultiCommand(c);addReply(c,shared.queued);} else {//如果是常规命令,调用 call() 函数执行命令call(c,CMD_CALL_FULL);c->woff = server.master_repl_offset;if (listLength(server.ready_keys))handleClientsBlockedOnKeys();}...}
7.常规命令的处理流程
我们只分析正常的命令处理流程即可,所以直接看正常命令的处理call()。这个方法过于巨大,大家忍一下,我只截取核心逻辑分析。
- 首先是根据命令调用对应的处理方法
然后做一些持久化相关的操作【aof】和 【slave】
void call(client *c, int flags) {...//调用客户端命令对应的 redisCommand 的处理方法,以 set 命令为例,会调用到 t_string.c#setCommand()方法c->cmd->proc(c);...if (flags & CMD_CALL_PROPAGATE &&if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))//同步数据到 AOF 文件和 slave节点。//该函数会根据服务端相关配置决定同步策略,// 其中调用 feedAppendOnlyFile() 函数会同步命令到AOF文件,// replicationFeedSlaves() 同步命令到 Slave 节点propagate(c->cmd, c->db->id, c->argv, c->argc, propagate_flags);}...}
8.string类型的set操作
看一下string数据类型的setCommand函数。我先把源码的方法注释拷贝过来。
/* SET key value [NX] [XX] [KEEPTTL] [GET] [EX <seconds>] [PX <milliseconds>] */
这里也能看出redis的string类型的使用。接下来看核心逻辑:
void setCommand(client *c) {....//将客户端传输过来的需保存的字符串对象尝试进行编码,以节约内存c->argv[2] = tryObjectEncoding(c->argv[2]);//将 key-value 保存到数据库中,接下来重点看这里setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);....}
我们放开断点,往下追踪这个方法:
- 命令检查
- 通过
genericSetKey添加到数据库 把服务端的响应写入到缓冲区,发送给客户端
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {//将数据添加到 redisDb 中genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1);server.dirty++;//将key和过期时间保存在redis数据库的过期字典中if (expire) setExpire(c,c->db,key,mstime()+milliseconds);notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);if (!(flags & OBJ_SET_GET)) {//这个函数会把服务端对客户端的响应写入到缓冲区,发送给客户端addReply(c, ok_reply ? ok_reply : shared.ok);}}
我们来看具体的添加逻辑:
void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {//查找 redis 数据库中是否存在指定的 key,因为 redis 的数据库结构可以看成是 HashMap,故其查找方式与 Java 中 HashMap 实现的方式相同。if (lookupKeyWrite(db,key) == NULL) {dbAdd(db,key,val);} else {dbOverwrite(db,key,val);}...}
9.总结
至此,一个客户端的一个set命令,服务端的处理流程我们就分析完了。接下来做一个梳理总结。
- 首先在服务端创建好事件循环处理器启动后,就可以开始接收客户端的请求
- 当客户端连接请求到来时,会在服务端的事件轮询时轮询出可读事件进行处理,这样就会触发启动过程中设置的处理函数
- 事件处理函数会去创建客户端对象
- 根据客户端对象发送的事件类型,选择不同的方法执行
- 读取客户端缓冲区的数据
- 对数据进行处理转换成字符串命令
- 根据命令类型的不同调用不同的方法
- 一个最简单的set操作,就会去调用t_string.c的
setCommand方法 - 把数据写入到数据库,然后将相应数据写入到客户端缓冲区,发送给客户端。

