1. redis数据结构

image.png

1. 字典Dict

Dict概述

Redis是一个键值型(key-value)数据库,我们可以根据键值对进行增删改查,键值对的映射关系就是通过Dict结构来实现的,Dict分为3部分:字典(dict)、哈希表(dictht)、哈希结点(dictEntry)

  1. typedef struct dict {
  2. dictType *type; // dict类型,内置不同的hash函数
  3. void *privdata; // 也是用于hash计算的私有数据
  4. dictht ht[2]; // 定义一个哈希表数组,容量为2,一个正常使用,一个rehash时使用
  5. long rehashidx; // 记录rehash的进度,-1表示未进行rehash
  6. int16_t pauserehash;// 1表示rehash暂停,0表示rehash在运行
  7. } dict;
  1. typedef struct dictht {
  2. // 定义一个二级指针指向dictEntry结点,就相当于java中的dictEntry[] table
  3. dictEntry **table; // 一个*表示指针指向dictEntry,第二个*表示这个dictEntry是个数组
  4. unsigned long size; // 哈希表数组长度
  5. unsigned long sizemask; // 掩码值 sizemask = size - 1, 用于取余计算
  6. unsigned long used; // 哈希表中已经插入的结点的个数
  7. } dictht;
  1. typedef struct dictEntry {
  2. // void *代表无类型指针,可以指向任何数据
  3. void *key; // 键
  4. union {
  5. void *val;
  6. unit64_t u64;
  7. int64_t s64;
  8. double d;
  9. } v; // 值,类型为联合体中的一个
  10. struct dictEntry *next; // 指向洗衣歌结点
  11. } dictEntry;

Dict的数据结构与HashMap基本一致
image.png

Dict的添加过程

  1. 当向Dict添加键值对时,Redis首先根据key计算出hash值,然后对数组长度取余获取哈希表索引
  2. 将键值对插入到对应索引的位置

    Dict的扩容过程

    与HashMap相同,哈希冲突导致链表过长会使得查询效率大大降低,因此要对数组扩容,以减小链表的长度
    在两种情况下会触发redis哈希表的扩容:负载因子(LoadFactor = used/size)

  3. 负载因子 >= 1, 并且服务器没有进行BGSAVE等进程;

  4. 负载因子 >= 5;

    1. static int _dictExpandIfNeeded(dict *d)
    2. {
    3. /* 如果正在rehashing, 暂停扩容 */
    4. if (dictIsRehashing(d)) return DICT_OK;
    5. /* 哈希表为空,表示首次创建哈希表,初始数组长度为4 */
    6. if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
    7. // d->ht[0].used >= d->ht[0].size: 负载因子 >= 1
    8. if (d->ht[0].used >= d->ht[0].size &&
    9. (dict_can_resize || // 检查BGSAVE等进程
    10. d->ht[0].used/d->ht[0].size > dict_force_resize_ratio) && // 负载因子 >= 5
    11. dictTypeExpandAllowed(d)){
    12. // 扩容为used+1,实际上会扩容为大于used+1的第一次2^n
    13. return dictExpand(d, d->ht[0].used + 1);
    14. }
    15. return DICT_OK;
    16. }

    Dict的收缩过程

    每次删除元素时,如果负载因子 < 0.1, 哈希表会收缩 ```c if (dictDelete((dict*)o->ptr, field) == C_OK) { deleted = 1;

    / 删除完元素后检查负载因子,小于0.1,收缩哈希表 / if (htNeedsResize(o->ptr)) dictResize(o->ptr); }

int htNeedsResize(dict *dict) { long long size, used;

  1. size = dictSlots(dict); // 哈希表容量
  2. used = dictSize(dict); // 哈希表已使用量
  3. // 如果size > 4并且used/size < 0.1,收缩哈希表
  4. return (size > DICT_HT_INITIAL_SIZE &&
  5. (used*100/size < HASHTABLE_MIN_FILL));

}

int dictResize(dict *d) { unsigned long minimal; // rehash或者后台进程在bgsave,返回错误 if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR; // 获取used minimal = d->ht[0].used; if (minimal < DICT_HT_INITIAL_SIZE) // used < 4, 则收缩至4 minimal = DICT_HT_INITIAL_SIZE; // 收缩至第一个大于等于used的2^n return dictExpand(d, minimal); }

  1. <a name="ZtYK1"></a>
  2. #### Dict的rehash过程
  3. 扩容还是搜索都要重新计算下表,即rehash过程
  4. 1. 计算哈希表新的size和sizemask
  5. 1. 根据size申请内存空间,创建dictht,并赋值给dict.ht[1]
  6. 1. 标记dict.rehashidx=1,表示开始rehash
  7. 1. 将dict.h[0]中的元素rehash到dict.h[1]中
  8. 1. 将h[1]赋值给h[0],将h[1]初始化为空哈希表,并释放原来的h[0]内存
  9. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655537227057-8176bb95-3141-439c-a48e-696e986c8d7a.png#clientId=ua193e0c1-9e27-4&crop=0&crop=0&crop=1&crop=1&from=paste&id=ua0cac871&margin=%5Bobject%20Object%5D&name=image.png&originHeight=343&originWidth=656&originalType=binary&ratio=1&rotation=0&showTitle=false&size=41995&status=done&style=none&taskId=uf3ad2cf1-bbfc-484c-80dd-2c4f1c880be&title=)![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655537237306-20bd2348-0787-4db9-8e92-faf9ca6929fd.png#clientId=ua193e0c1-9e27-4&crop=0&crop=0&crop=1&crop=1&from=paste&id=ub5ddfa96&margin=%5Bobject%20Object%5D&name=image.png&originHeight=361&originWidth=682&originalType=binary&ratio=1&rotation=0&showTitle=false&size=43850&status=done&style=none&taskId=uc5aaee09-2abe-4b9a-a6a4-5ab5c2a9d63&title=)<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655537251883-ba9af272-637d-4ba6-9e43-5877bda25153.png#clientId=ua193e0c1-9e27-4&crop=0&crop=0&crop=1&crop=1&from=paste&id=uaf689be2&margin=%5Bobject%20Object%5D&name=image.png&originHeight=359&originWidth=669&originalType=binary&ratio=1&rotation=0&showTitle=false&size=45205&status=done&style=none&taskId=ua621701c-f80b-45f0-ad55-13ac9a7e6bf&title=)<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655537264134-7d07df43-1986-400e-8737-b306860562d8.png#clientId=ua193e0c1-9e27-4&crop=0&crop=0&crop=1&crop=1&from=paste&id=ufeb44065&margin=%5Bobject%20Object%5D&name=image.png&originHeight=362&originWidth=655&originalType=binary&ratio=1&rotation=0&showTitle=false&size=43249&status=done&style=none&taskId=u11f92ec1-d824-4d6f-9707-db9e781e376&title=)<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655537272276-abfd785e-b90c-4fd8-a13a-315f200942e8.png#clientId=ua193e0c1-9e27-4&crop=0&crop=0&crop=1&crop=1&from=paste&id=u6af099ff&margin=%5Bobject%20Object%5D&name=image.png&originHeight=357&originWidth=658&originalType=binary&ratio=1&rotation=0&showTitle=false&size=44695&status=done&style=none&taskId=u8e762fe9-7d67-470c-907a-b53b4e86bd8&title=)<br />渐进式rehash
  10. - 由于rehash需要将h[0]中的全部元素转移至h[1]中, 如果哈希表中存有很多数据,这样是十分耗时的
  11. - redis在插执行增删改查操作时,会检查dict的rehashidx属性,如果它大于-1,则表明正在rehash,那么就将h[0]中rehashidx对应的链表转移至h[1]上,并将rehashidx++,知道所有的数据都转移至h[1]
  12. - rehashidx == -1时,代表rehash结束
  13. - 在rehash过程中,新增操作直接添加至h[1]中,查询手改删除则要查询两个哈希表并执行
  14. PS: redis中哈希表中采用的是头插法,由于redis是单线程的,所以不用担心HashMap中的并发死链问题
  15. <a name="QscCr"></a>
  16. ### 2. 动态字符串 SDS
  17. <a name="fZuWl"></a>
  18. #### SDS概述
  19. ** 为什么redis要自定义字符串?**<br />因为c语言中的字符串存在很多问题:
  20. 1. c语言字符串底层使用给一个char[]数组,以‘\0’为结束符,获取字符串长度需要手动计算
  21. 1. 如果定义的字符串出现‘\0’字符将会出现错误,所以c语言字符串非二进制安全
  22. 1. 由于底层为char型数组,所以长度固定不能够被修改
  23. ```c
  24. //
  25. struct __attribute__ ((__packed__)) sdshdr8 {
  26. uint8_t len; // buf中字符串字节数
  27. uint8_t alloc; // buf申请的字节数
  28. unsigned char flags; // SDS的类型,用来控制SDS的头大小
  29. char buf[];
  30. }

