0、Redis安装

redis的安装—基于虚拟机

Windows子系统安装redis:
https://blog.csdn.net/JAYU_37/article/details/106981067

子系统:/usr/local/redis/bin/redis.conf /usr/local/redis/bin
虚拟机: /usr/local/bin/redis.conf /usr/local/bin

redis 报错 RDB error 解决方式
config set stop-writes-on-bgsave-error no
lpush myColour “red”

关闭redis服务报错:
https://blog.csdn.net/u011868076/article/details/79539725

宿主连接到虚拟机redis

虚拟机安装redis及连接测试
https://www.cnblogs.com/wchaos/p/12219395.html

linux子系统在windows的C:\Users\admin\AppData\Local\Packages\CanonicalGroupLimited.UbuntuonWindows_79rhkp1fndgsc\LocalState\rootfs 目录下

Linux常用操作

Vim编辑器内操作

  • :%s/aaaa/bbbb 将aaaa全部替换为bbbb
  • 整页翻页:
    • Ctrl+f 下一页 f表示forward
    • Ctrl+b 上一页 b表示backward
  • 翻半页:
    • Ctrl+d 向下半页 d表示down
    • Ctrl+u 向上半页 u表示up
  • 跳转到最后一行:shift+g 或者大写的G或者输入$+回车
  • 跳转到第一行:gg或输入0或1+回车
  • 搜索目标内容 输入:/目标内容
  • 进入编辑模式:在目标内容处按**i**进入编辑模式;
  • 退出编辑模式:Esc
  • 保存修改:输入 :wq! 取消修改退出 :qa

查看防火墙状态:systemctl status firewall ; systemctl stop firewall 关闭防火墙
查看名字包含redis的进程:ps -ef | grep redis

1、数据类型

Redis字符串(String)

数据结构

String的数据结构为简单动态字符串(Simple Dynamic String,缩写SDS)。是可以修改的字符串,内部结构实现上类似于Java的ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配。
image.png
如图中所示,内部为当前字符串实际分配的空间capacity一般要高于实际字符串长度len。当字符串长度小于1M时,扩容都是加倍现有的空间,如果超过1M,扩容时一次只会多扩1M的空间。需要注意的是字符串最大长度为512M。

Redis列表(List)

Redis集合(Set)

Redis哈希(hash)

数据结构

Hash类型对应的数据结构是两种:ziplist(压缩列表),hashtable(哈希表)。当field-value长度较短且个数较少时,使用ziplist,否则使用hashtable。

有序集合Zset(sorted set)

底层数据结构

Zset是Redis提供的一个非常特别的数据接口,一方面等价于Map,可以给每一个元素value赋予一个权重score,另一方面它又类似于TreeSet,内部的元素会按照权重score进行排序,可以得到每个元素的名次,还可以通过score的范围来获取元素的列表。
Zset底层使用了两个数据接口

  • hash,hash的作用就是关联元素value和权重score,保障元素value的唯一性,可以通过元素value找到相应的score值。
  • 跳跃表,跳跃表的目的在于给元素value排序,根据score的范围获取元素列表。

    跳表

    image.png
    从最上层开始查找,从第一层开始相当于是一个索引,越往上索引越少,在比较时可以快速

新数据类型

BitMaps

HyperLogLog

GeoSpatital

2、Java连接Redis

Java Redis测试

