源码阅读方式建议:
VScode打开Redis源码,结合黄健宏老师的《Redis设计与实现》,先理解概念,再回归代码。 在阅读到数据库的具体实现时,建议在Linux系统下编译并运行Redis,对其的理解将更为直观。源码阅读顺序:
- 阅读数据结构的源码
- 阅读内存编码数据结构实现
- 阅读数据类型的实现
- 阅读单机数据库相关代码
- 阅读客户端和服务器相关代码
- 阅读多机(Cluster)的实现代码
黄健宏老师的《Redis设计与实现》(第二版)
部分资料及图片摘抄自网上,并附有参考链接,侵删。
一、基本数据结构
学习Redis源码建议自底向上,从底层数据结构入手,一步一步感受Redis的设计之巧妙,源码之美妙。动态字符串SDS
源码文件:sds.h 和 sds.c。Redis 构建了一种名为简单动态字符串(simple dynamic string,SDS)的抽象类型,并用作默认字符串表示。而传统的C语言字符串在Redis中只用来作为字符串字面量(常量),如打印日志等等。
结构定义
先来看看SDS结构的定义:由此观之,Redis的SDS其实是基于C语言传统的字符串数组进行封装。一个SDS实例的示意图如下:
// 类型别名,用于指向 sdshdr 的 buf 属性
typedef char *sds; //保存字符串对象的结构
struct sdshdr {
// buf 中已占用空间的长度
int len;
// buf 中剩余可用空间的长度
int free;
// 数据空间
char buf[];
};
printf("%s", s->buf);
### 对比C字符串
接着我们思考一个问题:这么封装C字符串有什么好处呢?
其一、快速获取字符串长度。通过len属性可以在O(1)的时间复杂度下获取一个字符串的长度,而C字符串需要O(n)。
其二、杜绝缓冲区溢出。C字符串由于没有记录自身长度,很可能在一些操作(如strcat等)时,造成缓冲区溢出,见下图:
其三、**减少修改字符串时带来的内存重分配次数。对于一个包含了 N 个字符的C字符串来说,这个C字符串的底层实现总是一个N+1个字符的数组**(额外一个字符用于存空字符)。
如果执行拼接操作(append),那么在执行这个操作之前,程序需要先通过内存重分配来扩展底层数组的空间大小——如果忘了这一步就会产生缓冲区溢出。 如果执行截断操作(trim),那么在执行这个操作之后,程序需要通过内存重分配来释放字符串不再使用的那部分空间——如果忘了这一步就会产生内存泄漏。 内存重分配可能会造成系统调用,一般程序很少进行字符串长度修改,但是Redis作为数据库,会进行大量的字符串修改。为了避免这种缺陷,SDS通过未使用空间解除了字符串长度和底层数组长度之间的关联: 在SDS中,buf 数组的长度不一定就是字符数量加一,数组里面可以包含未使用的字节,而这些字节的数量就由 SDS 的 free 属性记录。通过未使用空间,SDS 实现了空间预分配和惰性空间释放两种优化策略。
其四、空间预分配策略。当API对SDS进行修改时,不仅会分配必要空间还会分配额外空间,具体策略如下:
当修改后SDS长度小于1M(默认最大长度),则额外分配len长度; 当大于1M,则额外分配1M; 这种预分配策略,将连续增长N 次字符串所需的内存重分配次数从必定 N 次降低为最多 N 次。其五、惰性空间释放。当SDS的API需要缩短SDS保存的字符串时, 程序并不立即使用内存重分配来回收缩短后多出来的字节,而是使用free属性将这些字节的数量记录起来,并等待将来使用。
其六、支持二进制。C字符串是以’\0’结尾的,这使得C字符串只能保存文本数据,而不能保存像图片、音频、视频、压缩文件这样的二进制数据。但是Redis需要、也具备这样的特点。
可以说正是诸如此类的小改进,造就了Redis的高效和稳定。总结一下:
接口API
挑几个API来看看源码实现。sdslen
请大家结合sds的结构体定义,先想想,为什么这个API可以获取SDS长度? 在结构体里面, char* buf 和char buf[1]的效果差不多(对齐的情况),占4个字节;char buf[0] 和char buf[]是一样的,不占内存。链接 所以(void)(s-(sizeof(struct sdshdr)))其实就将内存地址转移到了*SDS结构体地址,于是就可以通过这个地址直接访问其成员变量。
/* * 返回 sds 实际保存的字符串的长度 * * T = O(1) */
static inline size_t sdslen(const sds s) {
struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr)));
return sh->len;
}
sdsclear惰性删除策略
//以O(1)的时间完成字符串的“清空”
//只需要将终止符放在0即可
void sdsclear(sds s) {
// 取出 sdshdr
struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr)));
// 重新计算属性
sh->free += sh->len;
sh->len = 0;
// 将结束符放到最前面(相当于惰性地删除 buf 中的内容)
sh->buf[0] = '\0';
}
sdsMakeRoomFor
这个函数对sds的free进行扩充,2倍原大小或者加上1M的额外空间。
sds sdsMakeRoomFor(sds s, size_t addlen) {
struct sdshdr *sh, *newsh;
// 获取 s 目前的空余空间长度
size_t free = sdsavail(s);
size_t len, newlen;
// s 目前的空余空间已经足够,无须再进行扩展,直接返回
if (free >= addlen) return s;
// 获取 s 目前已占用空间的长度
len = sdslen(s);
sh = (void*) (s-(sizeof(struct sdshdr)));
// s 最少需要的长度
newlen = (len+addlen);
// 根据新长度,为 s 分配新空间所需的大小
if (newlen < SDS_MAX_PREALLOC)
// 如果新长度小于 SDS_MAX_PREALLOC
// 那么为它分配两倍于所需长度的空间
newlen *= 2;
else
// 否则,分配长度为目前长度加上 SDS_MAX_PREALLOC
newlen += SDS_MAX_PREALLOC;
// T = O(N)
newsh = zrealloc(sh, sizeof(struct sdshdr)+newlen+1);
// 内存不足,分配失败,返回
if (newsh == NULL) return NULL;
// 更新 sds 的空余长度
newsh->free = newlen - len;
// 返回 sds
return newsh->buf;
}
链表List
源码文件:adlist.h 和 adlist.c。Redis实现的是双端链表,其被广泛用于实现 Redis 的各种功能,比如列表键,发布与订阅等等。通过将链表的void value设置为不同的类型,Redis的链表可以用于保存各种不同类型的值。 ### 结构定义 ```cpp / 双端链表结构定义 / typedef struct list {
// 表头节点
listNode head;
// 表尾节点
listNode tail;
// 以下这些是函数指针,负责对应的功能
// 节点值复制函数
void (dup)(void ptr);
// 节点值释放函数
void (free)(void ptr);
// 节点值对比函数
int (match)(void ptr, void key);
// 链表所包含的节点数量
unsigned long len; } list; ``` 其中链表节点定义如下:
一个链表的实例如下图所示:
// 双端链表节点
typedef struct listNode {
// 前置节点
struct listNode *prev;
// 后置节点
struct listNode *next;
// 节点的值
void *value;
} listNode;
接口分析
来挑几个API看看其具体实现:listAddNodeHead:插入节点到头部
list *listAddNodeHead(list *list, void *value) {
listNode *node;
// 为节点分配内存
if ((node = zmalloc(sizeof(*node))) == NULL)
return NULL;
// 保存值指针
node->value = value;
// 添加节点到空链表
if (list->len == 0) {
list->head = list->tail = node;
node->prev = node->next = NULL;
}
// 添加节点到非空链表
else {
node->prev = NULL;
node->next = list->head;
list->head->prev = node;
list->head = node;
}
// 更新链表节点数
list->len++;
return list;
}
listGetIterator:产生一个迭代器
/* 为给定链表创建一个迭代器,
* 之后每次对这个迭代器调用 listNext 都返回被迭代到的链表节点
* direction 参数决定了迭代器的迭代方向:
* AL_START_HEAD :从表头向表尾迭代
* AL_START_TAIL :从表尾想表头迭代 */
listIter *listGetIterator(list *list, int direction) {
// 为迭代器分配内存
listIter *iter;
if ((iter = zmalloc(sizeof(*iter))) == NULL)
return NULL;
// 根据迭代方向,设置迭代器的起始节点
if (direction == AL_START_HEAD)
iter->next = list->head;
else
iter->next = list->tail;
// 记录迭代方向
iter->direction = direction;
return iter;
}
其中,
//双端链表迭代器
typedef struct listIter {
// 当前迭代到的节点
listNode *next;
// 迭代的方向(前向还是后向)
int direction;
} listIter;
listSearchKey:在链表中查找指定值的节点,其实就是利用迭代器遍历
listNode *listSearchKey(list *list, void *key) {
listIter *iter;
listNode *node;
// 迭代整个链表
iter = listGetIterator(list, AL_START_HEAD);
while((node = listNext(iter)) != NULL) {
// 对比
if (list->match) {
if (list->match(node->value, key)) {
listReleaseIterator(iter);
// 找到
return node;
}
} else {
if (key == node->value) {
listReleaseIterator(iter);
// 找到
return node;
}
}
}
listReleaseIterator(iter);
// 未找到
return NULL; }
- 话说list迭代器有什么用?
字典Dict
源码文件dict.h 和 dict.c。Redis中字典使用哈希表实现。同时,为了实现渐进式Rehash的操作,每一个字典都有两个hash表(新/旧)。
结构定义
Redis中字典使用哈希表实现。同时,为了实现渐进式Rehash的操作,每一个字典都有两个hash表(新/旧)。其中的hash表定义如下:
// 字典定义
typedef struct dict {
// 类型特定函数
// 这一指针指向的结构体中存储了hash表常用的函数指针
dictType *type;
// 私有数据:保存了需要传给那些类型特定函数的可选参数(见后文)
void *privdata;
// 哈希表
dictht ht[2];
// 该值表示rehash进行到的下标索引位置
// 当 rehash 不在进行时,值为 -1
// 开始时值为0
// 正在进行中时,值处于0到size之间
int rehashidx;
int iterators;
// 目前正在运行的安全迭代器的数量
} dict;
//每个字典都使用两个哈希表,从而实现渐进式 rehash 。
typedef struct dictht {
// 哈希表数组(存储的是指向节点指针数组的指针)
// 都使用指针的方式可以节省空间
dictEntry **table;
// 哈希表大小
unsigned long size;
// 哈希表大小掩码,用于计算索引值
// 总是等于 size - 1
unsigned long sizemask;
// 该哈希表已有节点的数量
unsigned long used;
} dictht;
hash表的节点(Entry)定义如下,Redis是采用开链法来处理hash冲突的:
一个hash表的实例见下:
//哈希表节点
typedef struct dictEntry {
// 键
void *key;
// 值
union {
void *val;
uint64_t u64;
int64_t s64;
} v;
// 指向下个哈希表节点,形成链表
struct dictEntry *next;
} dictEntry;
PS:在redis的hash表结构中,如果出现hash冲突,新的key-value是在旧的key-value前面的,这么做也很好理解:如果插入到链表最后,那么还需要一个遍历链表的操作,O(N)的复杂度。一个普通状态下的字典实例见下:
接口分析
dictAddRaw:新增键值对
在其中有个函数:dictkeyindex()——计算该字典中可以插入该键值对的index,如果标志符号**dict_can_resize**为正,那么会在hash表的size和used比率大于1(即负载因子)时(没有执行BGSAVE)进行rehash或者大于5(执行BGSAVE)时进行强制rehash。 计算hash值时是这样的:
dictEntry *dictAddRaw(dict *d, void *key)
{
int index;
dictEntry *entry;
dictht *ht;
// 如果条件允许的话,进行单步 rehash
// T = O(1)
if (dictIsRehashing(d)) _dictRehashStep(d);
// 计算键在哈希表中的索引值
// 如果值为 -1 ,那么表示键已经存在
if ((index = _dictKeyIndex(d, key)) == -1)
return NULL;
// 如果字典正在 rehash ,那么将新键添加到 1 号哈希表
// 否则,将新键添加到 0 号哈希表
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
// 为新节点分配空间
entry = zmalloc(sizeof(*entry));
// 将新节点插入到链表表头
entry->next = ht->table[index];
ht->table[index] = entry;
// 更新哈希表已使用节点数量
ht->used++;
// 设置新节点的键
// T = O(1)
dictSetKey(d, entry, key);
return entry;
}
index = hash & dict->ht[0].sizemask
即是利用计算出的hash值跟sizemask相与,这个hash算法被称为MurmurHash2(目前有3了,但是Redis没用),这种算法的优点在于,即使输入的键是有规律的,算法仍能给出一个很好的随机分布性,并且算法的计算速度也非常快。
为什么负载因子一个是1一个是5?
根据 BGSAVE 命令或 BGREWRITEAOF 命令是否正在执行,服务器执行扩展操作所需的负载因子并不相同。 这是因为在执行BGSAVE命令或BGREWRITEAOF 命令的过程中,Redis 需要创建当前服务器进程的子进程, 而大多数操作系统都采用写时复制(copy-on-write)技术来优化子进程的使用效率,所以在子进程存在期间,服务器会提高执行扩展操作所需的负载因子,从而尽可能地避免在子进程存在期间进行哈希表扩展操作,这可以避免不必要的内存写入操作,最大限度地节约内存。
// 指示字典是否启用 rehash 的标识
static int dict_can_resize = 1;
// 强制 rehash 的比率(强制不可被上面的标志所阻止)
static unsigned int dict_force_resize_ratio = 5;
Rehash的具体操作见接下来的这个API。
通过上述源码我们可以得出:
int dictRehash(dict *d, int n) {
// 只可以在 rehash 进行中时执行
if (!dictIsRehashing(d)) return 0;
// 进行 N 步迁移
// T = O(N)
while(n--) {
dictEntry *de, *nextde;
// 如果 0 号哈希表为空,那么表示 rehash 执行完毕
// T = O(1)
if (d->ht[0].used == 0) {
// 释放 0 号哈希表
zfree(d->ht[0].table);
// 将原来的 1 号哈希表设置为新的 0 号哈希表
d->ht[0] = d->ht[1];
// 重置旧的 1 号哈希表
_dictReset(&d->ht[1]);
// 关闭 rehash 标识
d->rehashidx = -1;
// 返回 0 ,向调用者表示 rehash 已经完成
return 0;
}
// 确保 rehashidx 没有越界
assert(d->ht[0].size > (unsigned)d->rehashidx);
// 略过数组中为空的索引,找到下一个非空索引
// hash表的entry中第一个就是void* key,所以可以直接访问其是否为空
// 来判断该出是否存在键值对
while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;
// 指向该索引的链表表头节点
de = d->ht[0].table[d->rehashidx];
// 将链表中的所有节点迁移到新哈希表
// T = O(1)
while(de) {
unsigned int h;
// 保存下个节点的指针
nextde = de->next;
/* Get the index in the new hash table */
// 计算新哈希表的哈希值,以及节点插入的索引位置
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
// 插入节点到新哈希表
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
// 更新计数器
d->ht[0].used--;
d->ht[1].used++;
// 继续处理下个节点
de = nextde;
}
// 将刚迁移完的哈希表索引的指针设为空
d->ht[0].table[d->rehashidx] = NULL;
// 更新 rehash 索引
d->rehashidx++;
}
return 1;
}
- Redis的hash表采用渐进式分步hash的方法
- rehash的过程中字典的两个hash表同时存在,并且在迭代、更新、删除键的时候都需要考虑这两个hash表,值得注意的是:字典的删除(delete)、查找(find)、更新(update)等操作会在两个哈希表上进行,但插入只会在第二个表中,rehash完毕之后会重置第一个表。
- 在rehash时,如果扩展操作,那么ht[1]的大小为第一个大于等于 ht[0].used*2的2^n;如果收缩操作(负载因子小于0.1),那么ht[1]的大小为第一个大于等于ht[0].used的2^n。
- rehsh时,相关API会经常进行单步rehash,数据库操作主要是调用int dictRehashMilliseconds(dict *d, int ms),即在指定时间内执行rehash,时间到了就返回已迭代到的index,这种方式能够尽可能利用好CPU的执行时间。
迭代器
跟链表一样,Redis字典中也存在迭代器,主要是为了实现遍历一个字典。 其定义如下:安全迭代器
dictIterator *dictGetSafeIterator(dict *d) {
dictIterator *i = dictGetIterator(d);
// 设置安全迭代器标识
i->safe = 1;
return i;
}
非安全迭代器
两种迭代器都共用同一个函数接口:
dictIterator *dictGetIterator(dict *d)
{
dictIterator *iter = zmalloc(sizeof(*iter));
iter->d = d;
iter->table = 0;
iter->index = -1;
iter->safe = 0;
iter->entry = NULL;
iter->nextEntry = NULL;
return iter;
}
<font style="color:#333333;">dictEntry *dictNext(dictIterator *iter)</font>
该接口返回迭代器指向的当前节点。
如果是安全迭代器调用该函数,会更新字典的iterator计数器(安全迭代器);
如果是非安全迭代器调用该函数,会计算此时字典的fingerprint,以确定用户没有违规操作。有关迭代器的重点函数是:dictScan()。详见源码注释。
跳跃表skiplist
源码文件:t_zset.c 中所有以 zsl 开头的函数。Redis在两个地方用到了跳跃表,一个是实现有序集合,另一个是在集群节点中用作内部数据结构(后文会将,见Redis多机数据库)。 跳跃表是一种可以对有序链表进行近似二分查找的数据结构,它通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的目的。 跳跃表支持平均O(logN) 、最坏O(N)复杂度的节点查找,还可以通过顺序性操作来批量处理节点。 在大部分情况下,跳跃表的效率可以和平衡树相媲美,并且因为跳跃表的实现比平衡树要来得更为简单,所以有不少程序都使用跳跃表来代替平衡树。 通过下面这个图,来看看跳表这个数据结构:
其中的跳跃表节点定义如下:
typedef struct zskiplist {
// 表头节点和表尾节点
struct zskiplistNode *header, *tail;
// 表中节点的数量
unsigned long length;
// 表中层数最大的节点的层数
int level;
} zskiplist;
跳表每个节点都具有多个层标记,每一个层标记带有两个属性:前进指针和跨度,前进指针表示访问其之后(score比它大)的其他节点,跨度表示当前节点跟后续节点的距离。跳跃表 level 层级完全是随机的(插入节点时会随机生成1-32的数字)。一般来说,层级越多,访问节点的速度越快。 在跳跃表中,节点按各自所保存的分值从小到大排列。 obj是指向一个字符串对象,而字符串对象则保存着一个SDS值。
typedef struct zskiplistNode {
// 成员对象(存储的对象类型,见下)
robj *obj;
// 分值(按照这个大小,升序排列)
double score;
// 后退指针(用作链表的倒序遍历)
struct zskiplistNode *backward;
// 层
struct zskiplistLevel {
// 前进指针
struct zskiplistNode *forward;
// 跨度
unsigned int span;
} level[];
} zskiplistNode;
注意:在同一个跳跃表中,各个节点保存的成员对象必须是唯一的,但是多个节点保存的分值却可以是相同的:分值相同的节点将按照成员对象的大小(见后)来进行排序,成员对象较小的节点会排在前面(靠近表头的方向),而成员对象较大的节点则会排在后面(靠近表尾的方向)。其中的Redis成员对象定义如下:
typedef struct redisObject {
// 类型(日后会说)
unsigned type:4;
// 编码(日后会说)
unsigned encoding:4;
// 对象最后一次被访问的时间(日后会说)
unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */
// 引用计数(日后会说)
int refcount;
// 指向实际值的指针
void *ptr;
} robj;
注:成员对象的有关知识,参考Redis源码阅读(三)。
查找、删除、添加
我们以一个实际例子,来展现在跳跃表中,究竟是以何种方式进行节点的查找、删除和添加的。- 添加节点
- 删除操作呢?
接口分析
其中,Redis中的跳表最大层级及概率定义如下:
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
创建新跳跃表如下:
从上诉代码可以看出,跳跃表的表头节点跟其他的跳跃表节点一样,只不过忽略了BW指针和分值、成员对象等信息。
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
// 分配空间
zsl = zmalloc(sizeof(*zsl));
// 设置高度和起始层数
zsl->level = 1;
zsl->length = 0;
// 初始化表头节点
// T = O(1)
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
// 设置表尾
zsl->tail = NULL;
return zsl;
}
创建新节点如下(请仔细思考源码的实现):
其中计算节点随机level的函数如下:
/*
* 创建一个成员为 obj ,分值为 score 的新节点,
* 并将这个新节点插入到跳跃表 zsl 中。
* 函数的返回值为新节点。
* T_wrost = O(N^2), T_avg = O(N log N)
*/
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
redisAssert(!isnan(score));
// 在各个层查找节点的插入位置
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
// 如果 i 不是 zsl->level-1 层
// 那么 i 层的起始 rank 值为 i+1 层的 rank 值
// 各个层的 rank 值一层层累积
// 最终 rank[0] 的值加一就是新节点的前置节点的排位
// rank[0] 会在后面成为计算 span 值和 rank 值的基础
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 沿着前进指针遍历跳跃表
// T_wrost = O(N^2), T_avg = O(N log N)
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
// 比对分值
(x->level[i].forward->score == score &&
// 比对成员, T = O(N)
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
// 记录沿途跨越了多少个节点
rank[i] += x->level[i].span;
// 移动至下一指针
x = x->level[i].forward;
}
// 记录将要和新节点相连接的节点
update[i] = x;
}
/*
// zslInsert() 的调用者会确保同分值且同成员的元素不会出现,
* 所以这里不需要进一步进行检查,可以直接创建新元素。
*/
// 获取一个随机值作为新节点的层数level
// T = O(N)
level = zslRandomLevel();
// 如果新节点的层数比表中其他节点的层数都要大
// 那么初始化表头节点中未使用的层,并将它们记录到 update 数组中
// 将来也指向新节点
if (level > zsl->level) {
// 初始化未使用层
// T = O(1)
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
// 更新表中节点最大层数
zsl->level = level;
}
// 创建新节点
x = zslCreateNode(level,score,obj);
// 将前面记录的指针指向新节点,并做相应的设置
// T = O(1)
for (i = 0; i < level; i++) {
// 设置新节点的 forward 指针
x->level[i].forward = update[i]->level[i].forward;
// 将沿途记录的各个节点的 forward 指针指向新节点
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
// 计算新节点跨越的节点数量
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
// 更新新节点插入之后,沿途节点的 span 值
// 其中的 +1 计算的是新节点
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
// 未接触的节点的 span 值也需要增一,这些节点直接从表头指向新节点
// T = O(1)
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
// 设置新节点的后退指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
// 跳跃表的节点计数增一
zsl->length++;
return x;
}
返回值介乎 1 和 ZSKIPLIST_MAXLEVEL 之间(包含ZSKIPLIST_MAXLEVEL),根据随机算法所使用的幂次定律,越大的值生成的几率越小。
int zslRandomLevel(void) {
int level = 1;
//ZSKIPLIST为0.25
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
补充:有序集合
Redis的有序集合(Zset)数据结构是结合字典跟跳跃表实现的,其定义如下:ZSET同时使用两种数据结构来持有同一个元素,从而提供 O(log(N)) 复杂度的有序数据结构的插入和移除操作。哈希表将 Redis 对象映射到分值上。而跳跃表则将分值映射到 Redis 对象上,以跳跃表的视角来看,可以说 Redis 对象是根据分值来排序的。更详细的介绍参见第三章。
/*
* 有序集合
*/
typedef struct zset {
// 字典,键为成员,值为分值
// 用于支持 O(1) 复杂度的按成员取分值操作
dict *dict;
// 跳跃表,按分值排序成员
// 用于支持平均复杂度为 O(log N) 的按分值定位成员操作
// 以及范围操作
zskiplist *zsl;
} zset;
HyperLogLog
源码文件:hyperloglog.c。Redis 在 2.8.9 版本添加了 HyperLogLog 结构。HyperLogLog 是用来做基数统计的算法,优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。 在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。 但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
什么是基数? 比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}, 基数(不重复元素)为5个。 基数估计就是在误差可接受的范围内,快速计算基数。
算法细节
它内部维护了 16384 个桶(bucket)来记录各自桶的元素数量。当一个元素到来时,它会散列到其中一个桶,以一定的概率影响这个桶的计数值。因为是概率算法,所以单个桶的计数值并不准确,但是将所有的桶计数值进行调合均值累加起来,结果就会非常接近真实的计数值。密集存储
密集存储的结构非常简单,就是连续 16384 个 6bit 串成的字符串位图。offset_bytes = (idx 6) / 8 offset_bits = (idx 6) % 8
需要注意的是字节位序是左边低位右边高位,而通常我们使用的字节都是左边高位右边低位,我们需要在脑海中进行倒置。稀疏存储
稀疏存储适用于很多计数值都是零的情况。下图表示了一般稀疏存储计数值的状态。计数缓存
前面提到 HyperLogLog 表示的总计数值是由 16384 个桶的计数值进行调和平均后再基于因子修正公式计算得出来的。它需要遍历所有的桶进行计算才可以得到这个值,中间还涉及到很多浮点运算。这个计算量相对来说还是比较大的。 所以 Redis 使用了一个额外的字段来缓存总计数值,这个字段有 64bit,最高位如果为 1 表示该值是否已经过期,如果为 0, 那么剩下的 63bit 就是计数值。 当 HyperLogLog 中任意一个桶的计数值发生变化时,就会将计数缓存设为过期,但是不会立即触发计算(惰性计算)。而是要等到用户显示调用 pfcount 指令时才会触发重新计算刷新缓存。缓存刷新在密集存储时需要遍历 16384 个桶的计数值进行调和平均,但是稀疏存储时没有这么大的计算量。也就是说只有当计数值比较大时才可能产生较大的计算量。另一方面如果计数值比较大,那么大部分 pfadd 操作根本不会导致桶中的计数值发生变化。 这意味着在一个极具变化的 HLL 计数器中频繁调用 pfcount 指令可能会有少许性能问题。关于这个性能方面的担忧在 Redis 作者 antirez 的博客中也提到了。不过作者做了仔细的压力的测试,发现这是无需担心的,pfcount 指令的平均时间复杂度就是 O(1)。 HHL数据结构暂时只需要理解其原理,基本介绍链接 文中具体算法细节转载自(强烈推荐)链接 # 二、内存编码数据结构的实现 > 这一部分主要介绍Redis特制的内存编码数据结构,建议结合图像来理解。 > ## 整数集合 > 源码:intset.h和intset.c。 > 整数集合(intset)是集合键的底层实现之一: 当一个集合只包含整数值元素,并且这个集合的元素数量(见后)不多时,Redis就会使用整数集合作为集合键的底层实现。结构定义
其中,content是整数集合的底层实现,所有的元素都是该contents数组的一项,值得注意的是,虽然该数组声明为int8_t类型,但是该数组中存储的数据类型由encoding来决定: encoding有三个选择:INTSET_ENC_INT16/32/64。
typedef struct intset {
// 编码方式
uint32_t encoding;
// 集合包含的元素数量
uint32_t length;
// 保存元素的数组
int8_t contents[];
} intset;
//intset 的编码方式
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))
升级操作
当向当前的整数集合插入一个大于当前encodeing类型的数据时,必须先进行一次不可逆的升级操作:即将所有的元素编码升级为足以容纳新元素的encoding。 具体分为三步: 1.根据新元素的类型, 扩展整数集合底层数组的空间大小。 2.将底层数组现有的所有元素都转换成与新元素相同的类型,并将类型转换后的元素放置到正确的位上,而且在放置元素的过程中,需要继续维持底层数组的有序性质不变。 3.将新元素添加到底层数组里面。举个例子:
开始时,整数集合的contents数据类型为INT16,其中存储的元素如下:升级之后不可降级。
接口分析
比较好奇的是从底层数组怎么返回我想要的值:不对,上述的操作只是在指定的pos插入,那么如何确定这个pos呢?其实整数集合中的整数都是有序的,从小到大排列,因此在插入或者查找某值时都是先使用二分查找的方式进行查找,以下是往整数集合中插入一个value的函数:
/*
* 根据给定的编码方式 enc ,返回集合的底层数组在 pos 索引上的元素。
* T = O(1)
*/
static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {
int64_t v64;
int32_t v32;
int16_t v16;
// ((ENCODING*)is->contents) 首先将数组转换回被编码的类型
// 然后 ((ENCODING*)is->contents)+pos 计算出元素在数组中的正确位置
// 之后 memcpy(&vEnc, ..., sizeof(vEnc)) 再从数组中拷贝出正确数量的字节
// 如果有需要的话, memrevEncifbe(&vEnc) 会对拷贝出的字节进行大小端转换
// 最后将值返回
if (enc == INTSET_ENC_INT64) {
memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));
memrev64ifbe(&v64);
return v64;
} else if (enc == INTSET_ENC_INT32) {
memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));
memrev32ifbe(&v32);
return v32;
} else {
memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));
memrev16ifbe(&v16);
return v16;
}
}
/*
* 根据集合的编码方式,返回底层数组在 pos 索引上的值
*/
static int64_t _intsetGet(intset *is, int pos) {
return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding));
}
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
// 计算编码 value 所需的长度
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
// 默认设置插入为成功
if (success) *success = 1;
// 如果 value 的编码比整数集合现在的编码要大
// 那么表示 value 必然可以添加到整数集合中
// 并且整数集合需要对自身进行升级,才能满足 value 所需的编码
if (valenc > intrev32ifbe(is->encoding)) {
return intsetUpgradeAndAdd(is,value);
} else {
// 运行到这里,表示整数集合现有的编码方式适用于 value
// 在整数集合中查找 value ,看他是否存在:
// - 如果存在,那么将 *success 设置为 0 ,并返回未经改动的整数集合
// - 如果不存在,那么可以插入 value 的位置将被保存到 pos 指针中
// 等待后续程序使用
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
return is;
}
// 运行到这里,表示 value 不存在于集合中
// 程序需要将 value 添加到整数集合中
// 为 value 在集合中分配空间
is = intsetResize(is,intrev32ifbe(is->length)+1);
// 如果新元素不是被添加到底层数组的末尾
// 那么需要对现有元素的数据进行移动,空出 pos 上的位置,用于设置新值
// 举个例子
// 如果数组为:
// | x | y | z | ? |
// |<----->|
// 而新元素 n 的 pos 为 1 ,那么数组将移动 y 和 z 两个元素
// | x | y | y | z |
// |<----->|
// 这样就可以将新元素设置到 pos 上了:
// | x | n | y | z |
// T = O(N)
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
}
// 将新值设置到底层数组的指定位置中
_intsetSet(is,pos,value);
// 增一集合元素数量的计数器
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
// 返回添加新元素后的整数集合
return is;
}
那么怎么插入一个数据呢?
看到其中有个memrev16/32/64ifbe函数,有些好奇,遂F12,找到了源码:
/*
* 根据集合的编码方式,将底层数组在 pos 位置上的值设为 value 。
*/
static void _intsetSet(intset *is, int pos, int64_t value) {
// 取出集合的编码方式
uint32_t encoding = intrev32ifbe(is->encoding);
// 根据编码 ((Enc_t*)is->contents) 将数组转换回正确的类型
// 然后 ((Enc_t*)is->contents)[pos] 定位到数组索引上
// 接着 ((Enc_t*)is->contents)[pos] = value 将值赋给数组
// 最后, ((Enc_t*)is->contents)+pos 定位到刚刚设置的新值上
// 如果有需要的话, memrevEncifbe 将对值进行大小端转换
if (encoding == INTSET_ENC_INT64) {
((int64_t*)is->contents)[pos] = value;
memrev64ifbe(((int64_t*)is->contents)+pos);
} else if (encoding == INTSET_ENC_INT32) {
((int32_t*)is->contents)[pos] = value;
memrev32ifbe(((int32_t*)is->contents)+pos);
} else {
((int16_t*)is->contents)[pos] = value;
memrev16ifbe(((int16_t*)is->contents)+pos);
}
}
补充关于大端和小端序的转换,值得注意的是,大端小端在内存的存储的顺序是按照字节,而不是按位! 例如小端中的:00000000 10000000,好 在大端中存储不是:00000001 00000000 (即按位逆序) 而是按字节相反:10000000 00000000。 示意图:
/*将16位小端序转为大端序*/
void memrev16(void *p) {
unsigned char *x = p, t;
t = x[0];
x[0] = x[1];
x[1] = t;
}
/* 将32位小端序转为大端序 */
void memrev32(void *p) {
unsigned char *x = p, t;
t = x[0];
x[0] = x[3];
x[3] = t;
t = x[1];
x[1] = x[2];
x[2] = t;
}


static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
void *src, *dst;
// 要移动的元素个数
uint32_t bytes = intrev32ifbe(is->length)-from;
// 集合的编码方式
uint32_t encoding = intrev32ifbe(is->encoding);
// 根据不同的编码
// src = (Enc_t*)is->contents+from 记录移动开始的位置
// dst = (Enc_t*)is_.contents+to 记录移动结束的位置
// bytes *= sizeof(Enc_t) 计算一共要移动多少字节
if (encoding == INTSET_ENC_INT64) {
src = (int64_t*)is->contents+from;
dst = (int64_t*)is->contents+to;
bytes *= sizeof(int64_t);
} else if (encoding == INTSET_ENC_INT32) {
src = (int32_t*)is->contents+from;
dst = (int32_t*)is->contents+to;
bytes *= sizeof(int32_t);
} else {
src = (int16_t*)is->contents+from;
dst = (int16_t*)is->contents+to;
bytes *= sizeof(int16_t);
}
// 进行移动
// T = O(N)
memmove(dst,src,bytes);
}
压缩列表
源码参见:ziplist.h和ziplist.c。压缩列表是 Redis 为了节约内存而开发的,由一系列特殊编码的连续内存块组成的顺序型(sequential)数据结构。 Redis的有序集合、哈希以及列表都直接或者间接使用了压缩列表。
当有序集合或哈希的元素数目比较少,且元素都是短字符串或整形时,Redis便使用压缩列表作为其底层数据存储方式。列表使用快速链表(quicklist)数据结构存储,而快速链表就是双向链表与压缩列表的组合(见后)。
一个压缩列表可以包含任意多个节点(entry), 每个节点可以保存一个字节数组或者一个整数值。以下就是一个压缩列表的结构:为便于快速操作压缩列表获取各字段的数据,redis有以下宏定义(char * zl指向压缩列表首地址):
/*
* 创建并返回一个新的 ziplist
*/
unsigned char *ziplistNew(void) {
// ZIPLIST_HEADER_SIZE 是 ziplist 表头的大小
// +1 字节是表末端 ZIP_END 的大小
unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
// 为表头和表末端分配空间
unsigned char *zl = zmalloc(bytes);
// 初始化表属性
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
ZIPLIST_LENGTH(zl) = 0;
// 设置表末端
zl[bytes-1] = ZIP_END;
return zl;
}
// 定位到bytes 属性,该属性记录了整个 ziplist 所占用的内存字节数
#define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl)))
// 定位到offset 属性,该属性记录了到达表尾节点的偏移量
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))
// 定位到length 属性,该属性记录了 ziplist 包含的节点数量
#define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))
// 返回 ziplist 表头的大小
#define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))
// 返回指向 ziplist 第一个节点(的起始位置)的指针
#define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE)
// 返回指向 ziplist 最后一个节点(的起始位置)的指针
#define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))
// 返回指向 ziplist 末端 ZIP_END (的起始位置)的指针
#define ZIPLIST_ENTRY_END(zl) ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
结构定义
压缩列表的节点信息定义如下:
// 保存 ziplist 节点信息的结构
typedef struct zlentry {
// prevrawlen :前置节点的长度
// prevrawlensize :编码 prevrawlen 所需的字节大小
unsigned int prevrawlensize, prevrawlen;
// len :当前节点值的长度
// lensize :编码 len 所需的字节大小
unsigned int lensize, len;
// 当前节点 header 的大小
// 等于 prevrawlensize + lensize
unsigned int headersize;
// 当前节点值所使用的编码类型
unsigned char encoding;
// 指向当前节点的指针
unsigned char *p;
} zlentry;
节点细节
由上文节点定义代码可知,压缩节点信息可以分为三个部分:previous_entry_length,encoding,content,如下图:编码 | 编码长度 | content** 属性保存的值** |
---|---|---|
00bbbbbb | 1 字节 | 长度小于等于 63 字节的字节数组。 |
01bbbbbb xxxxxxxx | 2 字节 | 长度小于等于 16383 字节的字节数组。 |
10__ aaaaaaaa bbbbbbbb cccccccc dddddddd | 5 字节 | 长度小于等于 4294967295 的字节数组。 |
编码 | 编码长度 | content** 属性保存的值** |
---|---|---|
11000000 | 1 字节 | int16_t 类型的整数。 |
11010000 | 1 字节 | int32_t 类型的整数。 |
11100000 | 1 字节 | int64_t 类型的整数。 |
11110000 | 1 字节 | 24 位有符号整数。 |
11111110 | 1 字节 | 8 位有符号整数。 |
1111xxxx | 1 字节 | 使用这一编码的节点没有相应的 content 属性, 因为编码本身的 xxxx 四个位已经保存了一个介于 0 和12 之间的值, 所以它无须 content 属性。 |
接口分析
创建压缩列表插入元素 函数输入参数zl表示压缩列表首地址,p指向新元素的插入位置,s表示数据内容,slen表示数据长度,返回参数为压缩列表首地址。
unsigned char *ziplistNew(void);
/*创建空的压缩列表,只需要分配初始存储空间(11=4+4+2+1个字节),并对zlbytes、zltail、zllen和zlend字段初始化即可。*/
unsigned char *ziplistNew(void) {
//ZIPLIST_HEADER_SIZE = zlbytes + zltail + zllen;
//最后这个加1表示bytes本身
unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
unsigned char *zl = zmalloc(bytes);
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
ZIPLIST_LENGTH(zl) = 0;
//结尾标识0XFF
zl[bytes-1] = ZIP_END;
return zl;
}
插入元素时,可以简要分为三个步骤: 第一步需要将元素内容编码为压缩列表的元素, 第二步重新分配空间, 第三步拷贝数据。 下面分别讨论每个步骤的实现逻辑。 1)编码 编码即计算previous_entry_length字段、encoding字段和content字段的内容。如何获取前一个元素的长度呢?这时候就需要根据插入元素的位置分情况讨论了,如图所示:
unsigned char *ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen);
unsigned int zipRawEntryLength(unsigned char *p) {
unsigned int prevlensize, encoding, lensize, len;
ZIP_DECODE_PREVLENSIZE(p, prevlensize);
ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
return prevlensize + lensize + len;
}
encoding字段标识的是当前元素存储的数据类型以及数据长度,编码时首先会尝试将数据内容解析为整数,如果解析成功则按照压缩列表整数类型编码存储,解析失败的话按照压缩列表字节数组类型编码存储。
2)重分配空间 空间大小是否是添加元素前的压缩列表长度与新添加元素元素长度之和呢?并不完全是,如图中所示的例子。
if (zipTryEncoding(s,slen,&value,&encoding)) {
reqlen = zipIntSize(encoding);
} else {
reqlen = slen;
}
reqlen += zipStorePrevEntryLength(NULL,prevlen);
reqlen += zipStoreEntryEncoding(NULL,encoding,slen);
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl));
int forcelarge = 0;
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
if (nextdiff == -4 && reqlen < 4) {
nextdiff = 0;
forcelarge = 1;
}
//存储偏移量
offset = p-zl;
//调用realloc重新分配空间
zl = ziplistResize(zl,curlen+reqlen+nextdiff);
//重新偏移到插入位置P
p = zl+offset;
那么nextdiff与forcelarge在这里有什么用呢?
分析ziplistResize函数的3个输入参数,curlen表示插入元素前压缩列表的长度,reqlen表示插入元素元素的长度,而nextdiff表示的是entryX+1元素长度的变化,取值可能为0(长度不变)、4(长度增加4)和-4(长度减小4)。 我们再思考下,当nextdiff等于-4,而reqlen小于4时会发生什么呢?没错,插入元素导致压缩列表所需空间减少了,即函数ziplistResize底层调用realloc重新分配的空间小于指针zl指向的空间。这可能会存在问题,我们都知道realloc重新分配空间时,返回的地址可能不变,当重新分配的空间大小反而减少时,realloc底层实现可能会将多余的空间回收,此时可能会导致数据的丢失。因此需要避免这种情况的发生,即重新赋值nextdiff等于0,同时使用forcelarge标记这种情况。 可以再思考下,nextdiff等于-4时,reqlen会小于4吗?答案是可能的,连锁更新可能会导致这种情况的发生。连锁更新将在之后介绍。 3)数据拷贝 重新分配空间之后,需要将位置P后的元素移动到指定位置,将新元素插入到位置P。我们假设entryX+1元素的长度增加4(即nextdiff等于4),此时数据拷贝示意图如图所示:思考一下,第一次更新尾元素偏移量之后,为什么指向的元素可能不是尾元素呢?很显然,当entryX+1元素就是尾元素时,只需要更新一次尾元素的偏移量;但是当entryX+1元素不知尾元素,且entryX+1元素长度发生了改变,此时尾元素偏移量还需要加上nextdiff的值。 以上参考链接:强烈推荐 ### 连锁更新 当往ziplist中插入或删除节点时,由于previous节点字节数可为1或5,保存的前置节点大小不一致,可能就会引发后续节点依次影响,从而发生多次空间重分配,这就是连锁更新。 比如插入的new恰好大于254字节,而原本entry都是介于250-253之间:
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);
//更新entryX+1元素的previous_entry_length字段字段
if (forcelarge)
//entryX+1元素的previous_entry_length字段依然占5个字节;
//但是entryNEW元素长度小于4字节
zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
else
zipStorePrevEntryLength(p+reqlen,reqlen);
//更新zltail字段
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);
zipEntry(p+reqlen, &tail);
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
//更新zllen字段
ZIPLIST_INCR_LENGTH(zl,1);
三、数据类型的实现
在前面,我们陆续介绍了 Redis 用到的所有主要数据结构。Redis 并没有直接使用这些数据结构来实现键值对数据库,而是基于这些数据结构创建了一个对象系统,这个系统包含字符串对象、列表对象、哈希对象、集合对象和有序集合对象这五种类型的对象。
对象类型及编码,源码文件:object.c每次当我们在Redis的数据库中新创建一个键值对时,我们至少会创建两个对象,一个对象用作键值对的键(键对象),另一个对象用作键值对的值(值对象)。 Redis中的每个对象都由一个redisObject结构表示:
之前在介绍跳表时也给出过该定义,不过当时没有详细说,接下来我们来仔细看看。
typedef struct redisObject {
// 对象类型
unsigned type:4;
// 底层编码
unsigned encoding:4;
// 对象最后一次被访问的时间(日后会说)
unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */
// 引用计数
int refcount;
// 指向实际存储单元的指针
void *ptr;
} robj;
- 类型 该属性记录对象的类型,可选常量如下:
类型常量 | 对象的名称 |
---|---|
REDIS_STRING | 字符串对象 |
REDIS_LIST | 列表对象 |
REDIS_HASH | 哈希对象 |
REDIS_SET | 集合对象 |
REDIS_ZSET | 有序集合对象 |
注:对于Redis键值对来说,键总是字符串对象,而值才是上述五种类型其中一种。
- 编码 对象的ptr指针指向对象的底层实现数据结构,而这些数据结构由对象的encoding属性决定。encoding属性记录了对象所使用的编码, 也即是说使用了什么数据结构作为对象的底层实现,可选的对象编码如下。
编码常量 | 编码所对应的底层数据结构 |
---|---|
REDIS_ENCODING_INT | long 类型的整数 |
REDIS_ENCODING_EMBSTR | embstr 编码的简单动态字符串 |
REDIS_ENCODING_RAW | 简单动态字符串 |
REDIS_ENCODING_HT | 字典 |
REDIS_ENCODING_LINKEDLIST | 双端链表 |
REDIS_ENCODING_ZIPLIST | 压缩列表 |
REDIS_ENCODING_INTSET | 整数集合 |
REDIS_ENCODING_SKIPLIST | 跳跃表和字典 |
举个例子,在列表对象包含的元素比较少时,Redis使用压缩列表作为列表对象的底层实现: 1)因为压缩列表比双端链表更节约内存,并且在元素数量较少时,压缩列表更合适; 2)元素数量增多或者单个元素过大时,对象就会将底层实现从压缩列表转向功能更强、也更适合保存大量元素的双端链表上面;
- 空转时长(lru):该属性记录了对象最后一次被访问的时间。该值在Redis的内存过期值淘汰时被用到。如果服务器用于回收内存的算法为 volatile-lru 或者 allkeys-lru ,那么当内存到上限值时,空转时长较高的那部分键对象会优先被服务器释放,从而回收内存。
- 引用计数 : Redis在自己的对象系统中构建了一个引用计数技术实现的内存回收机制,通过这一机制,程序可以通过跟踪对象的引用计数信息,在适当的时候自动释放对象并进行内存回收。
修改引用对象计数器有专门的API,如下:
- <font style="color:#333333;">在</font>**<font style="color:#333333;">创建一个新对象</font>**<font style="color:#333333;">时, 引用</font>**<font style="color:#333333;">计数的值会被初始化为1</font>**<font style="color:#333333;">;</font>
- <font style="color:#333333;">当对象被一个新程序使用时,它的引用计数值会被</font>**<font style="color:#333333;">增一</font>**<font style="color:#333333;">;</font>
- <font style="color:#333333;">当对象</font>**<font style="color:#333333;">不再被一个程序使用时</font>**<font style="color:#333333;">,</font>**<font style="color:#333333;">它的引用计数值会被减一</font>**<font style="color:#333333;">;</font>
- <font style="color:#333333;">当对象的引用计数值变为0时,</font>**<font style="color:#333333;">对象所占用的内存会被释放</font>**<font style="color:#333333;">;</font>
void decrRefCount(robj *o) {
if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
// 释放对象
if (o->refcount == 1) {
switch(o->type) {
case REDIS_STRING: freeStringObject(o); break;
case REDIS_LIST: freeListObject(o); break;
case REDIS_SET: freeSetObject(o); break;
case REDIS_ZSET: freeZsetObject(o); break;
case REDIS_HASH: freeHashObject(o); break;
default: redisPanic("Unknown object type"); break;
}
zfree(o);
// 减少计数
} else {
o->refcount--;
}
}
- ptr,该指针指向实际存储的数据单元。
因为需要有一个验证时间!只有当共享目标和想要建立的键一样时,才会共享,验证整数只需要O(1),字符串需要O(n),列表或hash则需要O(n^2),这将造成CPU资源浪费。
字符串对象
源码文件:t_string.c。字符串对象是Redis数据库所有键值对的根本,因为键都是字符串对象。如下是Redis中string对象SET、SETEX等命令的底层API:
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0; /* initialized to avoid any harmness warning */
// 取出过期时间
if (expire) {
// 取出 expire 参数的值
// T = O(N)
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != REDIS_OK)
return;
// expire 参数的值不正确时报错
if (milliseconds <= 0) {
addReplyError(c,"invalid expire time in SETEX");
return;
}
// 不论输入的过期时间是秒还是毫秒
// Redis 实际都以毫秒的形式保存过期时间
// 如果输入的过期时间为秒,那么将它转换为毫秒
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
// 如果设置了 NX 或者 XX 参数,那么检查条件是否不符合这两个设置
// 在条件不符合时报错,报错的内容由 abort_reply 参数决定
if ((flags & REDIS_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & REDIS_SET_XX && lookupKeyWrite(c->db,key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}
// 将键值关联到数据库
setKey(c->db,key,val);
// 将数据库设为脏
server.dirty++;
// 为键设置过期时间
if (expire) setExpire(c->db,key,mstime()+milliseconds);
// 发送事件通知
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",key,c->db->id);
// 发送事件通知
if (expire) notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
"expire",key,c->db->id);
// 设置成功,向客户端发送回复
// 回复的内容由 ok_reply 决定
addReply(c, ok_reply ? ok_reply : shared.ok);
}
字符串对象的编码可以是int、raw、embstr。
- int编码,如果字符串对象的整数值可以被long类型表示,那么该整数值会被保存在ptr指针中(编码格式跟实际参数类型不一样);
- raw编码,如果字符串对象保存的是字符串,且大于39字节,那就使用SDS字符串保存;
- embstr编码,如果该保存的字符串小于等于39字节,就用embstr格式保存。embstr是专门用来保存短字符串的一种优化编码方式,跟raw的区别在于,它只需要调用一次内存分配一块连续的内存空间,依次包含redisObject和sdshdr,而raw是分别调用两次内存分配来存放redisObject和sdshdr,embstr编码时实例如下图:
- embstr 编码将创建字符串对象所需的内存分配次数从 raw 编码的两次降低为一次。
- 释放embstr编码的字符串对象只需要调用一次内存释放函数,而释放raw编码的字符串对象需要调用两次内存释放函数。
- 因为embstr编码的字符串对象的所有数据都保存在一块连续的内存里面,所以这种编码的字符串对象比起raw编码的字符串对象能够更好地利用缓存带来的优势。
编码转换
int/embstr在特定的情况下会转换为raw编码的对象,且不可逆。- int编码的对象执行命令之后变成了字符串值,那就将从int转换为raw;
- Redis没有为embstr编码的字符串对象编写任何相应的修改程序 (只有int编码的字符串对象和raw编码的字符串对象有这些程序),embstr编码的字符串对象实际上是只读的:当我们对embstr编码的字符串对象执行任何修改命令时,程序会先将对象的编码从embstr转换成raw,然后再执行修改命令;因为这个原因,embstr编码的字符串对象在执行修改命令之后,总会变成一个raw编码的字符串对象。
列表对象
源码文件:t_list.c。
其底层编码是ziplist或linkedlist。
- ziplist编码,底层是压缩列表,每个entry保存一个列表项,有两个条件:列表对象保存的所有字符串元素的长度都小于 64 字节;列表对象保存的元素数量小于 512 个;
void listTypeConvert(robj *subject, int enc) {
listTypeIterator *li;
listTypeEntry entry;
redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
// 转换成双端链表
if (enc == REDIS_ENCODING_LINKEDLIST) {
list *l = listCreate();
listSetFreeMethod(l,decrRefCountVoid);
// 遍历 ziplist ,并将里面的值全部添加到双端链表中
li = listTypeInitIterator(subject,0,REDIS_TAIL);
while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
listTypeReleaseIterator(li);
// 更新编码
subject->encoding = REDIS_ENCODING_LINKEDLIST;
// 释放原来的 ziplist
zfree(subject->ptr);
// 更新对象值指针
subject->ptr = l;
} else {
redisPanic("Unsupported list conversion");
}
}
- linkedlist编码,底层是双端链表来保存SDS字符串对象;
注:字符串对象是 Redis 五种类型的对象中唯一一种会被其他四种类型对象嵌套的对象。
哈希对象
源码文件t_hash.c。编码是ziplist或者hashtable。
- ziplist编码,底层是压缩列表,有两个条件,哈希对象保存的所有键值对的键和值的字符串长度都小于 64 字节;哈希对象保存的键值对数量小于 512 个;每当有新的键值对要加入到哈希对象时,保存了同一键值对的两个节点总是紧挨在一起,保存键的节点在前,保存值的节点在后;且这些键值对遵循先来后到的原则。
注:在使用ziplist编码的时候,获取键对应值的时间复杂度不是O(1),而是O(N^2)(先遍历键,在遍历值是否相等)!但由于使用ziplist的时候,长度和键的长度较小,对性能影响不是很大。
- hashtable编码,当不满足上述的两个条件时,就使用字典作为底层实现。哈希对象中的每个键值对都使用一个字典键值对来保存:字典的每个键和值都是一个字符串对象;
注:当ziplist不满足两个条件其中之一时,原本保存在压缩列表里的所有键值对都会被转移并保存到字典里面。
集合对象
源码文件:t_set.c。集合对象的编码可以是intset或者hashtable 。
- intset编码,使用整数集合作为集合对象的实现,有两个条件:集合对象保存的所有元素都是整数值;集合对象保存的元素数量不超过512个;
- hashtable编码,当intset编码的两个条件不满足时,使用字典作为底层实现,字典的每个键都是一个字符串对象,每个字符串对象包含了一个集合元素,而字典的值则全部被设置为 NULL(因为没有没有键就代表了值)。
有序集合对象
源码文件t_zset.c。有序集合的编码可以是ziplist和skiplist。
- ziplist 编码,使用压缩列表作为底层实现有两个条件:有序集合保存的元素数量小于 128 个;有序集合保存的所有元素成员的长度都小于 64 字节;
注:ziplist为了保证有序性,每次插入删除操作时,都可能需要对其中的数据进行移动。每个集合元素使用两个紧挨在一起的压缩列表节点来保存,第一个节点保存元素的成员,而第二个元素则保存元素的分值(score)(排序)。
- skiplist 编码,不满足上述条件时的有序集合对象使用zset 结构作为底层实现,一个zset结构同时包含一个字典和一个跳跃表:
typedef struct zset {
zskiplist *zsl;
dict *dict;
} zset;
为什么需要这么做呢?
在理论上来说,有序集合可以单独使用字典或者跳跃表的其中一种数据结构来实现,但无论单独使用字典还是跳跃表,在性能上对比起同时使用字典和跳跃表都会有所降低。 举个例子,如果我们只使用字典来实现有序集合,那么虽然以O(1)复杂度查找成员的分值这一特性会被保留,但是,因为字典以无序的方式来保存集合元素,所以每次在执行范围型操作一一一比如ZRANK、ZRANG等命令时,程序都需要对字典保存的所有元素进行排序,完成这种排序需要至少O(NlogN)时间复杂度,以及额外的O(N)内存空间(因为要创建一个数组来保存排序后的元素) 另一方面,如果我们只使用跳跃表来实现有序集合,那么跳跃表执行范围型操作的所有优点都会被保留,但因为没有了字典,所以根据成员查找分值这一操作的复杂度将上升到到O(logn)。 因为以上原因,为了让有序集合的查找和范围型操作都尽可能快地执行,Redis选择了同时使用字典和跳跃表两种数据结构来实现有序集合。 注:常用命令Hyperloglog对象
源码文件:hyperloglog.c。 建议直接参考:链接 ## 类型检查和多态 Redis中用于操作键的命令基本分为两种,一种是键共有操作,比如说DEL命令、EXPIRE命令、RENAME命令、TYPE 命令、OBJECT命令等。另一种是键特定操作,如SET、GET、APPEND、STRLEN等命令只能对字符串键执行。类型检查
在执行一个类型特定的命令之前, Redis 会先检查输入键的类型是否正确, 然后再决定是否执行给定的命令。通过 redisObject 结构的 type 属性来实现的:多态命令
这里指的是同一个对象一般具有两种编码方式,比如hash对象是hashtable和ziplist,那么执行命令的时候是怎么确定的呢?也是根据其对象的编码格式来的:四、数据库实现
在Redis中,服务器中所有的数据库都保存在redis.h/redisServer结构中的db数组中:struct redisServer {
// ……
// 保存服务器中所有的数据库
redisDb *db;
//……
// 决定服务器初始化时创建的数据库数量
// 默认16
int dbnum;
}
默认配置下的服务器启动之后的状态如下:
注:可以通过select指令进行数据库的切换(其实就是更改db的指向)。
数据库键空间
Redis服务器中的每个数据库都由redis.h/redisDb结构表示,如下:typedef struct redisDb {
// 数据库键空间,保存着数据库中的所有键值对
dict *dict; /* The keyspace for this DB */
// 键的过期时间,字典的键为键,字典的值为过期事件 UNIX 时间戳
dict *expires; /* Timeout of keys with a timeout set */
// 正处于阻塞状态的键
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
// 可以解除阻塞的键
dict *ready_keys; /* Blocked keys that received a PUSH */
// 正在被 WATCH 命令监视的键
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
struct evictionPoolEntry *eviction_pool; /* Eviction pool of keys */
// 数据库号码
int id; /* Database ID */
// 数据库的键的平均 TTL ,统计信息
long long avg_ttl; /* Average TTL, just for stats */
} redisDb;
其中dict字典保存了该数据库的所有键值对,我们就将其称为键空间。
键空间和用户所见的数据库是直接对应的:
·键空间的键也就是数据库的键,每个键都是一个字符串对象。
·键空间的值也就是数据库的值,每个值可以是字符串对象、列表对象、哈希表对象、集合对象和有序集合对象在内的任意一种 Redis 对象。
例如下图就是一个拥有一个字符串对象、哈希对象、列表对象的数据库键空间的实例:
- 在读取一个键之后(读操作和写操作都要对键进行读取),服务器会根据键是否存在,以此来更新服务器的键空间命中(hit)次数或键空间不命中(miss)次数, 这两个值可以在 INFO stats 命令的 keyspace_hits 属性和 keyspace_misses 属性中查看。
- 在读取一个键之后,服务器会更新键的 LRU (最后一次使用)时间,这个值可以用于计算键的闲置时间,使用命令OBJECT idletime 命令可以查看键key的闲置时间。
- 如果服务器在读取一个键时,发现该键已经过期,那么服务器会先删除这个过期键,然后才执行余下的其他操作。
- 如果有客户端使用WATCH命令监视了某个键,那么服务器在对被监视的键进行修改之后, 会将这个键标记为脏(dirty), 从而让事务程序注意到这个键已经被修改过。
- 服务器每次修改一个键之后,都会对脏(dirty)键计数器的值增一, 这个计数器会触发服务器的持久化以及复制操作执行。
- 如果服务器开启了数据库通知功能,那么在对键进行修改之后, 服务器将按配置发送相应的数据库通知。
生存/过期时间
数据库redisDb结构中的expire字典保存过期时间,故称其为过期字典。 一个数据库的实例展示如下:expire(秒)/pexpire(毫秒)命令可以使得客户端以秒或者毫秒精度为某个键设置生存时间(TTL),经过指定时间之后,服务器就会删除生存时间为0的键。
expireat/pexpireat命令可以设置过期时间,即到达那个时刻就删除。 四种命令如下:cpp
// 命令的第二个参数可能是绝对值,也可能是相对值。
//当执行 *AT 命令时, basetime 为 0 ,在其他情况下,它保存的就是当前的绝对时间。
//unit 用于指定 argv[2] (传入过期时间)的格式,
//它可以是 UNIT_SECONDS 或 UNIT_MILLISECONDS ,
// basetime 参数则总是毫秒格式的。
void expireGenericCommand(redisClient *c, long long basetime, int unit) {
robj *key = c->argv[1], *param = c->argv[2];
long long when; /* unix time in milliseconds when the key will expire. */
// 取出 when 参数
if (getLongLongFromObjectOrReply(c, param, &when, NULL) != REDIS_OK)
return;
// 如果传入的过期时间是以秒为单位的,那么将它转换为毫秒
if (unit == UNIT_SECONDS) when *= 1000;
when += basetime;
/* No key, return zero. */
// 取出键
if (lookupKeyRead(c->db,key) == NULL) {
addReply(c,shared.czero);
return;
}
/* 在载入数据时,或者服务器为附属节点时,
* 即使 EXPIRE 的 TTL 为负数,或者 EXPIREAT 提供的时间戳已经过期,
* 服务器也不会主动删除这个键,而是等待主节点发来显式的 DEL 命令。
* 程序会继续将(一个可能已经过期的 TTL)设置为键的过期时间,
* 并且等待主节点发来 DEL 命令。
*/
if (when <= mstime() && !server.loading && !server.masterhost) {
// when 提供的时间已经过期,服务器为主节点,并且没在载入数据
robj *aux;
redisAssertWithInfo(c,key,dbDelete(c->db,key));
server.dirty++;
/* Replicate/AOF this as an explicit DEL. */
// 传播 DEL 命令
aux = createStringObject("DEL",3);
rewriteClientCommandVector(c,2,aux,key);
decrRefCount(aux);
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",key,c->db->id);
addReply(c, shared.cone);
return;
} else {
// 设置键的过期时间
// 如果服务器为附属节点,或者服务器正在载入,
// 那么这个 when 有可能已经过期的
setExpire(c->db,key,when);
addReply(c,shared.cone);
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"expire",key,c->db->id);
server.dirty++;
return;
}
}
另外,可以使用persist命令解除过期时间;TTL/PTTL返回键的剩余生存时间。
过期删除策略
有三种常见的删除策略:- 定时删除:在设置键过期时间时,创建一个定时器,使得当定时器时间来临时,立即删除键;
- 惰性删除:过期键并不立即删除,而是每次取键时看它过期没有,过期就删除,没有就返回该键;
- 定时删除:两者的一种折中方式,隔一段时间对数据库检查,删除其中过期的键,检查策略是重点:检查间隔跟检查数量。
惰性删除
源码在db.c/expireIfNeeded,Redis在读写所有键的时候会检查其输入键是否过期:
int expireIfNeeded(redisDb *db, robj *key) {
// 取出键的过期时间
mstime_t when = getExpire(db,key);
mstime_t now;
// 没有过期时间
if (when < 0) return 0; /* No expire for this key */
// 如果服务器正在进行载入,那么不进行任何过期检查
if (server.loading) return 0;
now = server.lua_caller ? server.lua_time_start : mstime();
// 当服务器运行在 replication 模式时
// 附属节点并不主动删除 key
// 它只返回一个逻辑上正确的返回值
// 真正的删除操作要等待主节点发来删除命令时才执行
// 从而保证数据的同步
if (server.masterhost != NULL) return now > when;
// 运行到这里,表示键带有过期时间,并且服务器为主节点
/* Return when this key has not expired */
// 如果未过期,返回 0
if (now <= when) return 0;
/* Delete the key */
server.stat_expiredkeys++;
// 向 AOF 文件和附属节点传播过期信息
propagateExpire(db,key);
// 发送事件通知
notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
"expired",key,db->id);
// 将过期键从数据库中删除
return dbDelete(db,key);
}
定期删除
源码在redis.c/activeExpireCycle,该函数会在规定时间内分多次遍历服务器的各个数据库,从expire字典中随机检查一部分键的过期时间,并删除其中的过期键: void activeExpireCycle(int type) {
// 静态变量,用来累积函数连续执行时的数据
static unsigned int current_db = 0; /* Last DB tested. */
static int timelimit_exit = 0; /* Time limit hit in previous call? */
static long long last_fast_cycle = 0; /* When last fast cycle ran. */
unsigned int j, iteration = 0;
// 默认每次处理的数据库数量
unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
// 函数开始的时间
long long start = ustime(), timelimit;
// 快速模式
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
// 如果上次函数没有触发 timelimit_exit ,那么不执行处理
if (!timelimit_exit) return;
// 如果距离上次执行未够一定时间,那么不执行处理
if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
// 运行到这里,说明执行快速处理,记录当前时间
last_fast_cycle = start;
}
/* 一般情况下,函数只处理 REDIS_DBCRON_DBS_PER_CALL 个数据库
* 除非:
* 当前数据库的数量小于 REDIS_DBCRON_DBS_PER_CALL
* 如果上次处理遇到了时间上限,那么这次需要对所有数据库进行扫描,
* 这可以避免过多的过期键占用空间
*/
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
// 函数处理的微秒时间上限
// ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 默认为 25 ,也即是 25 % 的 CPU 时间
timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERCrver.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
// 如果是运行在快速模式之下
// 那么最多只能运行 FAST_DURATION 微秒
// 默认值为 1000 (微秒)
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */
// 遍历数据库
for (j = 0; j < dbs_per_call; j++) {
int expired;
// 指向要处理的数据库
redisDb *db = server.db+(current_db % server.dbnum);
// 为 DB 计数器加一,如果进入 do 循环之后因为超时而跳出
// 那么下次会直接从下个 DB 开始处理
current_db++;
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;
// 获取数据库中带过期时间的键的数量
// 如果该数量为 0 ,直接跳过这个数据库
if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
// 获取数据库中键值对的数量
slots = dictSlots(db->expires);
// 当前时间
now = mstime();
// 这个数据库的使用率低于 1% ,扫描起来太费力了(大部分都会 MISS)
// 跳过,等待字典收缩程序运行
if (num && slots > DICT_HT_INITIAL_SIZE &&
(num*100ots < 1)) break;
/* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones.
*
* 样本计数器
*/
// 已处理过期键计数器
expired = 0;
// 键的总 TTL 计数器
ttl_sum = 0;
// 总共处理的键计数器
ttl_samples = 0;
// 每次最多只能检查 LOOKUPS_PER_LOOP 个键
if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
// 开始遍历数据库
while (num--) {
dictEntry *de;
long long ttl;
// 从 expires 中随机取出一个带过期时间的键
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
// 计算 TTL
ttl = dictGetSignedIntegerVal(de)-now;
// 如果键已经过期,那么删除它,并将 expired 计数器增一
if (activeExpireCycleTryExpire(db,de,now)) expired++;
if (ttl < 0) ttl = 0;
// 累积键的 TTL
ttl_sum += ttl;
// 累积处理键的个数
ttl_samples++;
}
/* Update the average TTL stats for this database. */
// 为这个数据库更新平均 TTL 统计数据
if (ttl_samples) {
// 计算当前平均值
long long avg_ttl = ttl_suml_samples;
// 如果这是第一次设置数据库平均 TTL ,那么进行初始化
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
/* Smooth the value averaging with the previous one. */
// 取数据库的上次平均 TTL 和今次平均 TTL 的平均值
db->avg_ttl = (db->avg_ttl+avg_ttl)/2;
}
// 我们不能用太长时间处理过期键,
// 所以这个函数执行一定时间之后就要返回
// 更新遍历次数
iteration++;
// 每遍历 16 次执行一次
if ((iteration & 0xf) == 0 && /* check once every 16 iterations. */
(ustime()-start) > timelimit)
{
// 如果遍历次数正好是 16 的倍数
// 并且遍历的时间超过了 timelimit
// 那么断开 timelimit_exit
timelimit_exit = 1;
}
// 已经超时了,返回
if (timelimit_exit) return;
// 如果已删除的过期键占当前总数据库带过期时间的键数量的 25 %
// 那么不再遍历
} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
}
}
AOF、RDB、复制对过期键的处理
- 若是生成RDB文件,即SAVE、BGSAVE命令,在保存过程中会对数据库中的键进行检查,过期的键将不保存;
若是载入RDB文件,当以主服务器运行时,只会载入未过期的键;若以从服务器运行,所有键都会载入;
- 若是写入AOF,某个键过期还未删除不会造成影响,只有当惰性删除/定期删除策略将其删除时,AOF会显式追加DEL命令,提示已删除;
- 若是复制操作时,从服务器不会在意该键时是过期还是不过期,它的过期删除操作由主服务器控制,也就是说当主服务器删除一个过期键后,它会通知所有从服务器删除;若是主服务器没有发送,那么从服务器依旧像是处理未过期键一样;这样做主要是为了保证主从服务器的数据一致性。
通知与订阅
Redis2.8版本中新增了数据库的通知功能:客户端可以订阅指定的频道/模式来获取数据库中键的变化及命令执行情况。 下面的指令展示了一个客户端订阅0号数据库中针对message键的命令(键空间通知:某个键执行了什么命令)
// notify.c
/*
* type 参数表示该通知类型
* event 参数是一个字符串表示的事件名
* key 参数是一个 Redis 对象表示的键名
* dbid 参数为键所在的数据库ID
*/
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) ;
每当一个Redis命令需要发送数据库通知时,该命令的实现函数就会调用notifyKeyspaceEvent函数,并向函数传递相关信息。例如执行SADD命令,其实现函数进行通知的部分代码如下:
void saddCommand(redisClient *c){
...
// 当至少有一个元素被成功添加
if(added){
...
// 传递该命令的相关信息
notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sadd",c->argv[1].c->db->id);
}
...
}
持久化
为了避免因服务器宕机或错误造成数据严重丢失的问题,Redis提供了两种持久化(即将数据保存至磁盘)的方式,分别是RDB和AOF。RDB持久化
RDB持久化是将当前数据库状态生成快照,即一个二进制文件。通过该文件可以还原数据库状态。有两个命令可以生成RDB文件,一个是SAVE,另一个是BGSAVE。其实后者跟前者的区别主要在于BackGround,即后台保存。- 使用SAVE时,当前服务器进程阻塞,直到RDB文件完全生成;
- 使用BGSAVE时,当前进程派生一个子进程完成RDB文件的生成,原服务器进程照常工作。
注:子进程与父进程写时复制,所以字典dict的负载因子此时才为5,尽量减少此时rehash的可能性。现在结合源码分析一下,RDB持久化的源码文件为rdb.c,主要函数为rdbSave(char *filename),并且SAVE和BGSAVE命令底层都是通过它实现的,我们先来看看代码:
/*
* 将数据库保存到磁盘上。
* 保存成功返回 REDIS_OK ,出错/失败返回 REDIS_ERR 。
*/
int rdbSave(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
char tmpfile[256];
char magic[10];
int j;
long long now = mstime();
FILE *fp;
rio rdb;
uint64_t cksum;
// 创建临时文件
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
strerror(errno));
return REDIS_ERR;
}
// 初始化 I/O
rioInitWithFile(&rdb,fp);
// 设置校验和函数
if (server.rdb_checksum)
rdb.update_cksum = rioGenericUpdateChecksum;
// 写入 RDB 版本号
snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;
// 遍历所有数据库
for (j = 0; j < server.dbnum; j++) {
// 指向数据库
redisDb *db = server.db+j;
// 指向数据库键空间
dict *d = db->dict;
// 跳过空数据库
if (dictSize(d) == 0) continue;
// 创建键空间迭代器
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}
// 写入 DB 选择器符号 该符号后接数据库ID
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(&rdb,j) == -1) goto werr;
// 遍历数据库,并写入每个键值对的数据
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
// 根据 keystr ,在栈中创建一个 key 对象
initStaticStringObject(key,keystr);
// 获取键的过期时间
expire = getExpire(db,&key);
// 保存键值对数据
if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
}
dictReleaseIterator(di);
}
di = NULL; /* So that we don't release it again on error. */
/* 写入 EOF 代码
*/
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
// CRC64 校验和。
// 如果校验和功能已关闭,那么 rdb.cksum 将为 0
// 在这种情况下, RDB 载入时会跳过校验和检查。
cksum = rdb.cksum;
memrev64ifbe(&cksum);
rioWrite(&rdb,&cksum,8);
// 冲洗缓存,确保数据已写入磁盘
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
// 使用 RENAME ,原子性地对临时文件进行改名,覆盖原来的 RDB 文件。
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
// 写入完成,打印日志
redisLog(REDIS_NOTICE,"DB saved on disk");
// 清零数据库脏状态
server.dirty = 0;
// 记录最后一次完成 SAVE 的时间
server.lastsave = time(NULL);
// 记录最后一次执行 SAVE 的状态
server.lastbgsave_status = REDIS_OK;
return REDIS_OK;
werr:
// 关闭文件
fclose(fp);
// 删除文件
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
SAVE命令底层几乎就是上述函数,BGSAVE的实现只是需要fork()一个子进程:
int rdbSaveBackground(char *filename) {
pid_t childpid;
long long start;
// 如果 BGSAVE 已经在执行,那么出错
if (server.rdb_child_pid != -1) return REDIS_ERR;
// 记录 BGSAVE 执行前的数据库被修改次数
server.dirty_before_bgsave = server.dirty;
// 最近一次尝试执行 BGSAVE 的时间
server.lastbgsave_try = time(NULL);
// fork() 开始前的时间,记录 fork() 返回耗时用
start = ustime();
if ((childpid = fork()) == 0) {
int retval;
/* Child */
// 关闭网络连接 fd
closeListeningSockets(0);
// 设置进程的标题,方便识别
redisSetProcTitle("redis-rdb-bgsave");
// 执行保存操作
retval = rdbSave(filename);
// 打印 copy-on-write 时使用的内存数
if (retval == REDIS_OK) {
size_t private_dirty = zmalloc_get_private_dirty()
if (private_dirty) {
redisLog(REDIS_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
}
// 向父进程发送信号
exitFromChild((retval == REDIS_OK) ? 0 : 1);
} else {
/* Parent */
// 计算 fork() 执行的时间
server.stat_fork_time = ustime()-start;
// 如果 fork() 出错,那么报告错误
if (childpid == -1) {
server.lastbgsave_status = REDIS_ERR;
redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
// 打印 BGSAVE 开始的日志
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
// 记录数据库开始 BGSAVE 的时间
server.rdb_save_time_start = time(NULL);
// 记录负责执行 BGSAVE 的子进程 ID
server.rdb_child_pid = childpid;
// 关闭自动 rehash
updateDictResizePolicy();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
值得一提的是,s,可以对Redis服务器进行配置,在redis.conf文件中有默认配置:
save 900 1
save 300 10
save 60 10000
这表示只要满足上述三个条件任意一个,就执行BGSAVE。
例如save 900 1:900秒内对数据库进行了至少1次修改;
但其实用户还可以自定义保存条件,这主要由结构体redis.h/saveparam决定的:
// 服务器的保存条件(BGSAVE 自动执行的条件)
struct saveparam {
// 多少秒之内
time_t seconds;
// 发生多少次修改
int changes;
};
上述两个因素,其实是根据redisSever两个属性计算得来的,dirty计数器及lastsave属性。
- dirty计数器记录距离上次保存之后,对数据库进行了多少次修改(写入删除更新等),每进行一次修改,dirty属性值就加1;
- lastsave属性是Unix时间戳,记录上次保存的时间;
- REDIS是Redis RDB文件的标识;
- db_version是指RDB文件的版本号,这里的版本号是指Redis保存RDB文件的方式根据Redis版本可能是不同的;
- database包含0或任意个数据库的键值对数据;
- EOF长度1字节,标志正文结束;
- check_sum是8字节的校验和;
关于RDB文件结构,这里只做简单的介绍,如果有需要了解的地方可翻阅黄健宏老师的《Redis设计与实现》,或是留言。如果想分析RDB文件可以使用od -c或是od -x命令来打印RDB文件内容。
AOF持久化
AOF是Append Only File的简称。RDB是保存服务器快照,而AOF方式则是保存服务器执行的命令来“记录”服务器状态。简单解释一下这两者的区别: 假设有一个version-1.0版本的数据库,依次执行了SET DEL SADD三个指令,那么:- RDB方式会保存执行完三个指令之后的数据库状态,设为version1.1;
- AOF方式会保存执行的三个指令!如果数据库想恢复version1.1的状态,那么只需依次执行保存的三个指令即可。
具体实现时,AOF主要分为三个步骤:命令追加、文件写入、文件同步。
第一步,每当服务器执行完命令之后,就会将命令以AOF协议的格式写入redisServer的aof_buf缓冲区中。 何时保存该缓冲区内容到文件,即是第二步文件写入(写入AOF)及文件同步(保存AOF文件),这由服务器配置appendfsync来确定:注:默认为everysec。 注:现代操作系统为了提高文件写入效率,当用户调用write函数时,操作系统将数据暂存至内存缓冲区,等待缓冲区满或是指定时限而不是立即写入文件。为了强制写入数据,提供了fsync及fdatasync函数接口。上述三种同步方式将直接决定AOF的效率和安全: ·若是always:最安全,效率最慢; ·若是everysec:效率够快,丢失也仅1s的数据; ·若是no:效率最高,安全性较差; AOF文件的载入过程可以由下图简单表现:
RPUSH list “c” “D” “E” “F” “G”
AOF重写的实现原理即是:首先从数据库读取键现在的值,然后用一条命令去记录键值对,代替之前记录这个键值对的多条命令。其源码可以参阅aof.c/int rewriteAppendOnlyFile(char *filename); 为了提供重写效率,Redis提供了后台重写的功能,但有个问题需要解决即重写过程中,服务器可能又进行了更新,此时Redis服务器工作流程如下:事件
Redis服务器是一个事件驱动程序,需要处理两类事件: ·文件事件:客户端通过套接字与服务器连接的,客户端命令就是一种文件事件; ·时间事件:服务器的一些操作需要在给定时间点执行,如serverCron;文件事件
Redis基于Reactor模式开发了自己的网络(抽象的网络)事件处理器,称为文件事件处理器:
·基于IO多路复用监听多个套接字,并根据套接字执行的任务来关联合适的事件处理器; 文件事件处理器由四部分构成:套接字、多路复用、分派器、处理器// ae.c
// 选择当前系统最优的IO复用方式
// 根据性能降序排列
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
注:之所以如此排序是因为 Redis 会优先选择时间复杂度为O(1)的 I/O 多路复用函数作为底层实现,包括 Solaries 10 中的 evport、Linux 中的 epoll 和 macOS/FreeBSD 中的kqueue,上述的这些函数都使用了内核内部的结构,并且能够服务几十万的文件描述符。
但是如果当前编译环境没有上述函数,select函数是作为 POSIX 标准中的系统调用,在不同版本的操作系统上都会实现,所以将其作为保底方案。
事件类型
ae.h中定义了IO多路复用程序所监听的两类事件,AE_READABLE和AE_WEUTABLE: ·READABLE事件,当套接字可读(客户端write或close或新的可应答套接字)时产生; ·WRITABLE事件,套接字可写时,如客户端执行read; 当一个套接字同时产生了两类事件,即可读又可写,那么优先执行度再写;(为什么呢?可能是为了防止读操作被覆盖)时间事件
Redis中的时间事件主要分为两类,一是定时事件(返回AE_NOMORE),二是周期性事件(返回非AE_NOMORE整数值)。时间事件由三个属性构成: ·id:全局唯一ID,递增; ·when:unix时间戳,记录预设时间; ·timeProc:时间事件处理器,类似于回调函数,当时间事件到达时,就会调用相应的处理器进行处理; Redis的时间事件是基于一个无序(指when无序,而不是id无序)链表实现的,新来的放在表头,当时间事件执行器运行时,遍历整个链表,找出已到达的时间事件,并调用相应的事件处理器(这跟poll、select一样,效率很低?在3.0版本下,正常模式的Redis服务器仅有serverCron时间事件,benchmark模式下也只有两个,几乎不影响性能)。事件的调度与执行
Redis对时间和文件事件的调度策略如下图所示:客户端
Redis服务器可以与多个客户端建立连接,每个客户端连接时,服务器都保存了相应状态,如下:基本属性
typedef struct redisClient {
// 套接字描述符
int fd;
// 当前正在使用的数据库
redisDb *db;
// 当前正在使用的数据库的 id (号码)
int dictid;
// 客户端的名字
robj *name;
// 查询缓冲区
sds querybuf;
// 查询缓冲区长度峰值
size_t querybuf_peak;
// 参数数量
int argc;
// 参数对象数组
robj **argv;
// 记录被客户端执行的命令
struct redisCommand *cmd, *lastcmd;
// 请求的类型:内联命令还是多条命令
int reqtype;
// 剩余未读取的命令内容数量
int multibulklen;
// 命令内容的长度
long bulklen;
// 回复链表
list *reply;
// 回复链表中对象的总大小
unsigned long reply_bytes;
// 已发送字节,处理 short write 用
int sentlen; // 创建客户端的时间
time_t ctime; /* Client creation time */
// 客户端最后一次和服务器互动的时间
time_t lastinteraction;
// 客户端的输出缓冲区超过软性限制的时间
time_t obuf_soft_limit_reached_time;
// 客户端状态标志
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
// 当 server.requirepass 不为 NULL 时
// 代表认证的状态
// 0 代表未认证, 1 代表已认证
int authenticated;
// 复制状态
int replstate;
// 用于保存主服务器传来的 RDB 文件的文件描述符
int repldbfd; /* replication DB file descriptor */
// 读取主服务器传来的 RDB 文件的偏移量
off_t repldboff; /* replication DB file offset */
// 主服务器传来的 RDB 文件的大小
off_t repldbsize; /* replication DB file size */
sds replpreamble; /* replication DB preamble. */
// 主服务器的复制偏移量
long long reploff; /* replication offset if this is our master */
// 从服务器最后一次发送 REPLCONF ACK 时的偏移量
long long repl_ack_off; /* replication ack offset, if this is a slave */
// 从服务器最后一次发送 REPLCONF ACK 的时间
long long repl_ack_time;/* replication ack time, if this is a slave */
// 主服务器的 master run ID
// 保存在客户端,用于执行部分重同步
char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
// 从服务器的监听端口号
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
// 事务状态
multiState mstate; /* MULTI/EXEC state */
// 阻塞类型
int btype; /* Type of blocking op if REDIS_BLOCKED. */
// 阻塞状态
blockingState bpop; /* blocking state */
// 最后被写入的全局复制偏移量
long long woff; /* Last write global replication offset. */
// 被监视的键
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
// 这个字典记录了客户端所有订阅的频道
// 键为频道名字,值为 NULL
// 也即是,一个频道的集合
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
// 链表,包含多个 pubsubPattern 结构
// 记录了所有订阅频道的客户端的信息
// 新 pubsubPattern 结构总是被添加到表尾
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
// 回复偏移量
int bufpos;
// 回复缓冲区
char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;
主要围绕其中几个重要属性(从上至下)进行分析:
·套接字描述符fd:取值为-1(伪客户端,处理的命令请求来自AOF文件或者LUA脚本)或大于-1(普通客户端);
client list 可以列出当前服务器的客户端及其所使用的描述符等;
·name:默认为空,可以通过client setname命令设置;
·flags标志:记录客户端的角色及其状态,即可以是单个标志,也可以是多个标志的或:
* Client flags */
#define REDIS_SLAVE (1<<0) /* This client is a slave server */
#define REDIS_MASTER (1<<1) /* This client is a master server */
#define REDIS_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */
#define REDIS_MULTI (1<<3) /* This client is in a MULTI context */
#define REDIS_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
#define REDIS_DIRTY_CAS (1<<5) /* Watched keys modified. EXEC will fail. */
#define REDIS_CLOSE_AFTER_REPLY (1<<6) /* Close after writing entire reply. */
#define REDIS_UNBLOCKED (1<<7) /* This client was unblocked and is stored in
server.unblocked_clients */
#define REDIS_LUA_CLIENT (1<<8) /* This is a non connected client used by Lua */
#define REDIS_ASKING (1<<9) /* Client issued the ASKING command */
#define REDIS_CLOSE_ASAP (1<<10)/* Close this client ASAP */
#define REDIS_UNIX_SOCKET (1<<11) /* Client connected via Unix domain socket */
#define REDIS_DIRTY_EXEC (1<<12) /* EXEC will fail for errors while queueing */
#define REDIS_MASTER_FORCE_REPLY (1<<13) /* Queue replies even if is master */
#define REDIS_FORCE_AOF (1<<14) /* Force AOF propagation of current cmd. */
#define REDIS_FORCE_REPL (1<<15) /* Force replication of current cmd. */
#define REDIS_PRE_PSYNC (1<<16) /* Instance don't understand PSYNC. */
#define REDIS_READONLY (1<<17) /* Cluster client is in read-only state. */
·argv、argc命令与命令参数:即是客户端发送给服务器的命令,实例如下:
创建与关闭
服务器启动会创建负责执行Lua脚本的伪客户端(服务器生命周期内一直存在)、执行AOF中redis命令的伪客户端(载入完毕之后关闭)。 除此之外,当使用网络连接到服务器时,会创建普通客户端,普通客户端可能由于发送不合协议格式的命令而被关闭。服务器
命令请求执行过程
若客户端执行以下命令:set key value,那么从输入该命令到客户端回复ok,需要进行以下操作:/*
* Redis 命令
*/
struct redisCommand {
// 命令名字
char *name;
// 实现函数
redisCommandProc *proc;
// 参数个数
int arity;
// 字符串表示的 FLAG
char *sflags; /* Flags as string representation, one char per flag. */
// 实际 FLAG
int flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
// 从命令中判断命令的键参数。在 Redis 集群转向时使用。
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
// 指定哪些参数是 key
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
// 统计信息
// microseconds 记录了命令执行耗费的总毫微秒数
// calls 是命令被执行的总次数
long long microseconds, calls;
};
注:redis命令不区分大小写;·命令回复发送给客户端:命令实现函数会将命令回复保存到客户端的输出缓冲区里面,并为客户端的套接字关联命令回复处理器,当客户端套接字变为可写状态时,服务器就会执行命令回复处理器,将保存在客户端输出缓冲区中的命令回复发送给客户端。 当命令回复发送完毕之后,回复处理器会清空客户端状态的输出缓冲区为处理下一个命令做准备。 ·客户端接收并打印:如下
serverCron函数
源码地址redis.c/serverCron,1300+行左右位置。 该函数默认100毫秒(可配置)执行一次,负责管理服务器资源,并保持服务器自身的良好运转。 建议结合源码及下图理解其功能:五、多机数据库
这个部分主要介绍跟多级数据库有关的复制、集群、sentinel。复制功能
源码参见:replication.c在Redis中可以使用SLAVEOF或者设置slaveof选项,使得一个服务器去复制另一个服务器。这种情形就是我们熟知的主从结构,一般用作读写分离,主服务器是负责读/写,从服务器负责读见下图。
新旧版本的复制对比
在Redis2.8版本之后的复制功能效率更高,安全系数更好,新旧版本复制功能对比如下(请点击或下载查看大图):复制的全过程
·设置主服务器的地址端口 ·建立套接字连接 ·发送PING命令(检查主服务器状态)哨兵Sentinel
源码参见:sentinel.c介绍
sentinal,哨兵是Redis多机架构中高可用性非常重要的一个组件,主要功能如下: (1)集群监控,负责监控redis master和slave进程是否正常工作(心跳检测); (2)消息通知,如果某个redis实例有故障,那么哨兵负责发送消息作为报警通知给管理员; (3)故障转移,如果master node挂掉了,会自动转移到slave node上; (4)配置中心,如果故障转移发生了,通知client客户端新的master地址; 哨兵本身也是分布式的,作为一个哨兵集群去运行,互相协同工作 (1)故障转移时,判断一个master node是宕机了,需要大部分(超过一半)的哨兵都同意才行,涉及到了分布式选举的问题; (2)即使部分哨兵节点挂掉了,哨兵集群还是能正常工作的,因为如果一个作为高可用机制重要组成部分的故障转移系统本身是单点的,那就很不稳定了; 下面几幅图展示sentinel系统是如何实现监视Redis服务器的: 初始状态如下:初始化Sentinel
主要有以下四个步骤: ·初始化服务器:sentinel本身就是一个运行在特殊模式下的Redis服务器! sentinel和普通Redis服务器的区别如下:获取主从服务器信息
在建立连接之后,sentinel会默认10s通过INFO命令获取主/从服务器的返回消息。 注:sentinel一开始连接的是主服务器,当发现其有新的从服务器时,就会创建它的实例(flags为SRI_MASTER)并建立连接并订阅。 ·获取主服务器信息: 一方面是关于主服务器本身的信息,包括run_id域记录的服务器运行ID,以及role域记录的服务器角色(主/从); 另一方面是关于主服务器属下所有从服务器的信息,每个从服务器都由一个”slave”字符串开头的行记录,每行的ip=域记录了从服务器的IP地址,而port=域则记录了从服务器的端口号。根据这些IP地址和端口号,Sentinel无须用户提供从服务器的地址信息,就可以自动发现从服务器。 sentinel会通过这些信息更新主服务器的实例结构。 ·从服务器信息: 从服务器的运行ID run_id。 从服务器的角色role。 主服务器的IP地址master_host,以及主服务器的端口号master_port。主从服务器的连接状态master_link_status。 从服务器的优先级s1ave_priority(跟后续选举有关)。 从服务器的复制偏移量slave repl offset. sentinel会通过这些信息更新主服务器的实例结构。跟主从服务器交互
sentinel会默认两秒一次通过命令连接向监视的主从服务器发送命令: PUBLTSHsentinel__:hello”检查下线
分为检测主观/客观下线状态。 ·主观下线:默认每秒一次向所有实例发送PING,根据服务器返回的回复/或者断线时长来判断是否掉线。 注:多个sentinel的down-after-milliseconds可能不同,只有设置最长间隔的sentinel判断为下线,那才是真正的下线。 ·客观下线:这正是上一点提到的:不同的sentinel的设置时长或是判断不一致,因此当某一个sentinel认为某一个服务器下线以后(主观下线),他会不停地询问“该服务器是否已下线”,当接收到足够数量(可配置)的下线判断后,才会将服务器认为真正的客观下线,并对从服务器执行客观转移。 注:客观下线是在主观下线的基础上进行的。故障转移
故障转移主要分为两个步骤: ·选举领头sentinel,当主服务器被判断为客观下线之后,监视这个下线服务器的各个sentinel会进行协商(广播给其他sentinel),选举一个领头sentinel,由它来进行故障转移,这个过程主要有以下规则: (仔细领会)一个配置纪元内最多只能有一个领头sentinel、每个sentinel都有资格、每次进行选举不管结果如何配置纪元(configuration epoch)加一(没有啥用)、同一个配置纪元内所有sentinel都有且仅有一次将某sentinel设为局部领头的机会、发现主服务器客观下线的sentinel都会要求同伴将它设为局部领头、要求设为局部领头是先到先得、被大于一半的的sentinel设为局部领头那就是全局领头、给定时限内没选出来那就重来一次。 ·故障转移:选出领头的sentienl之后,由领头的进行故障转移,主要包括三个步骤,见下图:集群
源码参加cluster.c。 集群其实就是运行多个Redis实例,跟分布式不同的是,集群相当于增加硬件资源来使得整个系统性能更好(利用负载均衡),而分布式则是各个部分之间分门别类,各司其职,IPC通信。 Redis启动时都是一个个单机,当使用CLUSTER MEET命令之后,对应的Redis实例就会握手,从而成为一个集群。指派哈希槽
Redis集群的数据分配是以一种叫做hash槽的方式分派的,具有极大的灵活性,可以自由配置。Redis集群通过分片的方式来保存数据库中的键值对:集群的整个数据库被分为16384个槽( slot ),数据库中的每个键都属于这16384个槽的其中一个,集群中的每个节点可以处理0个或最多16384个槽。 当16384个槽都被分配出去的时候,整个集群才是上线状态,否则处于下线(fail)状态。 可以通过CLUSTER ADDSLOTS来进行槽的分配。 每个cluster的槽信息是由clusternode中的slots信息来记录,其实就是一个数组:集群中如何执行命令
客户端执行命令的关键是计算当前命令应该对应哪个集群节点,见下:def slot__number (key): return CRC1 6 (key)&16383
·判断该槽在哪个节点 通过clusterstate.slots数组来判断是否是当前节点自身,不是则指明客户端重定向MOVDE错误。重新分片
Redis集群可以在线重分片,这是由其集群管理软件redis-trib负责执行的,重分配过程见下:故障转移
集群中的节点分为主节点和从节点,主节点处理槽,从节点复制主节点并在主节点宕机下线后替代该主节点。如以下的这个集群,7004和7005是两个从节点,复制节点7000的数据状态。六.应用场景
前面的内容主要关注于Redis的底层数据结构及运行逻辑,接下来的这个部分是笔者从网上收集、整理的Redis应用资料。Redis语法
建议直接参考以下链接:
https://www.runoob.com/redis/redis-commands.html
业务场景
https://segmentfault.com/a/1190000016188385
https://www.pdai.tech/md/db/nosql-redis/db-redis-x-cache.html
Redis6.0如何?
https://www.cnblogs.com/leijisong/p/14957013.html
Redis不是已经采用了多路复用技术吗?不是号称很高的性能了吗?为啥还要采用多线程模型呢?
Redis6.0中的多线程,也只是针对网络请求过程采用了多线程,而数据的读写命令仍然是单线程处理的。 为什么呢?不是多路复用技术已经大大提高IO利用率了吗? 主要对Redis有着更高的要求。根据测算,Redis 将所有数据放在内存中,内存的响应时长大约为 100 纳秒,对于小数据包,Redis 服务器可以处理 80,000 到 100,000 QPS,这么高的对于 80% 的公司来说,单线程的 Redis 已经足够使用了。 但随着越来越复杂的业务场景,有些公司动不动就上亿的交易量,因此需要更大的 QPS。为了提升QPS,很多公司的做法是部署Redis集群,并且尽可能提升Redis机器数。但是这种做法的资源消耗是巨大的。 而经过分析,限制Redis的性能的主要瓶颈出现在网络IO的处理上,虽然之前采用了多路复用技术。但是我们前面也提到过,多路复用的IO模型本质上仍然是同步阻塞型IO模型。下面是多路复用IO中select函数的处理过程:那么,在引入多线程之后,如何解决并发带来的线程安全问题呢?
这就是为什么我们前面多次提到的”Redis 6.0的多线程只用来处理网络请求,而数据的读写还是单线程”的原因。 Redis 6.0 只有在网络请求的接收和解析,以及请求后的数据通过网络返回给时,使用了多线程。而数据读写操作还是由单线程来完成的,所以,这样就不会出现并发问题了。