一个包含字符串“name”的sds结构如下:
image.png
PS: attribute ((packed)): 紧凑的内存分配方式

  1. #include <stdio.h>
  2. struct test1 {
  3. char a;
  4. int b;
  5. } test1;
  6. struct __attribute__((packed)) test2 {
  7. char a;
  8. int b;
  9. } test2;

image.pngimage.png

SDS的优点

  1. 获取字符串长度时间复杂度为O(1)
  2. 支持动态扩容
  3. 扩容是内存预分配,可以减少内存分配次数
  4. SDS是二进制安全的

    SDS扩容

  • 如果新字符串小于1M,新扩容的字符串长度为原始的两倍 + 1
  • 如果新字符串大于1M,扩容后的长度为原来的长度 + 1M + 1

    3. 整数集合 IntSet

    Intset概述

    整数集合是Set的一种实现方式,基于整数数组实现,长度可变、有序

    1. typedef struct intset {
    2. uint32_t encoding; // 编码方式,支持存放16位、32位、64位整数
    3. uint32_t length; // 集合的长度
    4. int8_t contents[]; // 整数数组
    5. }

    为了方便查找,Redis会将intset中所有的整数按照升序依次保存在contents数组中,结构如图:
    image.png

    IntSet升级

    当一个新元素插入到Inset中,如果他的类型(int32_t)比原来集合的类型(int16_t)长,需要对整数集合进行升级,就是将所有元素的类型都升级为int32_t,并且要扩展contents数组的长度
    举个例子,假设有一个整数集合里有 3 个类型为 int16_t 的元素。
    image.png
    现在,往这个整数集合中加入一个新元素 65535,这个新元素需要用 int32_t 类型来保存,所以整数集合要进行升级操作,首先需要为 contents 数组扩容,在原本空间的大小之上再扩容多 80 位(4x32-3x16=80),这样就能保存下 4 个类型为 int32_t 的元素
    image.png
    扩容完 contents 数组空间大小后,需要将之前的三个元素转换为 int32_t 类型,并将转换后的元素放置到正确的位上面,并且需要维持底层数组的有序性不变,整个转换过程如下:
    image.png

    IntSet源码

    1. /* 往集合中添加一个整数 */
    2. intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
    3. // 获取添加的整数的编码
    4. uint8_t valenc = _intsetValueEncoding(value);
    5. // 插入的位置
    6. uint32_t pos;
    7. if (success) *success = 1;
    8. // 判断插入数值的编码是否超过了集合的编码
    9. if (valenc > intrev32ifbe(is->encoding)) {
    10. // 超过集合编码,进行升级
    11. return intsetUpgradeAndAdd(is,value);
    12. } else {
    13. // 从集合中寻找是否已经存在了value
    14. if (intsetSearch(is,value,&pos)) {
    15. if (success) *success = 0;
    16. // 存在value,无须插入,直接返回
    17. return is;
    18. }
    19. // 数组扩容
    20. is = intsetResize(is,intrev32ifbe(is->length)+1);
    21. // 将pos之后的元素移动至pos+1,腾出pos空间,就是整体往后一位
    22. if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    23. }
    24. // 插入新的元素
    25. _intsetSet(is,pos,value);
    26. // 重置元素长度
    27. is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    28. return is;
    29. }
    1. /* 编码升级流程 */
    2. static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    3. uint8_t curenc = intrev32ifbe(is->encoding); // 当前集合编码
    4. uint8_t newenc = _intsetValueEncoding(value); // 升级的编码
    5. int length = intrev32ifbe(is->length); // 集合长度
    6. int prepend = value < 0 ? 1 : 0; // 负数 prepend=1, 插到最首部
    7. /* 设置新的编码 */
    8. is->encoding = intrev32ifbe(newenc);
    9. // 扩容
    10. is = intsetResize(is,intrev32ifbe(is->length)+1);
    11. // 倒叙将数组拷贝至扩容后正确的位置上
    12. while(length--)
    13. _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));
    14. /* Set the value at the beginning or the end. */
    15. if (prepend)
    16. // 将元素放到集合首部
    17. _intsetSet(is,0,value);
    18. else
    19. // 将元素放到集合末尾
    20. _intsetSet(is,intrev32ifbe(is->length),value);
    21. // 修改数组长度
    22. is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    23. return is;
    24. }

    4. 链表 List

    链表结点

    c语言中没有链表,Redis中自己设计了链表结构

    1. typedef struct listNode {
    2. struct listNode *prev; // 前一个阶段
    3. struct listNode *next; // 下一个结点
    4. void *val; // 结点中的值
    5. } listNode;

    listNode就是一个双向链表
    image.png

    链表结构设计

    redis在listNode的基础上封装了list数据结构

    1. typedef struct list {
    2. listNode *head; // 头结点
    3. listNode *tail; // 尾结点
    4. // 结点复制函数
    5. // 节点值复制函数
    6. void *(*dup)(void *ptr);
    7. //节点值释放函数
    8. void (*free)(void *ptr);
    9. //节点值比较函数
    10. int (*match)(void *ptr, void *key);
    11. //链表节点数量
    12. unsigned long len;
    13. } list;

    链表的结构
    image.png
    redis链表的优势:

  1. redis链表中有prev和next两个指针,获取某一结点的前置结点和后置结点的时间复杂度都为O(1)
  2. list中封装了head和tail,获取头尾结点的时间复杂度为O(1)
  3. list中记录了链表长度,获取链表长度的时间复杂度为O(1)
  4. list使用void*存储结点值,所以链表结点可以存储不同类型的值

