1.存储原理

sorted set,有序的 set,每个元素有个 scorescore 相同时,按照 keyASCII 码排序。

数据结构 是否允许重复元素 是否有序 有序实现方式
列表list 索引下标
集合set
有序集合zset 分值score

同时满足以下条件时使用 ziplist 编码:

  1. 元素数量小于128个

  2. 所有成员的长度都小于64字节

在 ziplist 的内部,按照 score 排序递增来存储。插入的时候要移动之后的数据。(时间换空间的思想)

  1. # 与哈希和列表类似,排序后的集合也经过特殊编码,以节省大量空间。此编码仅在排序集的长度和元素低于以下限制时使用:
  2. zset-max-ziplist-entries 128
  3. zset-max-ziplist-value 64

超过阈值之后,使用 skiplist+dict 存储。

当 ziplist 作为 zset 的底层存储结构时,每个集合元素使用两个紧挨在一起的压缩列表节点来保存,第一个节点保存元素值,第二个元素保存元素的分值,而且分值小的靠近表头,大的靠近表尾。

zskiplist底层实现是跳跃表+字典。每个跳跃表节点都保存一个集合元素,并按分值从小到大排列,每个节点的 ele 属性保存了元素的值,score属性保存分值;字典的每个键值对保存一个集合元素,元素值包装为字典的键,元素分值保存为字典的值。注意集合的元素成员和分值是共享的,跳跃表和字典通过指针指向同一地址,不会浪费内存。
Redis整体结构图.jpg

2.数据结构

2.1 跳表结构定义

zset的数据结构定义在server.h文件中。

  1. typedef struct zset {
  2. //字典,键为成员,值为分值,可用于支持 O(1) 复杂度的按元素取分值操作
  3. dict *dict;
  4. //zsl: 跳跃表,按分值排序元素,用于支持平均复杂度为 O(log N) 的按分值定位元素的操作以及范围操作
  5. zskiplist *zsl;
  6. } zset;

字典前面我们已经说过了,再来看下zskiplist结构。

  1. typedef struct zskiplist {
  2. struct zskiplistNode *header, *tail; /* 指向跳跃表的头结点和尾节点 */
  3. /* 链表包含的节点总数。新创建的 skiplist 包含一个空的头指针,这个头指针不包含在 length 计数中 */
  4. unsigned long length;
  5. int level; /* 最大的层数 */
  6. } zskiplist;

接着我们再来看一下组成了zskiplist的节点定义。

  1. typedef struct zskiplistNode {
  2. sds ele; /* zset 的元素 */
  3. double score; /* 分值 */
  4. struct zskiplistNode *backward; /* 后退指针 */
  5. struct zskiplistLevel {
  6. struct zskiplistNode *forward; /* 前进指针,对应 level 的下一个节点 */
  7. unsigned long span; /* 从当前节点到下一个节点的跨度(跨越的节点数) */
  8. } level[]; /* 层 */
  9. } zskiplistNode;

2.2跳表的构建

