NoSql
为什么要用NoSql
单机MySql的年代
90年代,一个基本的网站访问量一般不会太大,单个数据库完全足够
那个时候,更多的去使用静态网页Html~服务器根本没有太大的压力!
思考一下,这种情况下:整个网站的瓶颈是什么
1、数据量如果太大、一个机器放不下了
2、数据的索引(B+Tree),一个机器内存也放不下
3、访问量(读写混合),一个服务器承受不了
只要你开始出现以上的三种情况之一,那么你就必须要晋级
Memcached(缓存)+ MySQL+垂直拆分(读写分离)网站80%的情况都是在读,毎次都要去査询数据库的话就十分的麻烦!所以说我们希望减轻数据的压力,我们可以使用缓存来保证效率!
发展过程:优化数据结构和索引 ——> 文件缓存(IO) ——> Memcached(当时最热门的技术)
+%20MySQL+%E5%9E%82%E7%9B%B4%E6%8B%86%E5%88%86.png#id=c6uVg&originHeight=756&originWidth=1032&originalType=binary&status=done&style=none)
分库分表+水平拆分+MySQL集群
技术和业务在发展的同时,对人的要求也越来越高
本质:数据库(读,写)
早些年 MyISAM:表锁,十分影响效率!高并发下就会出现严重的锁问题
转战 innodb:行锁
慢慢的就开始使用分库分表来解决写的压力! MySQL在哪个年代推出了表分区!这个并没有多少公司使用!
MySQL的集群,很好满足哪个年代的所有需求!
如今的年代
2010-2020十年之间,世界已经发生了翻天覆地的变化;(定位,也是一种数据,音乐,热榜!)
MySQL等关系型数据库就不够用了!数据量很多,变化很快。
MySQL有的使用它来存储一些比较大的文件,博客,图片!数据库表很大,效率就低了!如果有一种数据库来专门处理这种数据,MySQL压力就变得十分小(研究如何处理这些问题!)大数据的IO压力下,表几乎没法更大!
目前一个基本的互联网项目!
为什么要用 NOSQL
用户的个人信息,社交网络,地理位置。用户自己产生的数据,用户日志等等爆发式增长
这时候我们就需要使用NOSQL数据库的,NoSql可以很好的处理以上的情况
什么是NoSql
NOSQL
NOSQL= Not Only SQL(不仅仅是SQL)
关系型数据库:表格,行,列
泛指非关系型数据库的,随着web2.0互联网的诞生!传统的关系型数据库很难对付web2.0时代!尤其是超大规模的高并发的社区!暴露岀来很多难以克服的问题,NosαL在当今大数据环境下发展的十分迅速, Redis是发展最快的,而且是我们当下必须要掌握的一个技术。
很多的数据类型用户的个人信息,社交网络,地理位置。这些数据类型的存储不需要一个固定的格式!不需要多余的操作就可以横向扩展的! Map<String,Object> 使用键值对来控制。
NOSQL特点
解耦
1、方便扩展(数据之间没有关系,很好扩展!)
2、大数据量高性能(Redis一秒写8万次,读取11万,NoSqL的缓存记录级,是一种细粒度的缓存,性能会比较高!)
3、数据类型是多样型的!(不需要事先设计数据库!随取随用!如果是数据量十分大的表,很多人就无法设计了!)
4、传统 RDBMS和 NoSql
传统的 RDBMS
-结构化组织
-SQL
-数据和关系都存在单独的表中 row col
-操作操作,数据定义语言
-严格的一致性
-基础的事务
-...
NoSql
-不仅仅是数据
-没有固定的查询语言
-键值对存储,列存储,文档存储,图形数据库(社交关系)
-最终一致性,
-CAP定理和BASE(异地多活)
-高性能,高可用,高可扩
-...
了解:3V+3高
大数据时代的3:主要是描述问题的
- 海量Volume
- 多样Variety
- 实时Velocity
大数据时代的3高:主要是对程序的要求
- 高并发
- 高可拓
- 高性能
真正在公司中的实践: NOSQL+ RDBMS一起使用才是最强的,阿里巴巴的架构演进
技术没有高低之分,就看你如何去使用!(提升内功,思维的提高!)
阿里巴巴实践分析理解数据架构演进
如果你未来相当一个架构师:没有什么是加一层解决不了的!
# 1、商品的基本信息
名称、价格、商家信息
关系型数据库就可以解决了!MysαL/ oracle(淘宝早年就去IOE了!-王坚:推荐文章:阿里云的这群疯子:40分钟重要!)
淘宝内部的 MySQL不是大家用的 MySQL
# 2、商品的描述、评论(文字比较多)
文档型数据库中, MongoDB
#3、图片
分布式文件系统 FastDFS
- 淘宝的 TFS
- Google的 GFS
- Hadoop的 HDFS
- 阿里云的 OSS
#4、商品的关键字(搜索)
- 搜索引擎 solr elasticsearch
- ISErach:多隆(多去了解一下这些技术大佬!)
#5、商品热门的波段信息
内存数据库
Redis tair、 Memache
#6、商品的交易,外部的支付接口
- 三方应用
NoSql的四大分类
KV键值对:
- 新浪:Reds
- 美团: Redis+Tair
- 阿里、百度: Redis+ memcache
文档型数据库(bson格式和json一样):
- MongoDB(一般必须要掌握)
- MongoDB是一个基于分布式文件存储的数据库,C+编写,主要用来处理大量的文档
- MongoDB是一个个于关系型数据库和非关系型数据中中的产品! MongoDB是非关系型数据库中功能最丰富,最像关系型数据库的!
- ConthDB
列存储数据库
- HBase
- 分布式文件系统
图关系数据库
它不是存图形,放的是关系,比如:朋友圈社交网络,广告推荐!
Neo4i, Info grid
对比
分类 | Examples举例 | 典型应用场景 | 数据模型 | 优点 | 缺点 |
---|---|---|---|---|---|
键值(key-value) | Tokyo Cabinet/Tyrant, Redis, Voldemort, Oracle BDB | 内容缓存,主要用于处理大量数据的高访问负载,也用于一些日志系统等等。 | Key 指向 Value 的键值对,通常用hash table来实现 | 查找速度快 | 数据无结构化,通常只被当作字符串或者二进制数据 |
列存储数据库 | Cassandra, HBase, Riak | 分布式的文件系统 | 以列簇式存储,将同一列数据存在一起 | 查找速度快,可扩展性强,更容易进行分布式扩展 | 功能相对局限 |
文档型数据库 | CouchDB, MongoDb | Web应用(与Key-Value类似,Value是结构化的,不同的是数据库能够了解Value的内容) | Key-Value对应的键值对,Value为结构化数据 | 数据结构要求不严格,表结构可变,不需要像关系型数据库一样需要预先定义表结构 | 查询性能不高,而且缺乏统一的查询语法。 |
图形(Graph)数据库 | Neo4J, InfoGrid, Infinite Graph | 社交网络,推荐系统等。专注于构建关系图谱 | 图结构 | 利用图结构相关算法。比如最短路径寻址,N度关系查找等 | 很多时候需要对整个图做计算才能得出需要的信息,而且这种结构不太好做分布式的集群方案。 |
Redis 入门
Redis 是什么?
Redis(Remote Dictionary Server ),即远程字典服务,
是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。
区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了 master- slave(主从)同步。
免费开源,是当下最热门的NoSql技术之一,也被人们称为结构化数据库。
Redis 可以用来干什么?
1、内存存储、持久化,内存中是断电即失、所以说持久化很重要(rdb、aof)
2、效率高,可以用于高速缓存
3、发布订阅系统
4、地图信息分析
5、计时器、计数器(浏览量 incr decre)
6、……
特性
1、多样的数据类型
2、持久化
3、集群
4、事务
……
需要用到的东西
下载地址:
注意:Windows在Github上下载(停更很久了!)
Redis推荐都是在Linux服务器上搭建的,我们是基于Linux学习!
Redis安装
Windows安装
1、下载安装包: https://github.com/dmajkic/redis/releases
2、下载完毕得到压缩包:
3、解压到自己电脑上的环境目录下的就可以的。
4、开启Redis ,双击运行服务即可。
5、使用Redis客户端连接redis
Linux安装
1、下载安装包!redis-6.2.1.tar.gz
2、解压Redis的安装包!程序/opt (我用的是WSL2 自己创建的environment)
3、进入解压后的文件,可以看到我们redis的配置
reids.conf
4、基本的环境安装
这个是CentOS的
yum install gcc-c++
make
make install
而在WSL2中装的Ubuntu 20
sudo apt-get install build-essential
sudo apt-get install gcc
sudo apt-get install g++
make
make install
5、redis的默认安装路径/usr/local/bin
6、将redis配置文件。复制到我们当前目录下
7、redis默认不是后台启动的,修改配置文件!
上面三步由于wsl2配置暂时找不到对应的设置
8、启动redis
redis-server kconfig/redis.conf --通过指定修改的启动
redis-cli -p 6379 -- 通过redis客户端进行连接
ping -- 测试
ps -ef|grep redis -- redis的进程是否开启
shutdown -- 关闭redis服务
exit -- 退出
9、使用redis-cli进行连接测试!
10、查看redis的进程是否开启
测试性能
redis-benchmark是一个压力测试工具!
官方自带的性能测试工具!
redis-benchmark命令参数!
序号 | 选项 | 描述 | 默认值 |
---|---|---|---|
1 | -h | 指定服务器主机名 | 127.0.0.1 |
2 | -p | 指定服务器端口 | 6379 |
3 | -s | 指定服务器 socket | |
4 | -c | 指定并发连接数 | 50 |
5 | -n | 指定请求数 | 10000 |
6 | -d | 以字节的形式指定 SET/GET 值的数据大小 | 2 |
7 | -k | 1=keep alive 0=reconnect | 1 |
8 | -r | SET/GET/INCR 使用随机 key, SADD 使用随机值 | |
9 | -P | 通过管道传输 请求 | 1 |
10 | -q | 强制退出 redis。仅显示 query/sec 值 | |
11 | —csv | 以 CSV 格式输出 | |
12 | -l | 生成循环,永久执行测试 | |
13 | -t | 仅运行以逗号分隔的测试命令列表。 | |
14 | -I | Idle 模式。仅打开 N 个 idle 连接并等待。 |
来简单测试下:
#测试:100个并发连接100000请求
redis-benchmark-h localhost-p6379-c100-n100000
基础的知识
redis默认有16个数据库
默认使用的是第0个
可以使用select进行切换数据库!
127.0.0.1:6379>select3#切换数据库
OK
127.0.0.1:6379[3]>DBSIZE#查看DB大小
(integer)0
127.0.0.1:6379[3]> keys*#查看数据库所有的key
1)"name"
清除当前数据库 flushdb
清除全部数据库的内容FLUSHALL
127.0.0.1:6379[3]>flushdb
OK
127.0.0.1:6379[3]>keys*
(empty list or set)
Redis 是单线程的
明白Redis是很快的,官方表示,Redis是基于内存操作,CPU不是Redis性能瓶颈,Redis的瓶颈是根据机器的内存和网络带宽,既然可以使用单线程来实现,就使用单线程了!所有就使用了单线程了!
Redis是C语言写的,官方提供的数据为100000+的QPS,完全不比同样是使用key-vale的Memecache差!
Redis为什么单线程还这么快?
1、误区1:高性能的服务器一定是多线程的?
2、误区2:多线程(CPU上下文会切换!)一定比单线程效率高!
先去CPU>内存>硬盘的速度要有所了解!
核心:redis是将所有的数据全部放在内存中的,所以说使用单线程去操作效率就是最高的,多线程(CPU上下文会切换:耗时的操作!!!),对于内存系统来说,如果没有上下文切换效率就是最高的!时的操作!多次读写都是在一个CPU上的,在内存情况下,这个就是最佳的方案!
五大数据类型
Redis是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件MQ。它支持多种类型的数据结构,如字符串(strings),散列(hashes),列表(lists),集合(sets),有序集合(sorted sets)与范围查询,bitmaps,hyperloglogs和地理空间(geospatial)索引半径查询。Redis内置了复制(replication),LUA脚本(Luascripting),LRU驱动事件(LRU eviction),事务(transactions)和不同级别的磁盘持久(persistence),并通过Redis哨兵(Sentinel)和自动分区(Cluster)提供高可用性(high availability)。
Redis-Key
在redis中无论什么数据类型,在数据库中都是以key-value形式保存,通过进行对Redis-key的操作,来完成对数据库中数据的操作。
下面学习的命令:
exists key
:判断键是否存在del key
:删除键值对move key db
:将键值对移动到指定数据库expire key second
:设置键值对的过期时间type key
:查看value的数据类型
127.0.0.1:6379> keys * # 查看当前数据库所有key
(empty list or set)
127.0.0.1:6379> set name dafran # set key
OK
127.0.0.1:6379> set age 20
OK
127.0.0.1:6379> keys *
1) "age"
2) "name"
127.0.0.1:6379> move age 1 # 将键值对移动到指定数据库
(integer) 1
127.0.0.1:6379> EXISTS age # 判断键是否存在
(integer) 0 # 不存在
127.0.0.1:6379> EXISTS name
(integer) 1 # 存在
127.0.0.1:6379> SELECT 1
OK
127.0.0.1:6379[1]> keys *
1) "age"
127.0.0.1:6379[1]> del age # 删除键值对
(integer) 1 # 删除个数
127.0.0.1:6379> set age 20
OK
127.0.0.1:6379> EXPIRE age 15 # 设置键值对的过期时间
(integer) 1 # 设置成功 开始计数
127.0.0.1:6379> ttl age # 查看key的过期剩余时间
(integer) -2 # -2 表示key过期,-1表示key未设置过期时间
127.0.0.1:6379> get age # 过期的key 会被自动delete
(nil)
127.0.0.1:6379> keys *
1) "name"
127.0.0.1:6379> type name # 查看value的数据类型
string
关于TTL
命令
Redis的key,通过TTL命令返回key的过期时间,一般来说有3种:
- 当前key没有设置过期时间,所以会返回-1.
- 当前key有设置过期时间,而且key已经过期,所以会返回-2.
- 当前key有设置过期时间,且key还没有过期,故会返回key的正常剩余时间.
关于重命名RENAME
和RENAMENX
RENAME key newkey
修改 key 的名称RENAMENX key newkey
仅当 newkey 不存在时,将 key 改名为 newkey 。
更多命令学习:https://www.redis.net.cn/order/
- String
- hash:map格式
- 列表类型list:linkedlist格式元素可重复
- **集合类型set 元素不重复
- 有序集合:sortedset元素不重复且排序
String(字符串)
可覆盖
基本指令:
- 存储:set key value
- 获取 :get key nil(没有)
- 删除:del key
高级用法:
追加:
# APPEND key value 向指定的key的value后追加字符串
127.0.0.1:6379> set msg hello
OK
127.0.0.1:6379> append msg " world" (integer) 11
127.0.0.1:6379> get msg “hello world”
自增自减
# INCR/DECR key 将指定key的value数值进行+1/-1(仅对于数字)
127.0.0.1:6379> set age 20
OK
127.0.0.1:6379> incr age
(integer) 21
127.0.0.1:6379> decr age
(integer) 20
步长
#`INCRBY/DECRBY key n` 按指定的步长对数值进行加减
127.0.0.1:6379> INCRBY age 5
(integer) 25
127.0.0.1:6379> DECRBY age 10
(integer) 15
浮点型
#`INCRBYFLOAT key n`为数值加上浮点型数值
127.0.0.1:6379> INCRBYFLOAT age 5.2
“20.2”
获取key保存值的字符串长度
# `STRLEN key`
127.0.0.1:6379> get msg “hello world”
127.0.0.1:6379> STRLEN msg
(integer) 11
截取字符串(闭区间,起止位置都取)
#`GETRANGE key start end`
127.0.0.1:6379> get msg “hello world”
127.0.0.1:6379> GETRANGE msg 3 9
“lo worl”
替换
#`SETRANGE key offset value`
127.0.0.1:6379> SETRANGE msg 2 hello
(integer) 7
127.0.0.1:6379> get msg
“tehello”
# 替换返回 key 的旧值(old value)GETSET key value`
127.0.0.1:6379> GETSET msg test
“hello world”
set
#`SETNX key value` key不存在时 设置 分布式锁应用
#成功返回1
#失败返回0
127.0.0.1:6379> SETNX msg test
(integer) 0
127.0.0.1:6379> SETNX name sakura
(integer) 1
#set 键值对并设置过期时间`SETEX key seconds value`
127.0.0.1:6379> setex name 10 root
OK
127.0.0.1:6379> get name
(nil)
批量set键值对
#`MSET key1 value1 [key2 value2..]` 批量set键值对
127.0.0.1:6379> MSET k1 v1 k2 v2 k3 v3 OK
#仅当参数中所有的key都不存在时执行 `MSETNX key1 value1 [key2 value2..]`
#成功返回1
#失败返回0
#原子性操作,要么一起成功,要么一起失败。
127.0.0.1:6379> MSETNX k1 v1 k4 v4
(integer) 0
#批量获取多个key保存的值`MGET key1 [key2..]`
127.0.0.1:6379> MGET k1 k2 k3
1) “v1”
2) “v2”
3) “v3”
getset
getset #先get后set
127.0.0.1:6379> getset db redis # 如果不存在值,返回nil
(nil)
127.0.0.1:6379> get db
"redis"
127.0.0.1:6379> getset db mongodb # 如果存在值,获取返回原来的值,并设置新的值
"redis"
127.0.0.1:6379> get db
"mongodb"
对象:
set user:1:{name:zhja,age:15}
#user:{id}:{field}
127.0.0.1:6379> mset user:1:name zhan user:1:age 15
127.0.0.1:6379> mget user:1:name user:1:age
1) "zhan"
2) "15"
生存时间
`PSETEX key milliseconds value`
#和 SETEX 命令相似,但它以毫秒为单位设置 key 的生存时间,
`getset key value`
#如果不存在值,则返回nil,
#如果存在值,获取原来的值,并设置新的值
#先get再set
String类似的使用场景:
value除了是字符串还可以是数字,用途举例:
计数器
统计多单位的数量:uid:22532676:follow 0
粉丝数
对象存储缓存 | 命令 | 描述 | 示例 | | —- | —- | —- | |APPEND key value
| 向指定的key的value后追加字符串 | 127.0.0.1:6379> set msg hello OK 127.0.0.1:6379> append msg “ world” (integer) 11 127.0.0.1:6379> get msg “hello world” | |DECR/INCR key
| 将指定key的value数值进行+1/-1(仅对于数字) | 127.0.0.1:6379> set age 20 OK 127.0.0.1:6379> incr age (integer) 21 127.0.0.1:6379> decr age (integer) 20 | |INCRBY/DECRBY key n
| 按指定的步长对数值进行加减 | 127.0.0.1:6379> INCRBY age 5 (integer) 25 127.0.0.1:6379> DECRBY age 10 (integer) 15 | |INCRBYFLOAT key n
| 为数值加上浮点型数值 | 127.0.0.1:6379> INCRBYFLOAT age 5.2 “20.2” | |STRLEN key
| 获取key保存值的字符串长度 | 127.0.0.1:6379> get msg “hello world” 127.0.0.1:6379> STRLEN msg (integer) 11 | |GETRANGE key start end
| 按起止位置获取字符串(闭区间,起止位置都取) | 127.0.0.1:6379> get msg “hello world” 127.0.0.1:6379> GETRANGE msg 3 9 “lo worl” | |SETRANGE key offset value
| 用指定的value 替换key中 offset开始的值 | 127.0.0.1:6379> SETRANGE msg 2 hello (integer) 7 127.0.0.1:6379> get msg “tehello” | |GETSET key value
| 将给定 key 的值设为 value ,并返回 key 的旧值(old value)。 | 127.0.0.1:6379> GETSET msg test “hello world” | |SETNX key value
| 仅当key不存在时进行set | 127.0.0.1:6379> SETNX msg test (integer) 0 127.0.0.1:6379> SETNX name sakura (integer) 1 | |SETEX key seconds value
| set 键值对并设置过期时间 | 127.0.0.1:6379> setex name 10 root OK 127.0.0.1:6379> get name (nil) | |MSET key1 value1 [key2 value2..]
| 批量set键值对 | 127.0.0.1:6379> MSET k1 v1 k2 v2 k3 v3 OK | |MSETNX key1 value1 [key2 value2..]
| 批量设置键值对,仅当参数中所有的key都不存在时执行 | 127.0.0.1:6379> MSETNX k1 v1 k4 v4 (integer) 0 | |MGET key1 [key2..]
| 批量获取多个key保存的值 | 127.0.0.1:6379> MGET k1 k2 k3 1) “v1” 2) “v2” 3) “v3” | |PSETEX key milliseconds value
| 和 SETEX 命令相似,但它以毫秒为单位设置 key 的生存时间, | | |getset key value
| 如果不存在值,则返回nil,如果存在值,获取原来的值,并设置新的值 | |
String类似的使用场景:value除了是字符串还可以是数字,用途举例:
- 计数器
- 统计多单位的数量:uid:123666:follow 0
- 粉丝数
- 对象存储缓存
List(列表)
Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)
一个列表最多可以包含 232 - 1 个元素 (4294967295, 每个列表超过40亿个元素)。
首先我们列表,可以经过规则定义将其变为队列、栈、双端队列等
所有的List命令都是以L开头的
正如图Redis中List是可以进行双端操作的,所以命令也就分为了LXXX和RLLL两类,有时候L也表示List例如LLEN
有无 “ ”
都会被识别成字符,但也不一定。
---------------------------LPUSH---RPUSH---LRANGE--------------------------------
127.0.0.1:6379> LPUSH mylist k1 # LPUSH mylist=>{1}
(integer) 1
127.0.0.1:6379> LPUSH mylist k2 # LPUSH mylist=>{2,1}
(integer) 2
127.0.0.1:6379> RPUSH mylist k3 # RPUSH mylist=>{2,1,3}
(integer) 3
127.0.0.1:6379> get mylist # 普通的get是无法获取list值的
(error) WRONGTYPE Operation against a key holding the wrong kind of value
127.0.0.1:6379> LRANGE mylist 0 4 # LRANGE 获取起止位置范围内的元素
1) "k2"
2) "k1"
3) "k3"
127.0.0.1:6379> LRANGE mylist 0 2
1) "k2"
2) "k1"
3) "k3"
127.0.0.1:6379> LRANGE mylist 0 1
1) "k2"
2) "k1"
127.0.0.1:6379> LRANGE mylist 0 -1 # 获取全部元素
1) "k2"
2) "k1"
3) "k3"
---------------------------LPUSHX---RPUSHX-----------------------------------
127.0.0.1:6379> LPUSHX list v1 # list不存在 LPUSHX失败
(integer) 0
127.0.0.1:6379> LPUSHX list v1 v2
(integer) 0
127.0.0.1:6379> LPUSHX mylist k4 k5 # 向mylist中 左边 PUSH k4 k5
(integer) 5
127.0.0.1:6379> LRANGE mylist 0 -1
1) "k5"
2) "k4"
3) "k2"
4) "k1"
5) "k3"
---------------------------LINSERT--LLEN--LINDEX--LSET----------------------------
127.0.0.1:6379> LINSERT mylist after k2 ins_key1 # 在k2元素后 插入ins_key1
(integer) 6
127.0.0.1:6379> LRANGE mylist 0 -1
1) "k5"
2) "k4"
3) "k2"
4) "ins_key1"
5) "k1"
6) "k3"
127.0.0.1:6379> LLEN mylist # 查看mylist的长度
(integer) 6
127.0.0.1:6379> LINDEX mylist 3 # 获取下标为3的元素
"ins_key1"
127.0.0.1:6379> LINDEX mylist 0
"k5"
127.0.0.1:6379> LSET mylist 3 k6 # 将下标3的元素 set值为k6 如果目标列表不存在或者对应的索引不存在则失败
OK
127.0.0.1:6379> LRANGE mylist 0 -1
1) "k5"
2) "k4"
3) "k2"
4) "k6"
5) "k1"
6) "k3"
---------------------------LPOP--RPOP--------------------------
127.0.0.1:6379> LPOP mylist # 左侧(头部)弹出
"k5"
127.0.0.1:6379> RPOP mylist # 右侧(尾部)弹出
"k3"
---------------------------RPOPLPUSH--------------------------
127.0.0.1:6379> LRANGE mylist 0 -1
1) "k4"
2) "k2"
3) "k6"
4) "k1"
127.0.0.1:6379> RPOPLPUSH mylist newlist # 将mylist的最后一个值(k1)弹出,加入到newlist的头部
"k1"
127.0.0.1:6379> LRANGE newlist 0 -1
1) "k1"
127.0.0.1:6379> LRANGE mylist 0 -1
1) "k4"
2) "k2"
3) "k6"
---------------------------LTRIM--------------------------
127.0.0.1:6379> LTRIM mylist 0 1 # 截取mylist中的 0~1部分
OK
127.0.0.1:6379> LRANGE mylist 0 -1
1) "k4"
2) "k2"
# 初始 mylist: k2,k2,k2,k2,k2,k2,k4,k2,k2,k2,k2
---------------------------Linstrt--Rinsert--------------------------
127.0.0.1:6379> Rpush mylist "hello"
(integer) 1
127.0.0.1:6379> Rpush mylist "world"
(integer) 2
127.0.0.1:6379> Linsert mylist before "world" "other" # 在world之前增加 other
(integer) 3
127.0.0.1:6379> Lrange mylist 0 -1
1) "hello"
2) "other"
3) "world"
127.0.0.1:6379> Linsert mylist after world new # 在world之前增加 new
(integer) 4
127.0.0.1:6379> Lrange mylist 0 -1
1) "hello"
2) "other"
3) "world"
4) "new"
---------------------------LREM--------------------------
127.0.0.1:6379> LREM mylist 3 k2 # 从头部开始搜索 至多删除3个 k2
(integer) 3
# 删除后:mylist: k2,k2,k2,k4,k2,k2,k2,k2
127.0.0.1:6379> LREM mylist -2 k2 #从尾部开始搜索 至多删除2个 k2
(integer) 2
# 删除后:mylist: k2,k2,k2,k4,k2,k2
---------------------------BLPOP--BRPOP--------------------------
mylist: k2,k2,k2,k4,k2,k2
newlist: k1
127.0.0.1:6379> BLPOP newlist mylist 30 # 从newlist中弹出第一个值,mylist作为候选
1) "newlist" # 弹出
2) "k1"
127.0.0.1:6379> BLPOP newlist mylist 30
1) "mylist" # 由于newlist空了 从mylist中弹出
2) "k2"
127.0.0.1:6379> BLPOP newlist 30
(30.10s) # 超时了
127.0.0.1:6379> BLPOP newlist 30 # 我们连接另一个客户端向newlist中push了test, 阻塞被解决。
1) "newlist"
2) "test"
(12.54s)
1、添加
左部:lpush key value1[value2]
右部:rpush key value1[value2]
向已存在的列名中push值(一个或者多个)- LPUSHX key value
- RPUSHX key value
2、获取
lrange key start end 获取list 起止元素==(索引从左往右 递增)==
0 -1 全部
LLEN key获取列表长度
LINDEX key index 通过索引获取列表元素
LTRIM key start end 通过下标截取指定范围内的列表127.0.0.1:6379> LTRIM mylist 0 1 # 截取mylist中的 0~1部分
OK
127.0.0.1:6379> LRANGE mylist 0 -1
1) "k4"
2) "k2"
3、删除
lpop key:删除最左边的元素,并将元素返回
rpop key:删除最右边的元素,并将元素返回
RPOPLPUSH source destination 将列表的尾部(右)最后一个值弹出,并返回,然后加到另一个列表(没有可自动创建)的头部
BLPOP/BRPOP key1[key2] timout
移出并获取列表的第一个/最后一个元素,
如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。#mylist: k2,k2,k2,k4,k2,k2
#newlist: k1
127.0.0.1:6379> BLPOP newlist mylist 30 # 从newlist中弹出第一个值,mylist作为候选
1) "newlist" # 弹出
2) "k1"
127.0.0.1:6379> BLPOP newlist mylist 30
1) "mylist" # 由于newlist空了 从mylist中弹出
2) "k2"
127.0.0.1:6379> BLPOP newlist 30
(30.10s) # 超时了
127.0.0.1:6379> BLPOP newlist 30 # 我们连接另一个客户端向newlist中push了test, 阻塞被解决。
1) "newlist"
2) "test"
(12.54s)
BRPOPLPUSH source destination timeout- 和
RPOPLPUSH
功能相同,如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
- 和
LREM key count value 删除指定key的重复value- List中是允许value重复的
- count > 0:从头部开始搜索 然后删除指定的value 至多删除count个
- count < 0
:从尾部开始搜索…
- count = 0`:删除列表中所有的指定value。
4、插入
LINSERT key BEFORE|AFTER pivot value
5、修改
- LSET key index value 通过索引为元素设值
| 命令 | 描述 |
| —- | —- |
|
LPUSH/RPUSH key value1[value2..]
| 从左边/右边向列表中PUSH值(一个或者多个)。 | |LRANGE key start end
| 获取list 起止元素==(索引从左往右 递增)== | |LPUSHX/RPUSHX key value
| 向已存在的列名中push值(一个或者多个) | |LINSERT key BEFORE|AFTER pivot value
| 在指定列表元素的前/后 插入value | |LLEN key
| 查看列表长度 | |LINDEX key index
| 通过索引获取列表元素 | |LSET key index value
| 通过索引为元素设值 | |LPOP/RPOP key
| 从最左边/最右边移除值 并返回 | |RPOPLPUSH source destination
| 将列表的尾部(右)最后一个值弹出,并返回,然后加到另一个列表的头部 | |LTRIM key start end
| 通过下标截取指定范围内的列表 | |LREM key count value
| List中是允许value重复的count > 0
:从头部开始搜索 然后删除指定的value 至多删除count个count < 0
:从尾部开始搜索…count = 0
:删除列表中所有的指定value。 | |BLPOP/BRPOP key1[key2] timout
| 移出并获取列表的第一个/最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 | |BRPOPLPUSH source destination timeout
| 和RPOPLPUSH
功能相同,如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 |
小结
- list实际上是一个链表,before Node after , left, right 都可以插入值
- 如果key不存在,则创建新的链表
- 如果key存在,新增内容
- 如果移除了所有值,空链表,也代表不存在
- 在两边插入或者改动值,效率最高!修改中间元素,效率相对较低。
应用:
消息排队!消息队列(Lpush Rpop),栈(Lpush Lpop)
Set(集合)
Redis的Set是string类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据。
Redis 中 集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。
集合中最大的成员数为 232 - 1 (4294967295, 每个集合可存储40多亿个成员)。
1、添加:
- sadd key value(可多个)
2、获取:
- smembers key 所有元素 nil(没有)
- SCARD key 获取集合的成员数
- SDIFF key1[key2..] 返回所有集合的差集 key1- key2 - …
- SDIFFSTORE destination key1[key2..] 在SDIFF的基础上,将结果保存到集合中==(覆盖)==。不能保存到其他类型key
- SINTER key1 [key2..] 返回所有集合的交集
- SINTERSTORE destination key1[key2..] 在SINTER的基础上,存储结果到集合中。覆盖
- SUNION key1 [key2..] 返回所有集合的并集
- SUNIONSTORE destination key1 [key2..] 在SUNION的基础上,存储结果到集合。覆盖
- SSCAN KEY [MATCH pattern] [COUNT count] 在大量数据环境下,遍历集合中元素,每次遍历部分
3、查询
- SISMEMBER key member 查询member元素是否是集合的成员,结果是无序的
- SRANDMEMBER key [count] 随机返回集合中count个成员,count缺省值为1
4、删除:
srem key value 删除集合中的某个元素
SPOP key [count] 随机移除并返回集合中count个成员,count缺省值为1
SMOVE source destination member 将source集合的成员member移动到destination集合
SREM key member1[member2..] 移除集合中一个/多个成员
---------------SADD--SCARD--SMEMBERS--SISMEMBER--------------------
127.0.0.1:6379> SADD myset m1 m2 m3 m4 # 向myset中增加成员 m1~m4
(integer) 4
127.0.0.1:6379> SCARD myset # 获取集合的成员数目
(integer) 4
127.0.0.1:6379> smembers myset # 获取集合中所有成员
1) "m4"
2) "m3"
3) "m2"
4) "m1"
127.0.0.1:6379> SISMEMBER myset m5 # 查询m5是否是myset的成员
(integer) 0 # 不是,返回0
127.0.0.1:6379> SISMEMBER myset m2
(integer) 1 # 是,返回1
127.0.0.1:6379> SISMEMBER myset m3
(integer) 1
---------------------SRANDMEMBER--SPOP----------------------------------
127.0.0.1:6379> SRANDMEMBER myset 3 # 随机返回3个成员
1) "m2"
2) "m3"
3) "m4"
127.0.0.1:6379> SRANDMEMBER myset # 随机返回1个成员
"m3"
127.0.0.1:6379> SPOP myset 2 # 随机移除并返回2个成员
1) "m1"
2) "m4"
# 将set还原到{m1,m2,m3,m4}
---------------------SMOVE--SREM----------------------------------------
127.0.0.1:6379> SMOVE myset newset m3 # 将myset中m3成员移动到newset集合
(integer) 1
127.0.0.1:6379> SMEMBERS myset
1) "m4"
2) "m2"
3) "m1"
127.0.0.1:6379> SMEMBERS newset
1) "m3"
127.0.0.1:6379> SREM newset m3 # 从newset中移除m3元素
(integer) 1
127.0.0.1:6379> SMEMBERS newset
(empty list or set)
# 下面开始是多集合操作,多集合操作中若只有一个参数默认和自身进行运算
# setx=>{m1,m2,m4,m6}, sety=>{m2,m5,m6}, setz=>{m1,m3,m6}
-----------------------------SDIFF------------------------------------
# 差集
127.0.0.1:6379> SDIFF setx sety setz # 等价于setx-sety-setz
1) "m4"
127.0.0.1:6379> SDIFF setx sety # setx - sety
1) "m4"
2) "m1"
127.0.0.1:6379> SDIFF sety setx # sety - setx
1) "m5"
-------------------------SINTER---------------------------------------
# 共同关注(交集)
127.0.0.1:6379> SINTER setx sety setz # 求 setx、sety、setx的交集
1) "m6"
127.0.0.1:6379> SINTER setx sety # 求setx sety的交集
1) "m2"
2) "m6"
-------------------------SUNION---------------------------------------
# 并集
127.0.0.1:6379> SUNION setx sety setz # setx sety setz的并集
1) "m4"
2) "m6"
3) "m3"
4) "m2"
5) "m1"
6) "m5"
127.0.0.1:6379> SUNION setx sety # setx sety 并集
1) "m4"
2) "m6"
3) "m2"
4) "m1"
5) "m5"
命令 | 描述 |
---|---|
SADD key member1[member2..] |
向集合中无序增加一个/多个成员 |
SCARD key |
获取集合的成员数 |
SMEMBERS key |
返回集合中所有的成员 |
SISMEMBER key member |
查询member元素是否是集合的成员,结果是无序的 |
SRANDMEMBER key [count] |
随机返回集合中count个成员,count缺省值为1 |
SPOP key [count] |
随机移除并返回集合中count个成员,count缺省值为1 |
SMOVE source destination member |
将source集合的成员member移动到destination集合 |
SREM key member1[member2..] |
移除集合中一个/多个成员 |
SDIFF key1[key2..] |
返回所有集合的差集 key1- key2 - … |
SDIFFSTORE destination key1[key2..] |
在SDIFF的基础上,将结果保存到集合中==(覆盖)==。不能保存到其他类型key噢! |
SINTER key1 [key2..] |
返回所有集合的交集 |
SINTERSTORE destination key1[key2..] |
在SINTER的基础上,存储结果到集合中。覆盖 |
SUNION key1 [key2..] |
返回所有集合的并集 |
SUNIONSTORE destination key1 [key2..] |
在SUNION的基础上,存储结果到及和张。覆盖 |
SSCAN KEY [MATCH pattern] [COUNT count] |
在大量数据环境下,使用此命令遍历集合中元素,每次遍历部分 |
Hash(哈希)
Redis hash 是一个string类型的field和value的映射表,hash特别适合用于存储对象。
可以看作Map集合,key-Map ——> key- !本质和String类型没有太大区别,还是一个简单的key-vlaue!
Set就是一种简化的Hash,只变动key,而value使用默认值填充。可以将一个Hash表作为一个对象进行存储,表中存放对象的信息。
新增
- hset key field value 会覆盖
- HMSET key field1 value1 [field2 value2..] 时将多个 field-value (域-值)对设置到哈希表 key 中。
- HSETNX key field value 只有在字段 field 不存在时,设置哈希表字段的值。
- HINCRBY key field n 为哈希表 key 中的指定字段的整数值加上增量n,并返回增量后结果 一样只适用于整数型字段
- HINCRBYFLOAT key field n 为哈希表 key 中的指定字段的浮点数值加上增量 n。
查找
- HEXISTS key field 查看哈希表 key 中,指定的字段是否存在。
- HSCAN key cursor [MATCH pattern] [COUNT count] 迭代哈希表中的键值对。
获取
hget key field 获取所有给定字段的值
HMGET key field1 [field2..] 获取所有给定字段的值
HGETALL key 获取在哈希表key 的所有字段和值
HKEYS key 获取哈希表key中所有的字段
HLEN key 获取哈希表中字段的数量
HVALS key 获取哈希表中所有值
删除
hdel key field 删除字段
HDEL key field1 [field2..] 删除哈希表key中一个/多个field字段
------------------------HSET--HMSET--HSETNX----------------
127.0.0.1:6379> HSET studentx name sakura # 将studentx哈希表作为一个对象 (key-value),设置name为sakura
(integer) 1
127.0.0.1:6379> HSET studentx name gyc # 重复设置field进行覆盖,并返回0
(integer) 0
127.0.0.1:6379> HSET studentx age 20 # 设置studentx的age为20
(integer) 1
127.0.0.1:6379> HMSET studentx sex 1 tel 15623667886 # 设置sex为1,tel为15623667886
OK
127.0.0.1:6379> HSETNX studentx name gyc # HSETNX 设置已存在的field 存在不设置
(integer) 0 # 失败
127.0.0.1:6379> HSETNX studentx email 12345@qq.com # 不存在可以设置
(integer) 1 # 成功
----------------------HEXISTS--------------------------------
127.0.0.1:6379> HEXISTS studentx name # name字段在studentx中是否存在
(integer) 1 # 存在
127.0.0.1:6379> HEXISTS studentx addr
(integer) 0 # 不存在
-------------------HGET--HMGET--HGETALL-----------
127.0.0.1:6379> HGET studentx name # 获取studentx中name字段的value
"gyc"
127.0.0.1:6379> HMGET studentx name age tel # 获取studentx中name、age、tel字段的value
1) "gyc"
2) "20"
3) "15623667886"
127.0.0.1:6379> HGETALL studentx # 获取studentx中所有的field及其value
1) "name"
2) "gyc"
3) "age"
4) "20"
5) "sex"
6) "1"
7) "tel"
8) "15623667886"
9) "email"
10) "12345@qq.com"
--------------------HKEYS--HLEN--HVALS--------------
127.0.0.1:6379> HKEYS studentx # 查看studentx中所有的field
1) "name"
2) "age"
3) "sex"
4) "tel"
5) "email"
127.0.0.1:6379> HLEN studentx # 查看studentx中的字段数量
(integer) 5
127.0.0.1:6379> HVALS studentx # 查看studentx中所有的value
1) "gyc"
2) "20"
3) "1"
4) "15623667886"
5) "12345@qq.com"
-------------------------HDEL--------------------------
127.0.0.1:6379> HDEL studentx sex tel # 删除studentx 中的sex、tel字段
HDEL studentx sex #删除hash指定key字段!对应的value值也就消失了!
(integer) 2
127.0.0.1:6379> HKEYS studentx
1) "name"
2) "age"
3) "email"
-------------HINCRBY--HINCRBYFLOAT------------------------
127.0.0.1:6379> HINCRBY studentx age 1 # studentx的age字段数值+1
(integer) 21
127.0.0.1:6379> HINCRBY studentx name 1 # 非整数字型字段不可用
(error) ERR hash value is not an integer
127.0.0.1:6379> HINCRBYFLOAT studentx weight 0.6 # weight字段增加0.6
"90.8"
命令 | 描述 |
---|---|
HSET key field value |
将哈希表 key 中的字段 field 的值设为 value 。重复设置同一个field会覆盖,返回0 |
HMSET key field1 value1 [field2 value2..] |
同时将多个 field-value (域-值)对设置到哈希表 key 中。 |
HSETNX key field value |
只有在字段 field 不存在时,设置哈希表字段的值。 |
HEXISTS key field |
查看哈希表 key 中,指定的字段是否存在。 |
HGET key field value |
获取存储在哈希表中指定字段的值 |
HMGET key field1 [field2..] |
获取所有给定字段的值 |
HGETALL key |
获取在哈希表key 的所有字段和值 |
HKEYS key |
获取哈希表key中所有的字段 |
HLEN key |
获取哈希表中字段的数量 |
HVALS key |
获取哈希表中所有值 |
HDEL key field1 [field2..] |
删除哈希表key中一个/多个field字段 |
HINCRBY key field n |
为哈希表 key 中的指定字段的整数值加上增量n,并返回增量后结果 一样只适用于整数型字段 |
HINCRBYFLOAT key field n |
为哈希表 key 中的指定字段的浮点数值加上增量 n。 |
HSCAN key cursor [MATCH pattern] [COUNT count] |
迭代哈希表中的键值对。 |
注意:
- Hash变更的数据user name age,尤其是用户信息之类的,经常变动的信息!
- Hash更适合于对象的存储,Sring更加适合字符串存储!
Zset(有序集合)
不同的是每个元素都会关联一个double类型的分数(score)。redis正是通过分数来为集合中的成员进行从小到大的排序。
score相同:按字典顺序排序
有序集合的成员是唯一的,但分数(score)却可以重复。
-------------------ZADD--ZCARD--ZCOUNT--------------
127.0.0.1:6379> ZADD myzset 1 m1 2 m2 3 m3 # 向有序集合myzset中添加成员m1 score=1 以及成员m2 score=2..
(integer) 2
127.0.0.1:6379> ZCARD myzset # 获取有序集合的成员数
(integer) 2
127.0.0.1:6379> ZCOUNT myzset 0 1 # 获取score在 [0,1]区间的成员数量
(integer) 1
127.0.0.1:6379> ZCOUNT myzset 0 -1 # 查看全部的值
(integer) 2
----------------ZINCRBY--ZSCORE--------------------------
127.0.0.1:6379> ZINCRBY myzset 5 m2 # 将成员m2的score +5
"7"
127.0.0.1:6379> ZSCORE myzset m1 # 获取成员m1的score
"1"
127.0.0.1:6379> ZSCORE myzset m2
"7"
--------------ZRANK--ZRANGE-----------------------------------
127.0.0.1:6379> ZRANK myzset m1 # 获取成员m1的索引,索引按照score排序,score相同索引值按字典顺序顺序增加
(integer) 0
127.0.0.1:6379> ZRANK myzset m2
(integer) 2
127.0.0.1:6379> ZRANGE myzset 0 1 # 获取索引在 0~1的成员
1) "m1"
2) "m3"
127.0.0.1:6379> ZRANGE myzset 0 -1 # 获取全部成员
1) "m1"
2) "m3"
3) "m2"
#testset=>{abc,add,amaze,apple,back,java,redis} score均为0
------------------ZRANGEBYLEX---------------------------------
127.0.0.1:6379> ZRANGEBYLEX testset - + # 返回所有成员
1) "abc"
2) "add"
3) "amaze"
4) "apple"
5) "back"
6) "java"
7) "redis"
127.0.0.1:6379> ZRANGEBYLEX testset - + LIMIT 0 3 # 分页 按索引显示查询结果的 0,1,2条记录
1) "abc"
2) "add"
3) "amaze"
127.0.0.1:6379> ZRANGEBYLEX testset - + LIMIT 3 3 # 显示 3,4,5条记录
1) "apple"
2) "back"
3) "java"
127.0.0.1:6379> ZRANGEBYLEX testset (- [apple # 显示 (-,apple] 区间内的成员
1) "abc"
2) "add"
3) "amaze"
4) "apple"
127.0.0.1:6379> ZRANGEBYLEX testset [apple [java # 显示 [apple,java]字典区间的成员
1) "apple"
2) "back"
3) "java"
-----------------------ZRANGEBYSCORE---------------------
127.0.0.1:6379> ZRANGEBYSCORE myzset 1 10 # 返回score在 [1,10]之间的的成员
1) "m1"
2) "m3"
3) "m2"
127.0.0.1:6379> ZRANGEBYSCORE myzset 1 5
1) "m1"
2) "m3"
127.0.0.1:6379> ZRANGEBYSCORE myzset -inf +inf #从负无穷到正无穷
127.0.0.1:6379> zadd salary 2500 6b92d6 #添加三个用户
(integer) 1
127.0.0.1: 6379> zadd salary 5000 dafran
(integer)
127.0.0.1: 6379> zadd salary 500 zhangsan
(integer) 1
# ZRANGEBYSCORE key min max
127.0.0.1:6379> ZRANGEBYSCORE sa1ary -inf +inf #显示全部的用户从小到大
1) "zhangsan"
2) "6b92d6"
3) "dafran"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf withscores #显示全部的用户并且附带成绩
1) "zhangsan"
2) "500”
3) "6b92d6"
4) "2500"
5) "dafran"
6) "5000"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf 2500 withscores #显示工资小于2500员工的升序排序!
1) "zhangsan"
2) "500”
3) "6b92d6"
4) "2500"
--------------------ZLEXCOUNT-----------------------------
127.0.0.1:6379> ZLEXCOUNT testset - +
(integer) 7
127.0.0.1:6379> ZLEXCOUNT testset [apple [java
(integer) 3
------------------ZREM--ZREMRANGEBYLEX--ZREMRANGBYRANK--ZREMRANGEBYSCORE--------------------------------
127.0.0.1:6379> ZREM testset abc # 移除成员abc
(integer) 1
127.0.0.1:6379> ZREMRANGEBYLEX testset [apple [java # 移除字典区间[apple,java]中的所有成员
(integer) 3
127.0.0.1:6379> ZREMRANGEBYRANK testset 0 1 # 移除排名0~1的所有成员
(integer) 2
127.0.0.1:6379> ZREMRANGEBYSCORE myzset 0 3 # 移除score在 [0,3]的成员
(integer) 2
# testset=> {abc,add,apple,amaze,back,java,redis} score均为0
# myzset=> {(m1,1),(m2,2),(m3,3),(m4,4),(m7,7),(m9,9)}
----------------ZREVRANGE--ZREVRANGEBYSCORE--ZREVRANGEBYLEX-----------
127.0.0.1:6379> ZREVRANGE myzset 0 3 # 按score递减排序,然后按索引,返回结果的 0~3
1) "m9"
2) "m7"
3) "m4"
4) "m3"
127.0.0.1:6379> ZREVRANGE myzset 0 -1 #从大到小排序
127.0.0.1:6379> ZREVRANGE myzset 2 4 # 返回排序结果的 索引的2~4
1) "m4"
2) "m3"
3) "m2"
127.0.0.1:6379> ZREVRANGEBYSCORE myzset 6 2 # 按score递减顺序 返回集合中分数在[2,6]之间的成员
1) "m4"
2) "m3"
3) "m2"
127.0.0.1:6379> ZREVRANGEBYLEX testset [java (add # 按字典倒序 返回集合中(add,java]字典区间的成员
1) "java"
2) "back"
3) "apple"
4) "amaze"
-------------------------ZREVRANK------------------------------
127.0.0.1:6379> ZREVRANK myzset m7 # 按score递减顺序,返回成员m7索引
(integer) 1
127.0.0.1:6379> ZREVRANK myzset m2
(integer) 4
# mathscore=>{(xm,90),(xh,95),(xg,87)} 小明、小红、小刚的数学成绩
# enscore=>{(xm,70),(xh,93),(xg,90)} 小明、小红、小刚的英语成绩
-------------------ZINTERSTORE--ZUNIONSTORE-----------------------------------
127.0.0.1:6379> ZINTERSTORE sumscore 2 mathscore enscore # 将mathscore enscore进行合并 结果存放到sumscore
(integer) 3
127.0.0.1:6379> ZRANGE sumscore 0 -1 withscores # 合并后的score是之前集合中所有score的和
1) "xm"
2) "160"
3) "xg"
4) "177"
5) "xh"
6) "188"
127.0.0.1:6379> ZUNIONSTORE lowestscore 2 mathscore enscore AGGREGATE MIN # 取两个集合的成员score最小值作为结果的
(integer) 3
127.0.0.1:6379> ZRANGE lowestscore 0 -1 withscores
1) "xm"
2) "70"
3) "xg"
4) "87"
5) "xh"
6) "93"
新增
- zadd key score value(可多个)
- ZADD key score member1 [score2 member2] 向有序集合添加一个或多个成员,或者更新已存在成员的分数
- ZINCRBY key n member 有序集合中对指定成员的分数加上增量 n
获取
ZCARD key 获取有序集合的成员数
ZCOUNT key min max 计算在有序集合中指定区间score的成员数
ZSCORE key member 返回有序集中,成员的分数值
ZRANK key member 返回有序集合中指定成员的索引
ZRANGE key start end 通过索引区间返回有序集合成指定区间内的成员
ZRANGEBYLEX key min max 通过字典区间返回有序集合的成员
ZRANGEBYSCORE key min max 通过分数返回有序集合指定区间内的成员==-inf 和 +inf分别表示最小最大值,只支持开区间()==
ZLEXCOUNT key min max 在有序集合中计算指定字典区间内成员数量
ZREVRANGE key start end 返回有序集中指定区间内的成员,通过索引,分数从高到底
ZREVRANGEBYSCORRE key max min 返回有序集中指定分数区间内的成员,分数从高到低排序
ZREVRANGEBYLEX key max min 返回有序集中指定字典区间内的成员,按字典顺序倒序
ZREVRANK key member 返回有序集合中指定成员的排名,有序集成员按分数值递减(从大到小)排序
ZINTERSTORE destination numkeys key1 [key2 ..]- 计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 key 中,
- numkeys:表示参与运算的集合数,将score相加作为结果的score
ZUNIONSTORE destination numkeys key1 [key2..]- 计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 key 中
ZSCAN key cursor [MATCH pattern] [COUNT count]- 迭代有序集合中的元素(包括元素成员和元素分值)
移除
zrem key value 删除集合中的某个元素
ZREM key member1 [member2..] 移除有序集合中一个/多个成员
ZREMRANGEBYLEX key min max 移除有序集合中给定的字典区间的所有成员
ZREMRANGEBYRANK key start stop 移除有序集合中给定的排名区间的所有成员
ZREMRANGEBYSCORE key min max 移除有序集合中给定的分数区间的所有成员 | 命令 | 描述 | | —- | —- | |ZADD key score member1 [score2 member2]
| 向有序集合添加一个或多个成员,或者更新已存在成员的分数 | |ZCARD key
| 获取有序集合的成员数 | |ZCOUNT key min max
| 计算在有序集合中指定区间score的成员数 | |ZINCRBY key n member
| 有序集合中对指定成员的分数加上增量 n | |ZSCORE key member
| 返回有序集中,成员的分数值 | |ZRANK key member
| 返回有序集合中指定成员的索引 | |ZRANGE key start end
| 通过索引区间返回有序集合成指定区间内的成员 | |ZRANGEBYLEX key min max
| 通过字典区间返回有序集合的成员 | |ZRANGEBYSCORE key min max
| 通过分数返回有序集合指定区间内的成员==-inf 和 +inf分别表示最小最大值,只支持开区间()== | |ZLEXCOUNT key min max
| 在有序集合中计算指定字典区间内成员数量 | |ZREM key member1 [member2..]
| 移除有序集合中一个/多个成员 | |ZREMRANGEBYLEX key min max
| 移除有序集合中给定的字典区间的所有成员 | |ZREMRANGEBYRANK key start stop
| 移除有序集合中给定的排名区间的所有成员 | |ZREMRANGEBYSCORE key min max
| 移除有序集合中给定的分数区间的所有成员 | |ZREVRANGE key start end
| 返回有序集中指定区间内的成员,通过索引,分数从高到底 | |ZREVRANGEBYSCORRE key max min
| 返回有序集中指定分数区间内的成员,分数从高到低排序 | |ZREVRANGEBYLEX key max min
| 返回有序集中指定字典区间内的成员,按字典顺序倒序 | |ZREVRANK key member
| 返回有序集合中指定成员的排名,有序集成员按分数值递减(从大到小)排序 | |ZINTERSTORE destination numkeys key1 [key2 ..]
| 计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 key 中,numkeys:表示参与运算的集合数,将score相加作为结果的score | |ZUNIONSTORE destination numkeys key1 [key2..]
| 计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 key 中 | |ZSCAN key cursor [MATCH pattern\\] [COUNT count]
| 迭代有序集合中的元素(包括元素成员和元素分值) |
应用案例:
- set排序 存储班级成绩表 工资表排序!
- 普通消息,1.重要消息 2.带权重进行判断
- 排行榜应用实现,取Top N测试
三种特殊数据类型
geospatial(地理位置)
使用经纬度定位地理坐标并用一个有序集合zset保存,所以zset命令也可以使用
官方文档https://www.redis.net.cn/order/3685.html
----------------geoadd---------------------
#添加地理位置
#规则:南北两极无法直接添加
#一般用java程序一次导入
#有效经纬度
- 有效的经度从-180度到180度。
- 有效的纬度从-85.05112878度到85.05112878度。
#geoadd 参数 key 值(纬度、经度、名称)
#geoadd key longitud(经度) latitude(纬度) member [..]
127.0.0.1:6379> geoadd china:city 116.40 39.90 bejing
(integer) 1
127.0.0.1:6379> geoadd china:city 121.47 31.23 shanghai
(integer) 1
127.0.0.1:6379> geoadd china:city 106.
(error) ERR wrong number of arguments for 'geoadd' command
127.0.0.1:6379> geoadd china:city 106.50 29.53 chongqing 108.96 34.26 xian
(integer) 2
127.0.0.1:6379> geoadd china:city 114.13 22.54 shengzhen
(integer) 1
----------------geopos---------------------
# geopos key member [member..]获取集合中的一个/多个成员坐标
#获取集合中的一个/多个成员坐标
127.0.0.1:6379> geopos china:city bejing
1) 1) "116.39999896287918091"
2) "39.90000009167092543"
127.0.0.1:6379> geopos china:city xian
1) 1) "108.96000176668167114"
2) "34.25999964418929977"
----------------geodist---------------------
#geodist key member1 member2 [unit]
#返回两个给定位置之间的距离
#unit必须是以下单位的其中一个:
- **m** 表示单位为米 默认。
- **km** 表示单位为千米。
- **mi** 表示单位为英里。
- **ft** 表示单位为英尺。
127.0.0.1:6379> geodist china:city bejing xian km
"910.0565"
127.0.0.1:6379> geodist china:city xian chongqing
"575046.9885"
127.0.0.1:6379> geodist china:city xian chongqing ft
"1886637.1015"
127.0.0.1:6379> geodist china:city xian chongqing mi
"357.3185"
127.0.0.1:6379> geodist china:city xian chongqing m
"575046.9885"
127.0.0.1:6379> geodist china:city xian chongqing
"575046.9885"
----------------georadius---------------------
#georadius key longitude latitude radius m|km|mi|ft [WITHCOORD][WITHDIST] [WITHHASH] [COUNT count]
#以给定的经纬度为中心, 返回集合包含的位置元素当中, 与中心的距离不超过给定最大距离的所有位置元素。
#通过`georadius`就可以完成 **附近的人**功能
#withcoord:带上坐标
#withdist:带上距离,单位与半径单位相同
#COUNT n : 只显示前n个(按距离递增排序)
127.0.0.1:6379> GEORADIUS china:city 120 30 500 km withcoord withdist
# 查询经纬度(120,30)
1) 1) "hangzhou"
2) "29.4151"
3) 1) "120.20000249147415"
2) "30.199999888333501"
2) 1) "shanghai"
2) "205.3611"
3) 1) "121.40000134706497"
2) "31.400000253193539"
------------GEORADIUSBYMEMBER---------------------------
#找出位于指定元素周围的其他元素
#城市之间的定位
#GEORADIUSBYMEMBER key member radius...功能与GEORADIUS相同,
#只是中心位置不是具体的经纬度,而是使用结合中已有的成员作为中心点。
127.0.0.1:6379> georadiusbymember china:city bejing 1000 km
1) "bejing"
2) "xian"
127.0.0.1:6379> georadiusbymember china:city shanghai 400 km
1) "shanghai"
------------geohash---------------------------
#geohash key member1 [member2..]
#返回一个或多个位置元素的Geohash表示。
#使用Geohash位置52点整数编码。
#二维的经纬度转换为一维的字符串
#两字符串越近,代表距离越近
127.0.0.1:6379> geohash china:city yichang shanghai # 获取成员经纬坐标的geohash表示
1) "wmrjwbr5250"
2) "wtw6ds0y300"
127.0.0.1:6379> geohash china:city bejing
1) "wx4fbxxfke0"
底层实现原理是Zset,可以使用Zset来操作地理数据
127.0.0.1:6379> zrange china:city 0 -1
1) "chongqing"
2) "xian"
3) "shengzhen"
4) "shanghai"
5) "bejing"
127.0.0.1:6379> zrem china:city shengzhen
(integer) 1
127.0.0.1:6379> zrange china:city 0 -1
1) "chongqing"
2) "xian"
3) "shanghai"
4) "bejing"
127.0.0.1:6379>
命令 | 描述 |
---|---|
geoadd key longitud(经度) latitude(纬度) member [..] |
将具体经纬度的坐标存入一个有序集合 |
geopos key member [member..] |
获取集合中的一个/多个成员坐标 |
geodist key member1 member2 [unit] |
返回两个给定位置之间的距离。默认以米作为单位。 |
georadius key longitude latitude radius m|km|mi|ft [WITHCOORD][WITHDIST] [WITHHASH] [COUNT count] |
以给定的经纬度为中心, 返回集合包含的位置元素当中, 与中心的距离不超过给定最大距离的所有位置元素。 |
GEORADIUSBYMEMBER key member radius... |
功能与GEORADIUS相同,只是中心位置不是具体的经纬度,而是使用结合中已有的成员作为中心点。 |
geohash key member1 [member2..] |
返回一个或多个位置元素的Geohash表示。使用Geohash位置52点整数编码。 |
hyperloglog(基数统计)
Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。
花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。
因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
其底层使用string数据类型
0.81%错误率!统计∪V任务,可以忽略不计的
什么是基数?
数据集中不重复的元素的个数。
应用场景:
网页的访问量(UV):一个用户多次访问,也只能算作一个人。
传统实现,存储用户的id,然后每次进行比较。当用户变多之后这种方式及其浪费空间,而我们的目的只是计数,Hyperloglog就能帮助我们利用最小的空间完成。
指令
----------PFADD--PFCOUNT---------------------
#PFADD key element1 [elememt2..]
#添加指定元素到 HyperLogLog 中
#PFCOUNT key [key]
#返回给定 HyperLogLog 的基数估算值。有误差
127.0.0.1:6379> PFADD myelemx a b c d e f g h i j k # 添加元素
(integer) 1
127.0.0.1:6379> type myelemx # hyperloglog底层使用String
string
127.0.0.1:6379> PFCOUNT myelemx # 统计myelemx的基数
(integer) 11
127.0.0.1:6379> PFADD myelemy i j k z m c b v p q s
(integer) 1
127.0.0.1:6379> PFCOUNT myelemy
(integer) 11
----------------PFMERGE-----------------------
#`PFMERGE destkey sourcekey [sourcekey..]`
#将多个 HyperLogLog 合并为一个 HyperLogLog
#destkey 新合成
#sourcekey 合成一
#sourcekey... 合成二、.....
127.0.0.1:6379> PFMERGE myelemz myelemx myelemy # 合并myelemx和myelemy 成为myelemz
OK
127.0.0.1:6379> PFCOUNT myelemz # 查看并集数量
(integer) 17
命令 | 描述 |
---|---|
PFADD key element1 [elememt2..] |
添加指定元素到 HyperLogLog 中 |
PFCOUNT key [key] |
返回给定 HyperLogLog 的基数估算值。 |
PFMERGE destkey sourcekey [sourcekey..] |
将多个 HyperLogLog 合并为一个 HyperLogLog |
bitmap(位图)
使用位存储,信息状态只有 0 和 1
Bitmap是一串连续的2进制数字(0或1),每一位所在的位置为偏移(offset),在bitmap上可执行AND,OR,XOR,NOT以及其它位操作。
一种数据结构
使用位存储,信息状态只有 0 和 1
Bitmap是一串连续的2进制数字(0或1),每一位所在的位置为偏移(offset)
在bitmap上可执行AND,OR,XOR,NOT以及其它位操作。
365 天 = 365 bit 1字节 = 8bit 46 个字节左右!
本质string
应用场景
签到统计、状态统计
命令 | 描述 |
---|---|
setbit key offset value |
为指定key的offset位设置值 |
getbit key offset |
获取offset位的值 |
bitcount key [start end] |
统计字符串被设置为1的bit数,也可以指定统计范围按字节 |
bitop operration destkey key[key..] |
对一个或多个保存二进制位的字符串 key 进行位元操作,并将结果保存到 destkey 上。 |
BITPOS key bit [start] [end] |
返回字符串里面第一个被设置为1或者0的bit位。start和end只能按字节,不能按位 |
------------setbit--getbit--------------
127.0.0.1:6379> setbit sign 0 1 # 设置sign的第0位为 1
(integer) 0
127.0.0.1:6379> setbit sign 2 1 # 设置sign的第2位为 1 不设置默认 是0
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> setbit sign 5 1
(integer) 0
127.0.0.1:6379> type sign
string
127.0.0.1:6379> getbit sign 2 # 获取第2位的数值
(integer) 1
127.0.0.1:6379> getbit sign 3
(integer) 1
127.0.0.1:6379> getbit sign 4 # 未设置默认是0
(integer) 0
-----------bitcount----------------------------
127.0.0.1:6379> BITCOUNT sign # 统计sign中为1的位数
(integer) 4
Jedis
- Jedis是Redis官方推荐的Java连接开发工具。
- 要在Java开发中使用好Redis中间件,必须对Jedis熟悉才能写成漂亮的代码。
1、导入依赖
<dependencies>
<!--Jedis-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
2、编码测试:
连接数据库- 修改redis.conf的配置文件
vim /usr/local/bin/myconfig/redis.conf
1
将只绑定本地注释
bind 127.0.0.1
```bash
保护模式改为 noprotected-mode yes 修改
protected-mode no
3. 允许后台运行
```bash
daemonize no改为yes
开放端口6379firewall-cmd --zone=public --add-port=6379/tcp --permanet
1
5. 重启防火墙服务
```bash
systemctl restart firewalld.service
1
1. 阿里云服务器控制台配置安全组
2. 重启redis-server
```bash
[root@AlibabaECS bin]# redis-server myconfig/redis.conf
1
> 测试连接
**TestPing.java**
```java
public class TestPing {
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.xx.xxx", 6379);
String response = jedis.ping();
System.out.println(response); // PONG
}
}
常用Api
String
List
Set
Hash
Zset
和上面操作一样
操作事务
public class TestTX {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1",6379);
jedis.flushDB(); //清空数据库
JSONObject jsonObject = new JSONObject();
jsonObject.put("name","dafran");
jsonObject.put("title","chi");
//开启事务
Transaction multi = jedis.multi();
String result = jsonObject.toJSONString();
//jedis.watch(result);
try {
multi.set("user1",result);
multi.set("user2",result);
int i = 1/0; //代码抛出异常事务,执行失败!
multi.exec(); //执行事务
} catch (Exception e) {
multi.discard(); //放弃事务
e.printStackTrace();
} finally {
System.out.println(jedis.get("user1"));
System.out.println(jedis.get("user2"));
jedis.close(); //关闭事务
}
}
}
Redis.conf详解
容量单位不区分大小写,G和GB有区别
启动的时候,就通过配置文件来启动。
单位
1、配置文件unit单位对大小写不敏感,
包含
可以使用 include 组合多个配置问题
就是好比我们学习 Spring、Import、include
网络
bind 127.0.0.1 # 绑定的ip
protected-mode yes # 保护模式
port 6379 #端口设置
通用 GENERAL
daemonize yes #以守护进程的方式运行,默认是no,需要自己的开启为yes
pidfile /var/run/redis_6379.pid #如果以后台方式运行,就需要指定一个pid文件
# 日志
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably) 生产环境
# warning (only very important / critical messages are logged)
loglevel notice
logfile "" # 日志输出文件名
databases 16 # 数据库的数量,默认为16个
always-show-logo yes # 是否总是显示logo
快照 SNAPSHOTTING
持久化,在规定的时间内,执行了多少次操作,则会持久化到文件 `.rdb ` `.aof`
由于Redis是基于内存的数据库,需要将数据由内存持久化到文件中,如果没有持久化,则会断电即失。
# 如果在900s内,如果至少1个key进行修改,我们就进行持久化操作
save 900 1
save 300 10
save 60 10000
# 持久化出错,是否还继续工作
stop-writes-on-bgsave-error yes
# 是否压缩 rdb 文件,需要消耗一些cpu资源
rdbcompression yes
# 保护rdb文件的时候,进行错误的检查校验
rdbchecksum yes
# rdb 文件保存目录
dir ./
主从复制 REPLICATION
################################# REPLICATION #################################
# Master-Replica replication. Use replicaof to make a Redis instance a copy of
# another Redis server. A few things to understand ASAP about Redis replication.
#
# +------------------+ +---------------+
# | Master | ---> | Replica |
# | (receive writes) | | (exact copy) |
# +------------------+ +---------------+
# 1) Redis replication is asynchronous, but you can configure a master to
# stop accepting writes if it appears to be not connected with at least
# a given number of replicas.
# 2) Redis replicas are able to perform a partial resynchronization with the
# master if the replication link is lost for a relatively small amount of
# time. You may want to configure the replication backlog size (see the next
# sections of this file) with a sensible value depending on your needs.
# 3) Replication is automatic and does not need user intervention. After a
# network partition replicas automatically try to reconnect to masters
# and resynchronize with them.
#
# replicaof <masterip> <masterport> # 配置主机ip和端口
# If the master is password protected (using the "requirepass" configuration
# directive below) it is possible to tell the replica to authenticate before
# starting the replication synchronization process, otherwise the master will
# refuse the replica request.
#
# masterauth <master-password> # 主机密码
安全
# requirepass foobared
# 可以进行密码设置,一般默认为空,而且都在命令中设置
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> config get requirepass # 获取redis密码
1) "requirepass"
2) ""
127.0.0.1:6379> config set requirepass 123456 # 设置redis密码
OK
127.0.0.1:6379> config get requirepass # 发现所有的命令都没有权限
(error)NoAUTH Authentication required
127.0.0.1:6379> ping
(error) NoAUTH Authentication required
127.0.0.1:6379> auth 123456 # 使用密码进行登录
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "123456"
限制 CLIENTS
maxclients 10000 # 设置能连接上redis的最大客户端的数量
maxmemory <bytes> # redis 配置最大的内存容量
maxmemory-policy noeviction # 内存到达上限之后的处理策略
1、volatile-lru:只对设置了过期时间的key进行LRU(默认值)
2、al1keys-lru:删除lru算法的key
3、volatile-random:随机删除即将过期key
4、a11keys-random:随机删除
5、vo1ati1e-ttl:删除即将过期的
6、noeviction: 永不过期,返回错误
客户端连接相关
maxclients 10000 最大客户端数量
maxmemory <bytes> 最大内存限制
maxmemory-policy noeviction # 内存达到限制值的处理策略
redis 中的默认的过期策略是 volatile-lru 。
设置方式
config set maxmemory-policy volatile-lru
1
maxmemory-policy 六种方式
1、volatile-lru:只对设置了过期时间的key进行LRU(默认值)
2、allkeys-lru : 删除lru算法的key
3、volatile-random:随机删除即将过期key
4、allkeys-random:随机删除
5、volatile-ttl : 删除即将过期的
6、noeviction : 永不过期,返回错误
aof配置
APPEND ONLY MODE 模式
appendonly no # 默认是不开启aof模式的,默认是使用rdb方法持久化的,在大部分所有的情况下,rdb完全够用
appendfilename "appendonly.aof" # 持久化的文件的名字
# appendfsync always # 每秒修改都会 sync,消耗性能。
appendfsync everysec # 每秒执行一次 sync,可能会丢失这1秒的数据。
# appendfsync no # 不执行 sync,这个时候操作系统自己同步数据,速度最快。
SpringBoot整合
SpringBoot操作数据: spring-data( jpa、jdbc、mongodb、redis)
SpringData也是和 SpringBoot齐名的项目!
说明: 在 SpringBoot2.x 之后,原来使用的jedis 被替换为了 lettuce?
- jedis : 采用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用 jedis pool 连接池! 更像 BIO 模式
- lettuce : 采用netty,实例可以再多个线程中进行共享,不存在线程不安全的情况!可以减少线程数据 了,更像 NIO 模式
配置详解
学习SpringBoot自动配置的原理时,整合一个组件并进行配置一定会有一个自动配置类xxxAutoConfiguration,并且在spring.factories中也一定能找到这个类的完全限定名。Redis也不例外。
RedisAutoConfiguration源码
两个bean:RedisTemplate和StringRedisTemplate
当看到xxTemplate时可以对比RestTemplat、SqlSessionTemplate,通过使用这些Template来间接操作组件。那么这俩也不会例外。分别用于操作Redis和Redis中的String数据类型。
RedisProperties.class
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
@Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class })
public class RedisAutoConfiguration {
@Bean
@ConditionalOnMissingBean(name = "redisTemplate")
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
- 可配置的参数
RedisConnectionFactory接口两实现类
jedis 配置不全
lettuce配置完全
源码分析
@Bean
@ConditionalOnMissingBean(name = {"redisTemplate"}) // 可以自定义 redisTemplate 来进行·替换默认的
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory)
throws UnknownHostException {
// 默认的 RedisTemplate 没有过多的设置,redis 对象都是需要序列化的
// 两个泛型都是 Object, Object 的类型,需要强制转换 <String,Object>
RedisTemplate<Object, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean // String是Redis中最常用类型,所以单独提出一个Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
整合测试
导入依赖<!--操作redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置连接# 配置Redis
spring:
redis:
host: 127.0.0.1
pool: 6379
# lettuce:
测试 ```java @SpringBootTest class Redis02SpringBootApplicationTests {@Autowired private RedisTemplate redisTemplate;
@Test void contextLoads() {
/*
redisTemplate 操作不同的数据类型
opsForValue 操作字符串 类似String
opsForHash() hash
opsForSet()
opsForList() 操作List 类似List
opsForZSet() 有序集合
opsForGeo() 地理位置
opsForHyperLogLog() 基数
*/
// 除了基本的操作,常用的方法都可以直接通过 redisTemplate 操作,比如事务、基本的CRUD
// 获取redis的连接对象
// RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
// connection.flushDb();
// connection.flushAll();
redisTemplate.opsForValue().set("name","dafran");
System.out.println(redisTemplate.opsForValue().get("name"));
}
}
![](https://gitee.com/dafran/pic-go/raw/master/img/image-20210415172703945.png#id=mfyaZ&originHeight=598&originWidth=1510&originalType=binary&status=done&style=none)
![](https://gitee.com/dafran/pic-go/raw/master/img/image-20210415173053714.png#id=t96Dq&originHeight=222&originWidth=1222&originalType=binary&status=done&style=none)
<a name="3bc2cbd1"></a>
#### 关于对象的保存
在User为未进行序列化,且传入的是对象
![](https://gitee.com/dafran/pic-go/raw/master/img/image-20210415204519582.png#id=y5eQq&originHeight=976&originWidth=1858&originalType=binary&status=done&style=none)
```java
@Data
@AllArgsConstructor
@NoArgsConstructor
@Component
// 所有的pojo都序列化
public class User implements Serializable {
private String name;
private Integer age;
}
自定义RedisTemplate
// 自定义RedisTemplate
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory){
//我们为了自己开发方便,一般直接使用 <String, Object>
RedisTemplate<String, Object> template = new RedisTemplate<String,Object>();
template.setConnectionFactory(factory);
// Json序列化配置
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
//转义
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// String的序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
自定义RedisUntil
使用RedisTemplate需要频繁调用**.opForxxx**
然后才能进行对应的操作,这样使用起来代码效率低下,工作中一般不会这样使用,而是将这些常用的公共API抽取出来封装成为一个工具类,然后直接使用工具类来间接操作Redis,不但效率高并且易用。
工具类参考博客:
https://www.cnblogs.com/zeng1994/p/03303c805731afc9aa9c60dbbd32a323.html
https://blog.csdn.net/qq_36635434/article/details/103992863
https://www.cnblogs.com/zhzhlong/p/11434284.html
package cn.dafran.utils;
/**
* @Classname RedisUtils
* @Description TODO
* @Author Administrator
* @Version V1.0
*/
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
public final class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// =============================common============================
/**
* 指定缓存失效时间
* @param key 键
* @param time 时间(秒)
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据key 获取过期时间
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断key是否存在
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
// ============================String=============================
/**
* 普通缓存获取
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 递增
* @param key 键
* @param delta 要增加几(大于0)
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
* @param key 键
* @param delta 要减少几(小于0)
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
// ================================Map=================================
/**
* HashGet
* @param key 键 不能为null
* @param item 项 不能为null
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
* @param key 键
* @param map 对应多个键值
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 并设置时间
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================
/**
* 根据key获取Set中的所有值
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取set缓存的长度
*
* @param key 键
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================list=================================
/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取list缓存的长度
*
* @param key 键
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
* @param key 键
* @param value 值
* @param time 时间(秒)
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
事务
Redis单条命令式保存原子性的,但是事务不保证原子性
Redis事务没有隔离级别的概念
Redis事务本质:一组命令的集合。
————————- 队列 set set set 执行 —————————-
事务中每条命令都会被序列化,执行过程中按顺序执行,不允许其他命令进行干扰。
- 一次性
- 顺序性
- 排他性
- Redis事务没有隔离级别的概念
- Redis单条命令是保证原子性的,但是事务不保证原子性!
所有的命令在事务中,并没有直接被执行!只有发起执行命令的时候才会执行,Exec
- 开启事务(
multi
) - 命令入队(……)
- 执行事务(
exec
)
事务操作
正常执行事务
127.0.0.1:6379> multi #开启事务
OK
127.0.0.1:6379> set k1 2
QUEUED
127.0.0.1:6379> set k2 3
QUEUED
127.0.0.1:6379> get k2
QUEUED
127.0.0.1:6379> exec #执行事务
1) OK
2) OK
3) "3"
取消事务(
**discurd**
)
127.0.0.1:6379> multi #开启事务
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> DISCARD # 放弃事务
OK
127.0.0.1:6379> EXEC
(error) ERR EXEC without MULTI # 当前未开启事务
127.0.0.1:6379> get k1 # 被放弃事务中命令并未执行
(nil)
事务错误
编译型异常(代码有问题!命令有错!),事务中所有的命令都不会被执行
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> error k1 # 这是一条语法错误命令
(error) ERR unknown command `error`, with args beginning with: `k1`, # 会报错但是不影响后续命令入队
127.0.0.1:6379> get k2
QUEUED
127.0.0.1:6379> EXEC
(error) EXECABORT Transaction discarded because of previous errors. # 执行报错
127.0.0.1:6379> get k1
(nil) # 其他命令并没有被执行
运行时异常(1/0),如果事务队列中存在语法性,那么执行命令的时候,其他命令是可以正常执行的,错误命令抛出异常
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> INCR k1 # 这条命令逻辑错误(对字符串进行增量)
QUEUED
127.0.0.1:6379> get k2
QUEUED
127.0.0.1:6379> exec
1) OK
2) OK
3) (error) ERR value is not an integer or out of range # 运行时报错
4) "v2" # 其他命令正常执行
# 虽然中间有一条命令报错了,但是后面的指令依旧正常执行成功了。
# 所以说Redis单条指令保证原子性,但是Redis事务不能保证原子性。
事务监控
使用
watch key
监控指定数据,相当于乐观锁加锁。
悲观锁
- 无论干什么都加锁,再去操作
乐观锁
- 无论干什么都不加锁,如果出现了问题,再次更新值测试
- 获取version
- 更新时,对比version
- 使用
watch key
监控指定数据,相当于乐观锁加锁。
正常执行
127.0.0.1:6379> set money 100 # 设置余额:100
OK
127.0.0.1:6379> set use 0 # 支出使用:0
OK
127.0.0.1:6379> watch money # 监视money (上锁)
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> DECRBY money 20
QUEUED
127.0.0.1:6379> INCRBY use 20
QUEUED
127.0.0.1:6379> exec # 监视值没有被中途修改,事务正常执行
1) (integer) 80
2) (integer) 20
测试多线程修改值,使用watch可以当做redis的乐观锁操作(相当于getversion)
我们启动另外一个客户端模拟插队线程。
线程1:
127.0.0.1:6379> watch money # money上锁
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> DECRBY money 20
QUEUED
127.0.0.1:6379> INCRBY use 20
QUEUED
127.0.0.1:6379> # 此时事务并没有执行
模拟线程插队,线程2:
127.0.0.1:6379> INCRBY money 500 # 修改了线程一中监视的money
(integer) 600
12
回到线程1,执行事务:
127.0.0.1:6379> EXEC # 执行之前,另一个线程修改了我们的值,这个时候就会导致事务执行失败
(nil) # 没有结果,说明事务执行失败
127.0.0.1:6379> get money # 线程2 修改生效
"600"
127.0.0.1:6379> get use # 线程1事务执行失败,数值没有被修改
"0"
解锁获取最新值,然后再加锁进行事务。
unwatch
进行解锁。
127.0.0.1:6379> UNWATCH # 1.如果发现事务执行失败,就先解锁。
OK
127.0.0.1:6379> WATCH money # 2.获取最新的值,再次监控。(select version)
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> DECRBY money 1
QUEUED
127.0.0.1:6379> incrBY money 1
QUEUED
127.0.0.1:6379> exec # 3.针对监控的值是否发生变化,如果没有变化,那么就可以执行成功,否则失败。
1) (integer) 999
2) (integer) 1000
注意:每次提交执行exec后都会自动释放锁,不管是否成功
Redis持久化
- Redis是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退岀,服务器中的数据库状态也会消失。所以 Redis提供了持久化功能。
- 可以将redis内存中的数据持久化保存到硬盘的文件中。
- redis持久化机制:RDB和AOF
RDB(Redis Data Base)
在进行 `**RDB**` 的时候,`**redis**` 的主线程是不会做 `**io**` 操作的,主线程会 `**fork**` 一个子线程来完成该操作;
- Redis 调用forks。同时拥有父进程和子进程。
- 子进程将数据集写入到一个临时 RDB 文件中。
- 当子进程完成对新 RDB 文件的写入时,Redis 用新 RDB 文件替换原来的 RDB 文件,并删除旧的 RDB 文件。
这种工作方式使得 Redis 可以从写时复制(copy-on-write)机制中获益(因为是使用子进程进行写操作,而父进程依然可以接收来自客户端的请求。)
在指定的时间间隔内将内存中的数据集快照写入磁盘I也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。**RDB的缺点是最后一次持久化后的数据可能丢失。**默认就是RDB,一般情况下不不要修改这个配置。
**有时候在生产环境会将这个文件进行备份!**
rdb保存文件是dump.rdb,都是在配置文件中快照中配置的。
触发机制
1、save的规则满足的情况下,会自动触发rdb规则
2、执行flushall命令,也会触发rdb规则!
3、退出redis,也会产生rdb文件。
save
使用 save
命令,会立刻对当前内存中的数据进行持久化 ,但是会阻塞,也就是不接受其他操作了;
由于
save
命令是同步命令,会占用Redis的主进程。若Redis数据非常多时,save
命令执行速度会非常慢,阻塞所有客户端的请求。
flushall命令
flushall
命令也会触发持久化 ;
触发持久化规则
满足配置条件中的触发条件 ;
可以通过配置文件对 Redis 进行设置, 让它在“ N 秒内数据集至少有 M 个改动”这一条件被满足时, 自动进行数据集保存操作。
bgsave
bgsave
是异步进行,进行持久化的时候,redis
还可以将继续响应客户端请求 ;
bgsave和save对比
命令 | save | bgsave |
---|---|---|
IO类型 | 同步 | 异步 |
阻塞? | 是 | 是(阻塞发生在fock(),通常非常快) |
复杂度 | O(n) | O(n) |
优点 | 不会消耗额外的内存 | 不阻塞客户端命令 |
缺点 | 阻塞客户端命令 | 需要fock子进程,消耗内存 |
如何恢复rdb文件
1、只需要将rdb文件放在redis启动目录就可以,redis启动的时候会自动检查dump.rdb恢复其中的数据。
2、查看需要存放的位置
127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin"
127.0.0.1:6379> #如果在这个目录下存在dump.rdb文件,启动就会自动恢复其中的数据
几乎默认的配置就够用
优点:
1、适合大规模的数据恢复!
2、对数据的完整性要不高!
缺点:
1、需要一定的时间间隔进程操作!如果Redis意外岩机了,这个最后一次修改数据就没有了!
2、fork进程的时候,会占用一定的内存空间!
AOP(Append Only File)
将所有命令都记录下来,history,恢复的时候就把这个文件全部在执行一遍!
快照功能(RDB)并不是非常耐久(durable): 如果 Redis 因为某些原因而造成故障停机, 那么服务器将丢失最近写入、以及未保存到快照中的那些数据。 从 1.1 版本开始, Redis 增加了一种完全耐久的持久化方式: AOF 持久化。
以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作
Aof保存的是appendonly.aof文件
默认不开启,需要自己手动配置,只需要将 appendonly 改为yes就开启了 aof,重启,redis 生效。
如果这个 aof 文件有错位,这时候 redis 是启动不起来的,我们需要修复这个aof文件
redis 给我们提供了一个工具redis-check-aof --fix
redis-check-aof --fix appendonly.aof
如果文件正常,重启就可以直接恢复了!
重写规则
aof 默认就是文件的无限追加,文件会越来越大!
如果 aof 文件大于 64m,太大了! fork一个新的进程来将文件进行重写!
优点
appendonly no # 默认是不开启aof模式的,默认是使用rdb方式持久化的,在大部分所有的情况下,rdb完全够用!
appendfilename "appendonly.aof" # 持久化的文件的名字
# appendfsync always # 每次修改都会 sync。消耗性能
appendfsync everysec # 每秒执行一次 sync,可能会丢失这1s的数据!
# appendfsync no # 不执行 sync,这个时候操作系统自己同步数据,速度最快!
# rewrite 重写,
- 每一次修改都会同步,文件的完整性会更加好
- 没秒同步一次,可能会丢失一秒的数据
- 从不同步,效率最高
缺点
- 相对于数据文件来说,aof远远大于rdb,修复速度比rdb慢!
- Aof运行效率也要比rdb慢,所以redis默认的配置就是rdb持久化
RDB 和 AOF 对比
RDB | AOF | |
---|---|---|
启动优先级 | 低 | 高 |
体积 | 小 | 大 |
恢复速度 | 快 | 慢 |
数据安全性 | 丢数据 | 根据策略决定 |
扩展:
1、RDB持久化方式能够在指定的时间间隔内对数据进行快照存储
2、AOF持久化方式
- 记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,
- AOF命令以Redis协议追加保存每次写的操作到文件末尾,
- Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大。
3、只做缓存,如果只希望数据在服务器运行的时候存在,也可以不使用任何持久化
4、同时开启两种持久化方式
- 在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。
- RDB的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要只使用AOF呢?作者建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF可能潜在的Bug,留着作为一个万一的手段。
5、性能建议
- 因为RDB文件只用作后备用途,建议只在Slave(从机)上持久化RDB文件,而且只要15分钟备份一次就够了,只保留save 900 1这条规则
- 如果Enable AOF
- 好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了,
- 代价一是带来了持续的IO,二是AOF rewrite的最后将rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%大小重写可以改到适当的数值。
- 如果不Enable AOF
- 仅靠Master-Slave Repllcation实现高可用性也可以,能省掉一大笔IO,也减少了rewrite时带来的系统波动。
- 代价是如果Master/Slave同时倒掉,会丢失十几分钟的数据,启动脚本也要比较两个Master/Slave中的RDB文件,载入较新的那个,微博就是这种架构。
Redis发布订阅
Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
Redis客户端可以订阅任意数量的频道。
订阅/发布消息图:
第一个:消息发送者,第二个:频道第三个:消息订阅者!
下图展示了频道channel1,以及订阅这个频道的三个客户端—client2、client5和client1之间的关系
当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送给订阅它的三个客户端:
命令
这些命令被广泛用于构建即时通信应用,比如网络聊天室(chatroom)和实时广播、实时提醒等。
命令 | 描述 |
---|---|
PSUBSCRIBE pattern [pattern..] |
订阅一个或多个符合给定模式的频道。 |
PUNSUBSCRIBE pattern [pattern..] |
退订一个或多个符合给定模式的频道。 |
PUBSUB subcommand [argument[argument]] |
查看订阅与发布系统状态。 |
PUBLISH channel message |
向指定频道发布消息 |
SUBSCRIBE channel [channel..] |
订阅给定的一个或多个频道。 |
SUBSCRIBE channel [channel..] |
退订一个或多个频道 |
测试
订阅端
127.0.0.1:6379> SUBSCRIBE dafran # 订阅一个频道 dafran
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "dafran"
3) (integer) 1
# 等待读取推送的消息
1) "message" # 消息
2) "dafran" # 频道名
3) "hello" # 消息内容
1) "message"
2) "dafran"
3) "this is a message"
发送端
127.0.0.1:6379> PUBLISH dafran "hello" # 发布者发布消息到频道
(integer) 1
127.0.0.1:6379> PUBLISH dafran "this is a message" # 发布者发布消息到频道
(integer) 1
-----------------查看活跃的频道------------
127.0.0.1:6379> PUBSUB channels
1) "dafran"
原理
Redis是使用C实现的,通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现,籍此加深对Redis的理解。
Redis通过PUBLISH、SUBSCRIBE和PSUBSCRIBE等命令实现发布和订阅功能。
通过SUBSCRIBE命令订阅某频道后,redis-server里维护了一个字典,字典的键就是一个个channel,而字典的值则是一个频道(链表),链表中保存了所有订阅这个channel的客户端。SUBSCRIBE命令的关键,就是将客户端添加到给定channel的订阅链表中。
通过PUBLISH命令向订阅者发送消息,redis-server会使用给定的频道作为键,在它所维护的channel字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
Pub/Sub从字面上理解就是发布(Publish)与订阅(Subscribe),在Redis中,你可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。
缺点
- 如果一个客户端订阅了频道,但自己读取消息的速度却不够快的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。
- 这和数据传输可靠性有关,如果在订阅方断线,那么他将会丢失所有在短线期间发布者发布的消息。
应用
1、实时消息系统!
2、事实聊天!(频道当做聊天室,将信息回显给所有人即可!)
3、订阅,关注系统都是可以的!
稍微复杂的场景使用消息中间件MQ
Reds主从复制
概念
- 主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。
- 前者称为主节点(master/leader),后者称为从节点(slave/follower);
- 数据的复制是单向的,只能由主节点到从节点。
- Master以写为主,Slave以读为主。
- 默认情况下,每台Redis服务器都是主节点;
- 且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。
主从复制的作用主要包括:
1、数据穴余:主从复制实现了数据的热备份,是持久化之外的一种数据穴余方式。
2、故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的允余。
3、负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
4、高可用(集群)基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。
一般来说,要将Redis运用于工程项目中,只使用一台Redis是万万不能的(宕机),原因如下:
1、从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大;
2、从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G,也不能将所有内存用作Redis存储内存,一般来说,单台Redis最大使用内存不应该超过20G。
电商网站上的商品,一般都是一次上传,无数次浏览的,说专业点也就是”多读少写”。
主从复制,读写分离。80%的情况下都是在进行读操作,减缓服务器的压力,架构中经常使用。
环境配置
只配置从库,不用配置主库!
127.0.0.1:6379> info replication # 查看当前库的信息
# Replication
role:master # 角色 master
connected_slaves:0 # 没有从机
master_repl_offset:0
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
127.0.0.1:6379>
复制三个
[root@iZ2vc9j72d3f0ypio6oewrZ bin]# cd dafranRedis/
[root@iZ2vc9j72d3f0ypio6oewrZ dafranRedis]# ls
redis.conf
[root@iZ2vc9j72d3f0ypio6oewrZ dafranRedis]# cp redis.conf redis79.conf
[root@iZ2vc9j72d3f0ypio6oewrZ dafranRedis]# cp redis.conf redis80.conf
[root@iZ2vc9j72d3f0ypio6oewrZ dafranRedis]# cp redis.conf redis81.conf
[root@iZ2vc9j72d3f0ypio6oewrZ dafranRedis]# ls
redis79.conf redis80.conf redis81.conf redis.conf
既然需要启动多个服务,就需要多个配置文件。每个配置文件对应修改以下信息:
都是daemonize yes
- 端口号
- pid文件名
- 日志文件名
- rdb文件名
修改完毕之后,启动3个redis服务器,
[root@iZ2vc9j72d3f0ypio6oewrZ ~]# cd /usr/local/bin
[root@iZ2vc9j72d3f0ypio6oewrZ bin]# redis-server dafranRedis/redis79.conf
[root@iZ2vc9j72d3f0ypio6oewrZ ~]# cd /usr/local/bin
[root@iZ2vc9j72d3f0ypio6oewrZ bin]# redis-server dafranRedis/redis80.conf
[root@iZ2vc9j72d3f0ypio6oewrZ ~]# cd /usr/local/bin
[root@iZ2vc9j72d3f0ypio6oewrZ bin]# redis-server dafranRedis/redis81.conf
可以通过进程信息查看当前库信息
[root@iZ2vc9j72d3f0ypio6oewrZ bin]# ps -ef|grep redis
root 26399 1 0 16:01 ? 00:00:00 redis-server *:6379
root 27992 1 0 16:02 ? 00:00:00 redis-server *:6380
root 29376 1 0 16:02 ? 00:00:00 redis-server *:6381
root 32585 26895 0 16:03 pts/0 00:00:00 grep --color=auto redis
一主二从
默认情况下,每台Redis服务器都是主节点,一般只用配从机
# 在从机中配置
127.0.0.1:6380> slaveof 127.0.0.1 6379 # SLAVEOF host 6379
OK
127.0.0.1:6380> info replication
# Replication
role:slave # 当前角色为从机
master_host:127.0.0.1 # 查看主机信息
master_port:6379
master_link_status:up
master_last_io_seconds_ago:0
master_sync_in_progress:0
slave_repl_offset:14
slave_priority:100
slave_read_only:1
connected_slaves:0
master_replid:dfc4b8b3eb155c50ee4e89966ab06c3a5bcd9985
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:14
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:14
# 在主机中查看
127.0.0.1:6379> info replication
# Replication
role:master # 当前角色为主机
connected_slaves:2 # 从机配置信息
slave0:ip=127.0.0.1,port=6380,state=online,offset=210,lag=0
slave1:ip=127.0.0.1,port=6381,state=online,offset=210,lag=0
master_replid:dfc4b8b3eb155c50ee4e89966ab06c3a5bcd9985
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:210
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:210
真实的从主配置应该在配置文件中配置,这样的话是永久的,命令只是暂时的
使用规则
从机只能读,不能写,主机可读可写但是多用于写。
主机写入
从机只能读取内容
当主机断电宕机后,默认情况下从机的角色不会发生变化 ,集群中只是失去了写操作,当主机恢复以后,又会连接上从机恢复原状。
当从机断电宕机后,若不是使用配置文件配置的从机,再次启动后变为主机是无法获取之前主机的数据的,若此时重新配置称为从机,又可以获取到主机的所有数据。这里就要提到一个同步原理。
第二条中提到,默认情况下,主机故障后,不会出现新的主机,有两种方式可以产生新的主机:- 从机手动执行命令
slaveof no one
,这样执行以后从机会独立出来成为一个主机 - 使用哨兵模式(自动选举)
- 从机手动执行命令
如果没有主机了,这个时候选择出来一个主机,手动!
如果主机断开了连接,可以使用SLAVEOF no one
让从机变成主机!其他的节点就可以手动连接到最新的主节点(手动)!如果这个时候原主机修复了,那么就重新连接。
复制原理
- Slave启动成功连接到 master后会发送一个sync同步命令
- Master接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到save,并完成一次完全同步。
- 全量复制:而save服务在接收到数据库文件数据后,将其存盘并加载到内存中
- 增量复制:Master继续将新的所有收集到的修改命令依次传给 slave,完成同步
- 只要是重新连接 master,一次完全同步(全量复制)将被自动执行,数据一定可以在从机中看见
层层链路
上一个M链接下一个 S
配置:
可以看出虽然修改链路这种结构,但依然还是从机,即使在主机宕机后还是不能充当主机。
手动配置新主机
- 如果主机断开了连接,
- 从节点可以使用
SLAVEOF no one
让自己变成主机! - 其他的节点就可以手动连接到最新的这个主节点(手动)!
- 即使这个时候主机修复了,也需要重新配置。
哨兵模式
(自动选主机的模式)
概述
主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。 Redis从2.8开始正式提供了Senηtine(哨兵)架构来解决这个问题
谋朝篡位的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。
哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是**哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个 Redis实例**。
这里的哨兵有两个作用
- 通过发送命令,让 Redis服务器返回监控其运行状态,包括主服务器和从服务器。
当哨兵监测到 master宕机,会自动将 slave切换成 master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。
然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。
假设主服务器容机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为**主观下线**。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行**failover[故障转移]**操作。切换成功后,就会通过**发布订阅模式**,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为**客观下线**。
测试
目前状态,一主二从
1、配置哨兵配置文件 sentinel.conf
# sentinel monitor 被监控的名称[随意起] host port 1
sentinel monitor myredis 127.0.0.1 6379 1
后面的这个数字1,代表主机宕机,slave投票看哪个从机接替成为主机,票数最多的,就会成为主机。
2、启动哨兵
redis-sentinel dafranRedis/sentinel.conf
[root@iZ2vc9j72d3f0ypio6oewrZ bin]# redis-sentinel dafranRedis/sentinel.conf
21841:X 21 Apr 2021 10:59:12.697 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
21841:X 21 Apr 2021 10:59:12.697 # Redis version=5.0.12, bits=64, commit=00000000, modified=0, pid=21841, just started
21841:X 21 Apr 2021 10:59:12.697 # Configuration loaded
_._
_.-``__ ''-._
_.-`` `. `_. ''-._ Redis 5.0.12 (00000000/0) 64 bit
.-`` .-```. ```\/ _.,_ ''-._
( ' , .-` | `, ) Running in sentinel mode
|`-._`-...-` __...-.``-._|'` _.-'| Port: 26379
| `-._ `._ / _.-' | PID: 21841
`-._ `-._ `-./ _.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' | http://redis.io
`-._ `-._`-.__.-'_.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' |
`-._ `-._`-.__.-'_.-' _.-'
`-._ `-.__.-' _.-'
`-._ _.-'
`-.__.-'
21841:X 21 Apr 2021 10:59:12.699 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
21841:X 21 Apr 2021 10:59:12.701 # Sentinel ID is 8097e6e892f54ee04ea0e628891fd60d4bb73f7a
21841:X 21 Apr 2021 10:59:12.701 # +monitor master myredis 127.0.0.1 6379 quorum 1
21841:X 21 Apr 2021 10:59:12.701 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
21841:X 21 Apr 2021 10:59:12.703 * +slave slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
如果Master节点断开了,这个时候就会从从机中随机选择一个服务器!(这里面有一个投票算法!)
哨兵日志
+failover-state-select-slave master myredis 127.0.0.1 6379 # 故障转移
+switch-master myredis 127.0.0.1 6379 127.0.0.1 6380 # 选择出新的主机转移
如果主机此时回来了,只能归并到新的主机下,当做从机,这就是哨兵模式的规则!
特点
优点:
1、哨兵集群,基于主从复制模式,所有的主从配置优点,它全有
2、主从可以切换,故障可以转移,系统的可用性就会更好
3、哨兵模式就是主从模式的升级,手动到自动,更加健壮!
缺点:
1、Redis不好啊在线扩容的,集群容量一旦到达上限,在线扩容就十分麻烦!
2、实现哨兵模式的配置其实是很麻烦的,里面有很多选择!
全部配置
# Example sentinel.conf
# 哨兵sentinel实例运行的端口 默认26379
port 26379
# 哨兵sentinel的工作目录
dir/tmp
# 哨兵sentinel监控的redis主节点的 ip port
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 1
# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒
# sentinel down-after-milliseconds <master-name> <milliseconds>
sentinel down-after-milliseconds mymaster 30000
# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步,
这个数字越小,完成failover所需的时间就越长,
但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。
可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。
# sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1
# 故障转移的超时时间 failover-timeout 可以用在以下这些方面:
# 1. 同一个sentinel对同一个master两次failover之间的间隔时间。
# 2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。
# 3.当想要取消一个正在进行的failover所需要的时间。
# 4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了
# 默认三分钟
# sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000
# SCRIPTS EXECUTION
# 配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。
# 对于脚本的运行结果有以下规则:
# 若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10
# 若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
# 如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
# 一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。
# 通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,
# 这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,
# 一个是事件的类型,
# 一个是事件的描述。
# 如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。
# 通知脚本
# sentinel notification-script <master-name> <script-path>
sentinel notification-script mymaster /var/redis/notify.sh
# 客户端重新配置主节点参数脚本
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。
# 以下参数将会在调用脚本时传给脚本:
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
# 目前<state>总是“failover”,
# <role>是“leader”或者“observer”中的一个。
# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的
# 这个脚本应该是通用的,能被多次调用,不是针对性的。
# sentinel client-reconfig-script <master-name> <script-path>
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
Redis缓存穿透和雪崩
服务的高可用问题
Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的冋题,就是数据的—致性问题,从严格意义上讲,这个问题无解。如果对数据的—致性要求很高,那么就不能使用缓存。
另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。
例如客户端请求查询的是2号用户,但缓存中只有1号用户,那客户端就会请求mysql数据库中查询,而数据库中页,没有2号用户。就会一直频繁的请求查询2号用户。
在默认情况下,用户请求数据时,会先在缓存(Redis)中查找,若没找到即缓存未命中,再在数据库中进行查找,数量少可能问题不大,可是一旦大量的请求数据(例如秒杀场景)缓存都没有命中的话,就会全部转移到数据库上,造成数据库极大的压力,就有可能导致数据库崩溃。网络安全中也有人恶意使用这种手段进行攻击被称为洪水攻击。
缓存穿透(查不到)
概述
缓存穿透的概念很简单,用户想要査询一个数据,发现 redis內存数据库没有,也就是缓存没有命中,于是向持久层数据库査询。发现也没有,于是本次査询失败。当用户很多的时候,缓存都没有命中(秒杀场景),于是都去请求了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。
解决方案
布隆过滤器
布隆过滤器是一种数据结构,对所有可能査询的参数以hash形式存储,在控制层先进行校验,不符合则丟弃,从而避免了对底层存储系统的查询压力
[https://blog.csdn.net/qq_33709582/article/details/108407706](https://blog.csdn.net/qq_33709582/article/details/108407706)
[https://blog.csdn.net/jiaomeng/article/details/1495500](https://blog.csdn.net/jiaomeng/article/details/1495500)
缓存空对象
当存储层不命中后,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源;
但是这种方法会存在两个问题
1、如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键;
2、即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响;
缓存击穿(量太大,缓存过期)
概述
相较于缓存穿透,缓存击穿的目的性更强,一个存在的key,在缓存过期的一刻,同时有大量的请求,这些请求都会击穿到DB,造成瞬时DB请求量大、压力骤增。这就是缓存被击穿,只是针对其中某个key的缓存不可用而导致击穿,但是其他的key依然可以使用缓存响应。
比如热搜排行上,一个热点新闻被同时大量访问就可能导致缓存击穿。
解决方案
设置热点数据永不过期
这样就不会出现热点数据过期的情况,但是当Redis内存空间满的时候也会清理部分数据,而且此种方案会占用空间,一旦热点数据多了起来,就会占用部分空间。
加互斥锁(分布式锁)
在访问key之前,采用SETNX(set if not exists)来设置另一个短期key来锁住当前key的访问,访问结束再删除该短期key。保证同时刻只有一个线程访问。这样对锁的要求就十分高。
缓存雪崩(缓存击穿的升级)
概念
缓存雪崩,是指在某一个时间段,缓存集中过期失效。Redis 宕机!
产生雪崩的原因之一,马上就要到双十二零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,
双十一:掉一些服务,(保证主要的服务可用)
对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。
其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然形成的缓存雪崩。一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的,很有可能瞬间就把数据库压垮。
解决方案
redis高可用
这个思想的含义是,既然redis有可能挂掉,那可以多增设几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群。(异地多活)
限流降级
这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对 某个key只允许一个线程查询数据和写缓存,其他线程等待。
数据预热
含义就是在正式部署之前,先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。