redis链表的缺点:

  1. 链表的结点是不连续的,无法利用CPU缓存局部性原理带来的优势
  2. 封装一个链表结点,需要存储指针,内存消耗较大

    5. 压缩列表 ZipList

    ZipList结构设计

    压缩列表是一种内存紧凑型的数据结构,由连续内存块组成的顺序型数据结构,类似于数组
    image.png
    image.png

    zipListEntry结构

    普通链表记录结点需要两个指针,需要16个字节,浪费内存。entry采用了下面的结构:
    image.png
  • previous_entry_length:前一节点的长度,占1个或5个字节
    • 如果前一节点的长度小于254字节,则采用1个字节来保存这个长度值
    • 如果前一节点的长度大于254字节,则采用5个字节来保存这个长度值,第一个字节为0xfe,后四个字节才是真实长度数据
  • encoding:编码属性,记录content的数据类型(字符串还是整数)以及长度,占用1个、2个或5个字节
  • contents:保存结点,可以是字符串或者整数

    Encoding编码

    压缩列表的编码方式分为字符串和整数两种
  1. 字符串

encoding以“00”、“01”或者“10”开头,则证明content是字符串
image.png
例如,我们要保存字符串:“ab”和 “bc”
image.png
image.png

  1. 整数

如果encoding为“11”,则表明content为整数,encoding固定只占1个字节
image.png

ZipList的连锁更新问题

连锁更新问题
压缩列表插入的元素较大是,可能会导致后序元素的prev_entry_length(前一个结点的长度)占用空间发生变化,进而引起连锁更新问题,导致每个元素的空间都要重新分配,造成压缩列表性能的下降
image.png
ZipList这种特殊情况下产生的连续多次空间扩展操作称之为连锁更新(Cascade Update)。新增、删除都可能导致连锁更新的发生。

压缩列表的优缺点

优点:

  1. 压缩列表占用连续的内存空间,可以利用CPU缓存提升查询性能
  2. 压缩列表设计紧凑,针对不同长度的数据,进行相应的编码,相对于数组数据长度都相同,可以更有效的节省内存开销

缺点:

  1. 压缩列表不可以存储过多元素,否则查询效率很低
  2. 新增或者修改某个元素时,结点占用的内存空间可能会重新分配,甚至引发连锁更新的问题

因此,Redis在存储的元素数量较少,或者元素值不大时,才会使用压缩列表作为底层数据结构

6. 快表 QuickList

quickList就是结点为压缩列表的双向链表

QuickList结构

  1. typedef struct quicklist {
  2. quicklistNode *head; // quciklist的链表头结点
  3. quicklistNode *tail; // quicklist的链表尾结点
  4. unsigned long count; // 压缩列表元素个数
  5. unsigned long len; // quicklist的结点个数
  6. ...
  7. } quciklist;
  1. typedef struct quicklistNode {
  2. struct quicklistNode *prev; // 前一个结点
  3. struct quicklistNode *next; // 下一个结点
  4. unsigned char *zl; // quicklistNode指向的压缩列表
  5. unsigned int sz; // 压缩列表的字节大小
  6. unsigned int count : 16; // 压缩列表中元素的个数
  7. ...
  8. } quciklistNode;

image.png
向quicklist中添加元素时,会检查插入位置的quicklistNode上的压缩链表能否容下该元素,如果可以,就直接插入这个Node中,不可以的话,就新建一个quicklistNode

QuickList解决了什么?

如果存入大量数据,超过了ZipList的最佳上限,我们可以创建多个分片来存储数据

7. 跳表 SkipList

跳表概述

2. redis持久化

1. RDB快照

RDB:Redis Database Backup file(即Redis数据库备份文件),也可以称之为Redis数据快照
就是把Redis内存中的数据就记录在磁盘中,当Redis实例出现故障时,从磁盘中读取快照文件,恢复数据

RDB触发机制

如何使用RDB快照
Redis提供了两个命令生成RDB文件:save和bgsave,他们的区别在于是否在主线程中执行数据的备份

  • 执行save命令,会在主线程中生成RDB文件,与redis操作在同一个线程,所以会阻塞主线程
  • 执行bgsave命令,会创建一个子进程,子进程在后台执行数据备份,不会阻塞主线程

在redis.conf中可以配置redis触发机制

  1. # 在900s内,如果有一个key被修改,则执行bgsave
  2. save 900 1
  3. save 300 10
  4. save 60 10000
  5. save "" # 表示禁用RDB
  6. -----------------------------------------------------
  7. # redis的其他配置
  8. # 是否压缩 ,建议不开启,压缩也会消耗cpu,磁盘的话不值钱
  9. rdbcompression yes
  10. # RDB文件名称
  11. dbfilename dump.rdb
  12. # 文件保存的路径目录
  13. dir ./

以下几点需要注意:

  1. RDB 文件的加载工作是在服务器启动时自动执行的,Redis 并没有提供专门用于加载 RDB 文件的命令。
  2. redis的快照是全量快照,也就是说每次执行快照都会吧内存中的所有数据记录在磁盘中。所以RDB是一个比较中的操作,频繁的执行RDB会对性能产生影响。频次太低,快照之间的数据可能会因为服务器的宕机丢失很多。

    RDB实现原理

    image.png
    当bgsave开始执行时,会fork主进程得到子进程,子进程共享主进程的内存数据,完成fork后将内存数据写入RDB文件。
    所谓fork,就是新建一个子进程,然后复制主进程的页表,这样主进程与子进程映射的就是同一块物理内存。并且这一块物理内存时只读(read-only)的,fork完毕后,子进程就开始吧内存中的数据写到磁盘中。持久化的过程中,如果主进程对内存进行写操作时,是不可以直接写的,因为子进程正在读取数据,这样可能会出现脏数据。fork中使用了copy-on-write技术,也就是说在发生写操作时,从原来的数据中拷贝一份数据,将数据写到拷贝的内存中,主进程读取数据时也从这个拷贝的数据中读取!
    简单来说,bgsave的流程为:

  3. fork主进程得到一个子进程,复制页表,共享一片内存空间

  4. 子进程读取内存数据,写入一个新的rdb文件
  5. 将新的rdb文件替换掉旧的rdb文件

RDB的缺点

  1. RDB执行期间间隔长,两次RDB之间的数据可能会丢失
  2. fork子进程、写RDB文件都比较耗时

几点需要注意:

  1. fork主进程创建子进程,会阻塞主进程,复制页表,不复制内存,也是为了加速子进程的创建
  2. redis在执行bgsave的过程中,如果发生了写操作,这次写操作不会记录在此次bgsave中,只能等到下一次bgsave才能存储;如果系统恰好在rdb文件创建完后崩溃了,redis将丢失主进程在快照期间的写操作
  3. 是是是

    2. AOF日志

    AOF:Append Only File(追加文件),redis处理的每一条写命令都会记录在AOF文件,可以看作日志记录文件
    image.png

    AOF配置

    AOF默认是关闭的,需要修改redis.conf配置文件来开启AOF:
    是所示

    1. # 是否开启AOF功能,默认是no
    2. appendonly yes
    3. # AOF文件的名称
    4. appendfilename "appendonly.aof"

    由于AOF记录的是命令,所以AOF文件要比RDB文件大的多,并且AOF会记录对一个key的多次写操作,但是只有最后一次的写操作才是有意义的。redis提供了bgrewriteaof命令,让AOF执行重写功能,以最少的命令实现相同的效果!
    redis触发阈值时会自动重写AOF文件,可在redis.conf中配置

    1. # AOF文件比上次文件 增长超过多少百分比则触发重写
    2. auto-aof-rewrite-percentage 100
    3. # AOF文件体积最小多大以上才触发重写
    4. auto-aof-rewrite-min-size 64mb

    AOF持久化的两个风险

  4. 当redis没有来的即将AOF文件写入到内存中,服务器就宕机了,此时数据就会有丢失的风险

  5. redis执行命令和写日志是同步进行的,瑞国写日志时间过长,下一条redis命令将会被阻塞

    AOF写回策略

    image.pngimage.png

  6. redis执行完写操作命令后,会将命令追加至server.aof_buf缓冲区中

  7. 然后通过write()系统调用,将aof_buf缓冲区中的数据添加至内核缓冲区page cache中,等待将数据写入硬盘
  8. 具体什么时候将内核缓冲区中的数据持久化到磁盘中,有内核决定