t_zset.c文件中的zslInsert() 是跳跃表中非常关键的函数,这里体现了跳跃表的构建过程及其查找策略。可以看到 skiplist 为每个节点随机出一个层数(level),比如一个节点随机出的层数是3,那么就把它链入到从第1层到第3层这三层链表中。

  1. 首先通过分数 score 在跳跃表中查找到新增的节点应该插入的位置上的节点
  2. 调用 zslRandomLevel() 函数随机生成节点层数,使用该层数新建一个跳跃表节点
  3. 更新新增的节点应插入位置上的节点前后指针,将新增节点插入到跳跃表中
  1. /* 在skiplist中插入一个新节点。假设元素不存在(由调用者强制执行)。skiplist获取传递的SDS字符串“ele”的所有权。 */
  2. zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
  3. zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  4. unsigned int rank[ZSKIPLIST_MAXLEVEL];
  5. int i, level;
  6. serverAssert(!isnan(score));
  7. //通过分数 score 在跳跃表中查找到新增的节点应该插入的位置上的节点
  8. x = zsl->header;
  9. for (i = zsl->level-1; i >= 0; i--) {
  10. /* store rank that is crossed to reach the insert position */
  11. rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
  12. while (x->level[i].forward &&
  13. (x->level[i].forward->score < score ||
  14. (x->level[i].forward->score == score &&
  15. sdscmp(x->level[i].forward->ele,ele) < 0)))
  16. {
  17. rank[i] += x->level[i].span;
  18. x = x->level[i].forward;
  19. }
  20. update[i] = x;
  21. }
  22. //随机生成节点层数
  23. level = zslRandomLevel();
  24. if (level > zsl->level) {
  25. for (i = zsl->level; i < level; i++) {
  26. rank[i] = 0;
  27. update[i] = zsl->header;
  28. update[i]->level[i].span = zsl->length;
  29. }
  30. zsl->level = level;
  31. }
  32. //使用生成的层数新建一个跳跃表节点
  33. x = zslCreateNode(level,score,ele);
  34. //更新新增的节点应插入位置上的节点前后指针,将新增节点插入到跳跃表中
  35. for (i = 0; i < level; i++) {
  36. x->level[i].forward = update[i]->level[i].forward;
  37. update[i]->level[i].forward = x;
  38. /* update span covered by update[i] as x is inserted here */
  39. x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
  40. update[i]->level[i].span = (rank[0] - rank[i]) + 1;
  41. }
  42. /* increment span for untouched levels */
  43. for (i = level; i < zsl->level; i++) {
  44. update[i]->level[i].span++;
  45. }
  46. x->backward = (update[0] == zsl->header) ? NULL : update[0];
  47. if (x->level[0].forward)
  48. x->level[0].forward->backward = x;
  49. else
  50. zsl->tail = x;
  51. zsl->length++;
  52. return x;
  53. }

我们来画一张图:
image.png

2.3跳表的查找

有序链表
image.png
在这样一个链表中,如果我们要查找某个数据,那么需要从头开始逐个进行比较,直到找到包含数据的那个节点,或者找到第一个比给定数据大的节点为止(没找到)。也就是说,时间复杂度为 O(n)。同样,当我们要插入新数据的时候,也要经历同样的查找过程,从而确定插入位置。而二分查找法只适用于有序数组,不适用于链表。假如我们每相邻两个节点增加一个指针(或者理解为有三个元素进入了第二层),让指针指向下下个节点。
image.png
这样所有新增加的指针连成了一个新的链表,但它包含的节点个数只有原来的一半(上图中是 7, 19, 26)。在插入一个数据的时候,决定要放到那一层,取决于一个算法(在 redis 中 t_zset.c 有一个 zslRandomLevel 这个方法)。现在当我们想查找数据的时候,可以先沿着这个新链表进行查找。当碰到比待查数据大的节点时,再回到原来的链表中的下一层进行查找。比如,我们想查找 23,查找的路径是沿着下图中标红的指针所指向的方向进行的:

image.png

  1. 23 首先和 7 比较,再和 19 比较,比它们都大,继续向后比较。
  2. 但 23 和 26 比较的时候,比 26 要小,因此回到下面的链表(原链表),与 22比较。
  3. 23 比 22 要大,沿下面的指针继续向后和 26 比较。23 比 26 小,说明待查数据 23 在原链表中不存在。在这个查找过程中,由于新增加的指针,我们不再需要与链表中每个节点逐个进行比较了。需要比较的节点数大概只有原来的一半。这就是跳跃表。

为什么不用 AVL 树或者红黑树?因为 skiplist 更加简洁。

3.存储过程

zset的操作命令的处理函数在t_zset.c 文件中,向zset添加元素调用的函数为t_zset.c文件的zaddCommand()。这个函数其实主要调用了t_zset.c文件的zaddGenericCommand()。

void zaddCommand(client *c) {
    zaddGenericCommand(c,ZADD_NONE);
}

