客户端发起的一个set操作,服务端会如何处理呢?
1.回顾
上一篇在讲到initServer
函数的时候,里面有一段逻辑如下:
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler, NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.ipfd file event.");
}
这里调用了aeCreateFileEvent
函数,将读请求和读请求的处理函数acceptTcpHandler
进行绑定,这样也就确定了服务端对于客户端请求的处理函数acceptTcpHandler
。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData) {
...
//因为我们在上层传入的是AE_READABLE ,所以会走这里的逻辑,
// 将函数 acceptTcpHandler() 赋给了文件事件的读处理指针 rfileProc
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
....
}
2.接收请求
服务端启动完成后就可以接受客户端的连接了,当客户端连接请求到来时,会在服务端的事件轮询时轮询出可读事件进行处理,这样就会触发启动过程中设置的处理函数 networking.c里面的acceptTcpHandler
函数,这个函数会调用 connCreateAcceptedSocket
函数创建客户端与服务端之间的 Socket
, acceptCommonHandler
函数会对新建的 Socket 进行处理。
我们先来看一下上一篇最后提到的aeProcessEvents
函数,这个函数会去轮训等待事件的到达,在这里我们暂时只需要关注读事件的处理即可。可能有的读者朋友会有疑问,为什么要这样跳着看呢?这就像是你来到一家新公司刚开始做需求,你不可能为了了解之前的需求把所有的代码都看了,这不现实,你只需要先了解和你本次需求相关的代码,随着你做的需求越来越多,自然而然就对某一个业务领域的代码都很熟悉了。
言归正传,fe->rfileProc(eventLoop, fd, fe->clientData, mask)
这一行就调用了我们前面讲到的设置好的读请求处理函数acceptTcpHandler
。
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
....
//等待所监听文件描述符上有事件发生。
numevents = aeApiPoll(eventLoop, tvp);
//处理触发的各个事件,包括网络事件和超时事件
for (j = 0; j < numevents; j++) {
// 获取就绪文件事件的地址
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
....
if (!invert && fe->mask & mask & AE_READABLE) { //处理读事件
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
....
}
接下来我们来看一下acceptTcpHandler
函数的逻辑:可以看到这里的逻辑就是循环接收到的事件数,然后调用connCreateAcceptedSocket
函数创建服务端与客户端的tcp连接,拿到连接以后就可以通过acceptCommonHandler
函数对请求进行处理。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
....
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
...
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
3.创建客户端对象
acceptCommonHandler
函数会调用 createClient
函数新建一个 client 对象作为客户端的引用,以便维护服务端与客户端之间的连接。
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
...
if ((c = createClient(conn)) == NULL) {
//此处省略错误处理逻辑
}
...
}
接下来我们继续深究一下createClient
函数,这个方法看着内容很多,但是实际上的逻辑并不复杂。
这个方法主要做的几件事包括:
- 如果已经成功建立了连接,就给连接设置读处理器
readQueryFromClient
函数 - 通过
selectDb
将客户端要操作的数据库默认设置为下标为 0 的数据库 - 设置一些其他属性为client对象赋能
- 通过
listCreate
把客户端连接通过尾插保存到服务端维护的客户端连接列表client *createClient(connection *conn) {
...
if (conn) { //如果已经成功建立了连接
....
//设置该连接的读处理器
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
selectDb(c,0);
//接下来省略设置一些其他属性为client对象赋能的代码
}
4.根据客户端发送事件的类型,调用不同的函数
connection.c
里面的connSocketEventHandler
函数将在客户端发送命令后,服务端轮询到可读事件时触发(具体的绑定是在创建Socket连接的时候实现的绑定)。其逻辑是根据事件类型触发不同的处理函数,则对于读事件将触发连接上的读处理函数 readQueryFromClient
函数(具体的绑定是在创建Socket连接的时候实现的绑定)。
static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask){
...
int call_write = (mask & AE_WRITABLE) && conn->write_handler;
int call_read = (mask & AE_READABLE) && conn->read_handler;
/* 处理正常的输入/输出流 */
if (!invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
/* 激发可写事件。 */
if (call_write) {
if (!callHandler(conn, conn->write_handler)) return;
}
...
}
5.读取客户端缓冲区的数据
�现在我们回到networking.c
,看readQueryFromClient
函数。这个方法主要就是两件事:首先读取socket中的数据,如果成功读取到数据那就将读取到的数据通过processInputBuffer
方法转换成字符串命令。
void readQueryFromClient(connection *conn) {
...
nread = connRead(c->conn, c->querybuf+qblen, readlen);
...
processInputBuffer(c);
}
6.将数据转换成命令
读取到数据以后,接下来我们来看processInputBuffer
函数式如何把从socket读取到的数据转换成字符串命令的。
首先做的是确定请求的类型,接着解析socket数据成redis识别的命令,然后根据请求的类型调用不同的处理函数。
processInlineBuffer
函数处理解析 telnet 发送的 PING 等内联命令processMultibulkBuffer
函数处理解析批量命令包括 get/set 等processCommandAndResetClient
函数开始执行解析出来的命令字符串void processInputBuffer(client *c) {
/* 当输入缓冲区中有东西时,继续处理 */
while(c->qb_pos < sdslen(c->querybuf)) {
...
/* 确定请求类型 */
if (!c->reqtype) {
....
}
if (c->reqtype == PROTO_REQ_INLINE) {
//处理 telnet 发送的 PING 等内联命令
if (processInlineBuffer(c) != C_OK) break;
....
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
//处理批量命令包括 get/set 等
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
/* 批处理有可能长度小于等于 0,这个时候重置客户端*/
if (c->argc == 0) {
resetClient(c);
} else {
/* 如果我们处于输入/输出线程的上下文中,我们就不能在这里真正执行命令。我们所能做的就是将客户端标记为需要处理命令的客户端。 */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
/* 函数开始执行解析出来的命令字符串 */
if (processCommandAndResetClient(c) == C_ERR) {
return;
}
}
}
...
}
到了这里其实就是很轻松了,我们就关注一下
processCommandAndResetClient
函数的逻辑,看看他是如何执行解析出来的字符串命令。它的核心逻辑实际上就是调用processCommand
函数对命令进行处理。
int processCommandAndResetClient(client *c) {
...
if (processCommand(c) == C_OK) {
commandProcessed(c);
}
...
}
processCommand
函数逻辑大概如下:
- 调用
lookupCommand
函数将客户端传入的命令名称入参,从服务端启动过程中存储的命令列表中找到对应的redisCommand
- 如果 redis 以 cluster 集群模式启动,需要调用
getNodeByQuery
函数找到 key 所在的 slot 是否应该交给当前服务端处理,否则调用clusterRedirectClient
函数告诉客户端应该去找哪个服务端 - 异常情况的检测,包括是否超过内存限制、是否合法命令等
- 如果是事务执行命令, 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外其他命令都会被函数
queueMultiCommand
函数入队到事务队列中处理 - 如果是常规命令,调用
call
函数执行命令
int processCommand(client *c) {
...
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
...
if (server.cluster_enabled && !(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) &&!(!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)){
int hashslot;
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
clusterRedirectClient(c,n,hashslot,error_code);
c->cmd->rejected_calls++;
return C_OK;
}
}
....
/*如果是事务执行命令, 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外其他命令都会被函数 queueMultiCommand() 入队到事务队列中处理 */
if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand && c->cmd->proc != resetCommand){
queueMultiCommand(c);
addReply(c,shared.queued);
} else {//如果是常规命令,调用 call() 函数执行命令
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
...
}
7.常规命令的处理流程
我们只分析正常的命令处理流程即可,所以直接看正常命令的处理call()
。这个方法过于巨大,大家忍一下,我只截取核心逻辑分析。
- 首先是根据命令调用对应的处理方法
然后做一些持久化相关的操作【aof】和 【slave】
void call(client *c, int flags) {
...
//调用客户端命令对应的 redisCommand 的处理方法,以 set 命令为例,会调用到 t_string.c#setCommand()方法
c->cmd->proc(c);
...
if (flags & CMD_CALL_PROPAGATE &&
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
//同步数据到 AOF 文件和 slave节点。
//该函数会根据服务端相关配置决定同步策略,
// 其中调用 feedAppendOnlyFile() 函数会同步命令到AOF文件,
// replicationFeedSlaves() 同步命令到 Slave 节点
propagate(c->cmd, c->db->id, c->argv, c->argc, propagate_flags);
}
...
}
8.string类型的set操作
看一下string数据类型的setCommand
函数。我先把源码的方法注释拷贝过来。
/* SET key value [NX] [XX] [KEEPTTL] [GET] [EX <seconds>] [PX <milliseconds>] */
这里也能看出redis的string类型的使用。接下来看核心逻辑:
void setCommand(client *c) {
....
//将客户端传输过来的需保存的字符串对象尝试进行编码,以节约内存
c->argv[2] = tryObjectEncoding(c->argv[2]);
//将 key-value 保存到数据库中,接下来重点看这里
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
....
}
我们放开断点,往下追踪这个方法:
- 命令检查
- 通过
genericSetKey
添加到数据库 把服务端的响应写入到缓冲区,发送给客户端
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
//将数据添加到 redisDb 中
genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1);
server.dirty++;
//将key和过期时间保存在redis数据库的过期字典中
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id);
if (!(flags & OBJ_SET_GET)) {
//这个函数会把服务端对客户端的响应写入到缓冲区,发送给客户端
addReply(c, ok_reply ? ok_reply : shared.ok);
}
}
我们来看具体的添加逻辑:
void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
//查找 redis 数据库中是否存在指定的 key,因为 redis 的数据库结构可以看成是 HashMap,故其查找方式与 Java 中 HashMap 实现的方式相同。
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
}
...
}
9.总结
至此,一个客户端的一个set命令,服务端的处理流程我们就分析完了。接下来做一个梳理总结。
- 首先在服务端创建好事件循环处理器启动后,就可以开始接收客户端的请求
- 当客户端连接请求到来时,会在服务端的事件轮询时轮询出可读事件进行处理,这样就会触发启动过程中设置的处理函数
- 事件处理函数会去创建客户端对象
- 根据客户端对象发送的事件类型,选择不同的方法执行
- 读取客户端缓冲区的数据
- 对数据进行处理转换成字符串命令
- 根据命令类型的不同调用不同的方法
- 一个最简单的set操作,就会去调用t_string.c的
setCommand
方法 - 把数据写入到数据库,然后将相应数据写入到客户端缓冲区,发送给客户端。