java连接redis时,linux系统需要开放端口6379的访问权限或者关闭防火墙(不推荐)

  1. public class JedisDemo1 {
  2. public static void main(String[] args) {
  3. //创建Jedis对象
  4. Jedis jedis = new Jedis("192.168.190.1", 6379);
  5. //通过密码登陆
  6. jedis.auth("10086");
  7. //测试
  8. String ping = jedis.ping();
  9. System.out.println(ping);
  10. }
  11. private Jedis jedis;
  12. @Before
  13. public void initial() {
  14. jedis = new Jedis("192.168.190.1", 6379);
  15. jedis.auth("10086");
  16. }
  17. //操作key
  18. @Test
  19. public void keyTest1() {
  20. jedis.set("k1", "v1");
  21. jedis.set("k2", "v2");
  22. jedis.set("k3", "v3");
  23. String k1 = jedis.get("k1");
  24. System.out.println(k1);
  25. //查看是否过期
  26. System.out.println(jedis.ttl("k2"));
  27. //判断是否存在
  28. System.out.println(jedis.exists("k5"));
  29. //设置存活时间
  30. // System.out.println(jedis.expire("k1", 2000L));
  31. //设置多个key-value
  32. jedis.mset("str1","v1","str2","v2","str3","v3");
  33. System.out.println(jedis.mget("str1","str2","str3"));
  34. //获取所有的key
  35. Set<String> keys = jedis.keys("*");
  36. keys.forEach(System.out::println);
  37. }
  38. //操作list
  39. @Test
  40. public void listTest() {
  41. jedis.lpush("key1", "lucy", "marry", "jack");
  42. List<String> lrange = jedis.lrange("key1", 0, -1);
  43. lrange.forEach(System.out::println);
  44. }
  45. //操作set
  46. @Test
  47. public void setTest() {
  48. jedis.sadd("name", "lucy", "jack");
  49. Set<String> set = jedis.smembers("name");
  50. set.forEach(System.out::println);
  51. jedis.srem("name", "jack");
  52. }
  53. //操作hash
  54. @Test
  55. public void hashTest() {
  56. jedis.hset("hash1","userName","lisi");
  57. jedis.hset("hash1","age","18");
  58. System.out.println(jedis.hget("hash1", "userName"));
  59. System.out.println(jedis.hget("hash1", "age"));
  60. }
  61. //操作zset
  62. @Test
  63. public void zsetTest() {
  64. jedis.zadd("China", 100d, "shanghai");
  65. jedis.zadd("China", 200d, "beijing");
  66. Set<String> china = jedis.zrange("China", 0, -1);
  67. china.forEach(System.out::println);
  68. }
  69. }

手机验证码功能

image.png

3、Redis事务&锁机制

Redis事务

Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
Redis事务的主要作用就是串联多个命令防止别的命令插队。

Multi、Exec、Discard

从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。
组队的过程中可以通过discard来放弃组队。

事务的错误处理

组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消。
image.png
如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。
image.png

事务冲突/锁机制

image.png

悲观锁

悲观锁(Pessimistic Lock),是指在访问共享变量之前就认为会出现线程安全问题,所以需要在上锁之后再访问共享变量,由于悲观锁每次访问时都会上锁,就会造成程序的并发性降低。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁表锁等,读锁写锁等,都是在做操作之前先上锁。

乐观锁

乐观锁(Optimistic Lock),是指在访问共享变量之前认为不会出现线程安全问题,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种CAS(compare and swap / check and set)机制实现事务的。如果在访问时出现了线程安全问题,就取消这次对共享变量的操作。原始的CAS机制会存在ABA问题,因此引入版本号来解决该问题CAS机制以及ABA问题
CAS的缺点:1、CPU开销过大;2、不能保证代码块的原子性:CAS只能保证一个变量的原子性操作;3、ABA问题

WATCH key1 [key2…] UNWATCH

在执行multi之前,先执行watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。
unwatch可以取消对key的监视,如果在执行WATCH命令之后,EXEC命令或DISCARD命令先被执行了的话,那么就不需要再执行UNWATCH了。

Redis事务三特性

  • 单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
  • 没有隔离级别的概念:队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行
  • 不保证原子性:事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

Redis事务——秒杀并发模拟

ubuntu安装apache2-utils
sudo apt-get install apache2-utils
在当前目录下创建一个postfile.txt文本文件,里面填写post请求需要提交的内容 本例:prodid=0101&
然后执行 sudo ab -n 500 -c 300 -p ./postfile.txt -T application/x-www-form-urlencoded http://192.168.109.1:8080/Seckill/doseckill
-n 表示请求提交500次,-c表示线程数为300,-p表示post请求 postfile中是post请求需要提交的的内容,-T 表示文本类型默认为’text/plain’ 这里为application/x-www-form-urlencoded 与jsp文件enctype一致,后面则为请求地址

通过连接池解决连接超时问题

节省每次连接redis服务带来的消耗,把连接好的实例反复利用。通过参数管理连接的行为

  1. public class JedisPoolUtil {
  2. private static volatile JedisPool jedisPool = null;
  3. private JedisPoolUtil() {
  4. }
  5. public static JedisPool getJedisPoolInstance() {
  6. if (null == jedisPool) {
  7. synchronized (JedisPoolUtil.class) {
  8. if (null == jedisPool) {
  9. JedisPoolConfig poolConfig = new JedisPoolConfig();
  10. poolConfig.setMaxTotal(200);
  11. poolConfig.setMaxIdle(32);
  12. poolConfig.setMaxWaitMillis(100*1000);
  13. poolConfig.setBlockWhenExhausted(true);
  14. poolConfig.setTestOnBorrow(true); // ping PONG
  15. jedisPool = new JedisPool(poolConfig, "192.168.44.168", 6379, 60000, "10086");
  16. }
  17. }
  18. }
  19. return jedisPool;
  20. }
  21. public static void release(JedisPool jedisPool, Jedis jedis) {
  22. if (null != jedis) {
  23. jedisPool.returnResource(jedis);
  24. }
  25. }
  26. }