Redis 提供了 3 种写回硬盘的策略,控制的就是上面说的第三步的过程。
redis.conf文件中可以设置3中协会策略

  1. # 表示每执行一次写命令,立即记录到AOF文件
  2. appendfsync always
  3. # 写命令执行完先放入AOF缓冲区,然后表示每隔1秒将缓冲区数据写到AOF文件,是默认方案
  4. appendfsync everysec
  5. # 写命令执行完先放入AOF缓冲区,由操作系统决定何时将缓冲区内容写回磁盘
  6. appendfsync no

image.png

  • 如果需要高性能,就选择no
  • 如果需要高可靠,就选择Always
  • 如果允许数据丢失一系诶,同时又想要性能高,就选择everysec

这三种策略只是在控制 fsync() 函数的调用时机
当应用程序向文件写入数据时,内核通常先将数据复制到内核缓冲区中,然后排入队列,然后由内核决定何时写入硬盘。
image.png

  • Always 策略就是每次写入 AOF 文件数据后,就执行 fsync() 函数;
  • Everysec 策略就会创建一个异步任务来执行 fsync() 函数;
  • No 策略就是永不执行 fsync() 函数

    AOF重写机制

    AOF日志是一个文件,随着写操作的命令越来越多,文件也会越来越大。而文件过大会带来性能问题,如果redis重启后,读取redis加载数据,如果文件过大,加载过程就会很慢。

  • 如果多次对一个key进行写入操作,只有最后一个是有效的

  • 多次写入操作后,出现一个delete操作,所有操作无效

所以,为了消除这些无用的操作,redis启用了重写机制,来压缩AOF文件。
AOF重写机制:读取当前数据库的键值对,将键值用一条命令记录在记录到新的AOF文件中,全部记录完后,用新的AOF文件覆盖旧的AOF文件。这样就可以在多个键值对被反复修改,只保存最后一条,大大减少的AOF的命令数量

AOF后台重写

写入AOF的操作是在主线程中完成的,但是重写AOF文件实在后台的子进程bgrewriteaof中完成的,因为重写AOF文件是很耗时的!
与RDB的bgsave后台持久化一样,AOF也是使用fork()子进程使用copy-on-write技术重写的AOF
image.pngimage.png
主进程通过fork()调用生成bgwriteaof子进程,操作系统将主进程的页表复制给子进程,这样主进程与子进程映射到同一片内存空间。这样主进程在进行增删改查的操作时,子进程可以在后台执行AOF读写,就不会阻塞redis的主进程了。同样,当主进程进行写操作时,会将被写的屋里内存拷贝一份,读写操作都作用在拷贝的内存上。
但是这样又带来了一个问题,重写AOF过程中,如果主进程修改了已经存在的key-value,此时子进程中的内存数据就和主进程中的内存数据不一致了,为了解决数据不一致问题,redis引入了AOF重写缓冲区,这个缓冲区在bgwriteaof进程之后使用。在重写AOF期间,redis执行完一个写命令后,会将这个命令写入AOF缓冲区和AOF重写缓冲区
image.png
当bgrewriteaof执行完AOF重写工作后,会将AOF重写缓冲区中的内容添加至新的AOF文件中,保证AOF记录与数据库的一致性,然后用新的AOF文件覆盖旧的文件


3. redis缓存

本节以项目tust-bilibili为背景,为项目添加redis缓存

1. 添加redis缓存

首页视频添加redis缓存

image.png

  1. /**
  2. * 分頁獲取視頻列表,在首頁瀑布流展示
  3. */
  4. @GetMapping("/videos")
  5. public JsonData<PageResult<Video>> pageListVideos(Integer size, Integer no, String area) {
  6. PageResult<Video> videos = videoService.pageListVideos(size, no, area);
  7. return new JsonData<>(videos);
  8. }
  9. /**
  10. * 分页查询视频列表
  11. * @param size 分页查询的数量
  12. * @param no 第几页表
  13. * @param area 视频分区信息
  14. * @return 分页查询的结果
  15. */
  16. @Override
  17. public PageResult<Video> pageListVideos(Integer size, Integer no, String area){
  18. if(size == null || no == null) {
  19. throw new CustomException("参数异常");
  20. }
  21. Map<String, Object> map = new HashMap<>();
  22. map.put("start", (no - 1) * size);
  23. map.put("limit", size);
  24. map.put("area", area);
  25. Integer total = videoMapper.pageCountListVideos(map);
  26. List<Video> videoList = new ArrayList<>();
  27. if(total > 0) {
  28. videoList = videoMapper.pageListVideos(map);
  29. }
  30. return new PageResult<>(total, videoList);
  31. }

添加redis缓存的代码

  1. @Override
  2. public PageResult<Video> pageListVideosWithRedis(Integer size, Integer no, String area){
  3. if(size == null || no == null) {
  4. throw new CustomException("参数异常");
  5. }
  6. Map<String, Object> map = new HashMap<>();
  7. map.put("start", (no - 1) * size);
  8. map.put("limit", size);
  9. map.put("area", area);
  10. // 1. 从redis中查询缓存
  11. String key = PAGE_VIDEO_KEY_PREFIX + size + "-" + no + "-" + area;
  12. String s = stringRedisTemplate.opsForValue().get(key);
  13. // 2. 判断缓存是否命中
  14. if (!StringUtils.isNullOrEmpty(s)) {
  15. // 3. 命中,直接返回
  16. PageResult pageResult = JSONUtil.toBean(s, PageResult.class);
  17. return pageResult;
  18. }
  19. // 4. 未命中,根据map查询数据库
  20. Integer total = videoMapper.pageCountListVideos(map);
  21. List<Video> videoList = new ArrayList<>();
  22. if(total > 0) {
  23. videoList = videoMapper.pageListVideos(map);
  24. // 5. 存在信息,写回redis
  25. PageResult<Video> result = new PageResult<>(total, videoList);
  26. stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(result), 5,
  27. TimeUnit.MINUTES);
  28. }
  29. return new PageResult<>(total, videoList);
  30. }

根据videoId查询视频添加redis缓存

