1. 分布式锁
1. redis分布式锁
JVM层面的加锁,单机版的锁
- synchronized
ReentraLock ```java class X { private final ReentrantLock lock = new ReentrantLock(); // …
public void m() {
lock.lock(); // block until condition holds//不见不散
try {
// ... method body
} finally {
lock.unlock()
}
}
public void m2() {
if(lock.tryLock(timeout, unit)){//过时不候
try {
// ... method body
} finally {
lock.unlock()
}
}else{
// perform alternative actions
}
} }
分布式部署后,单机锁还是出现超卖现象,需要分布式锁<br />![图片.png](https://cdn.nlark.com/yuque/0/2021/png/12495364/1633740741651-2de0ca10-9025-43ae-be6e-f89031b9d4ac.png#clientId=u3a4c9737-7d45-4&from=paste&height=333&id=u8f25f2be&margin=%5Bobject%20Object%5D&name=%E5%9B%BE%E7%89%87.png&originHeight=333&originWidth=641&originalType=binary&ratio=1&size=124205&status=done&style=none&taskId=uc92f4fce-8216-48e3-9e96-5a53a3c43a0&width=641)<br />redis cluster<br />Nginx配置负载均衡,[Nginx学习笔记](https://blog.csdn.net/u011863024/article/details/107407905)or[备份](https://my.oschina.net/jallenkwong/blog/4400420)<br />Nginx配置文件修改内容
```java
upstream myserver{
server 127.0.0.1:1111;
server 127.0.0.1:2222;
}
server {
listen 80;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
# 负责用到的配置
proxy_pass http://myserver;
root html;
index index.html index.htm;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
启动两个微服务:1111,2222,多次访问http://localhost/buy_goods,服务提供端口在1111,2222两者之间横跳
上面手点,下面高并发模拟
redis:set goods:001 100,恢复到100
用到Apache JMeter,100个线程同时访问http://localhost/buy_goods。
启动测试,后台打印如下:
这就是所谓分布式部署后出现超卖现象。
Redis具有极高的性能,且其命令对分布式锁支持友好,借助SET命令即可实现加锁处理。
SET
- EX seconds – Set the specified expire time, in seconds.
- PX milliseconds – Set the specified expire time, in milliseconds.
- NX – Only set the key if it does not already exist.
- XX – Only set the key if it already exist.
[
](https://blog.csdn.net/u011863024/article/details/115270840)
在Java层面
public static final String REDIS_LOCK = "redis_lock";
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void m(){
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value);
if(!flag) {
return "抢锁失败";
}
...//业务逻辑
stringRedisTemplate.delete(REDIS_LOCK);
}
上面Java源码分布式锁问题:出现异常的话,可能无法释放锁,必须要在代码层面finally释放锁。
解决方法:try…finally…
public static final String REDIS_LOCK = "redis_lock";
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void m(){
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try{
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value);
if(!flag) {
return "抢锁失败";
}
...//业务逻辑
}finally{
stringRedisTemplate.delete(REDIS_LOCK);
}
}
另一个问题:部署了微服务jar包的机器挂了,代码层面根本没有走到finally这块,没办法保证解锁,这个key没有被删除,需要加入一个过期时间限定key。
public static final String REDIS_LOCK = "redis_lock";
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void m(){
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try{
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value);
//设定时间
stringRedisTemplate.expire(REDIS_LOCK, 10L, TimeUnit.SECONDS);
if(!flag) {
return "抢锁失败";
}
...//业务逻辑
}finally{
stringRedisTemplate.delete(REDIS_LOCK);
}
}
新问题:设置key+过期时间分开了,必须要合并成一行具备原子性。
解决方法:
public static final String REDIS_LOCK = "redis_lock";
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void m(){
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try{
Boolean flag = stringRedisTemplate.opsForValue()//使用另一个带有设置超时操作的方法
.setIfAbsent(REDIS_LOCK, value, 10L, TimeUnit.SECONDS);
//设定时间
//stringRedisTemplate.expire(REDIS_LOCK, 10L, TimeUnit.SECONDS);
if(!flag) {
return "抢锁失败";
}
...//业务逻辑
}finally{
stringRedisTemplate.delete(REDIS_LOCK);
}
}
另一个新问题:张冠李戴,删除了别人的锁
解决方法:只能自己删除自己的,不许动别人的。
public static final String REDIS_LOCK = "redis_lock";
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void m(){
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try{
Boolean flag = stringRedisTemplate.opsForValue()//使用另一个带有设置超时操作的方法
.setIfAbsent(REDIS_LOCK, value, 10L, TimeUnit.SECONDS);
//设定时间
//stringRedisTemplate.expire(REDIS_LOCK, 10L, TimeUnit.SECONDS);
if(!flag) {
return "抢锁失败";
}
...//业务逻辑
}finally{
if(stringRedisTemplate.opsForValue().get(REDIS_LOCK).equals(value)) {
stringRedisTemplate.delete(REDIS_LOCK);
}
}
}
finally块的判断 + del删除操作不是原子性的
用lua脚本
用redis自身的事务
Redis事务复习,Redis学习笔记
事务介绍
- Redis的事条是通过MULTI,EXEC,DISCARD和WATCH这四个命令来完成。
- Redis的单个命令都是原子性的,所以这里确保事务性的对象是命令集合。
- Redis将命令集合序列化并确保处于一事务的命令集合连续且不被打断的执行。
- Redis不支持回滚的操作。
命令 | 描述 |
---|---|
DISCARD | 取消事务,放弃执行事务块内的所有命令。 |
EXEC | 执行所有事务块内的命令。 |
MULTI | 标记一个事务块的开始。 |
UNWATCH | 取消 WATCH 命令对所有 key 的监视。 |
WATCH key [key …] | 监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。 |
继续上一章节,解决之道
public static final String REDIS_LOCK = "redis_lock";
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void m(){
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try{
Boolean flag = stringRedisTemplate.opsForValue()//使用另一个带有设置超时操作的方法
.setIfAbsent(REDIS_LOCK, value, 10L, TimeUnit.SECONDS);
//设定时间
//stringRedisTemplate.expire(REDIS_LOCK, 10L, TimeUnit.SECONDS);
if(!flag) {
return "抢锁失败";
}
...//业务逻辑
}finally{
while(true){
stringRedisTemplate.watch(REDIS_LOCK);
if(stringRedisTemplate.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
stringRedisTemplate.setEnableTransactionSupport(true);
stringRedisTemplate.multi();
stringRedisTemplate.delete(REDIS_LOCK);
List<Object> list = stringRedisTemplate.exec();
if (list == null) {
continue;
}
}
stringRedisTemplate.unwatch();
break;
}
}
}
Redis调用Lua脚本通过eval命令保证代码执行的原子性
RedisUtils:
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtils {
private static JedisPool jedisPool;
static {
JedisPoolConfig jpc = new JedisPoolConfig();
jpc.setMaxTotal(20);
jpc.setMaxIdle(10);
jedisPool = new JedisPool(jpc);
}
public static JedisPool getJedis() throws Exception{
if(jedisPool == null)
throw new NullPointerException("JedisPool is not OK.");
return jedisPool;
}
}
public static final String REDIS_LOCK = "redis_lock";
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void m(){
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try{
Boolean flag = stringRedisTemplate.opsForValue()//使用另一个带有设置超时操作的方法
.setIfAbsent(REDIS_LOCK, value, 10L, TimeUnit.SECONDS);
//设定时间
//stringRedisTemplate.expire(REDIS_LOCK, 10L, TimeUnit.SECONDS);
if(!flag) {
return "抢锁失败";
}
...//业务逻辑
}finally{
Jedis jedis = RedisUtils.getJedis();
String script = "if redis.call('get', KEYS[1]) == ARGV[1] "
+ "then "
+ " return redis.call('del', KEYS[1]) "
+ "else "
+ " return 0 "
+ "end";
try {
Object o = jedis.eval(script, Collections.singletonList(REDIS_LOCK),//
Collections.singletonList(value));
if("1".equals(o.toString())) {
System.out.println("---del redis lock ok.");
}else {
System.out.println("---del redis lock error.");
}
}finally {
if(jedis != null)
jedis.close();
}
}
}
确保RedisLock过期时间大于业务执行时间的问题
Redis分布式锁如何续期?
集群 + CAP对比ZooKeeper 对比ZooKeeper,重点,CAP
- Redis - AP -redis异步复制造成的锁丢失,比如:主节点没来的及把刚刚set进来这条数据给从节点,就挂了。
-
CAP
C:Consistency(强一致性)
- A:Availability(可用性)
- P:Partition tolerance(分区容错性)
综上所述
Redis集群环境下,我们自己写的也不OK,直接上RedLock之Redisson落地实现。
[
](https://blog.csdn.net/u011863024/article/details/115270840)
Redisson官方网站
Redisson配置类
import org.redisson.Redisson;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedisConfig {
@Bean
public Redisson redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
return (Redisson)Redisson.create(config);
}
}
Redisson模板
public static final String REDIS_LOCK = "REDIS_LOCK";
@Autowired
private Redisson redisson;
@GetMapping("/doSomething")
public String doSomething(){
RLock redissonLock = redisson.getLock(REDIS_LOCK);
redissonLock.lock();
try {
//doSomething
}finally {
redissonLock.unlock();
}
}
回到实例
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class GoodController{
public static final String REDIS_LOCK = "REDIS_LOCK";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@Autowired
private Redisson redisson;
@GetMapping("/buy_goods")
public String buy_Goods(){
//String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
RLock redissonLock = redisson.getLock(REDIS_LOCK);
redissonLock.lock();
try {
String result = stringRedisTemplate.opsForValue().get("goods:001");// get key ====看看库存的数量够不够
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if(goodsNumber > 0){
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", String.valueOf(realNumber));
System.out.println("成功买到商品,库存还剩下: "+ realNumber + " 件" + "\t服务提供端口" + serverPort);
return "成功买到商品,库存还剩下:" + realNumber + " 件" + "\t服务提供端口" + serverPort;
}else{
System.out.println("商品已经售完/活动结束/调用超时,欢迎下次光临" + "\t服务提供端口" + serverPort);
}
return "商品已经售完/活动结束/调用超时,欢迎下次光临" + "\t服务提供端口" + serverPort;
}finally {
redissonLock.unlock();
}
}
}
重启boot_redis01,boot_redis02,Nginx,重置Redis:set goods:001 100,启动JMeter,100个线程访问http://localhost/buy_goods。最后,后台输出:
让代码更加严谨
public static final String REDIS_LOCK = "REDIS_LOCK";
@Autowired
private Redisson redisson;
@GetMapping("/doSomething")
public String doSomething(){
RLock redissonLock = redisson.getLock(REDIS_LOCK);
redissonLock.lock();
try {
//doSomething
}finally {
//添加后,更保险
if(redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()) {
redissonLock.unlock();
}
}
}
可避免如下异常:
IllegalMonitorStateException: attempt to unlock lock,not loked by current thread by node id:da6385f-81a5-4e6c-b8c0
2. redis分布式锁总结回顾
synchronized单机版oK,上分布式
nginx分布式微服务单机锁不行
取消单机锁,上Redis分布式锁setnx
只加了锁,没有释放锁,出异常的话,可能无法释放锁,必须要在代码层面finally释放锁
宕机了,部署了微服务代码层面根本没有走到finally这块,没办法保证解锁,这个key没有被删除,
需要有lockKey的过期时间设定
为redis的分布式锁key,增加过期时间,此外,还必须要setnx+过期时间必须同一行
必须规定只能自己删除自己的锁,你不能把别人的锁删除了,防止张冠李戴,1删2,2删3
Redis集群环境下,我们自己写的也不oK直接上RedLock之Redisson落地实现
3. redis内存调整默认查看
一些面试题:
- 生产上你们你们的redis内存设置多少?
- 如何配置、修改redis的内存大小
- 如果内存满了你怎么办?
- redis清理内存的方式?定期删除和惰性删除了解过吗
- redis缓存淘汰策略
- redis的LRU了解过吗?可否手写一个LRU算法
- Redis内存满了怎么办?Redis默认内存多少?在哪里查看?如何设置修改?
查看Redis最大占用内存
配置文件redis.conf的maxmemory参数,maxmemory是bytes字节类型,注意转换。
redis默认内存多少可以用?
如果不设置最大内存大小或者设置最大内存大小为0,在64位操作系统下不限制内存大小,在32位操作系统下最多使用3GB内存
一般生产上你如何配置?
一般推荐Redis设置内存为最大物理内存的四分之三。
如何修改redis内存设置
- 修改配置文件redis.conf的maxmemory参数,如:maxmemory 104857600
- 通过命令修改
info memory
[
](https://blog.csdn.net/u011863024/article/details/115270840)
4. redis打满内存OOM
真要打满了会怎么样?如果Redis内存使用超出了设置的最大值会怎样?
我改改配置,故意把最大值设为1个
127.0.0.1:6379> config get maxmemory
1) "maxmemory"
2) "0"
127.0.0.1:6379> config set maxmemory 1
OK
127.0.0.1:6379> set a 123
(error) OOM command not allowed when used memory > 'maxmemory'.
没有加上过期时间就会导致数据写满maxmemory为了避免类似情况,引出下一节内存淘汰策略[
](https://blog.csdn.net/u011863024/article/details/115270840)
5. redis内存淘汰策略
redis过期键的删除策略
如果一个键是过期的,那它到了过期时间之后是不是马上就从内存中被被删除呢?
如果回答yes,你自己走还是面试官送你?
如果不是,那过期后到底什么时候被删除呢??是个什么操作?
三种不同的删除策略
- 定时删除 - 总结:对CPU不友好,用处理器性能换取存储空间(拿时间换空间)
- 惰性删除 - 总结:对memory不友好,用存储空间换取处理器性能(拿空间换时间)
- 上面两种方案都走极端 - 定期删除 - 定期抽样key,判断是否过期(存在漏网之鱼)
定时删除
Redis不可能时时刻刻遍历所有被设置了生存时间的key,来检测数据是否已经到达过期时间,然后对它进行删除。
立即删除能保证内存中数据的最大新鲜度,因为它保证过期键值会在过期后马上被删除,其所占用的内存也会随之释放。但是立即删除对cpu是最不友好的。因为删除操作会占用cpu的时间,如果刚好碰上了cpu很忙的时候,比如正在做交集或排序等计算的时候,就会给cpu造成额外的压力,让CPU心累,时时需要删除,忙死。
这会产生大量的性能消耗,同时也会影响数据的读取操作。
惰性删除
数据到达过期时间,不做处理。等下次访问该数据时,
如果未过期,返回数据;
发现已过期,删除,返回不存在。
惰性删除策略的缺点是,它对内存是最不友好的。
如果一个键已经过期,而这个键又仍然保留在数据库中,那么只要这个过期键不被删除,它所占用的内存就不会释放。
在使用惰性删除策略时,如果数据库中有非常多的过期键,而这些过期键又恰好没有被访问到的话,那么它们也许永远也不会被删除(除非用户手动执行FLUSHDB),我们甚至可以将这种情况看作是一种内存泄漏 – 无用的垃圾数据占用了大量的内存,而服务器却不会自己去释放它们,这对于运行状态非常依赖于内存的Redis服务器来说,肯定不是一个好消息。
定期删除
定期删除策略是前两种策略的折中:
定期删除策略每隔一段时间执行一次删除过期键操作,并通过限制删除操作执行的时长和频率来减少删除操作对CPU时间的影响。
周期性轮询Redis库中的时效性数据,来用随机抽取的策略,利用过期数据占比的方式控制删除频度
特点1:CPU性能占用设置有峰值,检测频度可自定义设置
特点2:内存压力不是很大,长期占用内存的冷数据会被持续清理
举例
redis默认每个100ms检查,是否有过期的key,有过期key则删除。注意:redis不是每隔100ms将所有的key检查一次而是随机抽取进行检查(如果每隔100ms,全部key进行检查,redis直接进去ICU)。因此,如果只采用定期删除策略,会导致很多key到时间没有删除。
定期删除策略的难点是确定删除操作执行的时长和频率:如果删除操作执行得太频繁,或者执行的时间太长,定期删除策略就会退化成定时删除策略,以至于将CPU时间过多地消耗在删除过期键上面。如果删除操作执行得太少,或者执行的时间太短,定期删除策略又会和惰性删除束略一样,出现浪费内存的情况。因此,如果采用定期删除策略的话,服务器必须根据情况,合理地设置删除操作的执行时长和执行频率。
上述步骤都过堂了,还有漏洞吗?
- 定期删除时,从来没有被抽查到
- 惰性删除时,也从来没有被点中使用过
上述2步骤====>大量过期的key堆积在内存中,导致redis内存空间紧张或者很快耗尽
必须要有一个更好的兜底方案
内存淘汰策略登场(Redis 6.0.8版本)
- noeviction:不会驱逐任何key
- volatile-lfu:对所有设置了过期时间的key使用LFU算法进行删除
- volatile-Iru:对所有设置了过期时间的key使用LRU算法进行删除
- volatile-random:对所有设置了过期时间的key随机删除
- volatile-ttl:删除马上要过期的key
- allkeys-lfu:对所有key使用LFU算法进行删除
- allkeys-Iru:对所有key使用LRU算法进行删除
-
上面总结
2*4得8
- 2个维度
- 过期键中筛选
- 所有键中筛选
- 4个方面
- LRU
- LFU
- random
- ttl(Time To Live)
-
如何配置,修改
命令
- config set maxmemory-policy noeviction
- config get maxmemory
- 配置文件 - 配置文件redis.conf的maxmemory-policy参数
6. lru算法简介
是什么
LRU是Least Recently Used的缩写,即最近最少使用,是一种常用的页面置换算法,选择最近最久未使用的数据予以淘汰。
算法来源
LeetCode - Medium - 146. LRU Cache
7. lru的思想
设计思想
- 所谓缓存,必须要有读+写两个操作,按照命中率的思路考虑,写操作+读操作时间复杂度都需要为O(1)
- 特性要求
- 必须要有顺序之分,一区分最近使用的和很久没有使用的数据排序。
- 写和读操作一次搞定。
- 如果容量(坑位)满了要删除最不长用的数据,每次新访问还要把新的数据插入到队头(按照业务你自己设定左右那一边是队头)
查找快、插入快、删除快,且还需要先后排序—————>什么样的数据结构可以满足这个问题?
你是否可以在O(1)时间复杂度内完成这两种操作?
如果一次就可以找到,你觉得什么数据结构最合适?
答案:LRU的算法核心是哈希链表
编码手写如何实现LRU
本质就是HashMap + DoubleLinkedList
时间复杂度是O(1),哈希表+双向链表的结合体
8. 巧用LinkedHashMap完成lru算法
import java.util.LinkedHashMap;
public class LRUCache {
private LinkedHashMap<Integer, Integer> cache;
public LRUCache(int capacity) {
cache = new LinkedHashMap<Integer, Integer>(capacity, 0.75f, true){
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(java.util.Map.Entry<Integer, Integer> eldest) {
return size() > capacity;
}
};
}
public int get(int key) {
return cache.getOrDefault(key, -1);
}
public void put(int key, int value) {
cache.put(key, value);
}
@Override
public String toString() {
return cache.toString();
}
}
9. 手写LRU
哈希表 + 双向链表
class LRUCache2{
class Node<K, V>{//双向链表节点
K key;
V value;
Node<K, V> prev;
Node<K, V> next;
public Node() {
this.prev = this.next = null;
}
public Node(K key, V value) {
super();
this.key = key;
this.value = value;
}
}
//新的插入头部,旧的从尾部移除
class DoublyLinkedList<K, V>{
Node<K, V> head;
Node<K, V> tail;
public DoublyLinkedList() {
//头尾哨兵节点
this.head = new Node<K, V>();
this.tail = new Node<K, V>();
this.head.next = this.tail;
this.tail.prev = this.head;
}
public void addHead(Node<K, V> node) {
node.next = this.head.next;
node.prev = this.head;
this.head.next.prev = node;
this.head.next = node;
}
public void removeNode(Node<K, V> node) {
node.prev.next = node.next;
node.next.prev = node.prev;
node.prev = null;
node.next = null;
}
public Node<K, V> getLast() {
if(this.tail.prev == this.head)
return null;
return this.tail.prev;
}
}
private int cacheSize;
private Map<Integer, Node<Integer, Integer>> map;
private DoublyLinkedList<Integer, Integer> doublyLinkedList;
public LRUCache2(int cacheSize) {
this.cacheSize = cacheSize;
map = new HashMap<>();
doublyLinkedList = new DoublyLinkedList<>();
}
public int get(int key) {
if(!map.containsKey(key)) {
return -1;
}
Node<Integer, Integer> node = map.get(key);
//更新节点位置,将节点移置链表头
doublyLinkedList.removeNode(node);
doublyLinkedList.addHead(node);
return node.value;
}
public void put(int key, int value) {
if(map.containsKey(key)) {
Node<Integer, Integer> node = map.get(key);
node.value = value;
map.put(key, node);
doublyLinkedList.removeNode(node);
doublyLinkedList.addHead(node);
}else {
if(map.size() == cacheSize) {//已达到最大容量了,把旧的移除,让新的进来
Node<Integer, Integer> lastNode = doublyLinkedList.getLast();
map.remove(lastNode.key);//node.key主要用处,反向连接map
doublyLinkedList.removeNode(lastNode);
}
Node<Integer, Integer> newNode = new Node<>(key, value);
map.put(key, newNode);
doublyLinkedList.addHead(newNode);
}
}
}