这个方法很长,但是核心逻辑其实并不复杂,很多都不是我们需要关心的内容,我们来大概梳理一下:

  1. 解析参数得到每个元素及其对应的分值
  2. 查找key对应的 ZSet 集合对象是否存在,不存在则创建
    1. 根据zset_max_ziplist_entries & zset_max_ziplist_value 这两个参数决定是创建压缩列表还是跳表
    2. 加入到redis数据库
  3. 循环调用 zsetAdd()函数将元素添加到有序集合中 ```java void zaddGenericCommand(client *c, int flags) { …

    / 参数校验 / ….

    / After the options, we expect to have an even number of args, since we expect any number of score-element pairs. / elements = c->argc-scoreidx; if (elements % 2 || !elements) {

     addReplyErrorObject(c,shared.syntaxerr);
     return;
    

    } elements /= 2; / Now this holds the number of score-element pairs. /

    //省略一些校验信息 … / 解析参数得到每个元素及其对应的分值 / scores = zmalloc(sizeof(double)*elements); for (j = 0; j < elements; j++) {

     if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
         != C_OK) goto cleanup;
    

    }

    /查找key对应的 ZSet 集合对象是否存在,不存在则创建 / zobj = lookupKeyWrite(c->db,key); if (checkType(c,zobj,OBJ_ZSET)) goto cleanup; if (zobj == NULL) {

     if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
     /*根据zset_max_ziplist_entries & zset_max_ziplist_value 这两个参数决定是创建压缩列表还是跳表。 */
     if (server.zset_max_ziplist_entries == 0 ||
         server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
     {
         zobj = createZsetObject();
     } else {
         zobj = createZsetZiplistObject();
     }
     //加入到redis数据库。
     dbAdd(c->db,key,zobj);
    

    } //循环调用 zsetAdd()函数将元素添加到有序集合中 for (j = 0; j < elements; j++) {

     double newscore;
     score = scores[j];
     int retflags = flags;
    
     ele = c->argv[scoreidx+1+j*2]->ptr;
     int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
     if (retval == 0) {
         addReplyError(c,nanerr);
         goto cleanup;
     }
     if (retflags & ZADD_ADDED) added++;
     if (retflags & ZADD_UPDATED) updated++;
     if (!(retflags & ZADD_NOP)) processed++;
     score = newscore;
    

    } server.dirty += (added+updated);

reply_to_client: …

cleanup: … }

<a name="LZxUz"></a>
### 3.1 zset对象的创建

我们在看以下两种zset对象的创建:
```c
robj *createZsetObject(void) {
    //分配内存
    zset *zs = zmalloc(sizeof(*zs));
    robj *o;
    /*创建一个字典对象*/
    zs->dict = dictCreate(&zsetDictType,NULL);
    /*创建一个skiplist对象*/
    zs->zsl = zslCreate();
    /*创建一个redisobject对象*/
    o = createObject(OBJ_ZSET,zs);
    /*设置redisobject对象的编码为跳表*/
    o->encoding = OBJ_ENCODING_SKIPLIST;
    return o;
}
robj *createZsetZiplistObject(void) {
    /*创建一个空的ziplist对象*/
    unsigned char *zl = ziplistNew();
    /*创建一个redisobject对象*/
    robj *o = createObject(OBJ_ZSET,zl);
    /*设置redisobject对象的编码为压缩列表*/
    o->encoding = OBJ_ENCODING_ZIPLIST;
    return o;
}

3.2 添加元素到集合