逻辑是一样的

  1. /**
  2. * 获取视频详情
  3. * @param videoId 视频id
  4. * @return 视频信息和用户信息
  5. */
  6. @Override
  7. public Map<String, Object> getVideoDetailWithRedis(Long videoId){
  8. // 1. 从redis中根据videoId查询视频信息
  9. String key = VIDEO_ID_KEY_PREFIX + ":" + videoId;
  10. String s = stringRedisTemplate.opsForValue().get(key);
  11. // 2. 判断redis中是否包含数据
  12. if (!StringUtils.isNullOrEmpty(s)) {
  13. // 3. 命中, 直接返回
  14. return JSONUtil.toBean(s, Map.class);
  15. }
  16. // 4. 未命中,查询数据库
  17. // 4.1. 根据videoId获取视频信息
  18. Map<String, Object> result = new HashMap<>();
  19. Video video = videoMapper.getVideoById(videoId);
  20. if(video != null) {
  21. // 4.2. 根据视频信息获取userId
  22. Long userId = video.getUserId();
  23. // 4.3. 根据userId获取用户信息
  24. UserVO userInfoById = userService.getUserInfoById(userId);
  25. UserInfo userInfo = userInfoById.getUserInfo();
  26. // 4.4. 将视频信息和用户信息封装在map中
  27. result = new HashMap<>();
  28. result.put("video", video);
  29. result.put("userInfo", userInfo);
  30. }
  31. // 5. 将查询结果,写入redis
  32. if(result != null) {
  33. stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(result));
  34. }
  35. return result;
  36. }

2. 缓存更新

使用redis缓存会存在数据的一致性问题,比如对数据库中的数据进行更新操作,而查询redis缓存仍是原来的数据,这就就出现了数据的还不一致,因此在更新数据库时,需要对redis缓存进行更新!
那么应该更新缓存还是删除缓存呢?

  • 更新缓存:如果每次更新数据库都更新缓存,只有最后一次更新是有效的,那么就对进行了多次无效写操作
  • 删除缓存:更新数据库时,删除缓存,再次查询时再更新缓存

如果不查询数据,都没有必要对数据进行更新,所以应该选择删除缓存
需要注意的点:

  1. 操作数据库和删除缓存需要同时成功或者同时失败,所以要加上事务注解@Transactional
  2. 应先更新数据库,再删除缓存

为什么要先更新数据库,再删除缓存?
因为在多线程情况下,更新数据库和删除缓存会出现数据不一致的问题,如果先删除缓存,在没有更新数据库的情况下,新的线程查询缓存,会将数据库中原始值更新到缓存中,而这样更新数据库的值就与缓存中的数据不一致,由于查询缓存和写入缓存都非常快,所以这样的线程安全问题出现的可能性高。
image.png

  1. /** 解决缓存更新问题 **/
  2. @Transactional
  3. public void updateVideoDetailWithRedis(Video video) {
  4. if(video == null) {
  5. throw new CustomException("参数不合法");
  6. }
  7. // 1. 操作数据库
  8. videoMapper.updateVideo(video);
  9. // 2. 删除缓存
  10. stringRedisTemplate.delete(VIDEO_ID_KEY_PREFIX + video.getId());
  11. }

3. 缓存穿透

缓存穿透:指请求的数据在缓存和数据库中都不存在,这样缓存永远不会生效,请求就会都打进数据库中
解决方案:

  1. 缓存空对象:
  • 优点:实现简单、维护方便
  • 缺点:占用了额外的内存消耗,会有短暂的数据不一致
  1. 布隆过滤器
  • 优点:内存占用少,redis中没有多余的key
  • 缺点:会出现误判,被放行的数据可能不在数据库中,拒绝的数据一定不在数据库中

布隆过滤器:布隆过滤器是数据库中的数据的哈希映射,使用较好的空间可以判断数据库中是否存有某个数据
image.pngimage.pngimage.png
缓存空对象流程图
image.png

  1. /**
  2. * 解决缓存穿透
  3. */
  4. @Override
  5. public Map<String, Object> getVideoDetailWithRedisPassThrough(Long videoId){
  6. if(videoId == null) {
  7. throw new CustomException("参数不合法");
  8. }
  9. // 1. 从redis中根据videoId查询视频信息
  10. String key = VIDEO_ID_KEY_PREFIX + videoId;
  11. String s = stringRedisTemplate.opsForValue().get(key);
  12. // 2. 判断redis中是否包含数据
  13. if (!StringUtils.isNullOrEmpty(s)) {
  14. // 3. 命中, 直接返回
  15. return JSONUtil.toBean(s, Map.class);
  16. }
  17. // 这里的s可能出现的情况(s == null, s == "")
  18. if(s != null) {
  19. // s != null 就意味着 s == "",即s为空值,直接返回
  20. return null;
  21. }
  22. // 4. 未命中,查询数据库,s == null
  23. // 4.1. 根据videoId获取视频信息
  24. Map<String, Object> result = new HashMap<>();
  25. Video video = videoMapper.getVideoById(videoId);
  26. if(video != null) {
  27. // 4.2. 根据视频信息获取userId
  28. Long userId = video.getUserId();
  29. // 4.3. 根据userId获取用户信息
  30. UserVO userInfoById = userService.getUserInfoById(userId);
  31. UserInfo userInfo = userInfoById.getUserInfo();
  32. // 4.4. 将视频信息和用户信息封装在map中
  33. result = new HashMap<>();
  34. result.put("video", video);
  35. result.put("userInfo", userInfo);
  36. }
  37. // 5. 将查询结果,写入redis
  38. if(result == null) {
  39. // 解决缓存击穿:查询结果video为null,缓存空值到redis中
  40. stringRedisTemplate.opsForValue().set(key, "", 2, TimeUnit.MINUTES);
  41. }
  42. stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(result));
  43. return result;
  44. }

4. 缓存雪崩

5. 缓存击穿

