Redis因为是纯内存操作,所以瓶颈在于内存和网络带宽,而不是cpu。Redis6.0版本引入的多线程也没有多么神奇,从Redis本身来讲,读写网络的 read / write 系统函数几乎占用了Redis的大部分cpu时间片,所以优化的核心还在于网络IO性能。Redis的优化方法是用多线程来处理同步IO的读写负载。
1.Redis多线程模型
前文说过老版本的Redis虽然是单Reactor单线程模型,但是他的多线程模型实际上却与传统的单Reactor多线程模型有一些区别。
- 传统的单Reactor多线程模型:把业务逻辑处理交给子线程
- Redis的多线程模型:把对网络数据的读写交给子线程来完成,业务逻辑依然是主线程来处理
2.初始化IO线程
前文在介绍redis服务端启动的时候说过,如果是在server.c的main函数。在main函数里面有一个方法InitServerLast
,这个方法最终会触发IO线程的初始化。
....
InitServerLast();
....
我们来看一下这个方法里面都做了什么:这里最主要的就是初始化了IO线程的数据结构。
void InitServerLast() {
...
//初始化 IO 线程的数据结构
initThreadedIO();
....
}
我们来继续来看下初始化IO线程数据结构的方法:
void initThreadedIO(void) {
server.io_threads_active = 0; /* 标志 IO 线程的激活状态,默认为 0 ,也就是非激活态 */
if (server.io_threads_num == 1) return;
/* 创建并初始化 I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* 【io_threads_list】数组元素为 list 列表,每个 IO 线程需要处理的 client 都放在数组对应下标的 list 中 */
io_threads_list[i] = listCreate(); //为每个 IO 线程创建任务列表
if (i == 0) continue; /* Thread 0 is the main thread. */
...
pthread_mutex_init(&io_threads_mutex[i],NULL); //初始化 IO 线程互斥对象
setIOPendingCount(i, 0);//初始化 IO 线程当前未处理的任务数量为 0
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
...
}
io_threads[i] = tid;
}
}
总结一下,这里面的主要流程:
- 判断用户是不是在配置文件指定了仅仅以单线程运行,如果是的话就不必创建额外的线程了
- 循环创建并初始化io线程
- 为每个 IO 线程创建任务列表
- 初始化 IO 线程互斥对象
- 初始化 IO 线程当前未处理的任务数量为 0
- 设置 IO 线程运行时处理的函数为
IOThreadMain
至此,我们分析完了redis在启动的时候对IO线程的创建和初始化,接下来我们看这些创建好的线程是如何启动并工作的。
3.启动IO线程
即使用户配置了使用多线程来处理任务,也不是在实际处理的时候就一定会用到多线程处理网络IO的,而是根据系统状态动态的进行判断。
在初始化IO线程阶段有一个变量server.io_threads_active
,他表示IO线程的激活状态,默认=0,没激活。通过在启动过程中断点跟踪这个变量就可以找到IO线程的启动位置。
我们在server.c的main函数里面有一个方法:beforeSleep
函数,这里面会启动IO线程。这个方法在每次事件处理之前被调用。
void beforeSleep(struct aeEventLoop *eventLoop) {
....
handleClientsWithPendingReadsUsingThreads();
....
handleClientsWithPendingWritesUsingThreads();
...
}
�这个方法有一点小长,不过我们仅仅需要关注核心逻辑。handleClientsWithPendingReadsUsingThreads
函数式使用IO线程处理等待读取数据的客户端,handleClientsWithPendingWritesUsingThreads
函数式使用IO线程处理等待响应的客户端。handleClientsWithPendingWritesUsingThreads
函数的流程比较核心,我们来分析一下:
/*使用IO线程处理等待响应的客户端*/
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);//clients_pending_write:server文件的全局数组:存放等待响应的客户端。
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
if (!server.io_threads_active) startThreadedIO();
while((ln = listNext(&li))) {
...
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
io_threads_op = IO_THREADS_OP_WRITE;
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
//这部分逻辑主要靠io_threads_pending 数组记录
//根据每个 IO 线程待处理的 client 数量来判断,
// 如果各个 IO 线程待处理的 client 数量相加为 0,则任务处理完毕,主线程跳出循环
if (pending == 0) break;
}
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (clientHasPendingReplies(c) && connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR){
freeClientAsync(c);
}
}
...
}
这个方法也不小,我们来梳理一下他都做了什么:
- 判断是否有客户端在等待,如果没有则说明不需要使用io线程处理。
- 如果用户没有配置多线程 IO 或者系统动态判断当前不需要使用多线程 IO,则直接处理请求
- 将待响应的客户端根据io线程数取模,分配到各个 IO 线程的任务列表
- 设置 io_threads_op 全局 IO 操作标志为 IO_THREADS_OP_WRITE,则 IO 线程都处理写任务
- 主线程空循环,等待 IO 线程处理任务完毕。
- 如果还有待处理的客户端则继续处理,处理完毕清空存放等待响应的客户端数组
我们再来看一下startThreadedIO
方法是如何开启IO线程的。首先是循环将所有IO线程的互斥锁解锁,还记的上面我们分析初始化IO线程的时候说过IO线程刚创建好是加了互斥锁的。解开了所有线程的互斥锁以后,将全局变量io_threads_active
设置为1,表示IO线程已经激活。
void startThreadedIO(void) {
for (int j = 1; j < server.io_threads_num; j++)
//将创建 IO 线程时锁住的互斥对象解锁,也就是使 IO 线程得以运行
pthread_mutex_unlock(&io_threads_mutex[j]);
server.io_threads_active = 1;
}
4.IO线程处理读任务
在前面介绍Redis服务端启动的文章中曾经介绍过IO 线程处理网络读取的方法readQueryFromClient
,这个方法主要是作用就是将客户端传过来的数据转换成字符串形式的命令和参数,其和多线程 IO 相关的部分为调用networking.c
文件里面的postponeClientRead
函数将客户端放入等待队列中。如果客户端成功入队,则 readQueryFromClient
函数不再继续执行,直接 return 了。
void readQueryFromClient(connection *conn) {
if (postponeClientRead(c)) return;
...
}
我们再来看下客户端入队的逻辑:经过一些列判断以后,将客户端的标志位设置成CLIENT_PENDING_READ,并将其加入到等待IO线程处理的挂起客户端队列。具体的判断包括全局变量IO线程的激活状态必须是1,全局变量待处理的客户端数组不为空,并且客户端的标志位 flags 不能是 CLIENT_MASTER、CLIENT_SLAVE、CLIENT_PENDING_READ。
int postponeClientRead(client *c) {
if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))){
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
至此客户端任务已经入队,其处理则由beforeSleep
函数调用。
接下来看handleClientsWithPendingReadsUsingThreads
使用IO线程处理等待读取数据的客户端,这里的处理与多线程 IO 写数据响应客户端是差不多的。
- 判断 IO 线程激活状态和用户配置是否允许使用 IO线程处理读数据操作
- 判断server.clients_pending_read数组中待处理的客户端数量是否为 0
- 将待响应的客户端根据io线程数取模,分配到各个 IO 线程的任务列表
- 将全局的线程操作标志 io_threads_op 设置为IO_THREADS_OP_READ,也就是 IO 线程都处理读任务,并更新io_threads_pending 数组
- 主线程自己也处理 IO 读任务,完成后开启空循环等待 IO 线程处理任务完毕
再次处理server.clients_pending_read数组中的客户端,如果客户端 flags 标志位为 CLIENT_PENDING_COMMAND(也就是 IO 线程把客户端的命令及参数解析完成),则调用
processCommandAndResetClient
函数直接执行命令,否则调用processInputBuffer
函数继续解析客户端的命令int handleClientsWithPendingReadsUsingThreads(void) {
//...此处省略判断逻辑
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
while(listLength(server.clients_pending_read)) {
...
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
continue;
}
}
processInputBuffer(c);
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
clientInstallWriteHandler(c);
}
...
}
IO 线程处理读写的核心在其初始化时设置的
IOThreadMain
函数,主要流程如下:开启空循环扫描 io_threads_pending 数组
- 如果找到属于当前线程的那个下标在数组中的值不为 0 则跳出扫描
- 再次检查当前线程待处理客户端的数量,如果为 0 ,则当前线程停止运行
- 从 io_threads_list 列表数组中取出当前线程待处理的 client 的列表
- 根据 io_threads_op 全局标志位决定对这些 client 做对应的处理
- IO_THREADS_OP_READ 读操作则调用
readQueryFromClient
函数继续处理
- IO_THREADS_OP_READ 读操作则调用
- 清空 io_threads_list 列表数组中当前线程待处理的 client 的列表
- 将 io_threads_pending 对应下标值置为 0,主线程利用该数组即可知道 IO 线程是否执行完所有读写任务
在这里再次调用函数void *IOThreadMain(void *myid) {
...
while(1) {
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}
if (getIOPendingCount(id) == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
//从 io_threads_list 列表数组中取出当前线程待处理的 client 的列表
listRewind(io_threads_list[id],&li);
//根据 io_threads_op 全局标志位决定对这些 client 做对应的处理
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
setIOPendingCount(id, 0);
}
}
readQueryFromClient
函数来处理客户端的请求,不过这个时候不会在入队了,因为在客户端第一次入队的时候,flags 已经被打标成CLIENT_PENDING_READ
,所以可以往下继续执行,一直到执行函数processInputBuffer
函数。processInputBuffer
函数已经分析过了,不再重复说明。
但是当 IO 线程将客户端的命令解析完毕后不会立即执行,因为客户端的 flags 是 CLIENT_PENDING_READ
,此处的处理只是将客户端的 flags 标志位更新为 CLIENT_PENDING_COMMAND
并返回,命令的执行由主线程完成。也就是processCommandAndResetClient
方法。这个方法上面也已经分析过了,在此不再分析。
5.总结
至此,整个Redis多线程模型处理请求的流程我们就已经分析完了,现在带大家来做一个整体的回顾:
- 在Redis启动的时候会去初始化IO线程
- 判断用户是不是在配置文件指定了仅仅以单线程运行,如果是的话就不必创建额外的线程了
- 循环创建并初始化io线程
a. 为每个 IO 线程创建任务列表
b. 初始化 IO 线程互斥对象
c. 初始化 IO 线程当前未处理的任务数量为 0
d. 设置 IO 线程运行时处理的函数为 IOThreadMain()
- 每次事件处理之前会尝试去启动IO线程
- 判断是否有客户端在等待,如果没有则说明不需要使用io线程处理。
- 如果用户没有配置多线程 IO 或者系统动态判断当前不需要使用多线程 IO,则直接处理请求
- 将待响应的客户端根据io线程数取模,分配到各个 IO 线程的任务列表
- 设置 io_threads_op 全局 IO 操作标志为 IO_THREADS_OP_WRITE,则 IO 线程都处理写任务
- 主线程空循环,等待 IO 线程处理任务完毕。
- 如果还有待处理的客户端则继续处理,处理完毕清空存放等待响应的客户端数组
对于IO 线程处理网络读取的方法readQueryFromClient
- 第一次进入这个方法的时候,将客户端传过来的数据转换成字符串形式的命令和参数,其和多线程 IO 相关的部分为调用 networking.c#postponeClientRead() 函数将客户端放入等待队列中。如果客户端成功入队,则 readQueryFromClient() 函数不再继续执行,直接 return 了。
2. 具体的客户端入队逻辑就是经过一些判断以后,将客户端的标志位设置成CLIENT_PENDING_READ,并将其加入到等待IO线程处理的挂起客户端队列。
3. 客户端入队以后,会被beforeSleep() 方法处理
4. 使用IO线程处理等待读取数据的客户端,这里的处理与多线程 IO 写数据响应客户端是差不多的
5. IO 线程处理读写的核心在其初始化时设置的 IOThreadMain() 函数
1. 开启空循环扫描 io_threads_pending 数组
2. 如果找到属于当前线程的那个下标在数组中的值不为 0 则跳出扫描
3. 再次检查当前线程待处理客户端的数量,如果为 0 ,则当前线程停止运行
4. 从 io_threads_list 列表数组中取出当前线程待处理的 client 的列表
5. 根据 io_threads_op 全局标志位决定对这些 client 做对应的处理
a. IO_THREADS_OP_READ 读操作则调用readQueryFromClient() 函数继续处理
6. 清空 io_threads_list 列表数组中当前线程待处理的 client 的列表
7. 将 io_threads_pending 对应下标值置为 0,主线程利用该数组即可知道 IO 线程是否执行完所有读写任务IO 线程将客户端的命令解析完毕后不会立即执行,因为客户端的 flags 是 CLIENT_PENDING_READ,此处的处理只是将客户端的 flags 标志位更新为 CLIENT_PENDING_COMMAND 并返回,命令的执行由主线程完成。
- 第一次进入这个方法的时候,将客户端传过来的数据转换成字符串形式的命令和参数,其和多线程 IO 相关的部分为调用 networking.c#postponeClientRead() 函数将客户端放入等待队列中。如果客户端成功入队,则 readQueryFromClient() 函数不再继续执行,直接 return 了。