方法有一点长,但是逻辑不难理解,我们来梳理一下:先判断是什么编码

  1. 此时是压缩列表编码
    1. 如果要添加的元素已经存在?
      1. 一模一样,直接返回
      2. 看看是不是要增加分数
      3. 如果元素 score 改变了,则先删除 ziplist 中的元素然后重新添加
    2. 此事说明集合中原本不存在要添加的元素
      1. 这里的添加是根据 score 分数有序插入的,分数小的在 ziplist 头部,分数大的在尾部,元素和分数分两次操作插入到 ziplist 中,并且紧挨在一起
      2. 添加完成后,判断元素的长度是否超出限制或 ziplist 存储的元素个数是否超过最大限制?
      3. 超出限制则将底层结构转为 skiplist
  2. 此时是跳表编码,在跳跃表中元素是按照分数有序存储的,这样范围查找的时间复杂度为 O(logN);使用字典则能达到单个元素查找O(1)的时间复杂度,非常高效

    1. 如果要添加的元素已经存在?
      1. 一模一样,直接返回
      2. 看看是不是要增加分数
      3. 已经存在且分数变了,在跳跃表中查找到目标节点,将其删除并重新插入一个节点,完成后返回新的跳跃表节点用于更新字典中元素的分数
    2. 此时说明不存在

      1. 将节点添加到跳表里面
      2. 新增一个节点到字典

        int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
        ....
        /* 此时是压缩列表编码 */
        if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *eptr;
        // [zzlFind] 判断要添加的元素是否已经存在
        if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
           /* 完全一样直接返回 */
           if (nx) {
               *flags |= ZADD_NOP;
               return 1;
           }
           /* 看看是不是要增加分数 */
           if (incr) {
               score += curscore;
               if (isnan(score)) {
                   *flags |= ZADD_NAN;
                   return 0;
               }
               if (newscore) *newscore = score;
           }
           /* 如果元素 score 改变了,则先删除 ziplist 中的元素然后重新添加 */
           if (score != curscore &&  
               /* LT? Only update if score is less than current. */
               (!lt || score < curscore) &&
               /* GT? Only update if score is greater than current. */
               (!gt || score > curscore)) 
           {   //删除
               zobj->ptr = zzlDelete(zobj->ptr,eptr);
               //重新添加
               zobj->ptr = zzlInsert(zobj->ptr,ele,score);
               *flags |= ZADD_UPDATED;
           }
           return 1;
        } else if (!xx) { //此事说明集合中原本不存在要添加的元素
           //这里的添加是根据 score 分数有序插入的,分数小的在 ziplist 头部,分数大的在尾部,
           // 元素和分数分两次操作插入到 ziplist 中,并且紧挨在一起
           zobj->ptr = zzlInsert(zobj->ptr,ele,score);
           //元素的长度是否超出限制或 ziplist 存储的元素个数是否超过最大限制?
           if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
               sdslen(ele) > server.zset_max_ziplist_value)
               //超出限制则将底层结构转为 skiplist
               zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
           if (newscore) *newscore = score;
           *flags |= ZADD_ADDED;
           return 1;
        } else {
           *flags |= ZADD_NOP;
           return 1;
        }
        //此时是跳表编码,
        // 在跳跃表中元素是按照分数有序存储的,这样范围查找的时间复杂度为 O(logN);
        // 使用字典则能达到单个元素查找O(1)的时间复杂度,非常高效
        } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplistNode *znode;
        dictEntry *de;
        
        de = dictFind(zs->dict,ele);
        if (de != NULL) {
           /* 存在并一模一样,直接返回 */
           if (nx) {
               *flags |= ZADD_NOP;
               return 1;
           }
           curscore = *(double*)dictGetVal(de);
        
           /* 分数是否需要自增 */
           if (incr) {
               score += curscore;
               if (isnan(score)) {
                   *flags |= ZADD_NAN;
                   return 0;
               }
               if (newscore) *newscore = score;
           }
        
           /* 已经存在且分数变了,删掉重新插入 */
           if (score != curscore &&  
               /* LT? Only update if score is less than current. */
               (!lt || score < curscore) &&
               /* GT? Only update if score is greater than current. */
               (!gt || score > curscore)) 
           {//在跳跃表中查找到目标节点,将其删除并重新插入一个节点,完成后返回新的跳跃表节点用于更新字典中元素的分数
               znode = zslUpdateScore(zs->zsl,curscore,ele,score);
               /* Note that we did not removed the original element from
                * the hash table representing the sorted set, so we just
                * update the score. */
               dictGetVal(de) = &znode->score; /* Update score ptr. */
               *flags |= ZADD_UPDATED;
           }
           return 1;
        } else if (!xx) {//不存在
           ele = sdsdup(ele);
           //将节点添加到跳表里面
           znode = zslInsert(zs->zsl,score,ele);
           //新增一个节点到字典
           serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
           *flags |= ZADD_ADDED;
           if (newscore) *newscore = score;
           return 1;
        } else {
           *flags |= ZADD_NOP;
           return 1;
        }
        } else {
        serverPanic("Unknown sorted set encoding");
        }
        return 0; /* Never reached. */
        }
        

        zset添加流程.jpg

        4.应用场景

①排行榜

id 为 6001 的新闻点击数加 1:

zincrby hotNews:20190926 1 n6001

获取今天点击最多的 15 条:

zrevrange hotNews:20190926 0 15 withscores