缓存击穿:缓存击穿问题也叫热点key问题,就是一个key被高并发访问并且缓存重建时间较长,导致大量的key同时访问数据库并进行缓存重建,给数据库带来巨大的冲击!
image.png
模拟缓存击穿
image.png
image.png
使用Jmeter压力测试,在5s中内执行1000个线程访问videoId=38的视频详情信息,理想情况下第一次访问redis未命中,查询数据库重建缓存,后面都应该从redis中读取数据。但是查询数据库和重建缓存都是需要时间的,在此期间发送的请求又会多次的请求数据库,重建缓存,从下图可以看到,数据库被访问了几十上百次!这就是缓存击穿问题,解决缓存击穿问题,就要只访问一次数据库!
image.png
缓存击穿的解决方案:

  1. 互斥锁
  2. 逻辑过期

    互斥锁

    image.png
    使用redis里的setnx作为互斥锁
    只有当key不存在时才可以设置键值对,key存在时,不可以修改

    1. redis:0>setnx lock 1
    2. "1"
    3. redis:0>get lock
    4. "1"
    5. redis:0>setnx lock 2
    6. "0"
    7. redis:0>setnx lock 3
    8. "0"

    image.png ```java /**

    • 解决缓存击穿:互斥锁 */ public Map getVideoDetailWithRedisMutex(Long videoId){

      if(videoId == null) { throw new CustomException(“参数不合法”); }

      // 1. 从redis中根据videoId查询视频信息 String key = VIDEO_ID_KEY_PREFIX + videoId; String s = stringRedisTemplate.opsForValue().get(key);

      // 2. 判断redis中是否包含数据 if (!StringUtils.isNullOrEmpty(s)) { // 3. 命中, 直接返回 return JSONUtil.toBean(s, Map.class); }

      if(s != null) { return null; }

      // 4. 未命中,查询数据库,缓存重建 String lock = VIDEO_MUTEX_KEY_PREFIX + key; Map result = new HashMap<>(); try { if (! trylock(lock)) {

      1. // 没有获取到互斥锁,休眠
      2. Thread.sleep(50);
      3. // 递归重新查询redis缓存
      4. return getVideoDetailWithRedisMutex(videoId);

      }

      // 获得互斥锁,查询redis是否命中 s = stringRedisTemplate.opsForValue().get(key);

      // 这里重新查询了一遍redis,防止获取锁的时候,redis已经重建好缓存 if (!StringUtils.isNullOrEmpty(s)) {

      1. // 3. 命中, 直接返回
      2. return JSONUtil.toBean(s, Map.class);

      }

      if(s != null) {

      1. return null;

      } // 查询数据库 // 4.1. 根据videoId获取视频信息 result = new HashMap<>(); Video video = videoMapper.getVideoById(videoId);

      if(video != null) {

      1. // 4.2. 根据视频信息获取userId
      2. Long userId = video.getUserId();
      3. // 4.3. 根据userId获取用户信息
      4. UserVO userInfoById = userService.getUserInfoById(userId);
      5. UserInfo userInfo = userInfoById.getUserInfo();
      6. // 4.4. 将视频信息和用户信息封装在map中
      7. result = new HashMap<>();
      8. result.put("video", video);
      9. result.put("userInfo", userInfo);

      } // 5. 将查询结果,写入redis if(result == null) {

      1. // 解决缓存击穿:查询结果video为null,缓存空值到redis中
      2. stringRedisTemplate.opsForValue().set(key, "", 2, TimeUnit.MINUTES);

      } stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(result)); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放锁 unlock(lock); } return result; }

/ 互斥锁 / public boolean trylock(String key) { // 互斥锁,设置10s,10s后销毁,以防出现问题永远存在redis里 Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, “1”, 10, TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); }

/ 解锁,就是将redis里的lock删除 / public void unlock(String key) { stringRedisTemplate.delete(key); }

  1. 使用互斥锁后,只查询了一次数据库<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655819080897-b57af184-bb10-4077-b854-ac32c3576dcd.png#clientId=u246580cf-6226-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=400&id=u91dc3d17&margin=%5Bobject%20Object%5D&name=image.png&originHeight=440&originWidth=1295&originalType=binary&ratio=1&rotation=0&showTitle=false&size=75413&status=done&style=none&taskId=u121d9ae7-9ef7-4fc2-bc52-50b7d912995&title=&width=1177.2727017560287)
  2. <a name="n77WS"></a>
  3. #### 逻辑过期
  4. 逻辑过期是在存储数据中设置一个过期时间,可以认为设置,他不是redisTTL,理论上可以永远存在。先人为的将要缓存的数据存储到redis中,(例如双11时,将抢购的商品数据预先缓存至redis中)<br />当查询缓存时发现逻辑时间已经过期,表明需要重建缓存了。此时加上互斥锁,开启一个新的线程来中兴重建缓存操作,主线程中直接返回旧的数据。其他线程查询缓存获取互斥锁失败,也返回原来的过期数据。这样使用逻辑过期,就可以只重建一次缓存,并且可以保证每次请求可以快速响应,但是会有数据的不一致!<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655820772436-b8e429cf-385e-45ad-beb4-e6bfa7c517ad.png#clientId=u246580cf-6226-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=455&id=ua73077c3&margin=%5Bobject%20Object%5D&name=image.png&originHeight=501&originWidth=668&originalType=binary&ratio=1&rotation=0&showTitle=false&size=24056&status=done&style=none&taskId=uf4d2330a-a48e-4f04-ad53-d62ac751ebc&title=&width=607.2727141104457)<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655822285045-00c26c65-f56f-4e2f-917a-ddacaeb3b91d.png#clientId=u87ca93c1-e2da-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=625&id=u0eea4e73&margin=%5Bobject%20Object%5D&name=image.png&originHeight=688&originWidth=1057&originalType=binary&ratio=1&rotation=0&showTitle=false&size=70599&status=done&style=none&taskId=ucc7ebce6-5dba-44ff-8dc2-9148632b2ca&title=&width=960.9090700819477)
  5. <a name="Biitc"></a>
  6. ### 6. redis缓存工具封装
  7. <a name="zHi18"></a>
  8. ## 4. redis集群
  9. <a name="VnpK8"></a>
  10. ### 1. redis主从集群
  11. 单点redis的并发能力是有上限的,搭建redis主从集群,实现读写分离,可以提高redis的并发能力<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655890144355-95861b82-8829-4de8-8170-233357da6e71.png#clientId=uc927b760-94f8-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=381&id=u112f5a01&margin=%5Bobject%20Object%5D&name=image.png&originHeight=419&originWidth=938&originalType=binary&ratio=1&rotation=0&showTitle=false&size=51140&status=done&style=none&taskId=u54356e5c-4572-434d-9801-b1795cf1af9&title=&width=852.7272542449073)
  12. <a name="iXurB"></a>
  13. #### 主从数据同步原理
  14. 主从同步第一次是全量同步<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655907516358-3638dabc-0d93-471f-a320-17d67bd1819e.png#clientId=uc927b760-94f8-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=442&id=u97e4b734&name=image.png&originHeight=486&originWidth=891&originalType=binary&ratio=1&rotation=0&showTitle=false&size=41956&status=done&style=none&taskId=ub1150c62-9550-47e1-9309-80f46b792b1&title=&width=809.9999824437232)<br />master如何判断接结点是不是第一次同步?
  15. - Replication Id:简称replid,是数据集的标记,id一致则说明是同一数据集。每一个master都有唯一的replidslave则会继承master节点的replid
  16. - offset:偏移量,随着记录在repl_baklog中的数据增多而逐渐增大。slave完成同步时也会记录当前同步的offset。如果slaveoffset小于masteroffset,说明slave数据落后于master,需要更新。
  17. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655907701135-8d5d8cd0-d04b-410f-985c-9634030c0a99.png#clientId=uc927b760-94f8-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=282&id=u3c0b621c&name=image.png&originHeight=310&originWidth=1334&originalType=binary&ratio=1&rotation=0&showTitle=false&size=37201&status=done&style=none&taskId=u26a7a23e-aff5-4c7f-89f8-bb1cb52e581&title=&width=1212.7272464421176)<br />**简述全量同步的流程**
  18. 1. slave结点请求全量同步时,会发送自己的replidoffsetmaster结点判断自己的replidslave结点是否相同,不相同的话,表明是第一次访问,开始全量同步
  19. 1. master结点将主节点的replidoffset发送给slave结点,slave结点保存信息
  20. 1. master结点在后台执行bgsave生成RDB文件,让后发送给slave结点
  21. 1. slave结点接收到RDB文件后,清空本地数据,加载RDB文件
  22. 1. masterRDB期间的命令保存在rep_baklog中,并将log发送给slave结点
  23. 1. slave结点执行接收到的命令
  24. redis重启后,进行的是增量同步<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655908763392-3c7f7fb9-4882-4c90-bafd-c7498105e63d.png#clientId=uc927b760-94f8-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=500&id=ua335eb72&name=image.png&originHeight=550&originWidth=1553&originalType=binary&ratio=1&rotation=0&showTitle=false&size=73052&status=done&style=none&taskId=ufd9043a2-9970-4c3c-93e8-7f141ed844b&title=&width=1411.8181512178476)<br />**简述增量同步流程**
  25. 1. slave结点的redis重启后,携带repIidoffsetmaster结点申请同步
  26. 1. master发现repIid结点与slave的一致,所以不是第一次同步,开始增量同步
  27. 1. masterrepl_baklog中寻找offset之后的命令,并将offset之后的命令发送给slave结点
  28. 1. slave结点执行命令
  29. repl_baklog<br />repl_baklog是一个环形数组,用于增量更新,其内部维护一个masteroffset,记录master写到什么位置了,当slaveoffset发送过来后,就将偏移量之间的数据发送给slave节点<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655909688186-17fc2146-093c-4ebd-84a5-ff180805adde.png#clientId=ue709d869-acf1-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=223&id=u6fa852a4&name=image.png&originHeight=245&originWidth=250&originalType=binary&ratio=1&rotation=0&showTitle=false&size=9035&status=done&style=none&taskId=u9fc52d29-386c-4a2f-b21a-c933cb6b2f0&title=&width=227.27272234672367)![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655909700194-0009077c-4fad-4d52-a3e2-2cc2b7a1b51e.png#clientId=ue709d869-acf1-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=230&id=ua214a6eb&name=image.png&originHeight=253&originWidth=253&originalType=binary&ratio=1&rotation=0&showTitle=false&size=11466&status=done&style=none&taskId=u35b82b7c-893f-4b35-beea-eeb98f92454&title=&width=229.99999501488438)![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655909706633-7f266928-c4fe-4d2e-aa40-ee6c0344d7bf.png#clientId=ue709d869-acf1-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=232&id=uc79992b9&name=image.png&originHeight=255&originWidth=250&originalType=binary&ratio=1&rotation=0&showTitle=false&size=15176&status=done&style=none&taskId=u7c70fa03-ccac-41e7-9564-122c7956287&title=&width=227.27272234672367)![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655909730684-19830e8f-8b8e-4484-8af6-c869b96c1fb2.png#clientId=ue709d869-acf1-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=235&id=u08d76cfe&name=image.png&originHeight=258&originWidth=229&originalType=binary&ratio=1&rotation=0&showTitle=false&size=15925&status=done&style=none&taskId=u1bf3d6ef-98a5-4390-9eee-8d4f92642ea&title=&width=208.18181366959888)![image.png](https://cdn.nlark.com/yuque/0/2022/png/25887408/1655909734383-4ac276c5-b685-41b4-a8e8-6b9e85ae4e47.png#clientId=ue709d869-acf1-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=240&id=u9aa25dac&name=image.png&originHeight=264&originWidth=252&originalType=binary&ratio=1&rotation=0&showTitle=false&size=18776&status=done&style=none&taskId=u7bc10bdf-2822-474b-a35e-5043ad6de97&title=&width=229.09090412549747)<br />repl_baklog是有上限的,写满后就会覆盖最早的数据,如果slave结点断开时间太长,就会导致为备份的数据被覆盖掉,就无法基于log进行增量同步,就应该使用去量同步
  30. <a name="KBO1Z"></a>
  31. #### redis主从集群搭建
  32. 这里我们会在同一台虚拟机中开启3redis实例,模拟主从集群,信息如下:
  33. | **IP** | **PORT** | **角色** |
  34. | --- | --- | --- |
  35. | 192.168.150.101 | 7001 | master |
  36. | 192.168.150.101 | 7002 | slave |
  37. | 192.168.150.101 | 7003 | slave |
  38. 要在同一台虚拟机开启3个实例,必须准备三份不同的配置文件和目录,配置文件所在目录也就是工作目录。
  39. <a name="cjapC"></a>
  40. ##### 1)创建目录
  41. 我们创建三个文件夹,名字分别叫700170027003
  42. ```shell
  43. # 进入/tmp目录
  44. cd /temp
  45. # 创建目录
  46. mkdir 7001 7002 7003