连接池参数

  • MaxTotal:控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;如果赋值为-1,则表示不限制;如果pool已经分配了MaxTotal个jedis实例,则此时pool的状态为exhausted。
  • maxIdle:控制一个pool最多有多少个状态为idle(空闲)的jedis实例;
  • MaxWaitMillis:表示当borrow一个jedis实例时,最大的等待毫秒数,如果超过等待时间,则直接抛JedisConnectionException;
  • testOnBorrow:获得一个jedis实例的时候是否检查连接可用性(ping());如果为true,则得到的jedis实例均是可用的;

解决超卖问题——使用乐观锁

  1. //秒杀过程
  2. public static boolean doSecKill(String uid,String prodid) throws IOException {
  3. //1、uid和prodid非空判断
  4. if(uid == null || prodid == null) {
  5. return false;
  6. }
  7. //2、连接redis
  8. // Jedis jedis = new Jedis("192.168.190.1",6379);
  9. // jedis.auth("10086");
  10. //通过连接池得到jedis对象
  11. JedisPool jedisPool = JedisPoolUtil.getJedisPoolInstance();
  12. Jedis jedis = jedisPool.getResource();
  13. //3 拼接key
  14. // 3.1 库存key
  15. String kcKey = "sk:"+prodid+":qt";
  16. // 3.2 秒杀成功用户key
  17. String userKey = "sk:"+prodid+":user";
  18. //监视库存——加锁(乐观锁)
  19. jedis.watch(kcKey);
  20. //4 获取库存,如果库存null,秒杀还没有开始
  21. String kc = jedis.get(kcKey);
  22. if(kc == null) {
  23. System.out.println("秒杀还没有开始,请等待");
  24. jedis.close();
  25. return false;
  26. }
  27. //5 判断用户是否重复秒杀操作
  28. if (jedis.sismember(userKey, uid)) {
  29. System.out.println("已经秒杀成功了,不能重复秒杀");
  30. jedis.close();
  31. return false;
  32. }
  33. //6 判断如果商品数量,库存数量小于1,秒杀结束
  34. if (Integer.parseInt(kc) <= 0) {
  35. System.out.println("秒杀已经结束了");
  36. jedis.close();
  37. return false;
  38. }
  39. //7 秒杀过程
  40. //使用事务
  41. Transaction multi = jedis.multi();
  42. //组队操作
  43. multi.decr(kcKey); // 库存-1
  44. multi.sadd(userKey, uid);
  45. //执行
  46. List<Object> res = multi.exec();
  47. if (res == null || res.size() == 0) {
  48. System.out.println("秒杀失败....");
  49. jedis.close();
  50. return false;
  51. }
  52. //不使用事务
  53. //7.1 库存-1
  54. // jedis.decr(kcKey);
  55. // //7.2 把秒杀成功用户添加清单里面
  56. // jedis.sadd(userKey, uid);
  57. System.out.println("秒杀成功了..");
  58. jedis.close();
  59. return true;
  60. }

乐观锁的库存遗留问题

假设存在500个商品,有2000人抢购,存在一种极端情况,2000个人同时读到一个商品,其中某一个买到了该商品,那么该商品的版本号会发生变化,由于乐观锁的存在,剩下1999个人的操作都会失败。这样就会导致2000个人只买了1个商品。而事实上500个商品可以被同时购买。

解决方式

  • 悲观锁——redis中无法直接使用,而且悲观锁效率低。
  • LUA脚本——将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。

LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。
但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题
image.png

4、Redis持久化

RDB(Redis DataBase)

在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。
image.png
save ms changes 表示xx秒内有多少次改动则进行一次持久化操作,如果300s内出现大于10次的改动,那么在改动达到10的时候进行一次持久化操作,剩余的部分下次再进行持久化操作。

备份是如何进行的

Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失

save命令 VS bgsave命令

save:save时只管保存,其它不管,全部阻塞。手动保存。不建议。
bgsave:Redis会在后台异步进行快照操作,快照同时还可以响应客户端请求。

stop-writes-on-bgsave-error

image.png当Redis无法写入磁盘(磁盘满了等情况),直接关掉redis 的写操作,推荐设置为yes

优缺点分析

优势

  • 适合大规模的数据恢复
  • 对数据完整性和一致性要求不高的长河更适合使用
  • 节省磁盘空间
  • 回复速度快

    劣势

  • Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑

  • 虽然Redis在fork时使用了写时拷贝技术(copy on write),但是如果数据庞大时还是比较消耗性能。
  • 在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有修改。

