1. redis数据结构
1. 字典Dict
Dict概述
Redis是一个键值型(key-value)数据库,我们可以根据键值对进行增删改查,键值对的映射关系就是通过Dict结构来实现的,Dict分为3部分:字典(dict)、哈希表(dictht)、哈希结点(dictEntry)
typedef struct dict {
dictType *type; // dict类型,内置不同的hash函数
void *privdata; // 也是用于hash计算的私有数据
dictht ht[2]; // 定义一个哈希表数组,容量为2,一个正常使用,一个rehash时使用
long rehashidx; // 记录rehash的进度,-1表示未进行rehash
int16_t pauserehash;// 1表示rehash暂停,0表示rehash在运行
} dict;
typedef struct dictht {
// 定义一个二级指针指向dictEntry结点,就相当于java中的dictEntry[] table
dictEntry **table; // 一个*表示指针指向dictEntry,第二个*表示这个dictEntry是个数组
unsigned long size; // 哈希表数组长度
unsigned long sizemask; // 掩码值 sizemask = size - 1, 用于取余计算
unsigned long used; // 哈希表中已经插入的结点的个数
} dictht;
typedef struct dictEntry {
// void *代表无类型指针,可以指向任何数据
void *key; // 键
union {
void *val;
unit64_t u64;
int64_t s64;
double d;
} v; // 值,类型为联合体中的一个
struct dictEntry *next; // 指向洗衣歌结点
} dictEntry;
Dict的添加过程
- 当向Dict添加键值对时,Redis首先根据key计算出hash值,然后对数组长度取余获取哈希表索引
-
Dict的扩容过程
与HashMap相同,哈希冲突导致链表过长会使得查询效率大大降低,因此要对数组扩容,以减小链表的长度
在两种情况下会触发redis哈希表的扩容:负载因子(LoadFactor = used/size) 负载因子 >= 1, 并且服务器没有进行BGSAVE等进程;
负载因子 >= 5;
static int _dictExpandIfNeeded(dict *d)
{
/* 如果正在rehashing, 暂停扩容 */
if (dictIsRehashing(d)) return DICT_OK;
/* 哈希表为空,表示首次创建哈希表,初始数组长度为4 */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
// d->ht[0].used >= d->ht[0].size: 负载因子 >= 1
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize || // 检查BGSAVE等进程
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio) && // 负载因子 >= 5
dictTypeExpandAllowed(d)){
// 扩容为used+1,实际上会扩容为大于used+1的第一次2^n
return dictExpand(d, d->ht[0].used + 1);
}
return DICT_OK;
}
Dict的收缩过程
每次删除元素时,如果负载因子 < 0.1, 哈希表会收缩 ```c if (dictDelete((dict*)o->ptr, field) == C_OK) { deleted = 1;
/ 删除完元素后检查负载因子,小于0.1,收缩哈希表 / if (htNeedsResize(o->ptr)) dictResize(o->ptr); }
int htNeedsResize(dict *dict) { long long size, used;
size = dictSlots(dict); // 哈希表容量
used = dictSize(dict); // 哈希表已使用量
// 如果size > 4并且used/size < 0.1,收缩哈希表
return (size > DICT_HT_INITIAL_SIZE &&
(used*100/size < HASHTABLE_MIN_FILL));
}
int dictResize(dict *d) { unsigned long minimal; // rehash或者后台进程在bgsave,返回错误 if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR; // 获取used minimal = d->ht[0].used; if (minimal < DICT_HT_INITIAL_SIZE) // used < 4, 则收缩至4 minimal = DICT_HT_INITIAL_SIZE; // 收缩至第一个大于等于used的2^n return dictExpand(d, minimal); }
<a name="ZtYK1"></a>
#### Dict的rehash过程
扩容还是搜索都要重新计算下表,即rehash过程
1. 计算哈希表新的size和sizemask
1. 根据size申请内存空间,创建dictht,并赋值给dict.ht[1]
1. 标记dict.rehashidx=1,表示开始rehash
1. 将dict.h[0]中的元素rehash到dict.h[1]中
1. 将h[1]赋值给h[0],将h[1]初始化为空哈希表,并释放原来的h[0]内存
<br /><br /><br /><br />渐进式rehash
- 由于rehash需要将h[0]中的全部元素转移至h[1]中, 如果哈希表中存有很多数据,这样是十分耗时的
- redis在插执行增删改查操作时,会检查dict的rehashidx属性,如果它大于-1,则表明正在rehash,那么就将h[0]中rehashidx对应的链表转移至h[1]上,并将rehashidx++,知道所有的数据都转移至h[1]
- rehashidx == -1时,代表rehash结束
- 在rehash过程中,新增操作直接添加至h[1]中,查询手改删除则要查询两个哈希表并执行
PS: redis中哈希表中采用的是头插法,由于redis是单线程的,所以不用担心HashMap中的并发死链问题
<a name="QscCr"></a>
### 2. 动态字符串 SDS
<a name="fZuWl"></a>
#### SDS概述
** 为什么redis要自定义字符串?**<br />因为c语言中的字符串存在很多问题:
1. c语言字符串底层使用给一个char[]数组,以‘\0’为结束符,获取字符串长度需要手动计算
1. 如果定义的字符串出现‘\0’字符将会出现错误,所以c语言字符串非二进制安全
1. 由于底层为char型数组,所以长度固定不能够被修改
```c
//
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; // buf中字符串字节数
uint8_t alloc; // buf申请的字节数
unsigned char flags; // SDS的类型,用来控制SDS的头大小
char buf[];
}
一个包含字符串“name”的sds结构如下:
PS: attribute ((packed)): 紧凑的内存分配方式
#include <stdio.h>
struct test1 {
char a;
int b;
} test1;
struct __attribute__((packed)) test2 {
char a;
int b;
} test2;
SDS的优点
- 如果新字符串小于1M,新扩容的字符串长度为原始的两倍 + 1
如果新字符串大于1M,扩容后的长度为原来的长度 + 1M + 1
3. 整数集合 IntSet
Intset概述
整数集合是Set的一种实现方式,基于整数数组实现,长度可变、有序
typedef struct intset {
uint32_t encoding; // 编码方式,支持存放16位、32位、64位整数
uint32_t length; // 集合的长度
int8_t contents[]; // 整数数组
}
为了方便查找,Redis会将intset中所有的整数按照升序依次保存在contents数组中,结构如图:
IntSet升级
当一个新元素插入到Inset中,如果他的类型(int32_t)比原来集合的类型(int16_t)长,需要对整数集合进行升级,就是将所有元素的类型都升级为int32_t,并且要扩展contents数组的长度
举个例子,假设有一个整数集合里有 3 个类型为 int16_t 的元素。
现在,往这个整数集合中加入一个新元素 65535,这个新元素需要用 int32_t 类型来保存,所以整数集合要进行升级操作,首先需要为 contents 数组扩容,在原本空间的大小之上再扩容多 80 位(4x32-3x16=80),这样就能保存下 4 个类型为 int32_t 的元素。
扩容完 contents 数组空间大小后,需要将之前的三个元素转换为 int32_t 类型,并将转换后的元素放置到正确的位上面,并且需要维持底层数组的有序性不变,整个转换过程如下:IntSet源码
/* 往集合中添加一个整数 */
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
// 获取添加的整数的编码
uint8_t valenc = _intsetValueEncoding(value);
// 插入的位置
uint32_t pos;
if (success) *success = 1;
// 判断插入数值的编码是否超过了集合的编码
if (valenc > intrev32ifbe(is->encoding)) {
// 超过集合编码,进行升级
return intsetUpgradeAndAdd(is,value);
} else {
// 从集合中寻找是否已经存在了value
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
// 存在value,无须插入,直接返回
return is;
}
// 数组扩容
is = intsetResize(is,intrev32ifbe(is->length)+1);
// 将pos之后的元素移动至pos+1,腾出pos空间,就是整体往后一位
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
}
// 插入新的元素
_intsetSet(is,pos,value);
// 重置元素长度
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}
/* 编码升级流程 */
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
uint8_t curenc = intrev32ifbe(is->encoding); // 当前集合编码
uint8_t newenc = _intsetValueEncoding(value); // 升级的编码
int length = intrev32ifbe(is->length); // 集合长度
int prepend = value < 0 ? 1 : 0; // 负数 prepend=1, 插到最首部
/* 设置新的编码 */
is->encoding = intrev32ifbe(newenc);
// 扩容
is = intsetResize(is,intrev32ifbe(is->length)+1);
// 倒叙将数组拷贝至扩容后正确的位置上
while(length--)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));
/* Set the value at the beginning or the end. */
if (prepend)
// 将元素放到集合首部
_intsetSet(is,0,value);
else
// 将元素放到集合末尾
_intsetSet(is,intrev32ifbe(is->length),value);
// 修改数组长度
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}
4. 链表 List
链表结点
c语言中没有链表,Redis中自己设计了链表结构
typedef struct listNode {
struct listNode *prev; // 前一个阶段
struct listNode *next; // 下一个结点
void *val; // 结点中的值
} listNode;
链表结构设计
redis在listNode的基础上封装了list数据结构
typedef struct list {
listNode *head; // 头结点
listNode *tail; // 尾结点
// 结点复制函数
// 节点值复制函数
void *(*dup)(void *ptr);
//节点值释放函数
void (*free)(void *ptr);
//节点值比较函数
int (*match)(void *ptr, void *key);
//链表节点数量
unsigned long len;
} list;
链表的结构
redis链表的优势:
- redis链表中有prev和next两个指针,获取某一结点的前置结点和后置结点的时间复杂度都为O(1)
- list中封装了head和tail,获取头尾结点的时间复杂度为O(1)
- list中记录了链表长度,获取链表长度的时间复杂度为O(1)
- list使用void*存储结点值,所以链表结点可以存储不同类型的值
redis链表的缺点:
- 链表的结点是不连续的,无法利用CPU缓存局部性原理带来的优势
- 封装一个链表结点,需要存储指针,内存消耗较大
5. 压缩列表 ZipList
ZipList结构设计
压缩列表是一种内存紧凑型的数据结构,由连续内存块组成的顺序型数据结构,类似于数组zipListEntry结构
普通链表记录结点需要两个指针,需要16个字节,浪费内存。entry采用了下面的结构:
- previous_entry_length:前一节点的长度,占1个或5个字节
- 如果前一节点的长度小于254字节,则采用1个字节来保存这个长度值
- 如果前一节点的长度大于254字节,则采用5个字节来保存这个长度值,第一个字节为0xfe,后四个字节才是真实长度数据
- encoding:编码属性,记录content的数据类型(字符串还是整数)以及长度,占用1个、2个或5个字节
- contents:保存结点,可以是字符串或者整数
Encoding编码
压缩列表的编码方式分为字符串和整数两种
- 字符串
encoding以“00”、“01”或者“10”开头,则证明content是字符串
例如,我们要保存字符串:“ab”和 “bc”
- 整数
如果encoding为“11”,则表明content为整数,encoding固定只占1个字节
ZipList的连锁更新问题
连锁更新问题
压缩列表插入的元素较大是,可能会导致后序元素的prev_entry_length(前一个结点的长度)占用空间发生变化,进而引起连锁更新问题,导致每个元素的空间都要重新分配,造成压缩列表性能的下降
ZipList这种特殊情况下产生的连续多次空间扩展操作称之为连锁更新(Cascade Update)。新增、删除都可能导致连锁更新的发生。
压缩列表的优缺点
优点:
- 压缩列表占用连续的内存空间,可以利用CPU缓存提升查询性能
- 压缩列表设计紧凑,针对不同长度的数据,进行相应的编码,相对于数组数据长度都相同,可以更有效的节省内存开销
缺点:
- 压缩列表不可以存储过多元素,否则查询效率很低
- 新增或者修改某个元素时,结点占用的内存空间可能会重新分配,甚至引发连锁更新的问题
因此,Redis在存储的元素数量较少,或者元素值不大时,才会使用压缩列表作为底层数据结构
6. 快表 QuickList
QuickList结构
typedef struct quicklist {
quicklistNode *head; // quciklist的链表头结点
quicklistNode *tail; // quicklist的链表尾结点
unsigned long count; // 压缩列表元素个数
unsigned long len; // quicklist的结点个数
...
} quciklist;
typedef struct quicklistNode {
struct quicklistNode *prev; // 前一个结点
struct quicklistNode *next; // 下一个结点
unsigned char *zl; // quicklistNode指向的压缩列表
unsigned int sz; // 压缩列表的字节大小
unsigned int count : 16; // 压缩列表中元素的个数
...
} quciklistNode;
向quicklist中添加元素时,会检查插入位置的quicklistNode上的压缩链表能否容下该元素,如果可以,就直接插入这个Node中,不可以的话,就新建一个quicklistNode
QuickList解决了什么?
如果存入大量数据,超过了ZipList的最佳上限,我们可以创建多个分片来存储数据
7. 跳表 SkipList
跳表概述
2. redis持久化
1. RDB快照
RDB:Redis Database Backup file(即Redis数据库备份文件),也可以称之为Redis数据快照
就是把Redis内存中的数据就记录在磁盘中,当Redis实例出现故障时,从磁盘中读取快照文件,恢复数据
RDB触发机制
如何使用RDB快照
Redis提供了两个命令生成RDB文件:save和bgsave,他们的区别在于是否在主线程中执行数据的备份
- 执行save命令,会在主线程中生成RDB文件,与redis操作在同一个线程,所以会阻塞主线程
- 执行bgsave命令,会创建一个子进程,子进程在后台执行数据备份,不会阻塞主线程
在redis.conf中可以配置redis触发机制
# 在900s内,如果有一个key被修改,则执行bgsave
save 900 1
save 300 10
save 60 10000
save "" # 表示禁用RDB
-----------------------------------------------------
# redis的其他配置
# 是否压缩 ,建议不开启,压缩也会消耗cpu,磁盘的话不值钱
rdbcompression yes
# RDB文件名称
dbfilename dump.rdb
# 文件保存的路径目录
dir ./
以下几点需要注意:
- RDB 文件的加载工作是在服务器启动时自动执行的,Redis 并没有提供专门用于加载 RDB 文件的命令。
redis的快照是全量快照,也就是说每次执行快照都会吧内存中的所有数据记录在磁盘中。所以RDB是一个比较中的操作,频繁的执行RDB会对性能产生影响。频次太低,快照之间的数据可能会因为服务器的宕机丢失很多。
RDB实现原理
当bgsave开始执行时,会fork主进程得到子进程,子进程共享主进程的内存数据,完成fork后将内存数据写入RDB文件。
所谓fork,就是新建一个子进程,然后复制主进程的页表,这样主进程与子进程映射的就是同一块物理内存。并且这一块物理内存时只读(read-only)的,fork完毕后,子进程就开始吧内存中的数据写到磁盘中。持久化的过程中,如果主进程对内存进行写操作时,是不可以直接写的,因为子进程正在读取数据,这样可能会出现脏数据。fork中使用了copy-on-write技术,也就是说在发生写操作时,从原来的数据中拷贝一份数据,将数据写到拷贝的内存中,主进程读取数据时也从这个拷贝的数据中读取!
简单来说,bgsave的流程为:fork主进程得到一个子进程,复制页表,共享一片内存空间
- 子进程读取内存数据,写入一个新的rdb文件
- 将新的rdb文件替换掉旧的rdb文件
RDB的缺点
- RDB执行期间间隔长,两次RDB之间的数据可能会丢失
- fork子进程、写RDB文件都比较耗时
几点需要注意:
- fork主进程创建子进程,会阻塞主进程,复制页表,不复制内存,也是为了加速子进程的创建
- redis在执行bgsave的过程中,如果发生了写操作,这次写操作不会记录在此次bgsave中,只能等到下一次bgsave才能存储;如果系统恰好在rdb文件创建完后崩溃了,redis将丢失主进程在快照期间的写操作
-
2. AOF日志
AOF:Append Only File(追加文件),redis处理的每一条写命令都会记录在AOF文件,可以看作日志记录文件
AOF配置
AOF默认是关闭的,需要修改redis.conf配置文件来开启AOF:
是所示# 是否开启AOF功能,默认是no
appendonly yes
# AOF文件的名称
appendfilename "appendonly.aof"
由于AOF记录的是命令,所以AOF文件要比RDB文件大的多,并且AOF会记录对一个key的多次写操作,但是只有最后一次的写操作才是有意义的。redis提供了bgrewriteaof命令,让AOF执行重写功能,以最少的命令实现相同的效果!
redis触发阈值时会自动重写AOF文件,可在redis.conf中配置# AOF文件比上次文件 增长超过多少百分比则触发重写
auto-aof-rewrite-percentage 100
# AOF文件体积最小多大以上才触发重写
auto-aof-rewrite-min-size 64mb
AOF持久化的两个风险
当redis没有来的即将AOF文件写入到内存中,服务器就宕机了,此时数据就会有丢失的风险
redis执行命令和写日志是同步进行的,瑞国写日志时间过长,下一条redis命令将会被阻塞
AOF写回策略
redis执行完写操作命令后,会将命令追加至server.aof_buf缓冲区中
- 然后通过write()系统调用,将aof_buf缓冲区中的数据添加至内核缓冲区page cache中,等待将数据写入硬盘
- 具体什么时候将内核缓冲区中的数据持久化到磁盘中,有内核决定
Redis 提供了 3 种写回硬盘的策略,控制的就是上面说的第三步的过程。
redis.conf文件中可以设置3中协会策略
# 表示每执行一次写命令,立即记录到AOF文件
appendfsync always
# 写命令执行完先放入AOF缓冲区,然后表示每隔1秒将缓冲区数据写到AOF文件,是默认方案
appendfsync everysec
# 写命令执行完先放入AOF缓冲区,由操作系统决定何时将缓冲区内容写回磁盘
appendfsync no
- 如果需要高性能,就选择no
- 如果需要高可靠,就选择Always
- 如果允许数据丢失一系诶,同时又想要性能高,就选择everysec
这三种策略只是在控制 fsync() 函数的调用时机
当应用程序向文件写入数据时,内核通常先将数据复制到内核缓冲区中,然后排入队列,然后由内核决定何时写入硬盘。
- Always 策略就是每次写入 AOF 文件数据后,就执行 fsync() 函数;
- Everysec 策略就会创建一个异步任务来执行 fsync() 函数;
-
AOF重写机制
AOF日志是一个文件,随着写操作的命令越来越多,文件也会越来越大。而文件过大会带来性能问题,如果redis重启后,读取redis加载数据,如果文件过大,加载过程就会很慢。
如果多次对一个key进行写入操作,只有最后一个是有效的
- 多次写入操作后,出现一个delete操作,所有操作无效
所以,为了消除这些无用的操作,redis启用了重写机制,来压缩AOF文件。
AOF重写机制:读取当前数据库的键值对,将键值用一条命令记录在记录到新的AOF文件中,全部记录完后,用新的AOF文件覆盖旧的AOF文件。这样就可以在多个键值对被反复修改,只保存最后一条,大大减少的AOF的命令数量
AOF后台重写
写入AOF的操作是在主线程中完成的,但是重写AOF文件实在后台的子进程bgrewriteaof中完成的,因为重写AOF文件是很耗时的!
与RDB的bgsave后台持久化一样,AOF也是使用fork()子进程使用copy-on-write技术重写的AOF
主进程通过fork()调用生成bgwriteaof子进程,操作系统将主进程的页表复制给子进程,这样主进程与子进程映射到同一片内存空间。这样主进程在进行增删改查的操作时,子进程可以在后台执行AOF读写,就不会阻塞redis的主进程了。同样,当主进程进行写操作时,会将被写的屋里内存拷贝一份,读写操作都作用在拷贝的内存上。
但是这样又带来了一个问题,重写AOF过程中,如果主进程修改了已经存在的key-value,此时子进程中的内存数据就和主进程中的内存数据不一致了,为了解决数据不一致问题,redis引入了AOF重写缓冲区,这个缓冲区在bgwriteaof进程之后使用。在重写AOF期间,redis执行完一个写命令后,会将这个命令写入AOF缓冲区和AOF重写缓冲区
当bgrewriteaof执行完AOF重写工作后,会将AOF重写缓冲区中的内容添加至新的AOF文件中,保证AOF记录与数据库的一致性,然后用新的AOF文件覆盖旧的文件
3. redis缓存
本节以项目tust-bilibili为背景,为项目添加redis缓存
1. 添加redis缓存
首页视频添加redis缓存
/**
* 分頁獲取視頻列表,在首頁瀑布流展示
*/
@GetMapping("/videos")
public JsonData<PageResult<Video>> pageListVideos(Integer size, Integer no, String area) {
PageResult<Video> videos = videoService.pageListVideos(size, no, area);
return new JsonData<>(videos);
}
/**
* 分页查询视频列表
* @param size 分页查询的数量
* @param no 第几页表
* @param area 视频分区信息
* @return 分页查询的结果
*/
@Override
public PageResult<Video> pageListVideos(Integer size, Integer no, String area){
if(size == null || no == null) {
throw new CustomException("参数异常");
}
Map<String, Object> map = new HashMap<>();
map.put("start", (no - 1) * size);
map.put("limit", size);
map.put("area", area);
Integer total = videoMapper.pageCountListVideos(map);
List<Video> videoList = new ArrayList<>();
if(total > 0) {
videoList = videoMapper.pageListVideos(map);
}
return new PageResult<>(total, videoList);
}
添加redis缓存的代码
@Override
public PageResult<Video> pageListVideosWithRedis(Integer size, Integer no, String area){
if(size == null || no == null) {
throw new CustomException("参数异常");
}
Map<String, Object> map = new HashMap<>();
map.put("start", (no - 1) * size);
map.put("limit", size);
map.put("area", area);
// 1. 从redis中查询缓存
String key = PAGE_VIDEO_KEY_PREFIX + size + "-" + no + "-" + area;
String s = stringRedisTemplate.opsForValue().get(key);
// 2. 判断缓存是否命中
if (!StringUtils.isNullOrEmpty(s)) {
// 3. 命中,直接返回
PageResult pageResult = JSONUtil.toBean(s, PageResult.class);
return pageResult;
}
// 4. 未命中,根据map查询数据库
Integer total = videoMapper.pageCountListVideos(map);
List<Video> videoList = new ArrayList<>();
if(total > 0) {
videoList = videoMapper.pageListVideos(map);
// 5. 存在信息,写回redis
PageResult<Video> result = new PageResult<>(total, videoList);
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(result), 5,
TimeUnit.MINUTES);
}
return new PageResult<>(total, videoList);
}
根据videoId查询视频添加redis缓存
逻辑是一样的
/**
* 获取视频详情
* @param videoId 视频id
* @return 视频信息和用户信息
*/
@Override
public Map<String, Object> getVideoDetailWithRedis(Long videoId){
// 1. 从redis中根据videoId查询视频信息
String key = VIDEO_ID_KEY_PREFIX + ":" + videoId;
String s = stringRedisTemplate.opsForValue().get(key);
// 2. 判断redis中是否包含数据
if (!StringUtils.isNullOrEmpty(s)) {
// 3. 命中, 直接返回
return JSONUtil.toBean(s, Map.class);
}
// 4. 未命中,查询数据库
// 4.1. 根据videoId获取视频信息
Map<String, Object> result = new HashMap<>();
Video video = videoMapper.getVideoById(videoId);
if(video != null) {
// 4.2. 根据视频信息获取userId
Long userId = video.getUserId();
// 4.3. 根据userId获取用户信息
UserVO userInfoById = userService.getUserInfoById(userId);
UserInfo userInfo = userInfoById.getUserInfo();
// 4.4. 将视频信息和用户信息封装在map中
result = new HashMap<>();
result.put("video", video);
result.put("userInfo", userInfo);
}
// 5. 将查询结果,写入redis
if(result != null) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(result));
}
return result;
}
2. 缓存更新
使用redis缓存会存在数据的一致性问题,比如对数据库中的数据进行更新操作,而查询redis缓存仍是原来的数据,这就就出现了数据的还不一致,因此在更新数据库时,需要对redis缓存进行更新!
那么应该更新缓存还是删除缓存呢?
- 更新缓存:如果每次更新数据库都更新缓存,只有最后一次更新是有效的,那么就对进行了多次无效写操作
- 删除缓存:更新数据库时,删除缓存,再次查询时再更新缓存
如果不查询数据,都没有必要对数据进行更新,所以应该选择删除缓存
需要注意的点:
- 操作数据库和删除缓存需要同时成功或者同时失败,所以要加上事务注解@Transactional
- 应先更新数据库,再删除缓存
为什么要先更新数据库,再删除缓存?
因为在多线程情况下,更新数据库和删除缓存会出现数据不一致的问题,如果先删除缓存,在没有更新数据库的情况下,新的线程查询缓存,会将数据库中原始值更新到缓存中,而这样更新数据库的值就与缓存中的数据不一致,由于查询缓存和写入缓存都非常快,所以这样的线程安全问题出现的可能性高。
/** 解决缓存更新问题 **/
@Transactional
public void updateVideoDetailWithRedis(Video video) {
if(video == null) {
throw new CustomException("参数不合法");
}
// 1. 操作数据库
videoMapper.updateVideo(video);
// 2. 删除缓存
stringRedisTemplate.delete(VIDEO_ID_KEY_PREFIX + video.getId());
}
3. 缓存穿透
缓存穿透:指请求的数据在缓存和数据库中都不存在,这样缓存永远不会生效,请求就会都打进数据库中
解决方案:
- 缓存空对象:
- 优点:实现简单、维护方便
- 缺点:占用了额外的内存消耗,会有短暂的数据不一致
- 布隆过滤器
- 优点:内存占用少,redis中没有多余的key
- 缺点:会出现误判,被放行的数据可能不在数据库中,拒绝的数据一定不在数据库中
布隆过滤器:布隆过滤器是数据库中的数据的哈希映射,使用较好的空间可以判断数据库中是否存有某个数据
缓存空对象流程图
/**
* 解决缓存穿透
*/
@Override
public Map<String, Object> getVideoDetailWithRedisPassThrough(Long videoId){
if(videoId == null) {
throw new CustomException("参数不合法");
}
// 1. 从redis中根据videoId查询视频信息
String key = VIDEO_ID_KEY_PREFIX + videoId;
String s = stringRedisTemplate.opsForValue().get(key);
// 2. 判断redis中是否包含数据
if (!StringUtils.isNullOrEmpty(s)) {
// 3. 命中, 直接返回
return JSONUtil.toBean(s, Map.class);
}
// 这里的s可能出现的情况(s == null, s == "")
if(s != null) {
// s != null 就意味着 s == "",即s为空值,直接返回
return null;
}
// 4. 未命中,查询数据库,s == null
// 4.1. 根据videoId获取视频信息
Map<String, Object> result = new HashMap<>();
Video video = videoMapper.getVideoById(videoId);
if(video != null) {
// 4.2. 根据视频信息获取userId
Long userId = video.getUserId();
// 4.3. 根据userId获取用户信息
UserVO userInfoById = userService.getUserInfoById(userId);
UserInfo userInfo = userInfoById.getUserInfo();
// 4.4. 将视频信息和用户信息封装在map中
result = new HashMap<>();
result.put("video", video);
result.put("userInfo", userInfo);
}
// 5. 将查询结果,写入redis
if(result == null) {
// 解决缓存击穿:查询结果video为null,缓存空值到redis中
stringRedisTemplate.opsForValue().set(key, "", 2, TimeUnit.MINUTES);
}
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(result));
return result;
}
4. 缓存雪崩
5. 缓存击穿
缓存击穿:缓存击穿问题也叫热点key问题,就是一个key被高并发访问并且缓存重建时间较长,导致大量的key同时访问数据库并进行缓存重建,给数据库带来巨大的冲击!
模拟缓存击穿
使用Jmeter压力测试,在5s中内执行1000个线程访问videoId=38的视频详情信息,理想情况下第一次访问redis未命中,查询数据库重建缓存,后面都应该从redis中读取数据。但是查询数据库和重建缓存都是需要时间的,在此期间发送的请求又会多次的请求数据库,重建缓存,从下图可以看到,数据库被访问了几十上百次!这就是缓存击穿问题,解决缓存击穿问题,就要只访问一次数据库!
缓存击穿的解决方案:
- 互斥锁
-
互斥锁
使用redis里的setnx作为互斥锁
只有当key不存在时才可以设置键值对,key存在时,不可以修改redis:0>setnx lock 1
"1"
redis:0>get lock
"1"
redis:0>setnx lock 2
"0"
redis:0>setnx lock 3
"0"
```java /**
解决缓存击穿:互斥锁 */ public Map
getVideoDetailWithRedisMutex(Long videoId){ if(videoId == null) { throw new CustomException(“参数不合法”); }
// 1. 从redis中根据videoId查询视频信息 String key = VIDEO_ID_KEY_PREFIX + videoId; String s = stringRedisTemplate.opsForValue().get(key);
// 2. 判断redis中是否包含数据 if (!StringUtils.isNullOrEmpty(s)) { // 3. 命中, 直接返回 return JSONUtil.toBean(s, Map.class); }
if(s != null) { return null; }
// 4. 未命中,查询数据库,缓存重建 String lock = VIDEO_MUTEX_KEY_PREFIX + key; Map
result = new HashMap<>(); try { if (! trylock(lock)) { // 没有获取到互斥锁,休眠
Thread.sleep(50);
// 递归重新查询redis缓存
return getVideoDetailWithRedisMutex(videoId);
}
// 获得互斥锁,查询redis是否命中 s = stringRedisTemplate.opsForValue().get(key);
// 这里重新查询了一遍redis,防止获取锁的时候,redis已经重建好缓存 if (!StringUtils.isNullOrEmpty(s)) {
// 3. 命中, 直接返回
return JSONUtil.toBean(s, Map.class);
}
if(s != null) {
return null;
} // 查询数据库 // 4.1. 根据videoId获取视频信息 result = new HashMap<>(); Video video = videoMapper.getVideoById(videoId);
if(video != null) {
// 4.2. 根据视频信息获取userId
Long userId = video.getUserId();
// 4.3. 根据userId获取用户信息
UserVO userInfoById = userService.getUserInfoById(userId);
UserInfo userInfo = userInfoById.getUserInfo();
// 4.4. 将视频信息和用户信息封装在map中
result = new HashMap<>();
result.put("video", video);
result.put("userInfo", userInfo);
} // 5. 将查询结果,写入redis if(result == null) {
// 解决缓存击穿:查询结果video为null,缓存空值到redis中
stringRedisTemplate.opsForValue().set(key, "", 2, TimeUnit.MINUTES);
} stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(result)); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放锁 unlock(lock); } return result; }
/ 互斥锁 / public boolean trylock(String key) { // 互斥锁,设置10s,10s后销毁,以防出现问题永远存在redis里 Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, “1”, 10, TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); }
/ 解锁,就是将redis里的lock删除 / public void unlock(String key) { stringRedisTemplate.delete(key); }
使用互斥锁后,只查询了一次数据库<br />
<a name="n77WS"></a>
#### 逻辑过期
逻辑过期是在存储数据中设置一个过期时间,可以认为设置,他不是redis的TTL,理论上可以永远存在。先人为的将要缓存的数据存储到redis中,(例如双11时,将抢购的商品数据预先缓存至redis中)<br />当查询缓存时发现逻辑时间已经过期,表明需要重建缓存了。此时加上互斥锁,开启一个新的线程来中兴重建缓存操作,主线程中直接返回旧的数据。其他线程查询缓存获取互斥锁失败,也返回原来的过期数据。这样使用逻辑过期,就可以只重建一次缓存,并且可以保证每次请求可以快速响应,但是会有数据的不一致!<br /><br />
<a name="Biitc"></a>
### 6. redis缓存工具封装
<a name="zHi18"></a>
## 4. redis集群
<a name="VnpK8"></a>
### 1. redis主从集群
单点redis的并发能力是有上限的,搭建redis主从集群,实现读写分离,可以提高redis的并发能力<br />
<a name="iXurB"></a>
#### 主从数据同步原理
主从同步第一次是全量同步<br /><br />master如何判断接结点是不是第一次同步?
- Replication Id:简称replid,是数据集的标记,id一致则说明是同一数据集。每一个master都有唯一的replid,slave则会继承master节点的replid
- offset:偏移量,随着记录在repl_baklog中的数据增多而逐渐增大。slave完成同步时也会记录当前同步的offset。如果slave的offset小于master的offset,说明slave数据落后于master,需要更新。
<br />**简述全量同步的流程**
1. slave结点请求全量同步时,会发送自己的replid和offset,master结点判断自己的replid与slave结点是否相同,不相同的话,表明是第一次访问,开始全量同步
1. master结点将主节点的replid和offset发送给slave结点,slave结点保存信息
1. master结点在后台执行bgsave生成RDB文件,让后发送给slave结点
1. slave结点接收到RDB文件后,清空本地数据,加载RDB文件
1. master将RDB期间的命令保存在rep_baklog中,并将log发送给slave结点
1. slave结点执行接收到的命令
redis重启后,进行的是增量同步<br /><br />**简述增量同步流程**
1. slave结点的redis重启后,携带repIid和offset向master结点申请同步
1. master发现repIid结点与slave的一致,所以不是第一次同步,开始增量同步
1. master到repl_baklog中寻找offset之后的命令,并将offset之后的命令发送给slave结点
1. slave结点执行命令
repl_baklog<br />repl_baklog是一个环形数组,用于增量更新,其内部维护一个master的offset,记录master写到什么位置了,当slave的offset发送过来后,就将偏移量之间的数据发送给slave节点<br /><br />repl_baklog是有上限的,写满后就会覆盖最早的数据,如果slave结点断开时间太长,就会导致为备份的数据被覆盖掉,就无法基于log进行增量同步,就应该使用去量同步
<a name="KBO1Z"></a>
#### redis主从集群搭建
这里我们会在同一台虚拟机中开启3个redis实例,模拟主从集群,信息如下:
| **IP** | **PORT** | **角色** |
| --- | --- | --- |
| 192.168.150.101 | 7001 | master |
| 192.168.150.101 | 7002 | slave |
| 192.168.150.101 | 7003 | slave |
要在同一台虚拟机开启3个实例,必须准备三份不同的配置文件和目录,配置文件所在目录也就是工作目录。
<a name="cjapC"></a>
##### 1)创建目录
我们创建三个文件夹,名字分别叫7001、7002、7003:
```shell
# 进入/tmp目录
cd /temp
# 创建目录
mkdir 7001 7002 7003
2)恢复原始配置
修改redis-6.2.4/redis.conf文件,将其中的持久化模式改为默认的RDB模式,AOF保持关闭状态。
# 开启RDB
# save ""
save 3600 1
save 300 100
save 60 10000
# 关闭AOF
appendonly no
3)拷贝配置文件到每个实例目录
然后将redis-6.2.4/redis.conf文件拷贝到三个目录中(在/tmp目录执行下列命令):
# 方式一:逐个拷贝
cp redis-6.2.4/redis.conf 7001
cp redis-6.2.4/redis.conf 7002
cp redis-6.2.4/redis.conf 7003
# 方式二:管道组合命令,一键拷贝
echo 7001 7002 7003 | xargs -t -n 1 cp redis-6.2.4/redis.conf
4)修改每个实例的端口、工作目录
修改每个文件夹内的配置文件,将端口分别修改为7001、7002、7003,将rdb文件保存位置都修改为自己所在目录(在/tmp目录执行下列命令):
sed -i -e 's/6379/7001/g' -e 's/dir .\//dir \/home\/project\/redis-cluster\/7001\//g' 7001/redis.conf
sed -i -e 's/6379/7002/g' -e 's/dir .\//dir \/home\/project\/redis-cluster\/7002\//g' 7002/redis.conf
sed -i -e 's/6379/7003/g' -e 's/dir .\//dir \/home\/project\/redis-cluster\/7003\//g' 7003/redis.conf
sed -i -e 's/6379/7002/g' -e 's/dir .\//dir \/tmp\/7002\//g' 7002/redis.conf
sed -i -e 's/6379/7003/g' -e 's/dir .\//dir \/tmp\/7003\//g' 7003/redis.conf
5)修改每个实例的声明IP
虚拟机本身有多个IP,为了避免将来混乱,我们需要在redis.conf文件中指定每一个实例的绑定ip信息,格式如下
# redis实例的声明 IP
replica-announce-ip 172.26.34.179
每个目录都要改,我们一键完成修改(在/tmp目录执行下列命令):
sed -i '1a replica-announce-ip 172.26.34.179' 7001/redis.conf
# 逐一执行
sed -i '1a replica-announce-ip 192.168.150.101' 7001/redis.conf
sed -i '1a replica-announce-ip 192.168.150.101' 7002/redis.conf
sed -i '1a replica-announce-ip 192.168.150.101' 7003/redis.conf
# 或者一键修改
printf '%s\n' 7001 7002 7003 | xargs -I{} -t sed -i '1a replica-announce-ip 172.26.34.179' {}/redis.conf
3)启动
为了方便查看日志,我们打开3个ssh窗口,分别启动3个redis实例,启动命令:
# 第1个
redis-server 7001/redis.conf
# 第2个
redis-server 7002/redis.conf
# 第3个
redis-server 7003/redis.conf
启动后:
如果要一键停止,可以运行下面命令:
printf '%s\n' 7001 7002 7003 | xargs -I{} -t redis-cli -p {} shutdown
4)开启主从关系
现在三个实例还没有任何关系,要配置主从可以使用replicaof 或者slaveof(5.0以前)命令。
有临时和永久两种模式:
- 修改配置文件(永久生效)
- 在redis.conf中添加一行配置:
slaveof <masterip> <masterport>
- 在redis.conf中添加一行配置:
- 使用redis-cli客户端连接到redis服务,执行slaveof命令(重启后失效):
如果redis设置密码的话,必须在redis.conf中,加入auth密码认证,如下:slaveof <masterip> <masterport>
masterauth <password>
注意:在5.0以后新增命令replicaof,与salveof效果一致。
这里我们为了演示方便,使用方式二。
通过redis-cli命令连接7002,执行下面命令:
# 连接 7002
redis-cli -p 7002
# 执行slaveof
slaveof 192.168.150.101 7001
通过redis-cli命令连接7003,执行下面命令:
# 连接 7003
redis-cli -p 7003
# 执行slaveof
slaveof 172.26.34.179 7001
然后连接 7001节点,查看集群状态:
# 连接 7001
redis-cli -p 7001
# 查看状态
info replication
6) 测试
执行下列操作以测试:
- 利用redis-cli连接7001,执行
set num 123
- 利用redis-cli连接7002,执行
get num
,再执行set num 666
- 利用redis-cli连接7003,执行
get num
,再执行set num 888
可以发现,只有在7001这个master节点上可以执行写操作,7002和7003这两个slave节点只能执行读操作。
2. redis哨兵集群
redis哨兵作用与原理
redis从结点宕机了,可以使用master结点来同步数据
如果master主节点宕机了,该怎么办呢,redis哨兵就是解决这个问题的!
redis的哨兵机制主要是为了实现主从集群中主节点宕机自动恢复故障,哨兵sentinel机制的作用有:
- 监控:哨兵不断检查主从结点是否正常工作
- 自动恢复故障:如果master故障,哨兵会将一个slave结点晋升为master,故障恢复后仍以新的master为主
- 通知:当集群发生故障转移时,哨兵会将最新消息推送给redis客户端
可以看到哨兵承担着监控master的重任,因此它也必须是高可靠的,所以哨兵也要部署集群
哨兵是如何监控redis的状态的
sentinel通过心跳机制监测服务状态,每隔1s向每隔redis实例发型ping命令:
- 主观下线:某个哨兵结点发现某redis实例没有响应,就认为该结点主观下线
- 客观下线:超过指定数量(quorum)的哨兵结点发现某个redis实例没有响应,就认为它客观下线,就是真的宕机了
quoruo值最好超过哨兵实例的一半,其实就是一个投票机制
选举新的master
一旦master发生故障,redis会从slave中选择一个结点成为新的master,选择依据是:
- slave的offset值越大,说明slave结点中的数据越新,就选择它作为新结点
如何实现故障转移
当选择一个新的slave作为master后,
- 哨兵结点会向选择的slave结点,发送slave no one指令,让该节点成为新的master
- 哨兵向其他的所有结点发送指令,使得其他结点都成为新的master的从节点,并从新的master上同步数据
- 最后哨兵将故障结点标记为slave,故障恢复后,它也会成为master的slave节点
redis哨兵集群搭建
这里我们搭建一个三节点形成的Sentinel集群,来监管之前的Redis主从集群。如图:
三个sentinel实例信息如下:
节点 | IP | PORT |
---|---|---|
s1 | 192.168.150.101 | 27001 |
s2 | 192.168.150.101 | 27002 |
s3 | 192.168.150.101 | 27003 |
1)准备实例和配置
要在同一台虚拟机开启3个实例,必须准备三份不同的配置文件和目录,配置文件所在目录也就是工作目录。
我们创建三个文件夹,名字分别叫s1、s2、s3:
# 进入/tmp目录
cd /tmp
# 创建目录
mkdir s1 s2 s3
如图:
然后我们在s1目录创建一个sentinel.conf文件,添加下面的内容:
port 27001
sentinel announce-ip 172.26.34.179
sentinel monitor mymaster 172.26.34.179 7001 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
dir "/home/project/redis-cluster/s1"
dir "/tmp/s1"
解读:
port 27001
:是当前sentinel实例的端口sentinel monitor mymaster 192.168.150.101 7001 2
:指定主节点信息mymaster
:主节点名称,自定义,任意写192.168.150.101 7001
:主节点的ip和端口2
:选举master时的quorum值
然后将s1/sentinel.conf文件拷贝到s2、s3两个目录中(在/tmp目录执行下列命令):
# 方式一:逐个拷贝
cp s1/sentinel.conf s2
cp s1/sentinel.conf s3
# 方式二:管道组合命令,一键拷贝
echo s2 s3 | xargs -t -n 1 cp s1/sentinel.conf
修改s2、s3两个文件夹内的配置文件,将端口分别修改为27002、27003:
sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf
sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf
2)启动
为了方便查看日志,我们打开3个ssh窗口,分别启动3个redis实例,启动命令:
# 第1个
redis-sentinel s1/sentinel.conf
# 第2个
redis-sentinel s2/sentinel.conf
# 第3个
redis-sentinel s3/sentinel.conf
3)测试
尝试让master节点7001宕机,查看sentinel日志:
查看7003的日志:
查看7002的日志:
3. redis分片集群
分片集群结构
主从和哨兵解决了高并发和高可用的问题,但是仍有问题:
- 海量数据存储问题:主从同步耗时,存储海量数据将会使得性能下降
- 高并发写的问题:只有一个master结点,在高并发写的情况下,性能会下降
分片集群的特征
- 集群中有多个master,每个master上保存不同的数据
- 每个master上有多个slave结点
- master之间彼此通过心跳监测健康状态
- 客户端可以访问集群的任意结点,最终都会转发至正确的结点上
散列插槽
redis把每一个结点映射到0-16383共16384个插槽上,查看集群信息可以看出:
数据的key不是与结点绑定,而是与插槽绑定,redis通过哈希算法获取一个hash值,然后对16384取余,就可以得到插槽值
key中包含”{}”,且“{}”中至少包含1个字符,“{}”中的部分是有效部分,就只使用{}的值计算slot
集群伸缩
通过redis-cli —cluster中的集群命令,可以向集群中添加结点或者删除结点
比如,添加节点的命令:
故障转移
当分片集群中的一个redis宕机时,会选择它的一个slave结点升级为master结点
使用cluster failover可以手动让集群中的某个master宕机