如图:
image-20210630113929868.png

2)恢复原始配置

修改redis-6.2.4/redis.conf文件,将其中的持久化模式改为默认的RDB模式,AOF保持关闭状态。

  1. # 开启RDB
  2. # save ""
  3. save 3600 1
  4. save 300 100
  5. save 60 10000
  6. # 关闭AOF
  7. appendonly no

3)拷贝配置文件到每个实例目录
然后将redis-6.2.4/redis.conf文件拷贝到三个目录中(在/tmp目录执行下列命令):

  1. # 方式一:逐个拷贝
  2. cp redis-6.2.4/redis.conf 7001
  3. cp redis-6.2.4/redis.conf 7002
  4. cp redis-6.2.4/redis.conf 7003
  5. # 方式二:管道组合命令,一键拷贝
  6. echo 7001 7002 7003 | xargs -t -n 1 cp redis-6.2.4/redis.conf

4)修改每个实例的端口、工作目录
修改每个文件夹内的配置文件,将端口分别修改为7001、7002、7003,将rdb文件保存位置都修改为自己所在目录(在/tmp目录执行下列命令):

  1. sed -i -e 's/6379/7001/g' -e 's/dir .\//dir \/home\/project\/redis-cluster\/7001\//g' 7001/redis.conf
  2. sed -i -e 's/6379/7002/g' -e 's/dir .\//dir \/home\/project\/redis-cluster\/7002\//g' 7002/redis.conf
  3. sed -i -e 's/6379/7003/g' -e 's/dir .\//dir \/home\/project\/redis-cluster\/7003\//g' 7003/redis.conf
  4. sed -i -e 's/6379/7002/g' -e 's/dir .\//dir \/tmp\/7002\//g' 7002/redis.conf
  5. sed -i -e 's/6379/7003/g' -e 's/dir .\//dir \/tmp\/7003\//g' 7003/redis.conf

5)修改每个实例的声明IP
虚拟机本身有多个IP,为了避免将来混乱,我们需要在redis.conf文件中指定每一个实例的绑定ip信息,格式如下

  1. # redis实例的声明 IP
  2. replica-announce-ip 172.26.34.179

每个目录都要改,我们一键完成修改(在/tmp目录执行下列命令):

  1. sed -i '1a replica-announce-ip 172.26.34.179' 7001/redis.conf
  2. # 逐一执行
  3. sed -i '1a replica-announce-ip 192.168.150.101' 7001/redis.conf
  4. sed -i '1a replica-announce-ip 192.168.150.101' 7002/redis.conf
  5. sed -i '1a replica-announce-ip 192.168.150.101' 7003/redis.conf
  6. # 或者一键修改
  7. printf '%s\n' 7001 7002 7003 | xargs -I{} -t sed -i '1a replica-announce-ip 172.26.34.179' {}/redis.conf

3)启动

为了方便查看日志,我们打开3个ssh窗口,分别启动3个redis实例,启动命令:

  1. # 第1个
  2. redis-server 7001/redis.conf
  3. # 第2个
  4. redis-server 7002/redis.conf
  5. # 第3个
  6. redis-server 7003/redis.conf

