1.Redis安装
- Redis下载
- 将压缩包上传到Linux服务器上
/media/sf_share/redis-6.2.1 目录下
解压压缩文件
`tar -zxvf redis-6.2.1.tar.gz`
安装GCC编译器
yum install -y gcc
- 查看GCC版本
gcc --version
- 进入解压后的redis文件夹
cd redis-6.2.1
- 在redis-6.2.3目录下执行
make
命令 - 在redis-6.2.3目录下执行
make install
命令 - 若指定安装目录,可将上两步替换为以下命令,将安装目录修改为自己的
make PREFIX=/usr/local/redis install
- 默认安装目录
/usr/local/bin
Redis配置文件详解
查看默认安装目录:
redis-benchmark:性能测试工具,可以在自己本子运行,看看自己本子性能如何
redis-check-aof:修复有问题的AOF文件,rdb和aof后面讲
redis-check-dump:修复有问题的dump.rdb文件
redis-sentinel:Redis集群使用
redis-server:Redis服务器启动命令
redis-cli:客户端,操作入口
2.Redis启动
- 前台启动(不推荐使用)
cd /usr/local/bin
redis-server
- 后台启动
- 拷贝一份redis.conf到其他目录
cp /media/sf_share/redis-6.2.1/redis.conf /etc/redis.conf
- 后台启动设置redis.conf的daemonize值
vi /etc/redis.conf
- no改成yes
- 若用阿里云安装的redis,必须设置密码,不然会被挖矿
- 修改配置文件,设置requirepass值,即密码值
requirepass xxx
- redis启动
cd /usr/local/bin
redis-server /etc/redis.conf
- 客户端访问
- 无密码
redis-cli
- 有密码
redis-cli -a 密码
或者使用redis-cli
进入redis后,使用auth “密码” 认证
- redis关闭
- `redis-cli shutdown`
- 使用 kill -9 2977 杀掉进程
Redis关闭服务错误——(error) ERR Errors trying to SHUTDOWN. Check logs.
https://blog.csdn.net/qq_46127735/article/details/113933690
ps -ef | grep redis 找到redis的当前进程号
设置目录权限
chmod -R 777 /myRedis/
查看redis是否在运行
ps aux| grep redis
Redis是单线程+多路IO复用技术
多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)
串行 vs 多线程+锁(memcached)vs 单线程+多路IO复用(Redis)
(与Memcache三点不同: 支持多数据类型,支持持久化,单线程+多路IO复用)
3.Redis键(key)操作
set k1 lucy
—->key:k1 ; value:lucy
set k2 marry
- set k3 jack
keys *
查看当前库所有keyexists key
判断某个key是否存在 —>exists k1
type key
查看你的key是什么类型 —>type k1
del key
删除指定的key数据 —>del k1
unlink key
根据value选择非阻塞删除:仅将keys从keyspace元数据中删除,真正的删除会在后续异步操作。expire key 10
10秒钟:为给定的key设置过期时间 —>expire k2 10
ttl key
查看还有多少秒过期,-1表示永不过期,-2表示已过期 —>ttl k2
- dbsize查看当前数据库的key的数量
flushdb
清空当前库 ```java (error) MISCONF Redis is configured to save RDB snapshots,but it is currently not able to persist on disk. Commands that may modify the data set are disabled, because this instance is configured to report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). Please check the Redis logs for details about the RDB error.
解决方法:
127.0.0.1:6379> config set stop-writes-on-bgsave-error no
<a name="r5Rej"></a>
## 4.Redis数据类型
redis到底有几种数据类型[传送查看](https://www.163.com/dy/article/G500BK4U053725XJ.html)
> Redis命令手册,可自行下载
> 链接: [https://pan.baidu.com/s/1KAabPz-WI7YurbmTEpusVw](https://pan.baidu.com/s/1KAabPz-WI7YurbmTEpusVw) 提取码: m6xt
<a name="gGlsg"></a>
### 4.1 String
**数据结构**<br />String的数据结构为简单动态字符串(Simple Dynamic String,缩写SDS)。是可以修改的字符串,内部结构实现上类似于Java的ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配.<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/21480227/1640250990709-493f89a7-efb5-47bf-b805-a8fdb4f5a19a.png#clientId=uc99b79e0-d58e-4&from=paste&height=117&id=ub7dfccd6&margin=%5Bobject%20Object%5D&name=image.png&originHeight=126&originWidth=693&originalType=binary&ratio=1&size=7451&status=done&style=none&taskId=ud534b60b-b525-4f69-b96b-0500decd21c&width=642.5)<br />如图中所示,内部为当前字符串实际分配的空间capacity一般要高于实际字符串长度len。当字符串长度小于1M时,扩容都是加倍现有的空间,如果超过1M,扩容时一次只会多扩1M的空间。需要注意的是字符串最大长度为512M。
<a name="atHXD"></a>
### 4.2 Hash
- Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象。
类似Java里面的Map<String,Object><br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/21480227/1640259080948-47e60a29-68e0-438f-8adb-3d5529df11e9.png#clientId=uc99b79e0-d58e-4&from=paste&height=166&id=u9fdd8c72&margin=%5Bobject%20Object%5D&name=image.png&originHeight=189&originWidth=400&originalType=binary&ratio=1&size=59026&status=done&style=none&taskId=u4915d570-24e2-4302-af0c-28092caa6a4&width=351)
- 通过 key(用户ID) + field(属性标签) 就可以操作对应属性数据了,既不需要重复存储数据,也不会带来序列化和并发修改控制的问题
**数据结构**<br />Hash类型对应的数据结构是两种:ziplist(压缩列表),hashtable(哈希表)。当field-value长度较短且个数较少时,使用ziplist,否则使用hashtable。
<a name="r0dlu"></a>
### 4.3 List
单键多值<br />Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。它的底层实际是个双向链表,对两端的操作性能很高,通过索引下标的操作中间的节点性能会较差。<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/21480227/1640251104731-1ba1fcd1-cf8d-41e0-9a97-f8462f721ac0.png#clientId=uc99b79e0-d58e-4&from=paste&height=89&id=u2fed571e&margin=%5Bobject%20Object%5D&name=image.png&originHeight=105&originWidth=695&originalType=binary&ratio=1&size=55444&status=done&style=none&taskId=ucbd851ff-cbd7-491b-8bf4-c214582eb09&width=589.5)<br />List的数据结构为快速链表quickList。
<a name="qwJH1"></a>
### 4.4 Set
Redis set对外提供的功能与list类似是一个列表的功能,特殊之处在于set是可以**自动排重**的<br />Redis的Set是string类型的无序集合。它底层其实是一个value为null的hash表,所以添加,删除,查找的**复杂度都是O(1)**。<br />一个算法,随着数据的增加,执行时间的长短,如果是O(1),数据增加,查找数据的时间不变<br />**数据结构**
- Set数据结构是dict字典,字典是用哈希表实现的。
- Java中HashSet的内部实现使用的是HashMap,只不过所有的value都指向同一个对象。Redis的set结构也是一样,它的内部也使用hash结构,所有的value都指向同一个内部值。
<a name="quKSr"></a>
### 4.5 Sorted Set
- Redis有序集合zset与普通集合set非常相似,是一个没有重复元素的字符串集合。
- 不同之处是有序集合的每个成员都关联了一个**评分(score)**,这个评分(score)被用来按照从最低分到最高分的方式排序集合中的成员。集合的成员是唯一的,但是评分可以是重复了 。
- 因为元素是有序的, 所以你也可以很快的根据评分(score)或者次序(position)来获取一个范围的元素。
**数据结构**<br />SortedSet(zset)是Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map<String, Double>,可以给每一个元素value赋予一个权重score,另一方面它又类似于TreeSet,内部的元素会按照权重score进行排序,可以得到每个元素的名次,还可以通过score的范围来获取元素的列表。<br />zset底层使用了两个数据结构
- (1)hash,hash的作用就是关联元素value和权重score,保障元素value的唯一性,可以通过元素value找到相应的score值。
- (2)跳跃表,跳跃表的目的在于给元素value排序,根据score的范围获取元素列表。
**跳跃表(跳表)**<br />1、简介<br /> 有序集合在生活中比较常见,例如根据成绩对学生排名,根据得分对玩家排名等。对于有序集合的底层实现,可以用数组、平衡树、链表等。数组不便元素的插入、删除;平衡树或红黑树虽然效率高但结构复杂;链表查询需要遍历所有效率低。Redis采用的是跳跃表。跳跃表效率堪比红黑树,实现远比红黑树简单。<br />2、实例<br /> 对比有序链表和跳跃表,从链表中查询出51<br />(1) 有序链表<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/21480227/1640261121723-db0d3e23-bf28-44a1-b628-1d36a9f7ffbb.png#clientId=u718cd18d-3bfb-4&from=paste&height=41&id=u45cd1a57&margin=%5Bobject%20Object%5D&name=image.png&originHeight=50&originWidth=693&originalType=binary&ratio=1&size=14137&status=done&style=none&taskId=u477b165b-556b-47b5-8d00-390f367a983&width=573.5)<br />要查找值为51的元素,需要从第一个元素开始依次查找、比较才能找到。共需要6次比较。<br />(2) 跳跃表<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/21480227/1640261140402-3c127bbd-010d-4020-97a6-c50bb7fbaf7e.png#clientId=u718cd18d-3bfb-4&from=paste&height=165&id=u2c461af9&margin=%5Bobject%20Object%5D&name=image.png&originHeight=203&originWidth=693&originalType=binary&ratio=1&size=38729&status=done&style=none&taskId=udbe14f13-a1d4-4529-b049-55705037b0a&width=563.5)<br />从第2层开始,1节点比51节点小,向后比较。<br />21节点比51节点小,继续向后比较,后面就是NULL了,所以从21节点向下到第1层<br />在第1层,41节点比51节点小,继续向后,61节点比51节点大,所以从41向下<br />在第0层,51节点为要查找的节点,节点被找到,共查找4次。
从此可以看出跳跃表比有序链表效率要高。
<a name="FbqbP"></a>
### 4.6 Redis新数据类型
<a name="F39sF"></a>
#### 4.6.1 Bitmaps
<a name="KmKZo"></a>
#### 4.6.2 Geospatial
<a name="L1cI6"></a>
#### 4.6.3 HyperLogLog
<a name="GkxJw"></a>
## 5.0 Redis_Jedis_测试连接redis
**1.导入Jedis所需要的jar包**
```java
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
2.连接Redis注意事项
- 1.禁用Linux的防火墙
- 2.redis.conf中注释掉bind 127.0.0.1 ,然后protected-mode no ``` 安装防火墙(默认安装) sudo apt install ufw
查看防火墙状态 sudo ufw status verbose
**3.测试**
public class JedisDemo1 { public static void main(String[] args) { //创建Jedis对象 Jedis jedis = new Jedis(“192.168.2.9”,6379); jedis.auth(“123456”);//设置密码 //测试 使用校园网(网络地址转换)无法连接 String ping = jedis.ping(); System.out.println(ping); } }
<a name="HJdwP"></a>
## 5.模拟验证码发送
**完成一个手机验证码功能**<br />要求:
- 1、输入手机号,点击发送后随机生成6位数字码,2分钟有效
- 2、输入验证码,点击验证,返回成功或失败
- 3、每个手机号每天只能输入3次
```java
public class PhoneCode {
public static void main(String[] args) {
//模拟验证码发送
verifyCode("18220890508");
//getRedisCode("18220890508","725249");
}
//验证码校验
public static void getRedisCode(String phone,String code){
//从redis中获取验证码
//创建Jedis对象
Jedis jedis = new Jedis("192.168.79.102",6379);
jedis.auth("123456");//设置密码
//验证码key
String codeKey = "VerifyCode" + phone + ":code";
String redisCode = jedis.get(codeKey);
//判断
if (code.equals(redisCode)){
System.out.println("成功");
}else {
System.out.println("失败");
}
jedis.close();
}
//每个手机每天只能发送3次,验证码放到redis中,设置过期时间120s
public static void verifyCode(String phone){
//创建Jedis对象
Jedis jedis = new Jedis("192.168.79.102",6379);
jedis.auth("123456");//设置密码
//拼接key 手机发送次数key
String countKey = "VerifyCode" + phone + ":count";
//验证码key
String codeKey = "VerifyCode" + phone + ":code";
//每个手机每天只能发送3次
String count = jedis.get(countKey);
if (count == null){
//没有发送次数,第一次发送
jedis.setex(countKey,24*60*60,"1");
}else if (Integer.parseInt(count) <= 2){
//发送次数+ 1
jedis.incr(countKey);
}else if(Integer.parseInt(count) > 2){
System.out.println("今天的发送次数已经超过3次");
jedis.close();
//确保第三次以后不再发验证码
return;
}
//发送的验证码放到redis里面
String vcode = getCode();
jedis.setex(codeKey,120,vcode);
jedis.close();
}
//1.生成6位数字验证码
public static String getCode(){
Random random = new Random();
String code = "";
for (int i = 0; i < 6; i++) {
int rand = random.nextInt(10);
code += rand;
}
return code;
}
}
6.SpringBoot整合Redis
1. 在pom.xml文件中引入redis相关依赖
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- spring2.X集成redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
在SpringBoot2.x以后,原来使用的 jedis 被替换为了lettuce
jedis: 采用的直连,多个线程操作的话,是不安全的,如果要避免多线程不安全,使用 jedis pool 连接池 更像BIO模式
lettuce: 采用netty,实例可以在多个线程中进行共享,不存在线程不安全的情况!可以减少线程数量 更像NIO模式
源码:
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnClass({RedisOperations.class})
@EnableConfigurationProperties({RedisProperties.class})
@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})
public class RedisAutoConfiguration {
public RedisAutoConfiguration() {
}
@Bean
@ConditionalOnMissingBean(
name = {"redisTemplate"}
) //我们可以自己定义一个redisTemplate来替换默认的
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
//默认的RedisTemplate 没有过多的设置,redis对象都是需要序列化
//两个泛型都是<Object, Object> ,后面使用需要强制转换
RedisTemplate<Object, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean //String是redis中最常使用的类型,所以单独提出来了一个bean
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
@ConfigurationProperties(
prefix = "spring.redis"
)
public class RedisProperties {
private int database = 0;
private String url;
private String host = "localhost";
private String username;
private String password;
private int port = 6379;
private boolean ssl;
private Duration timeout;
private Duration connectTimeout;
private String clientName;
private RedisProperties.ClientType clientType;
private RedisProperties.Sentinel sentinel;
private RedisProperties.Cluster cluster;
private final RedisProperties.Jedis jedis = new RedisProperties.Jedis();
private final RedisProperties.Lettuce lettuce = new RedisProperties.Lettuce();
}
2、 application.properties配置redis配置
#Redis服务器地址
spring.redis.host=192.168.79.102
#密码
spring.redis.password=123456
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database= 0
#连接超时时间(毫秒)
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
3. 添加redis配置类
package com.atguigu.redis_springboot.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.Duration;
/**
* @author zmh
* @create 2021-12-24 16:28
*/
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
4.测试
RedisTestController中添加测试方法
@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping
public String testRedis(){
//设置值到redis
redisTemplate.opsForValue().set("name","lucy");
//从redis获取值
String name = (String) redisTemplate.opsForValue().get("name");
return name;
}
}
7.Redis事务锁机制_秒杀
7.1定义
- Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
-
7.2Multi、Exec、discard
从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。
- 组队的过程中可以通过discard来放弃组队。
MULTI
标记一个事务块的开始。
事务块内的多条命令会按照先后顺序被放进一个队列当中,最后由 EXEC 命令原子性
(atomic)地执行。
EXEC
执行所有事务块内的命令。
DISCARD
取消事务,放弃执行事务块内的所有命令。
如果正在使用 WATCH 命令监视某个(或某些) key,那么取消所有监视,等同于执行命
令 UNWATCH 。
7.3事务的错误处理
- 1.组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消。
2.如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。
7.4事务冲突的问题(悲观锁乐观锁)
7.4.1悲观锁
悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
7.4.2乐观锁
乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数 据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的。实现:版本号机制
场景:抢票7.4.3WATCH
WATCH key [key …]
在执行multi之前,先执行watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。7.4.4unwatch
取消WATCH 命令对所有key 的监视。
如果在执行WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行UNWATCH 了。
7.5Redis事务三特性
单独的隔离操作
事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
- 没有隔离级别的概念
队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行
- 不保证原子性
事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚
7.6Redis事务秒杀案例
public class SecKill_redis {
public static void main(String[] args) {
Jedis jedis =new Jedis("192.168.79.102",6379);
System.out.println(jedis.ping());
jedis.close();
}
//秒杀过程
public static boolean doSecKill(String uid,String prodid) throws IOException {
//1 uid和prodid非空判断
if(uid == null || prodid == null) {
return false;
}
//2 连接redis
//Jedis jedis = new Jedis("192.168.79.102",6379);
//通过连接池得到jedis对象
JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedisPoolInstance.getResource();
//3 拼接key
// 3.1 库存key
String kcKey = "sk:"+prodid+":qt";
// 3.2 秒杀成功用户key
String userKey = "sk:"+prodid+":user";
//监视库存
jedis.watch(kcKey);
//4 获取库存,如果库存null,秒杀还没有开始
String kc = jedis.get(kcKey);
if(kc == null) {
System.out.println("秒杀还没有开始,请等待");
jedis.close();
return false;
}
// 5 判断用户是否重复秒杀操作
if(jedis.sismember(userKey, uid)) {
System.out.println("已经秒杀成功了,不能重复秒杀");
jedis.close();
return false;
}
//6 判断如果商品数量,库存数量小于1,秒杀结束
if(Integer.parseInt(kc)<=0) {
System.out.println("秒杀已经结束了");
jedis.close();
return false;
}
//7 秒杀过程
//使用事务
Transaction multi = jedis.multi();
//组队操作
multi.decr(kcKey);
multi.sadd(userKey,uid);
//执行
List<Object> results = multi.exec();
if(results == null || results.size()==0) {
System.out.println("秒杀失败了....");
jedis.close();
return false;
}
//7.1 库存-1
//jedis.decr(kcKey);
//7.2 把秒杀成功用户添加清单里面
//jedis.sadd(userKey,uid);
System.out.println("秒杀成功了..");
jedis.close();
return true;
}
}
第一版:简单版
第二版:加事务-乐观锁(解决超卖),但出现遗留库存和连接超时
第三版:连接池解决超时问题
第四版:解决库存依赖问题,LUA脚本
8.Redis持久化
Redis 提供了2个不同形式的持久化方式。
- RDB(Redis DataBase)
- AOF(Append Of File)
8.1RDB
传送查看
在指定的时间间隔内将内存中的数据集快照写入磁盘, 也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里
备份是如何执行的
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到 一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。 整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。
Fork
- Fork的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程
- 在Linux程序中,fork()会产生一个和父进程完全相同的子进程,但子进程在此后多会exec系统调用,出于效率考虑,Linux中引入了“写时复制技术”
一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程。
8.1.1RDB持久化流程
优势
l 适合大规模的数据恢复
l 对数据完整性和一致性要求不高更适合使用
l 节省磁盘空间
l 恢复速度快
- 劣势
l Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑
l 虽然Redis在fork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。
l 在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有 修改。
8.1.2rdb的备份
先通过config get dir 查询rdb文件的目录
将.rdb的文件拷贝到别的地方
*rdb的恢复:
- 关闭Redis
- 先把备份的文件拷贝到工作目录下cp dump2.rdb dump.rdb
-
8.1.3 停止
动态停止RDB:redis-cli config set save “” #save后给空值,表示禁用保存策略
8.1.4 小结
8.2AOF
8.2.1 定义
以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录), 只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作
8.2.2AOF持久化流程
(1)客户端的请求写命令会被append追加到AOF缓冲区内;
- (2)AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件中;
- (3)AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;
- (4)Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;
8.2.3AOF默认不开启
可以在redis.conf中配置文件名称,默认为 appendonly.aof
AOF和RDB同时开启,redis听谁的?
AOF和RDB同时开启,系统默认取AOF的数据(数据不会存在丢失)
8.2.4AOF启动/修复/恢复
AOF的备份机制和性能虽然和RDB不同, 但是备份和恢复的操作同RDB一样,都是拷贝备份文件,需要恢复时再拷贝到Redis工作目录下,启动系统即加载。
正常恢复
- 修改默认的appendonly no,改为yes
- 将有数据的aof文件复制一份保存到对应目录(查看目录:config get dir)
- 恢复:重启redis然后重新加载
异常恢复
- 修改默认的appendonly no,改为yes
- 如遇到AOF文件损坏,通过/usr/local/bin/redis-check-aof—fix appendonly.aof进行恢复
- 备份被写坏的AOF文件
-
8.2.5 AOF同步频率设置
appendfsync always
始终同步,每次Redis的写入都会立刻记入日志;性能较差但数据完整性比较好
appendfsync everysec
每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。
appendfsync no
redis不主动进行同步,把同步时机交给操作系统。8.2.6 Rewrite压缩
1.是什么:
AOF采用文件追加方式,文件会越来越大为避免出现此种情况,新增了重写机制, 当AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩, 只保留可以恢复数据的最小指令集.可以使用命令bgrewriteaof
2.重写原理,如何实现重写
AOF文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再rename),redis4.0版本后的重写,是指上就是把rdb 的快照,以二级制的形式附在新的aof头部,作为已有的历史数据,替换掉原来的流水账操作。
no-appendfsync-on-rewrite:
如果no-appendfsync-on-rewrite=yes ,不写入aof文件只写入缓存,用户请求不会阻塞,但是在这段时间如果宕机会丢失这段时间的缓存数据。(降低数据安全性,提高性能)
如果no-appendfsync-on-rewrite=no, 还是会把数据往磁盘里刷,但是遇到重写操作,可能会发生阻塞。(数据安全,但是性能降低)
触发机制,何时重写
Redis会记录上次重写时的AOF大小,默认配置是当AOF文件大小是上次rewrite后大小的一倍且文件大于64M时触发
重写虽然可以节约大量磁盘空间,减少恢复时间。但是每次重写还是有一定的负担的,因此设定Redis要满足一定条件才会进行重写。
auto-aof-rewrite-percentage:设置重写的基准值,文件达到100%时开始重写(文件是原来重写后文件的2倍时触发)
auto-aof-rewrite-min-size:设置重写的基准值,最小文件64MB。达到这个值开始重写。
例如:文件达到70MB开始重写,降到50MB,下次什么时候开始重写?100MB
系统载入时或者上次重写完毕时,Redis会记录此时AOF大小,设为base_size,
如果Redis的AOF当前大小>= base_size +base_size100% (默认)且当前大小>=64mb(默认)的情况下,Redis会对AOF进行重写。
*3.重写流程: (1)bgrewriteaof触发重写,判断是否当前有bgsave或bgrewriteaof在运行,如果有,则等待该命令结束后再继续执行。
- (2)主进程fork出子进程执行重写操作,保证主进程不会阻塞。
- (3)子进程遍历redis内存中数据到临时文件,客户端的写请求同时写入aof_buf缓冲区和aof_rewrite_buf重写缓冲区保证原AOF文件完整以及新AOF文件生成期间的新的数据修改动作不会丢失。
- (4)1).子进程写完新的AOF文件后,向主进程发信号,父进程更新统计信息。2).主进程把aof_rewrite_buf中的数据写入到新的AOF文件。
- (5)使用新的AOF文件覆盖旧的AOF文件,完成AOF重写。
优劣势:
优势
| 备份机制更稳健,丢失数据概率更低。
| 可读的日志文本,通过操作AOF稳健,可以处理误操作。
劣势
| 比起RDB占用更多的磁盘空间。
| 恢复备份速度要慢。
| 每次读写都同步的话,有一定的性能压力。
| 存在个别Bug,造成恢复不能。8.3 用哪个
-官方推荐两个都启用。 -如果对数据不敏感,可以选单独用RDB。 -不建议单独用 AOF,因为可能会出现Bug。 -如果只是做纯内存缓存,可以都不用。
9.Redis主从复制
定义:主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制,Master以写为主,Slave以读为主
优点:
[root@a etc]# mkdir myredis
- 复制一份redis.conf到myredis目录(etc目录下复制过redis.conf, 如果没有就去安装目录复制)
[root@a etc]# cp /etc/redis.conf /myredis/redis.conf
- 修改myredis目录下redis.conf,将daemonize设置为yes,Appendonly 关掉
- myredis目录下新建redis6379.conf ,并添加内容
[root@a myredis]# vi redis6379.conf
include /myredis/redis.conf
pidfile /var/run/redis_6379.pid
port 6379
dbfilename dump6379.rdb
- myredis目录下新建redis6380.conf ,并添加内容
[root@a myredis]# vi redis6380.conf
include /myredis/redis.conf
pidfile /var/run/redis_6380.pid
port 6380
dbfilename dump6380.rdb
masterauth 123456
- myredis目录下新建redis6381.conf ,并添加内容
[root@a myredis]# vi redis6381.conf
include /myredis/redis.conf
pidfile /var/run/redis_6381.pid
port 6381
dbfilename dump6381.rdb
masterauth 123456
- 关闭以前启动的redis服务
ps -ef | grep redis
查看端口号kill -9 port
- 使用三个配置文件分别启动redis
[root@a myredis]# redis-server redis6379.conf
[root@a myredis]# redis-server redis6380.conf
[root@a myredis]# redis-server redis6381.conf
- 打开server-cli,通过端口号连接
[root@a myredis]# redis-cli -p 6379
[root@a myredis]# redis-cli -p 6380
[root@a myredis]# redis-cli -p 6381
- 分别在客户端输入
info replication
打印主从复制的相关信息
- 分别在客户端输入
127.0.0.1:6379> info replication
127.0.0.1:6380> info replication
127.0.0.1:6381> info replication
- 配从库而不配主库,在从库中设置主从信息,选取6379为主
127.0.0.1:6380> slaveof 127.0.0.1 6379
127.0.0.1:6381> slaveof 127.0.0.1 6379
使用redis实现主从复制,搭建一主多从时master主机不显示slave从机的连接信息:
原因是我在redis.conf文件中指定了自己的密码,在进行主从复制时,主机master会要求密码验证。有两种解决办法:
- 1、将redis.conf配置文件中的密码部分requirepass注掉。
- 2、配置从机的masterauth,我用的第二种方法。
- 使用vim命令打开每个从机的配置文件例如我的从机配置文件叫redis6380.conf和redis6381.conf。
- 在配置文件末尾使用命令masterauth,格式:masterauth +你之前在redis.conf中设置的密码.
- 保存退出。再次启动redis的从机slave和主机master:
- 启动从机slave 6380:
- 启动从机slave 6381:
- 启动主机master:
再次使用info replication命令发现可以显示从机的连接信息,问题完美解决。
PS:
从机只能读数据而不能写数据
主机挂掉,重启就行,主从配置不会失效
从机重启需重设:slaveof 127.0.0.1 6379
可以将配置增加到文件中。永久生效。
9.2 主从复制原理
9.3 一主二从
特点:
- 主机宕机后,从机原地待命,不会成为主机
- 从机宕机后,重启后需要重新设置主机IP
-
9.4 薪火相传
上一个Slave可以是下一个slave的Master,Slave同样可以接收其他slaves的连接和同步请求,那么该slave作为了链条中下一个的master, 可以有效减轻master的写压力,去中心化降低风险。 用 slaveof
- 中途变更转向:会清除之前的数据,重新建立拷贝最新的
-
9.5 反客为主
当一个master宕机后,后面的slave可以立刻升为master,其后面的slave不用做任何修改。
9.6 哨兵模式
- 调整为一主二仆模式,6379带着6380、6381
- 自定义的/myredis目录下新建sentinel.conf文件,名字绝不能错
[root@a myredis]# vi sentinel.conf
- 配置哨兵 ``` sentinel monitor mymaster 127.0.0.1 6379 1 sentinel auth-pass mymaster 123456 如果redis主节点设置了密码,则需要进行这个配置。
PS: 其中mymaster为监控对象自定义的服务器名称, 1 为至少有多少个哨兵(启动的sentinel)同意从机切换为主机。
- 启动哨兵
`[root@a myredis]# redis-sentinel /myredis/sentinel.conf`
- 当主机挂掉,在从机中产生新的主机。原主机重启后会变为从机。
<a name="tb4Hf"></a>
### 9.7故障恢复
![image.png](https://cdn.nlark.com/yuque/0/2021/png/12805114/1621667744458-a26861ac-3360-44cf-9e36-4850e2388d90.png#clientId=u45c2a2ea-54ac-4&from=paste&id=u7fdae50b&margin=%5Bobject%20Object%5D&name=image.png&originHeight=301&originWidth=553&originalType=binary&ratio=1&size=54880&status=done&style=shadow&taskId=u9b9f8adf-77d3-424f-b58b-d129a7c45bd)
> PS
> 优先级在redis.conf中默认:**replica-priority 100**,值越小优先级越高
> 偏移量是指获得原主机数据最全的
> 每个redis实例启动后都会随机生成一个40位的**runid**
<a name="L7Hrw"></a>
### 9.8 复制延时
**缺点:**
- 由于所有的写操作都是先在Master上操作,然后同步更新到Slave上,所以从Master同步到Slave机器有一定
的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave机器数量的增加也会使这个问题更加严重。
<a name="RNmOK"></a>
### 9.9 主从复制java代码实现
```java
private static JedisSentinelPool jedisSentinelPool=null;
public static Jedis getJedisFromSentinel(){
if(jedisSentinelPool==null){
Set<String> sentinelSet=new HashSet<>();
sentinelSet.add("192.168.11.103:26379");
JedisPoolConfig jedisPoolConfig =new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(10); //最大可用连接数
jedisPoolConfig.setMaxIdle(5); //最大闲置连接数
jedisPoolConfig.setMinIdle(5); //最小闲置连接数
jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待
jedisPoolConfig.setMaxWaitMillis(2000); //等待时间
jedisPoolConfig.setTestOnBorrow(true); //取连接的时候进行一下测试 ping pong
jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig);
return jedisSentinelPool.getResource();
}else{
return jedisSentinelPool.getResource();
}
}
10 Redis集群
1. 问题
容量不够,redis如何进行扩容?
并发写操作,redis如何分摊?
另外,主从模式,薪火相传模式,主机宕机,导致ip地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息。
之前通过代理主机来解决,但是redis3.0中提供了解决方案。就是无中心化集群配置。
2. 什么是集群
Redis 集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。
Redis 集群通过分区(partition)来提供一定程度的可用性(availability):即使集群中有一部分节点失效或者无法进行通讯,集群也可以继续处理命令请求
3. 删除持久化数据
10.1 环境配置 (aliyun版本)
- 将myredis目录下的所有dump文件删除
rm -rf dump*
- 创建六个redis配置文件
redis cluster配置修改:
- cluster-enabled yes 打开集群模式
- cluster-config-file nodes-6379.conf 设定节点配置文件名
- cluster-node-timeout 15000 设定节点失联时间,超过该时间(毫秒),集群自动进行主从切换。
每个文件的内容分别为
可以使用 :%s/6379/6380/6381/6389/6390/6391 一键替换
redis6379.conf
include /myredis/redis.conf
pidfile /var/run/redis_6379.pid
port 6379
dbfilename dump6379.rdb
cluster-enabled yes
cluster-config-file nodes-6379.conf
cluster-node-timeout 15000
redis6380.conf
include /myredis/redis.conf
pidfile "/var/run/redis_6380.pid"
port 6380
dbfilename "dump6380.rdb"
cluster-enabled yes
cluster-config-file nodes-6380.conf
cluster-node-timeout 15000
redis6381.conf
include /myredis/redis.conf
pidfile /var/run/redis_6381.pid
port 6381
dbfilename dump6381.rdb
cluster-enabled yes
cluster-config-file nodes-6381.conf
cluster-node-timeout 15000
redis6389.conf
include /myredis/redis.conf
pidfile /var/run/redis_6389.pid
port 6389
dbfilename dump6389.rdb
cluster-enabled yes
cluster-config-file nodes-6389.conf
cluster-node-timeout 15000
redis6390.conf
include /myredis/redis.conf
pidfile /var/run/redis_6390.pid
port 6390
dbfilename dump6390.rdb
cluster-enabled yes
cluster-config-file nodes-6390.conf
cluster-node-timeout 15000
redis6391.conf
include /myredis/redis.conf
pidfile /var/run/redis_6391.pid
port 6391
dbfilename dump6391.rdb
cluster-enabled yes
cluster-config-file nodes-6391.conf
cluster-node-timeout 15000
- 阿里云控制台添加安全组
PS : 安全组的创建百度即可,注意安全组所属地区是否与你的服务器所在地区一致
- 使用六个配置文件启动六个redis-server
[root@a myredis]# redis-server redis6379.conf
[root@a myredis]# redis-server redis6380.conf
[root@a myredis]# redis-server redis6381.conf
[root@a myredis]# redis-server redis6389.conf
[root@a myredis]# redis-server redis6390.conf
[root@a myredis]# redis-server redis6391.conf
- 确保所有redis实例启动后,nodes-xxxx.conf文件都生成正常
合体:
- 进入下载的redis目录
cd /media/sf_share/redis-6.2.1/src
- 使用 redis-cli 创建整个 redis 集群(redis5.0版本之前使用的ruby脚本 redis-trib.rb,之后的版本已经集成该脚本)
redis-cli -a 123456 --cluster create --cluster-replicas 1 192.168.246.102:6379 192.168.246.102:6380 192.168.246.102:6381 192.168.246.102:6389 192.168.246.102:6390 192.168.246.102:6391
此处不要用127.0.0.1,请用真实IP地址,即服务器IP地址 —replicas 1 采用最简单的方式配置集群,一台主机,一台从机,正好三组。
redis-cli -a 123456 -c -p 6379
成功示例:
10.2 查看集群信息
- -c 采用集群策略连接,设置数据会自动切换到相应的写主机
[root@a src]# redis-cli -c -p 6379
- 通过 cluster nodes 命令查看集群信息
10.3 cluster 如何分配当前六个节点
- 一个集群至少要有三个主节点。
- 选项—cluster-replicas 1 表示我们希望为集群中的每个主节点创建一个从节点。
分配原则尽量保证每个主数据库运行在不同的IP地址(即不同的服务器),每个从库和主库不在一个IP地址上。
10.4 什么是slots
[OK] All 16384 slots covered.
一个 Redis 集群包含16384 个插槽(hash slot),数据库中的每个键都属于这16384 个插槽的其中一个,集群使用公式CRC16(key) % 16384 来计算键key 属于哪个槽,其中CRC16(key) 语句用于计算键key 的CRC16 校验和。集群中的每个节点负责处理一部分插槽。
举个例子,一个集群有三个主节点,其中:
节点 A 负责处理0号至5460号插槽。
节点 B 负责处理5461号至10922号插槽。
节点 C 负责处理10923号至16383号插槽。10.5 在集群中录入值
在redis-cli每次录入、查询键值,redis都会计算出该key应该送往的插槽,如果不是该客户端对应服务器的插槽,redis会报错,并告知应前往的redis实例地址和端口。
- redis-cli客户端提供了 –c 参数实现自动重定向。
- 如redis-cli -c –p 6379 登入后,再录入、查询键值对可以自动重定向。
- 不在一个slot下的键值,是不能使用mget,mset等多键操作。
:(error) CROSSSLOT Keys in request don’t hash to the same slot
- 通过{}定义组才可以使用mget,mset等多键操作
mset name{user} lucy age{user} 20
10.6 查询集群中的值
- cluster keyslot
:计算键 key 应该被放置在哪个槽上
cluster keyslot k1
- cluster countkeysinslot
:返回槽 slot 目前包含的键值对数量。(槽slot的值必须在当前节点范围内否则返回0) : 只能看自己的
- cluster getkeysinslot
:返回 count 个 slot 槽中的键 (槽slot的值必须在当前节点范围内否则返回empty array)
10.7 故障恢复
如果主节点下线?从节点能否自动升为主节点?注意:15秒超时
主节点恢复后,主从关系会如何?主节点回来变成从机。
如果所有某一段插槽的主从节点都宕掉,redis服务是否还能继续?
- 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为yes ,那么 ,整个集群都挂掉
- 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为no ,那么,该插槽数据全都不能使用,也无法存储。
redis.conf中的参数 cluster-require-full-coverage
10.8 集群好处与不足
好处
实现扩容
分摊压力
无中心配置相对简单
- 不足
多键操作是不被支持的
多键的Redis事务是不被支持的。lua脚本不被支持
由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大
11 Redis应用问题解决(高并发场景)
11.1 缓存穿透
11.1.1 问题描述
key对应的数据在数据源并不存在,此时若有大量并发请求过来,每次针对此key的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。
11.1.2 解决方案
一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。
解决方案:
- 对空值缓存:如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟
- 设置可访问的名单(白名单):使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。
- 采用布隆过滤器:(布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。)将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。
- 进行实时监控:当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务
11.2 缓存击穿
11.2.1 问题描述
key(某个)对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。11.2.2 解决方案
key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。
解决问题:
- 预先设置热门数据:在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长
- 实时调整:现场监控哪些数据热门,实时调整key的过期时长
- 使用锁:
- 就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db。
- 先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key
- 当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key;当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法。
11.3 缓存雪崩
11.3.1 问题描述
key(大量)对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
PS. 缓存雪崩与缓存击穿的区别在于缓存雪崩针对很多key缓存,缓存击穿则是某一个key
11.3.2 解决方案
缓存失效时的雪崩效应对底层系统的冲击非常可怕!
解决方案:
- 构建多级缓存架构:nginx缓存 + redis缓存 +其他缓存(ehcache等)
- 使用锁或队列:用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况
- 设置过期标志更新缓存:记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。
- 将缓存失效时间分散开:比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
11.4 分布式锁
11.4.1 问题描述
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
分布式锁主流的实现方案:
1. **基于数据库实现分布式锁**
1. **基于缓存(Redis等)**
1. **基于Zookeeper**
每一种分布式锁解决方案都有各自的优缺点:
1. **性能:redis最高**
1. ** 可靠性:zookeeper最高**
11.4.2 解决方案
redis命令(setnx)
setnx加锁,del释放锁
加锁:setnx user niubi 释放锁: del user 问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放 解决:设置过期时间,自动释放锁。
设置过期时间防止锁一直不被释放
加锁:setnx user niubi 设置过期:expire user 10 缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放
同时设置锁和过期时间(推荐使用)
set user 10 nx ex 10 问题:可能会释放其他服务器的锁。
场景:如果业务逻辑的执行时间是7s,锁过期时间为3s。执行流程如下 1. index1业务逻辑没执行完,3秒后锁被自动释放。 2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。 3. index3获取到锁,执行业务逻辑 4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行 1s就被别人释放。 最终等于没锁的情况。
解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁设置UUID防误删(即将key的值设置为唯一值)
set user UUID nx ex 10 问题:删除操作缺乏原子性。
场景: 1. index1执行删除时,查询到的lock值确实和uuid相等 2. index1执行删除前,lock刚好过期时间已到,被redis自动释放 3. index2获取了lock开始执行方法 4. index1执行删除,此时会把index2的lock删除 (同一个锁) index1 因为已经在方法中了,所以不需要重新上锁。index1有执行的权限。
解决:LUA脚本保证删除的原子性
set 命令参数详解
EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。 NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。 XX :只在键已经存在时,才对键进行设置操作。
-
11.4.3优化之设置锁的过期时间
设置过期时间有两种方式:
1. 首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)
- 2. 在set时指定过期时间(推荐)
java代码实现:
@GetMapping("testLock")
public void testLock(){
String uuid = UUID.randomUUID().toString();
//1获取锁,setne
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS);
//2获取锁成功、查询num的值
if(lock){
Object value = redisTemplate.opsForValue().get("num");
//2.1判断num为空return
if(StringUtils.isEmpty(value)){
return;
}
//2.2有值就转成成int
int num = Integer.parseInt(value+"");
//2.3把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
//2.4释放锁,del
//判断比较uuid值是否一样
String lockUuid = (String)redisTemplate.opsForValue().get("lock");
if(lockUuid.equals(uuid)) {
redisTemplate.delete("lock");
}
}else{
//3获取锁失败、每隔0.1秒再获取
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 多个客户端同时获取锁(setnx)
- 获取成功,执行业务逻辑{从db获取数据,放入缓存},执行完成释放锁(del)
- 其他客户端等待重试
- 同时设置锁和过期时间(推荐使用)
@GetMapping("testLock")
public void testLock(){
//1获取锁,setnx-->setIfAbsent
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111", 10, TimeUnit.SECONDS);
//2获取锁成功、查询num的值
if(lock){
Object value = redisTemplate.opsForValue().get("num");
//2.1判断num为空return
if(StringUtils.isEmpty(value)){
return;
}
//2.2有值就转成成int
int num = Integer.parseInt(value+"");
//2.3把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
//2.4释放锁,del,保证锁必须被释放
redisTemplate.delete("lock"); -->当业务执行时间小与过期时间时需要释放锁
}else{
//3获取锁失败、每隔0.1秒再获取
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
11.4.4设置UUID防误删
@GetMapping("testLock")
public void testLock(){
//1获取锁,setnx-->setIfAbsent
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS);
//2获取锁成功、查询num的值
if(lock){
Object value = redisTemplate.opsForValue().get("num");
//2.1判断num为空return
if(StringUtils.isEmpty(value)){
return;
}
//2.2有值就转成成int
int num = Integer.parseInt(value+"");
//2.3把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
//2.4释放锁,del,保证锁必须被释放-->当业务执行时间小与过期时间时需要释放锁
if(uuid.equals((String)redisTemplate.opsForValue().get("lock"))){
redisTemplate.delete("lock"); -->删除自己的锁
}
}else{
//3获取锁失败、每隔0.1秒再获取
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
11.4.5优化之LUA脚本保证删除的原子性
@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("testLockLua")
public void testLockLua() {
//1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
String uuid = UUID.randomUUID().toString();
//2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
String skuId = "25"; // 访问skuId 为25号的商品 100008348542
String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
// 3 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
// 第一种: lock 与过期时间中间不写任何的代码。
// redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
// 如果true
if (lock) {
// 执行的业务逻辑开始
// 获取缓存中的num 数据
Object value = redisTemplate.opsForValue().get("num");
// 如果是空直接返回
if (StringUtils.isEmpty(value)) {
return;
}
// 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
int num = Integer.parseInt(value + "");
// 使num 每次+1 放入缓存
redisTemplate.opsForValue().set("num", String.valueOf(++num));
/*使用lua脚本来锁*/
// 定义lua 脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 使用redis执行lua执行
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
} else {
// 其他线程等待
try {
// 睡眠
Thread.sleep(1000);
// 睡醒了之后,调用方法。
testLockLua();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
11.4.6 总结
- 加锁 ```java // 1. 从redis中获取锁,set k1 v1 px 20000 nx String uuid = UUID.randomUUID().toString(); Boolean lock = this.redisTemplate.opsForValue().setIfAbsent(“lock”, uuid, 2, TimeUnit.SECONDS);
2. 解锁
```java
// 2. 释放锁 del
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 设置lua脚本返回的数据类型
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 设置lua脚本返回类型为Long
redisScript.setResultType(Long.class);
redisScript.setScriptText(script);
redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid);
- 重试
Thread.sleep(500);
testLock();
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
- 加锁和解锁必须具有原子性。
12 Redsi6.0新功能
- ACL
Redis ACL是Access Control List(访问控制列表)的缩写,该功能允许根据可以执行的命令和可以访问的键来限制某些连接。 在Redis 5版本之前,Redis 安全规则只有密码控制还有通过rename 来调整高危命令比如 flushdb , KEYS, shutdown 等。Redis 6 则提供ACL的功能对用户进行更细粒度的权限控制: (1)接入权限:用户名和密码 (2)可以执行的命令 (3)可以操作的 KEY *参考官网:https://redis.io/topics/acl
命令:
1、使用acl list命令展现用户权限列表
(1)数据说明
2、使用acl cat命令
(1)查看添加权限指令类别
(2)加参数类型名可以查看类型下具体命令
3、使用acl whoami命令查看当前用户
4、使用aclsetuser命令创建和编辑用户ACL
(1)ACL规则
ACL规则 | ||
---|---|---|
类型 | 参数 | 说明 |
启动和禁用用户 | on | 激活某用户账号 |
off | 禁用某用户账号。注意,已验证的连接仍然可以工作。如果默认用户被标记为off,则新连接将在未进行身份验证的情况下启动,并要求用户使用AUTH选项发送AUTH或HELLO,以便以某种方式进行身份验证。 | |
权限的添加删除 | + |
将指令添加到用户可以调用的指令列表中 |
- |
从用户可执行指令列表移除指令 | |
+@ |
添加该类别中用户要调用的所有指令,有效类别为@admin、@set、@sortedset…等,通过调用ACL CAT命令查看完整列表。特殊类别@all表示所有命令,包括当前存在于服务器中的命令,以及将来将通过模块加载的命令。 | |
-@ |
从用户可调用指令中移除类别 | |
allcommands | +@all的别名 | |
nocommand | -@all的别名 | |
可操作键的添加或删除 | ~ |
添加可作为用户可操作的键的模式。例如~*允许所有的键 |
(2)通过命令创建新用户默认权限
acl setuser user1
在上面的示例中,我根本没有指定任何规则。如果用户不存在,这将使用just created的默认属性来创建用户。如果用户已经存在,则上面的命令将不执行任何操作。
(3)设置有用户名、密码、ACL权限、并启用的用户
acl setuser user2 on >password ~cached:* +get
(4)切换用户,验证权限
auth user2 password
IO多线程
Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程。之所以这么设计是不想因为多线程而变得复杂,需要去控制 key、lua、事务,LPUSH/LPOP 等等的并发问题。 多线程IO默认也是不开启的,需要在配置文件(redis.conf)中配置 io-threads-do-reads yes io-threads 4
工具支持 Cluster
老版本Redis想要搭集群需要单独安装ruby环境,Redis5开始 将 redis-trib.rb 的功能集成到 redis-cli 。另外官方 redis-benchmark 工具开始支持 cluster 模式了,通过多线程的方式对多个分片进行压测。
RESP3协议
新的 Redis 通信协议:优化服务端与客户端之间通信
Client side caching客户端缓存
基于 RESP3 协议实现的客户端缓存功能。为了进一步提升缓存的性能,将客户端经常访问的数据cache到客户端。减少TCP网络交互。
Proxy集群代理模式
Proxy 功能,让 Cluster 拥有像单实例一样的接入方式,降低大家使用cluster的门槛。不过需要注意的是代理不改变 Cluster 的功能限制,不支持的命令还是不会支持,比如跨 slot 的多Key操作。
Modules API
Redis 6中模块API开发进展非常大,因为Redis Labs为了开发复杂的功能,从一开始就用上Redis模块。Redis可以变成一个框架,利用Modules来构建不同系统,而不需要从头开始写然后还要BSD许可。Redis一开始就是一个向编写各种系统开放的平台。