Redis cow细节

AOF(Append Of File)

AOF是以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。

持久化流程

  • 客户端的请求写命令会被append追加到AOF缓冲区内;
  • AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件中;
    • appendfsync always:始终同步,每次Redis的写入都会立刻记入日志;性能较差但数据完整性比较好
    • appendfsync everysec:每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。
    • appendfsync no:redis不主动进行同步,把同步时机交给操作系统。
  • AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;
  • Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;

若RDB跟AOF同时开启,则优先使用AOF(数据不存在丢失)

优缺点分析

优势

  • 数据完整性更强
  • 可读的日志文本,通过操作AOF文件,可以处理误操作

    劣势

  • 比起RDB占用更多的磁盘空间。

  • 恢复备份速度要慢。
  • 每次读写都同步的话,有一定的性能压力。
  • 存在个别Bug,造成恢复不能。

image.png

5、主从复制

主机数据更新后根据配置和策略,自动同步到备机的master/slaver机制,Master以写为主,Slave以读为主

功能

  • 读写分离,性能扩展
  • 容灾快速恢复

    操作步骤

    创建主/从配置文件

    创建一个文件夹 myredis 我创建在了/usr/local/myredis (redis安装在/usr/local/redis 目录下) 将redis的redis.conf配置文件拷贝纸myredis目录下,创建三个conf配置文件 redisPort1.conf、redisPort2.conf、redisPort3.conf
    文件中的内容为:

    1. ===include 这里最好写绝对路径不然可能会报错===
    2. include /usr/local/myredis/redis.conf
    3. pidfile /var/run/redis_6379.pid
    4. port 6379
    5. dbfilename dump6379.rdb

    启动服务

    cd 到 redis/bin目录下分别按上述几个配置文件启动redis-server
    sudo ./redis-server ../../myredis/redis6381.conf

    设置主从

    1、sudo ./redis-cli -p xxxx(port) 启动客户端
    2、通过 info replication 命令可以查看当前主机运行情况
    3、配置从机,slaveof <ip> <port> 该方式若重启了主机则失效,需要重新配置,若需要长期绑定主从关系,则从配置文件配置主从。直接在从机配置文件中写 slaveof <ip> <port> 即可

    主从服务器关系

  • Redis2.8起,从机默认只读操作,可以通过设置配置文件中的slave-read-only来修改只读;

  • 若从机宕机(已经在配置文件中配置了主从关系),重启后会从主机中恢复数据
  • 主机器宕机,从机依然是主机的从机

    主从复制原理

    image.png

    全量同步

    Redis全量复制一般发生在Slave初始化阶段,这时Slave需要将Master上的所有数据都复制一份。具体步骤如下:
    - 从服务器连接主服务器,发送SYNC命令;
    - 主服务器接收到SYNC命名后,开始执行BGSAVE命令生成RDB文件并使用缓冲区记录此后执行的所有写命令;
    - 主服务器BGSAVE执行完后,向所有从服务器发送快照文件,并在发送期间继续记录被执行的写命令;
    - 从服务器收到快照文件后丢弃所有旧数据,载入收到的快照;
    - 主服务器快照发送完毕后开始向从服务器发送缓冲区中的写命令;
    - 从服务器完成对快照的载入,开始接收命令请求,并执行来自主服务器缓冲区的写命令;
    image.png

    增量同步

    Redis增量复制是指Slave初始化后开始正常工作时主服务器发生的写操作同步到从服务器的过程。
    增量复制的过程主要是主服务器每执行一个写命令就会向从服务器发送相同的写命令,从服务器接收并执行收到的写命令。

    Redis主从同步策略

    主从刚刚连接的时候,进行全量同步;全同步结束后,进行增量同步。当然,如果有需要,slave 在任何时候都可以发起全量同步。redis 策略是,无论如何,首先会尝试进行增量同步,如不成功,要求从机进行全量同步。
    Redis主从复制原理总结

    薪火相传

    如果多个Slave断线了,需要重启的时候,因为只要Slave启动,就会发送sync请求和主机全量同步,当多个同时出现的时候,可能会导致Master IO剧增宕机。因此,上一个Slave可以是下一个slave的Master,Slave同样可以接收其他slaves的连接和同步请求,那么该slave作为了链条中下一个的master, 可以有效减轻master的写压力,去中心化降低风险。
    该方式也存在缺点,当中间的某台slave出现问题时,后面的salve均无法同步数据。

    反客为主

    当一个master宕机后,后面的slave可以立刻升为master,其后面的slave不用做任何修改。用slaveof no one 将从机变为主机。但是该模式需要手动调整,后面的哨兵模式(sentinel)会实现反客为主的自动版,即当主机宕机时,从机自动变为主机。

    哨兵模式(sentinel)

    反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库

    配置步骤

  • 在/user/local/myredis/ 文件夹在创建sentinel.conf文件

  • 在配置文件中填写:sentinel monitor mymaster 1 -主地址|主端口|参与选举个数配置基数3-2
    • 其中,mymaster为监控对象起的服务器名称,1表示需要有1个哨兵同意,则进行切换
  • 在配置文件中设置服务器密码(如果设置了):
    • sentinel auth-pass mymaster 10086 -主密码,不设置的话不能动态切换
  • 启动哨兵:cd 到redis-sentinel (就是redis-server的目录下),执行 ./redis-sentinel ../../myredis/sentinel.conf 开启哨兵

    复制延时

    由于所有的写操作都是先在Master上操作,然后同步更新到Slave上,所以从Master同步到Slave机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave机器数量的增加也会使这个问题更加严重。

    故障恢复

    image.png

  • 优先级在redis.conf中默认:replica-priority 100,值越小优先级越高

  • 偏移量是指获得原主机数据最全的
  • 每个redis实例启动后都会随机生成一个40位的runid