启动后:
image-20210630183914491.png
如果要一键停止,可以运行下面命令:

  1. printf '%s\n' 7001 7002 7003 | xargs -I{} -t redis-cli -p {} shutdown

4)开启主从关系

现在三个实例还没有任何关系,要配置主从可以使用replicaof 或者slaveof(5.0以前)命令。
有临时和永久两种模式:

  • 修改配置文件(永久生效)
    • 在redis.conf中添加一行配置:slaveof <masterip> <masterport>
  • 使用redis-cli客户端连接到redis服务,执行slaveof命令(重启后失效):
    1. slaveof <masterip> <masterport>
    如果redis设置密码的话,必须在redis.conf中,加入auth密码认证,如下:
    1. masterauth <password>

注意:在5.0以后新增命令replicaof,与salveof效果一致。
这里我们为了演示方便,使用方式二。
通过redis-cli命令连接7002,执行下面命令:

  1. # 连接 7002
  2. redis-cli -p 7002
  3. # 执行slaveof
  4. slaveof 192.168.150.101 7001

通过redis-cli命令连接7003,执行下面命令:

  1. # 连接 7003
  2. redis-cli -p 7003
  3. # 执行slaveof
  4. slaveof 172.26.34.179 7001

然后连接 7001节点,查看集群状态:

  1. # 连接 7001
  2. redis-cli -p 7001
  3. # 查看状态
  4. info replication

结果:
image-20210630201258802.png

6) 测试

执行下列操作以测试:

  • 利用redis-cli连接7001,执行set num 123
  • 利用redis-cli连接7002,执行get num,再执行set num 666
  • 利用redis-cli连接7003,执行get num,再执行set num 888

可以发现,只有在7001这个master节点上可以执行写操作,7002和7003这两个slave节点只能执行读操作。

2. redis哨兵集群

redis哨兵作用与原理

redis从结点宕机了,可以使用master结点来同步数据
如果master主节点宕机了,该怎么办呢,redis哨兵就是解决这个问题的!
image.png
redis的哨兵机制主要是为了实现主从集群中主节点宕机自动恢复故障,哨兵sentinel机制的作用有:

  1. 监控:哨兵不断检查主从结点是否正常工作
  2. 自动恢复故障:如果master故障,哨兵会将一个slave结点晋升为master,故障恢复后仍以新的master为主
  3. 通知:当集群发生故障转移时,哨兵会将最新消息推送给redis客户端

可以看到哨兵承担着监控master的重任,因此它也必须是高可靠的,所以哨兵也要部署集群
哨兵是如何监控redis的状态的
sentinel通过心跳机制监测服务状态,每隔1s向每隔redis实例发型ping命令:

  • 主观下线:某个哨兵结点发现某redis实例没有响应,就认为该结点主观下线
  • 客观下线:超过指定数量(quorum)的哨兵结点发现某个redis实例没有响应,就认为它客观下线,就是真的宕机了

quoruo值最好超过哨兵实例的一半,其实就是一个投票机制
image.png
选举新的master
一旦master发生故障,redis会从slave中选择一个结点成为新的master,选择依据是:

  • slave的offset值越大,说明slave结点中的数据越新,就选择它作为新结点

如何实现故障转移
当选择一个新的slave作为master后,

  1. 哨兵结点会向选择的slave结点,发送slave no one指令,让该节点成为新的master
  2. 哨兵向其他的所有结点发送指令,使得其他结点都成为新的master的从节点,并从新的master上同步数据
  3. 最后哨兵将故障结点标记为slave,故障恢复后,它也会成为master的slave节点

    redis哨兵集群搭建

    这里我们搭建一个三节点形成的Sentinel集群,来监管之前的Redis主从集群。如图:
    三个sentinel实例信息如下:
节点 IP PORT
s1 192.168.150.101 27001
s2 192.168.150.101 27002
s3 192.168.150.101 27003

1)准备实例和配置

要在同一台虚拟机开启3个实例,必须准备三份不同的配置文件和目录,配置文件所在目录也就是工作目录。
我们创建三个文件夹,名字分别叫s1、s2、s3:

  1. # 进入/tmp目录
  2. cd /tmp
  3. # 创建目录
  4. mkdir s1 s2 s3

如图:
image-20210701215534714.png
然后我们在s1目录创建一个sentinel.conf文件,添加下面的内容:

  1. port 27001
  2. sentinel announce-ip 172.26.34.179
  3. sentinel monitor mymaster 172.26.34.179 7001 2
  4. sentinel down-after-milliseconds mymaster 5000
  5. sentinel failover-timeout mymaster 60000
  6. dir "/home/project/redis-cluster/s1"
  7. dir "/tmp/s1"

解读:

  • port 27001:是当前sentinel实例的端口
  • sentinel monitor mymaster 192.168.150.101 7001 2:指定主节点信息
    • mymaster:主节点名称,自定义,任意写
    • 192.168.150.101 7001:主节点的ip和端口
    • 2:选举master时的quorum值

然后将s1/sentinel.conf文件拷贝到s2、s3两个目录中(在/tmp目录执行下列命令):

  1. # 方式一:逐个拷贝
  2. cp s1/sentinel.conf s2
  3. cp s1/sentinel.conf s3
  4. # 方式二:管道组合命令,一键拷贝
  5. echo s2 s3 | xargs -t -n 1 cp s1/sentinel.conf

修改s2、s3两个文件夹内的配置文件,将端口分别修改为27002、27003:

  1. sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf
  2. sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf

2)启动

为了方便查看日志,我们打开3个ssh窗口,分别启动3个redis实例,启动命令:

  1. # 第1个
  2. redis-sentinel s1/sentinel.conf
  3. # 第2个
  4. redis-sentinel s2/sentinel.conf
  5. # 第3个
  6. redis-sentinel s3/sentinel.conf

启动后:
image-20210701220714104.png

3)测试

尝试让master节点7001宕机,查看sentinel日志:
image-20210701222857997.png
查看7003的日志:
image-20210701223025709.png
查看7002的日志:
image-20210701223131264.png

3. redis分片集群

分片集群结构

主从和哨兵解决了高并发和高可用的问题,但是仍有问题:

  1. 海量数据存储问题:主从同步耗时,存储海量数据将会使得性能下降
  2. 高并发写的问题:只有一个master结点,在高并发写的情况下,性能会下降

分片集群的特征

  1. 集群中有多个master,每个master上保存不同的数据
  2. 每个master上有多个slave结点
  3. master之间彼此通过心跳监测健康状态
  4. 客户端可以访问集群的任意结点,最终都会转发至正确的结点上

image.png

散列插槽

redis把每一个结点映射到0-16383共16384个插槽上,查看集群信息可以看出:
image.png
数据的key不是与结点绑定,而是与插槽绑定,redis通过哈希算法获取一个hash值,然后对16384取余,就可以得到插槽值
key中包含”{}”,且“{}”中至少包含1个字符,“{}”中的部分是有效部分,就只使用{}的值计算slot

集群伸缩

通过redis-cli —cluster中的集群命令,可以向集群中添加结点或者删除结点
比如,添加节点的命令:
image.png

故障转移

当分片集群中的一个redis宕机时,会选择它的一个slave结点升级为master结点
使用cluster failover可以手动让集群中的某个master宕机

5. redis网络模型