6、Redis集群(cluster)

问题

  • 容量不够,Redis如何进行扩容
  • 并发写操作,Redies如何进行分担
  • 另外,主从模式,薪火相传模式,主机宕机,导致ip地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息。

之前通过代理主机来解决,但是redis3.0中提供了解决方案。就是无中心化集群配置。
image.png
无中心化集群:任意一台服务器都可以作为集群的入口,他们之间可以互相联通,也就是说任意一台服务器接收到客户端的请求后,可以转发到目标服务器进行处理

什么是集群

  • Redis 集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。
  • Redis 集群通过分区(partition)来提供一定程度的可用性(availability):即使集群中有一部分节点失效或者无法进行通讯,集群也可以继续处理命令请求。

    集群搭建

    首先我们既然要搭建集群,那么master节点至少要3个,slave节点也是3个,为什么呢?这是因为一个redis集群如果要对外提供可用的服务,那么集群中必须要有过半的master节点正常工作。基于这个特性,如果想搭建一个能够允许 n 个master节点挂掉的集群,那么就要搭建2n+1个master节点的集群。
    [

](https://blog.csdn.net/aloneno/article/details/96370167)

1、创建配置文件redisxxxx.conf 端口号分别为6379、6380、6381、6389、6390、6391(可以自己设定)

  1. include /usr/local/myredis/redis.conf
  2. pidfile "/var/run/redis_6391.pid"
  3. port 6391
  4. dbfilename "dump6391.rdb"
  5. cluster-enabled yes #打开集群模式
  6. cluster-config-file nodes-6391.conf #设定节点配置文件名 默认配置文件跟dbfile在同一目录下,对应redis.conf 中的dir设定的目录,也可以自己设定目录(写绝对路径)
  7. cluster-node-timeout 15000 #节点失联事件,若主节点超过指定时间不可达,她将由其丛书设备进行故障切换
  8. # 下面两项如果在redis.conf中设置了就不必重复设置
  9. masterauth 10086
  10. requirepass 10086

2、启动所有redis服务

image.png

3、将各个服务组合成一个集群

redis6已经集成了ruby环境,直接使用即可。
cd 到redis最开始解压的文件目录的src中 我这里为/home/mrlinxi/redis-6.0.5/src
./redis-cli --cluster create --cluster-replicas 1 192.168.190.1:6379 192.168.190.1:6380 192.168.190.1:6381 192.168.190.1:6389 192.168.190.1:6390 192.168.190.1:6391

  • 这里用真实的ip地址,1 表示从机数量,即一主一从
  • 如果设置了密码 需要追加 -a password 不然会报错 NOAUTH Authentication required

image.png
image.png 16384

4、通过集群方式连接

-c 采用集群方式连接,设置数据会自动切换到相应的写主机
./redis-cli -c -p 63xx
cluster nodes 可查看集群信息

5、redis cluster分配原则

分配原则尽量保证每个主数据库运行在不同的IP地址,每个从库和主库不在一个IP地址上。

6、什么是slotes

在搭建cluster时,我们可以看到最后输出 All 16384 slots covered

  • 一个Redis集群包含16384个插槽(hash slot),数据库中每个键都属于这16384个插槽的其中一个
  • 集群使用公式CRC16(key) % 16384 来计算键key 属于哪个槽,其中CRC16(key) 语句用于计算键key 的CRC16 校验和。CRC表示冗余循环校验。
  • 集群中的每个节点负责处理一部分插槽。举个例子,如果一个集群可以有主节点,其中:
    • 节点 A 负责处理0号至5460号插槽。
    • 节点 B 负责处理5461号至10922号插槽。
    • 节点 C 负责处理10923号至16383号插槽。

      主从复制的Jedis操作

      ```java private static JedisSentinelPool jedisSentinelPool=null;

public static Jedis getJedisFromSentinel(){ if(jedisSentinelPool==null){ Set sentinelSet=new HashSet<>(); sentinelSet.add(“192.168.11.103:26379”);

  1. JedisPoolConfig jedisPoolConfig =new JedisPoolConfig();
  2. jedisPoolConfig.setMaxTotal(10); //最大可用连接数
  3. jedisPoolConfig.setMaxIdle(5); //最大闲置连接数
  4. jedisPoolConfig.setMinIdle(5); //最小闲置连接数
  5. jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待
  6. jedisPoolConfig.setMaxWaitMillis(2000); //等待时间
  7. jedisPoolConfig.setTestOnBorrow(true); //取连接的时候进行一下测试 ping pong
  8. jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig);
  9. return jedisSentinelPool.getResource();
  10. }else{
  11. return jedisSentinelPool.getResource();
  12. }

}

  1. <a name="obSuy"></a>
  2. ### 在集群中录入/查询值
  3. <a name="R30ui"></a>
  4. #### 录入
  5. 在redis-cli每次录入、查询键值,redis都会计算出该key应该送往的插槽,如果不是该客户端对应服务器的插槽,redis会报错,并告知应前往的redis实例地址和端口。<br />redis-cli客户端提供了–c 参数实现自动重定向。<br />如redis-cli -c–p 6379登入后,再录入、查询键值对可以自动重定向。<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/22423156/1630833129761-38f534fe-fc29-4ce4-b5eb-6adae7756aad.png#clientId=u6cac1c02-ccb9-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=71&id=u1079fc0f&margin=%5Bobject%20Object%5D&name=image.png&originHeight=71&originWidth=517&originalType=binary&ratio=1&rotation=0&showTitle=false&size=5366&status=done&style=none&taskId=u11386e14-7175-4dcf-9cce-db9e6c26b03&title=&width=517)<br />不在一个slot下的键值,不能使用mget、mset等多键操作;<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/22423156/1630833197404-138be4c4-8f0c-4fab-b609-1d5fc1e07d0f.png#clientId=u6cac1c02-ccb9-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=35&id=uabd37765&margin=%5Bobject%20Object%5D&name=image.png&originHeight=35&originWidth=495&originalType=binary&ratio=1&rotation=0&showTitle=false&size=3143&status=done&style=none&taskId=u2ee9d4d4-65b4-4a60-bd3b-bebb915dd30&title=&width=495)<br />可以通过{}来定义组的概念,从而使key中{}内相同内容的键值对放到一个slot中去。<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/22423156/1630833278974-461d1c05-aa08-43e2-8bfb-b53f2bb90baa.png#clientId=u6cac1c02-ccb9-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=36&id=u8f737088&margin=%5Bobject%20Object%5D&name=image.png&originHeight=36&originWidth=596&originalType=binary&ratio=1&rotation=0&showTitle=false&size=2809&status=done&style=none&taskId=u4c72db41-f84f-477b-ad45-bde81cb48f5&title=&width=596)
  6. <a name="IKTHV"></a>
  7. #### 查询
  8. CLUSTER GETKEYSINSLOT <slot><count> 返回 slot 槽中的count个键。<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/22423156/1630840265339-fffbde8c-f825-46d2-969f-861795222a40.png#clientId=u6cac1c02-ccb9-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=66&id=u3f1d11e4&name=image.png&originHeight=66&originWidth=399&originalType=binary&ratio=1&rotation=0&showTitle=false&size=3677&status=done&style=none&taskId=ucbaca110-ccc0-4076-9b74-f8d2f400f09&title=&width=399)<br />cluster keyslot <key/groupname> 查看key的插槽值 只能查看当前服务器slot范围内的key<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/22423156/1630840282388-b18b1a32-c4af-4574-8b56-8c1ee5bc0f6b.png#clientId=u6cac1c02-ccb9-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=36&id=ub07635be&name=image.png&originHeight=36&originWidth=333&originalType=binary&ratio=1&rotation=0&showTitle=false&size=2031&status=done&style=none&taskId=ub90e3014-21fc-4a4e-bd62-23df80ba922&title=&width=333)<br />cluster countkeysinslot <slot> 返回slot中键的个数<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/22423156/1630840297049-135e7896-023d-43b5-904d-27e0fad7a720.png#clientId=u6cac1c02-ccb9-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=36&id=ucf5f9862&name=image.png&originHeight=36&originWidth=394&originalType=binary&ratio=1&rotation=0&showTitle=false&size=2288&status=done&style=none&taskId=u4a8d3c94-24cb-4f7b-b438-c6a2fe39d3f&title=&width=394)
  9. <a name="GG3Ak"></a>
  10. ### 故障恢复
  11. - 主节点下限,从节点能否自动升为主节点? 能 注意cluster-node-timeout设定的超时时间
  12. - 主节点恢复后,主从关系会如何?主节点回来变成从机
  13. - 如果所有某一段插槽的主从节点都宕掉,redis服务是否还能继续?
  14. - 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为yes ,那么,整个集群都挂掉
  15. - 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为no ,那么,该插槽数据全都不能使用,也无法存储。
  16. redis.conf中的参数 cluster-require-full-coverage
  17. <a name="FRa8e"></a>
  18. ### 集群的jedis开发
  19. ```java
  20. public class RedisClusterTest {
  21. public static void main(String[] args) {
  22. //创建集群对象
  23. HostAndPort hostAndPort = new HostAndPort("192.168.190.1", 6379);
  24. //没密码的写法
  25. //JedisCluster jedisCluster = new JedisCluster(hostAndPort);
  26. JedisCluster jedisCluster = new JedisCluster(hostAndPort,
  27. 5000, 3000, 10,
  28. "10086", new JedisPoolConfig());
  29. //进行操作
  30. jedisCluster.set("b1", "value1");
  31. System.out.println(jedisCluster.get("b1"));
  32. }
  33. }
  34. ======================也可以用set存储多个节点============================
  35. public class JedisClusterTest {
  36. public static void main(String[] args) {
  37. Set<HostAndPort>set =new HashSet<HostAndPort>();
  38. set.add(new HostAndPort("192.168.31.211",6379));
  39. JedisCluster jedisCluster=new JedisCluster(set);
  40. jedisCluster.set("k1", "v1");
  41. System.out.println(jedisCluster.get("k1"));
  42. }
  43. }

优点与不足

优点

  • 实现了扩容、分摊压力(多台机器同时工作)、无中心配置相对简单(任意节点均可进入进行操作)

不足

  • 多键操作是不被支持的、多键的Redis事务是不被支持的。lua脚本不被支持
  • 由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大。

    7、(高频面试题)Redis应用中会出现的问题

    1、缓存穿透

    问题描述

    缓存穿透是指key对应的数据在缓存和数据库中都没有,每次针对该key的请求从缓存中获取不到,导致所有请求均落在数据库上,从而可能压垮数据源。
    现象:应用服务器压力面大、redis命中率降低、一直查询数据库、出现很多非正常的url访问
    image.png

    解决方案

    一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。

  • 对空值缓存:如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟

  • 设置可访问的名单(白名单):使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。
  • 采用布隆过滤器:(布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。
  • 进行实时监控:当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

    2、缓存击穿

    问题描述

    key所对应的数据存在,但在缓存中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
    与缓存穿透的差别击穿缓存中没有db中有,而且处于高并发状态;而缓存穿透是缓存和db中都没有,请求不断的提交造成db压力增加
    现象:数据库访问压力瞬间增加、redis中的key并没有大量过期、redis正常运行
    redis某个key过期,大量访问使用这个key
    image.png

    解决方案

    key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。

  • 预先设置热门数据:在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长

  • 实时调整:现场监控哪些数据热门,实时调整key的过期时长
  • 使用锁
    • 在缓存失效的时候(判断拿出来的值是否为空),不是立即去load db。
    • 先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key
    • 当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key;
    • 当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法。

image.png

3、缓存雪崩

问题描述

缓存雪崩是指缓存同一时间大面积的失效,因此请求都会落在db上,造成数据库在短时间内承受大量请求而崩掉
缓存雪崩与缓存击穿的区别是,缓存雪崩是针对很多key,而缓存击穿是针对某一个key
image.png

解决方案

  • 构建多级缓存架构:nginx缓存 + redis缓存 +其他缓存(ehcache等)
  • 使用锁或队列:用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况
  • 设置过期标志更新缓存:记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。
  • 将缓存失效时间分散开:比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

    4、分布式锁

    问题描述

    随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

    解决方案

    分布式锁的主流实现方案:
  1. 基于数据库实现分布式锁
  2. 基于缓存(Redis等)
  3. 基于Zookeeper

每种分布式锁都有自己的优缺点:

  1. 性能:Redis最高
  2. 可靠性:Zookeeper最高

我们这里通过redis 实现分布式锁

通过redis 实现分布式锁

通过setnx命令可以实现分布式锁,当key不存在时才能设定key的value,否则无法设定
image.png

  • 使用setnx上锁,通过del释放锁;但是如果上锁之后一直没有释放,会导致阻塞;
  • 因此通过设置key的过期时间,自动释放
    • setnx key value
    • expire key 100
  • 上述操作加锁和设置过期时间是两步,可能上锁之后突然出现异常,无法设置过期时间;
    • 上锁的时候同时设置过期时间 set key value nx ex 100

image.png

  1. @GetMapping("testLock")
  2. public void testLock(){
  3. //1获取锁,setne
  4. Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111", 3, TimeUnit.SECONDS);
  5. //2获取锁成功、查询num的值
  6. if(lock){
  7. Object value = redisTemplate.opsForValue().get("num");
  8. //2.1判断num为空return
  9. if(StringUtils.isEmpty(value)){
  10. return;
  11. }
  12. //2.2有值就转成成int
  13. int num = Integer.parseInt(value+"");
  14. //2.3把redis的num加1
  15. redisTemplate.opsForValue().set("num", ++num);
  16. //2.4释放锁,del
  17. redisTemplate.delete("lock");
  18. }else{
  19. //3获取锁失败、每隔0.1秒再获取
  20. try {
  21. Thread.sleep(100);
  22. testLock();
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }

分布式锁的误删问题

上面实现了分布锁的功能,同时避免了不释放锁带来的阻塞问题,但是会引发新的问题:可能会释放其他服务器的锁。
场景:如果业务逻辑的执行时间是7s。执行流程如下
1. index1业务逻辑没执行完,3秒后锁被自动释放。
2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
3. index3获取到锁,执行业务逻辑
4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。
最终等于没锁的情况。
解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁
image.png
通过uuid解决锁误删的问题

  1. @GetMapping("testLock")
  2. public void testLock(){
  3. //1获取锁,setne
  4. String uuid = UUID.randomUUID().toString();
  5. Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
  6. //2获取锁成功、查询num的值
  7. if(lock){
  8. Object value = redisTemplate.opsForValue().get("num");
  9. //2.1判断num为空return
  10. if(StringUtils.isEmpty(value)){
  11. return;
  12. }
  13. //2.2有值就转成成int
  14. int num = Integer.parseInt(value+"");
  15. //2.3把redis的num加1
  16. redisTemplate.opsForValue().set("num", ++num);
  17. //2.4释放锁,del
  18. //判断比较uuid值是否一样
  19. String stringUuid = (String) redisTemplate.opsForValue().get("lock");
  20. if (uuid.equals(stringUuid)) {
  21. redisTemplate.delete("lock");
  22. }
  23. }else{
  24. //3获取锁失败、每隔0.1秒再获取
  25. try {
  26. Thread.sleep(100);
  27. testLock();
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }

删除缺乏原子性

image.png

  1. @GetMapping("testLockLua")
  2. public void testLockLua() {
  3. //1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
  4. String uuid = UUID.randomUUID().toString();
  5. //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
  6. String skuId = "25"; // 访问skuId 为25号的商品 100008348542
  7. String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
  8. // 3 获取锁
  9. Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
  10. // 第一种: lock 与过期时间中间不写任何的代码。
  11. // redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
  12. // 如果true
  13. if (lock) {
  14. // 执行的业务逻辑开始
  15. // 获取缓存中的num 数据
  16. Object value = redisTemplate.opsForValue().get("num");
  17. // 如果是空直接返回
  18. if (StringUtils.isEmpty(value)) {
  19. return;
  20. }
  21. // 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
  22. int num = Integer.parseInt(value + "");
  23. // 使num 每次+1 放入缓存
  24. redisTemplate.opsForValue().set("num", String.valueOf(++num));
  25. /*使用lua脚本来锁*/
  26. // 定义lua 脚本
  27. String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
  28. // 使用redis执行lua执行
  29. DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
  30. redisScript.setScriptText(script);
  31. // 设置一下返回值类型 为Long
  32. // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
  33. // 那么返回字符串与0 会有发生错误。
  34. redisScript.setResultType(Long.class);
  35. // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
  36. redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
  37. } else {
  38. // 其他线程等待
  39. try {
  40. // 睡眠
  41. Thread.sleep(1000);
  42. // 睡醒了之后,调用方法。
  43. testLockLua();
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }

总结

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
- 加锁和解锁必须具有原子性。

Redis6.0新功能

ACL(Access Control List)访问控制列表