- 一、NoSQL数据库介绍
- 二、Redis概述安装
- 三、常用五大数据类型
- 四、Redis配置文件介绍
- 五、Redis新数据类型
- 六、Redis_Jedis测试
- 七、整合Springboot
- 八、Redis事务操作
- 4. 事务冲突问题
- 5. 秒杀案例
- 4. 解决库存遗留问题
- 八、Redis持久化
一、NoSQL数据库介绍
1. 技术发展
技术的分类
- 解决功能性的问题:Java、Jsp、RDBMS、Tomcat、HTML、Linux、JDBC、SVN
- 解决扩展性的问题:Struts、Spring、SpringMVC、Hibernate、Mybatis
- 解决性能的问题:NoSQL、Java线程、Hadoop、Nginx、MQ、ElasticSearch
1. Web1.0时代
Web1.0的时代,数据访问量很有限,用一夫当关的高性能的单点服务器可以解决大部分问题
2. Web2.0时代
随着Web2.0的时代的到来,用户访问量大幅度提升,同时产生了大量的用户数据。加上后来的智能移动设备的普及,所有的互联网平台都面临了巨大的性能挑战
3. 解决CPU及内存压力
4. 解决IO压力
2. NoSQL数据库
1. NoSQL数据库概述
NoSQL(NoSQL = Not Only SQL ),意即“不仅仅是SQL”,泛指非关系型的数据库。
NoSQL 不依赖业务逻辑方式存储,而以简单的key-value模式存储。因此大大的增加了数据库的扩展能力。
- 不遵循SQL标准
- 不支持ACID
- 远超于SQL的性能
2. NoSQL适用场景
- 对数据高并发的读写
- 海量数据的读写
- 对数据高可扩展性的
3. NoSQL不适用场景
- 需要事务支持
- 基于sql的结构化查询存储,处理复杂的关系,需要即席查询
用不着sql的和用了sql也不行的情况,请考虑用NoSql
3. 常用的NoSQL数据库
1. Memcache
- 很早出现的NoSql数据库
- 数据都在内存中,一般不持久化
- 支持简单的key-value模式,支持类型单一
- 一般是作为缓存数据库辅助持久化的数据库
2. Redis
- 几乎覆盖了Memcached的绝大部分功能
- 数据都在内存中,支持持久化,主要用作备份恢复
- 除了支持简单的key-value模式,还支持多种数据结构的存储,比如 list、set、hash、zset等。
- 一般是作为缓存数据库辅助持久化的数据库
3. MongoDB
- 高性能、开源、模式自由(schema free)的文档型数据库
- 数据都在内存中, 如果内存不足,把不常用的数据保存到硬盘
- 虽然是key-value模式,但是对value(尤其是json)提供了丰富的查询功能
- 支持二进制数据及大型对象
- 可以根据数据的特点替代RDBMS ,成为独立的数据库。或者配合RDBMS,存储特定的数据。
3. 行式存储数据库(大数据时代)
1. 行式数据库
2. 列式数据库
3. 常用的海量型数据库
1. Hbase
HBase是Hadoop项目中的数据库。它用于需要对大量的数据进行随机、实时的读写操作的场景中。
HBase的目标就是处理数据量非常庞大的表,可以用普通的计算机处理超过10亿行数据,还可处理有数百万列元素的数据表。
2. Cassandra[kəˈsændrə]
Apache Cassandra是一款免费的开源NoSQL数据库,其设计目的在于管理由大量商用服务器构建起来的庞大集群上的海量数据集**(数据量通常达到PB级别)**。在众多显著特性当中,Cassandra最为卓越的长处是对写入及读取操作进行规模调整,而且其不强调主集群的设计思路能够以相对直观的方式简化各集群的创建与扩展流程。
4. 图关系型数据库
主要应用:社会关系,公共交通网络,地图及网络拓谱(n*(n-1)/2)
5. DB-Engines 数据库排名
http://db-engines.com/en/ranking
二、Redis概述安装
- Redis是一个开源的key-value存储系统。
- 和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set —有序集合)和hash(哈希类型)。
- 这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。
- 在此基础上,Redis支持各种不同方式的排序。
- 与memcached一样,为了保证效率,数据都是缓存在内存中。
- 区别的是Redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件。
- 并且在此基础上实现了master-slave(主从)同步。
1. 应用场景
1. 配合关系型数据库做高速缓存
2. Redis安装
Redis官方网站 | Redis中文官方网站 |
---|---|
http://redis.io | http://redis.cn/ |
1. 安装版本
- 6.2.1 for Linux(redis-6.2.1.tar.gz)
- 不用考虑在windows环境下对Redis的支持
2. 安装步骤
1. 准备工作:下载安装最新版的gcc编译器
安装C语言的运行环境
yum install centos-release-scl scl-utils-build
yum install -y devtoolset-8-toolchain
scl enable devtoolset-8 bash
测试 gcc版本
gcc --version
2. 下载redis-6.2.1.tar.gz放/opt目录
3. 解压命令:tar -zxvf redis-6.2.1.tar.gz
4. 解压完成后进入目录:cd redis-6.2.1
5. 在redis-6.2.1目录下再次执行make命令(只是编译好)
如果没有准备好C语言编译环境,make 会报错—Jemalloc/jemalloc.h:没有那个文件
解决方案:运行make distclean
在redis-6.2.1目录下再次执行make命令(只是编译好)
6. 跳过make test 继续执行: make install
3. 安装目录:/usr/local/bin
查看默认安装路径
redis-benchmark:性能测试工具,可以在自己本子运行,看看自己本子性能如何
redis-check-aof:修复有问题的AOF文件,rdb和aof后面讲
redis-check-dump:修复有问题的dump.rdb文件
redis-sentinel:Redis集群使用
redis-server:Redis服务器启动命令
redis-cli:客户端,操作入口
4. 前台启动(不推荐)
前台启动,命令行窗口不能关闭,否则服务器停止
5. 后台启动
1. 备份 redis.conf
拷贝一份redis.conf到其他目录
cp /opt/redis-3.2.5/redis.conf /myredis
2. 后台启动设置daemonize no改成yes
修改redis.conf(128行)文件将里面的daemonize no 改成 yes,让服务在后台启动
3. Redis启动
redis-server/myredis/redis.conf
4. 用客户端访问:redis-cli
5. 多个端口可以:redis-cli -p6379
6. 测试验证: ping
7. Redis关闭
单实例关闭:_redis-cli shutdown_
也可以进入终端后再关闭
多实例关闭,指定端口关闭:redis-cli -p 6379 shutdown
3. Redis介绍相关知识
默认16个数据库,类似数组下标从0开始,初始默认使用0号库
使用命令 select <dbid>
来切换数据库
统一密码管理,所有库同样密码
dbsize
查看当前数据库的key的数量flushdb
清空当前库flushall
通杀全部库
Redis**是单线程+多路IO复用技术
**多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)
串行 vs 多线程+锁(memcached) vs 单线程+多路IO复用(Redis)(与Memcache三点不同: 支持多数据类型,支持持久化,单线程+多路IO复用)
4. Docker使用Redis
1. 获取镜像
docker pull redis
也可以指定版本号,不加版本号默认获取最新版本,也可以使用 docker search redis 查看镜像来源
docker pull redis:4.0.1
[root@localhost docker]# docker search redis
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
redis Redis is an open source key-value store that… 7486 [OK]
bitnami/redis Bitnami Redis Docker Image 130 [OK]
sameersbn/redis 78 [OK]
grokzen/redis-cluster Redis cluster 3.0, 3.2, 4.0 & 5.0 62
rediscommander/redis-commander Alpine image for redis-commander - Redis man… 31 [OK]
...
2. 查看本地镜像
[root@localhost docker]# docker images | grep redis
redis latest de25a81a5a0b 2 weeks ago 98.2MB
hub.c.163.com/library/redis latest d4f259423416 2 years ago 106MB
3. 修改配置文件
1. 创建配置文件目录存放redis.conf,文件从官网下载
2. 创建文件夹,新建配置文件贴入从官网下载的配置文件并修改
mkdir /usr/local/docker/redis
vi /usr/local/docker/redis/redis.conf
3. 修改启动默认配置(从上至下依次)
bind 127.0.0.1 #注释掉这部分,这是限制redis只能本地访问
protected-mode no #默认yes,开启保护模式,限制为本地访问
daemonize no#默认no,改为yes意为以守护进程方式启动,可后台运行,除非kill进程,改为yes会使配置文件方式启动redis失败
databases 16 #数据库个数(可选),我修改了这个只是查看是否生效。。
dir ./ #输入本地redis数据库存放文件夹(可选)
appendonly yes #redis持久化(可选)
4. docker启动redis
1. 快速启动
docker run -itd --name redis-test -p 6379:6379 redis
参数说明:
-p 6379:6379
:映射容器服务的 6379 端口到宿主机的 6379 端口。外部可以直接通过宿主机ip:6379 访问到 Redis 的服务。
2. 配置文件启动
docker run -p 6379:6379 --name myredis -v /usr/local/docker/redis/redis.conf:/etc/redis/redis.conf -v /usr/local/docker/redis/data:/data -d redis redis-server /etc/redis/redis.conf --appendonly yes
参数解释说明:
-p 6379:6379
端口映射:前表示主机部分,:后表示容器部分。--name myredis
指定该容器名称,查看和进行操作都比较方便。-v
挂载目录,重要: 配置文件映射,docker镜像redis 默认无配置文件。-d redis
表示后台启动redisredis-server /etc/redis/redis.conf
以配置文件启动redis,加载容器内的conf文件,最终找到的是挂载的目录/usr/local/docker/redis/redis.conf,重要: docker 镜像reids 默认 无配置文件启动--appendonly yes
开启redis 持久化
5. 查看是否运行成功
[root@localhost docker]# docker ps -a | grep redis
602659d1b445 redis "docker-entrypoint.s…" 54 minutes ago Up 54 minutes 0.0.0.0:6379->6379/tcp modest_ramanujan
65bb6e6cab9f redis "docker-entrypoint.s…" About an hour a
三、常用五大数据类型
哪里去获得redis常见数据类型操作命令http://www.redis.cn/commands.html
1. Redis键(Key)
keys *
查看当前库所有key (匹配:keys *1)exists key
判断某个key是否存在type key
查看你的key是什么类型del key
删除指定的key数据unlink key
根据value选择非阻塞删除(仅将keys从keyspace元数据中删除,真正的删除会在后续异步操作)expire key 10
为给定的key设置过期时间(10秒)ttl key
查看还有多少秒过期,-1表示永不过期,-2表示已过期select
命令切换数据库dbsize
查看当前数据库的key的数量flushdb
清空当前库flushall
通杀全部库
2. Redis字符串(String)
1. 简介
String是Redis最基本的类型,你可以理解成与Memcached一模一样的类型,一个key对应一个value。
String类型是二进制安全的。意味着Redis的string可以包含任何数据。比如jpg图片或者序列化的对象。
String类型是Redis最基本的数据类型,一个Redis中字符串value最多可以是512M
2. 常用命令
set <key> <value>
: 添加键值对get <key>
:查询对应键值append <key> <value>
:将给定的追加到原值的末尾 strlen <key>
:获得值的长度setnx <key> <value>
:只有在 key 不存在时 设置 key的值incr <key>
:将 key 中储存的数字值增1(只能对数字值操作,如果为空,新增值为1)decr <key>
:将 key 中储存的数字值减1(只能对数字值操作,如果为空,新增值为-1)incrby / decrby <key> <步长>
:将key 中储存的数字值增减。自定义步长
原子性:
所谓原子操作是指不会被线程调度机制打断的操作;
这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)
- 在单线程中, 能够在单条指令中完成的操作都可以认为是”原子操作”,因为中断只能发生于指令之间。
- 在多线程中,不能被其它进程(线程)打断的操作就叫原子操作。
Redis单命令的原子性主要得益于Redis的单线程。
mset <key1> <value1> <key2> <value2> .....
: 同时设置一个或多个key-value对mget <key1> <key2> <key3> .....
:同时获取一个或多个valuemsetnx <key1> <value1> <key2> <value2> .....
:同时设置一个或多个key-value 对,当且仅当所有给定 key 都不存在。getrange <key> <起始位置> <结束位置>
:获得值的范围,类似java中的substring,前包,后包setrange <key> <起始位置> <value>
:用覆写 所储存的字符串值,从<起始位置>开始(索引从0开始)。 setex <key> <过期时间> <value>
:设置键值的同时,设置过期时间,单位秒。getset <key><value>
:以新换旧,设置了新值同时获得旧值。
3. 数据结构
String的数据结构为简单动态字符串(Simple Dynamic String,缩写SDS)。是可以修改的字符串,内部结构实现上类似于Java的ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配
如图中所示,内部为当前字符串实际分配的空间capacity一般要高于实际字符串长度len。当字符串长度小于1M时,扩容都是加倍现有的空间,如果超过1M,扩容时一次只会多扩1M的空间。需要注意的是字符串最大长度为512M
3. Redis列表(List)
1. 简介
单键多值
Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)
它的底层实际是个双向链表,对两端的操作性能很高,通过索引下标的操作中间的节点性能会较差
2. 常用命令
lpush/rpush <key> <value1> <value2> <value3> ....
:从左边/右边插入一个或多个值。lpop/rpop <key>
:从左边/右边吐出一个值。值在键在,值光键亡。rpoplpush <key1> <key2>
:从列表右边吐出一个值,插到 列表左边。 lrange <key> <start> <stop>
:按照索引下标获得元素(从左到右),lrange mylist 0 -1
:0左边第一个,-10右边第一个,(0 -1表示获取所有)lindex <key> <index>
:按照索引下标获得元素(从左到右)llen <key>
:获得列表长度linsert <key> before <value> <newvalue>
:在的后面插入 插入值 lrem <key> <n> <value>
:从左边删除n个value(从左到右)lset <key> <index> <value>
:将列表key下标为index的值替换成value
3. 数据结构
List的数据结构为快速链表quickList,首先在列表元素较少的情况下会使用一块连续的内存存储,这个结构是ziplist,也即是压缩列表。
它将所有的元素紧挨着一起存储,分配的是一块连续的内存。
当数据量比较多的时候才会改成quicklist。
因为普通的链表需要的附加指针空间太大,会比较浪费空间。比如这个列表里存的只是int类型的数据,结构上还需要两个额外的指针prev和next。
Redis将链表和ziplist结合起来组成了quicklist。也就是将多个ziplist使用双向指针串起来使用。这样既满足了快速的插入删除性能,又不会出现太大的空间冗余。
4. Redis集合(Set)
1. 简介
Redis set对外提供的功能与list类似是一个列表的功能,特殊之处在于set是可以自动排重的,当你需要存储一个列表数据,又不希望出现重复数据时,set是一个很好的选择,并且set提供了判断某个成员是否在一个set集合内的重要接口,这个也是list所不能提供的。
Redis的Set是string类型的无序集合。它底层其实是一个value为null的hash表,所以添加,删除,查找的复杂度都是O(1)
一个算法,随着数据的增加,执行时间的长短,如果是O(1),数据增加,查找数据的时间不变
2. 常用命令
sadd <key> <value1> <value2> .....
:将一个或多个 member 元素加入到集合 key 中,已经存在的 member元素将被忽略smembers <key>
:取出该集合的所有值。sismember <key> <value>
:判断集合是否为含有该 值,有1,没有0 scard<key>
:返回该集合的元素个数。srem <key> <value1> <value2> ....
:删除集合中的某个元素。spop <key>
:随机从该集合中吐出一个值srandmember <key> <n>
:随机从该集合中取出n个值。不会从集合中删除smove <source> <destination> <value>
:把集合中一个值从一个集合移动到另一个集合sinter <key1> <key2>
:返回两个集合的交集元素。sunion <key1> <key2>
:返回两个集合的并集元素。sdiff <key1><key2>
:返回两个集合的差集元素(key1中的,不包含key2中的)
3. 数据结构
Set数据结构是dict字典,字典是用哈希表实现的。
Java中HashSet的内部实现使用的是HashMap,只不过所有的value都指向同一个对象。Redis的set结构也是一样,它的内部也使用hash结构,所有的value都指向同一个内部值。
5. Redis哈希(Hash)
1. 简介
Redis hash 是一个键值对集合。
Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象。
类似Java里面的Map
用户ID为查找的key,存储的value用户对象包含姓名,年龄,生日等信息,如果用普通的key/value结构来存储
主要有以下2种存储方式:
方式一
图一:每次修改用户的某个属性需要,先反序列化改好后再序列化回去。开销较大。
图二:用户ID数据冗余
方式二
通过 key(用户ID) + field(属性标签) 就可以操作对应属性数据了,既不需要重复存储数据,也不会带来序列化和并发修改控制的问题
**
2. 常用命令
hset <key> <field> <value>
:给集合中的 键赋值 hget <key1> <field>
:从集合 取出 value hmset <key1> <field1> <value1> <field2> <value2> ...
:批量设置hash的值hexists <key1> <field>
:查看哈希表 key 中,给定域 field 是否存在。hkeys <key>
:列出该hash集合的所有fieldhvals <key>
:列出该hash集合的所有valuehincrby <key> <field> <increment>
:为哈希表 key 中的域 field 的值加上增量 1 -1hsetnx <key> <field> <value>
:将哈希表 key 中的域 field 的值设置为 value ,当且仅当域 field不存在
3. 数据结构
Hash类型对应的数据结构是两种:ziplist(压缩列表),hashtable(哈希表)。当field-value长度较短且个数较少时,使用ziplist,否则使用hashtable
6. Redis有序集合Zset(sorted set)
1. 简介
Redis有序集合zset与普通集合set非常相似,是一个没有重复元素的字符串集合
不同之处是有序集合的每个成员都关联了一个评分(**score)**,这个评分(score)被用来按照从最低分到最高分的方式排序集合中的成员。集合的成员是唯一的,但是评分可以是重复了
因为元素是有序的, 所以你也可以很快的根据评分(score)或者次序(position)来获取一个范围的元素。
访问有序集合的中间元素也是非常快的,因此你能够使用有序集合作为一个没有重复成员的智能列表。
2. 常用命令
zadd <key> <score1> <value1> <score2> <value2> …
:将一个或多个 member 元素及其 score 值加入到有序集 key当中。zrange <key> <start> <stop> [WITHSCORES]
:返回有序集 key 中,下标在之间的元素带WITHSCORES,可以让分数一起和值返回到结果集。 zrangebyscore key minmax[withscores] [limit offset count]
:返回有序集 key 中,所有 score 值介于 min和 max 之间(包括等于 min或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。z``rev``rangebyscore <key> ``maxmin`` [withscores] [limit offset count]
:同上,改为从大到小排列。zincrby <key> <increment> <value>
为元素的score加上增量zrem <key> <value>
:删除该集合下,指定值的元素zcount <key> <min> <max>
:统计该集合,分数区间内的元素个数zrank <key> <value>
:返回该值在集合中的排名,从0开始。
3. 数据结构
SortedSet(zset)是Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map
zset底层使用了两个数据结构
- hash:hash的作用就是关联元素value和权重score,保障元素value的唯一性,可以通过元素value找到相应的score值
- 跳跃表:跳跃表的目的在于给元素value排序,根据score的范围获取元素列表
4. 跳跃表(跳表)
简介
有序集合在生活中比较常见,例如根据成绩对学生排名,根据得分对玩家排名等。对于有序集合的底层实现,可以用数组、平衡树、链表等。数组不便元素的插入、删除;平衡树或红黑树虽然效率高但结构复杂;链表查询需要遍历所有效率低。Redis采用的是跳跃表。跳跃表效率堪比红黑树,实现远比红黑树简单。
实例
对比有序链表和跳跃表,从链表中查询出51
- 有序列表
要查找值为51的元素,需要从第一个元素开始依次查找、比较才能找到。共需要6次比较。
- 跳跃表
从第2层开始,1节点比51节点小,向后比较。
21节点比51节点小,继续向后比较,后面就是NULL了,所以从21节点向下到第1层
在第1层,41节点比51节点小,继续向后,61节点比51节点大,所以从41向下
在第0层,51节点为要查找的节点,节点被找到,共查找4次。
四、Redis配置文件介绍
创建配置文件目录存放redis.conf,文件从官网下载
1. Units单位
配置大小单位,开头定义了一些基本的度量单位,只支持bytes,不支持bit,并且对大小写不敏感
2. Includes包含
类似jsp中的include,多实例的情况可以把公用的配置文件提取出来
3. 网络相关配置
1. bind
默认情况 bind=127.0.0.1
只能接受本机的访问请求
不写的情况下,无限制接受任何ip地址的访问
生产环境肯定要写你应用服务器的地址;服务器是需要远程访问的,所以需要将其注释掉
如果开启了protected-mode,那么在没有设定bind ip且没有设密码的情况下,Redis只允许接受本机的响应
2. protected-mode
将本机访问保护模式设置no(yes:仅允许本机访问)
3. port
端口号,默认 6379
4. tcp-backlog
设置tcp的backlog,backlog其实是一个连接队列,backlog队列总和 = 未完成三次握手队列 + 已经完成三次握手队列
在高并发环境下你需要一个高backlog值来避免慢客户端连接问题
Linux内核会将这个值减小到/proc/sys/net/core/somaxconn的值(128),所以需要确认增大/proc/sys/net/core/somaxconn和/proc/sys/net/ipv4/tcp_max_syn_backlog(128)两个值来达到想要的效果
5. timeout
一个空闲的客户端维持多少秒会关闭,0表示关闭该功能。即永不关闭
6. tcp-keepalive
对访问客户端的一种心跳检测,每个n秒检测一次
单位为秒,如果设置为0,则不会进行Keepalive检测,建议设置成60
4. General通用
1. daemonize
是否为后台进程,设置为yes
守护进程,后台启动
2. pidfile
存放pid文件的位置,每个实例会产生一个不同的pid文件
3. loglevel
指定日志记录级别,Redis总共支持四个级别:debug、verbose、notice、warning,默认为notice
四个级别根据使用阶段来选择,生产环境选择notice 或者warning
4. logfile
日志文件名称
5. database
设定库的数量 默认16,默认数据库为0,可以使用SELECT
5. Security安全
1. 普通安装设置密码
方式一:通过配置文件进行设置
去掉行前的注释,并修改密码为所需的密码,保存文件
requirepass myRedis
重启redis
sudo service redis restart
#或者
sudo service redis stop
sudo redis-server /etc/redis.conf
这个时候尝试登录redis,发现可以登上,但是执行具体命令是提示操作不允许
redis-cli -h 127.0.0.1 -p 6379
redis 127.0.0.1:6379>
redis 127.0.0.1:6379> keys *
(error) ERR operation not permitted
redis 127.0.0.1:6379> select 1
(error) ERR operation not permitted
redis 127.0.0.1:6379[1]>
尝试用密码登录并执行具体的命令看到可以成功执行
redis-cli -h 127.0.0.1 -p 6379 -a myRedis
redis 127.0.0.1:6379> keys *
1) "myset"
2) "mysortset"
redis 127.0.0.1:6379> select 1
OK
redis 127.0.0.1:6379[1]> config get requirepass
1) "requirepass"
2) "myRedis"
方式二:通过命令行进行配置
redis 127.0.0.1:6379[1]> config set requirepass my_redis
OK
redis 127.0.0.1:6379[1]> config get requirepass
1) "requirepass"
2) "my_redis"
无需重启redis
使用第一步中配置文件中配置的老密码登录redis,会发现原来的密码已不可用,操作被拒绝
redis-cli -h 127.0.0.1 -p 6379 -a myRedis
redis 127.0.0.1:6379> config get requirepass
(error) ERR operation not permitted
使用修改后的密码登录redis,可以执行相应操作
redis-cli -h 127.0.0.1 -p 6379 -a my_redis
redis 127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "my_redis
尝试重启一下redis,用新配置的密码登录redis执行操作,发现新的密码失效,redis重新使用了配置文件中的密码
sudo service redis restart
Stopping redis-server: [ OK ]
Starting redis-server: [ OK ]
redis-cli -h 127.0.0.1 -p 6379 -a my_redis
redis 127.0.0.1:6379> config get requirepass
(error) ERR operation not permitted
redis-cli -h 127.0.0.1 -p 6379 -a myRedis
redis 127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "myRedis"
master配置了密码,slave如何配置
若master配置了密码则slave也要配置相应的密码参数否则无法进行正常复制的。
slave中配置文件内找到如下行,移除注释,修改密码即可
#masterauth mstpassword
2. Docker安装设置密码
开启容器并设置密码
docker run -d --name redis-test -p 6379:6379 redis:6.2.1 --requirepass "Gmf2373271519..."
进入容器使用密码开启redis
docker exec -it redis-test redis-cli -a Gmf2373271519...
6. Limit限制
1. maxclients
- 设置redis同时可以与多少个客户端进行连接。
- 默认情况下为10000个客户端。
- 如果达到了此限制,redis则会拒绝新的连接请求,并且向这些连接请求方发出“max number of clients reached”以作回应
2. maxmemory
- 建议必须设置,否则,将内存占满,造成服务器宕机
- 设置redis可以使用的内存量。一旦到达内存使用上限,redis将会试图移除内部数据,移除规则可以通过maxmemory-policy来指定。
- 如果redis无法根据移除规则来移除内存中的数据,或者设置了“不允许移除”,那么redis则会针对那些需要申请内存的指令返回错误信息,比如SET、LPUSH等。
- 但是对于无内存申请的指令,仍然会正常响应,比如GET等。如果你的redis是主redis(说明你的redis有从redis),那么在设置内存使用上限时,需要在系统中留出一些内存空间给同步队列缓存,只有在你设置的是“不移除”的情况下,才不用考虑这个因素
3. maxmemory-policy
- volatile-lru:使用LRU算法移除key,只对设置了过期时间的键;(最近最少使用)
- allkeys-lru:在所有集合key中,使用LRU算法移除key
- volatile-random:在过期集合中移除随机的key,只对设置了过期时间的键
- allkeys-random:在所有集合key中,移除随机的key
- volatile-ttl:移除那些TTL值最小的key,即那些最近要过期的key
- noeviction:不进行移除。针对写操作,只是返回错误信息
4. maxmemory-samples
- 设置样本数量,LRU算法和最小TTL算法都并非是精确的算法,而是估算值,所以你可以设置样本的大小,redis默认会检查这么多个key并选择其中LRU的那个。
- 一般设置3到7的数字,数值越小样本越不准确,但性能消耗越小
五、Redis新数据类型
1. Bitmaps
1. 简介
现代计算机用二进制(位)作为信息的基础单位, 1个字节等于8位, 例如“abc”字符串是由3个字节组成, 但实际在计算机存储时将其用二进制表示,“abc”分别对应的ASCII码分别是97、 98、 99, 对应的二进制分别是01100001、 01100010和01100011,如下图
理地使用操作位能够有效地提高内存使用率和开发效率。
Redis提供了Bitmaps这个“数据类型”可以实现对位的操作:
- Bitmaps本身不是一种数据类型, 实际上它就是字符串(key-value) , 但是它可以对字符串的位进行操作
- Bitmaps单独提供了一套命令, 所以在Redis中使用Bitmaps和使用字符串的方法不太相同。 可以把Bitmaps想象成一个以位为单位的数组, 数组的每个单元只能存储0和1, 数组的下标在Bitmaps中叫做偏移量
2. 常用命令
1. setbit
setbit <key> <offset> <value>
:设置Bitmaps中某个偏移量的值(0或1)
注:偏移量从0开始算起
实例
每个独立用户是否访问过网站存放在Bitmaps中, 将访问的用户记做1, 没有访问的用户记做0, 用偏移量作为用户的id
设置键的第offset个位的值(从0算起) , 假设现在有20个用户,userid=1, 6, 11, 15, 19的用户对网站进行了访问, 那么当前Bitmaps初始化结果如图
unique:users:20201106代表2020-11-06这天的独立访问用户的Bitmaps
注意:
- 很多应用的用户id以一个指定数字(例如10000) 开头, 直接将用户id和Bitmaps的偏移量对应势必会造成一定的浪费, 通常的做法是每次做setbit操作时将用户id减去这个指定数字
- 在第一次初始化Bitmaps时, 假如偏移量非常大, 那么整个初始化过程执行会比较慢, 可能会造成Redis的阻塞
2. getbit
getbit <key> <offset>
:获取Bitmaps中某个偏移量的值
注:获取键的第offset位的值(从0开始算)
实例
获取id=8的用户是否在2020-11-06这天访问过, 返回0说明没有访问过:
注:因为100根本不存在,所以也是返回0
3. bitcount
统计字符串被设置为1的bit数。一般情况下,给定的整个字符串都会被进行计数,通过指定额外的 start 或 end 参数,可以让计数只在特定的位上进行。start 和 end 参数的设置,都可以使用负数值:比如 -1 表示最后一个位,而 -2 表示倒数第二个位,start、end 是指bit组的字节的下标数,二者皆包含。bitcount <key> [start end]
:统计字符串从start字节到end字节比特值为1的数量
实例
计算2022-11-06这天的独立访问用户数量
start和end代表起始和结束字节数, 下面操作计算用户id在第1个字节到第3个字节之间的独立访问用户数,对应的用户id是11, 15, 19
举例: K1 【01000001 01000000 00000000 00100001】,对应【0,1,2,3】
- bitcount K1 1 2 : 统计下标1、2字节组中bit=1的个数,即 01000000 00000000 —》bitcount K1 1 2 —》1
- bitcount K1 1 3 : 统计下标1、2字节组中bit=1的个数,即 01000000 00000000 00100001 —》bitcount K1 1 3 —》3
- bitcount K1 0 -2 : 统计下标0到下标倒数第2,字节组中bit=1的个数,即01000001 01000000 00000000 —》bitcount K1 0 -2 —》3
注:redis的setbit设置或清除的是bit位置,而bitcount计算的是byte位置。
4. bitop
bitop and(or/not/xor) <destkey> [key…]
bitop是一个复合操作, 它可以做多个Bitmaps的and(交集) 、 or(并集) 、 not(非) 、 xor(异或) 操作并将结果保存在destkey中
实例
2020-11-04 日访问网站的userid=1,2,5,9
setbit unique:users:20201104 1 1
setbit unique:users:20201104 2 1
setbit unique:users:20201104 5 1
setbit unique:users:20201104 9 1
2020-11-03 日访问网站的userid=0,1,4,9
setbit unique:users:20201103 0 1
setbit unique:users:20201103 1 1
setbit unique:users:20201103 4 1
setbit unique:users:20201103 9 1
计算出两天都访问过网站的用户数量
bitop and unique:users:and:20201104_03 unique:users:20201103 unique:users:20201104
计算出任意一天都访问过网站的用户数量(例如月活跃就是类似这种),可以使用or求并集
3. Bitmaps与Set对比
假设网站有1亿用户, 每天独立访问的用户有5千万, 如果每天用集合类型和Bitmaps分别存储活跃用户可以得到表
set和Bitmaps存储一天活跃用户对比 | |||
---|---|---|---|
数据类型 | 每个用户id占用空间 | 需要存储的用户量 | 全部内存量 |
集合类型 | 64位 | 50000000 | 64位*50000000 = 400MB |
Bitmaps | 1位 | 100000000 | 1位*100000000 = 12.5MB |
很明显, 这种情况下使用Bitmaps能节省很多的内存空间,尤其是随着时间推移节省的内存还是非常可观的
set和Bitmaps存储独立用户空间对比 | |||
---|---|---|---|
数据类型 | 一天 | 一个月 | 一年 |
集合类型 | 400MB | 12GB | 144GB |
Bitmaps | 12.5MB | 375MB | 4.5GB |
但Bitmaps并不是万金油, 假如该网站每天的独立访问用户很少, 例如只有10万(大量的僵尸用户) , 那么两者的对比如下表所示, 很显然, 这时候使用Bitmaps就不太合适了, 因为基本上大部分位都是0
set和Bitmaps存储一天活跃用户对比(独立用户比较少) | |||
---|---|---|---|
数据类型 | 每个userid占用空间 | 需要存储的用户量 | 全部内存量 |
集合类型 | 64位 | 100000 | 64位*100000 = 800KB |
Bitmaps | 1位 | 100000000 | 1位*100000000 = 12.5MB |
2. HyperLogLog
1. 简介
在工作当中,我们经常会遇到与统计相关的功能需求,比如统计网站PV(PageView页面访问量),可以使用Redis的incr、incrby轻松实现。
但像UV(UniqueVisitor,独立访客)、独立IP数、搜索记录数等需要去重和计数的问题如何解决?这种求集合中不重复元素个数的问题称为基数问题。
解决基数问题有很多种方案:
- 数据存储在MySQL表中,使用distinct count计算不重复个数
- 使用Redis提供的hash、set、bitmaps等数据结构来处理
以上的方案结果精确,但随着数据不断增加,导致占用空间越来越大,对于非常大的数据集是不切实际的。
能否能够降低一定的精度来平衡存储空间?Redis推出了HyperLogLog
Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。
在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。
但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
什么是基数?
比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为{1, 3, 5 ,7, 8}, 基数(不重复元素)为5。 基数估计就是在误差可接受的范围内,快速计算基数(基数 = 个数总和 - 重复个数)
2. 命令
1. pfadd
pfadd <key> <element> [element ...]
:添加指定元素到 HyperLogLog 中
实例
将所有元素添加到指定HyperLogLog数据结构中。如果执行命令后HLL估计的近似基数发生变化,则返回1,否则返回0
2. pfcount
pfcount <key> [key ...]
:计算HLL的近似基数,可以计算多个HLL,比如用HLL存储每天的UV,计算一周的UV可以使用7天的UV合并计算即可
3. pfmerge
pfmerge <destkey> <sourcekey> [sourcekey ...]
:将一个或多个HLL合并后的结果存储在另一个HLL中,比如每月活跃用户可以使用每天的活跃用户来合并计算可得
3. Geospatial
1. 简介
Redis 3.2 中增加了对GEO类型的支持。GEO,Geographic,地理信息的缩写。该类型,就是元素的2维坐标,在地图上就是经纬度。redis基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度Hash等常见操作
2. 命令
1. geoadd
geoadd <key> <longitude> <latitude> <member> [longitude latitude member...]
:添加地理位置(经度,纬度,名称)
实例
geoadd china:city 121.47 31.23 shanghai
geoadd china:city 106.50 29.53 chongqing 114.05 22.52 shenzhen 116.38 39.90 beijing
注意:
- 两极无法直接添加,一般会下载城市数据,直接通过 Java 程序一次性导入。
- 有效的经度从 -180 度到 180 度。有效的纬度从 -85.05112878 度到 85.05112878 度。
- 当坐标位置超出指定范围时,该命令将会返回一个错误。
- 已经添加的数据,是无法再次往里面添加的。
2. geopos
geopos <key> <member> [member...]
:获得指定地区的坐标值
实例
3. geodist
geodist <key> <member1> <member2> [m|km|ft|mi]
:获取两个位置之间的直线距离
实例
获取两个位置之间的直线距离
单位:
- m 表示单位为米[默认值]。
- km 表示单位为千米。
- mi 表示单位为英里。
- ft 表示单位为英尺。
如果用户没有显式地指定单位参数, 那么 GEODIST 默认使用米作为单位
4. georadius
georadius <key> <longitude> <latitude> radius m|km|ft|mi
:以给定的经纬度为中心,找出某一半径内的元素(经度 纬度 距离 单位)
六、Redis_Jedis测试
1. Jedis简介
Jedis:一款java操作redis数据库的工具,其中操作不同数据结构的方法与Redis命令操作相同
2. Jedis所需要的Jar包
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
3. 连接Redis注意事项
- 禁用Linux的防火墙:Linux(CentOS7)里执行命令
- 阿里云开启设置Redis安全组
- redis.conf 中注释掉 bind 127.0.0.1 ,然后 protected-mode no
4. 使用演示
public class TestJedis {
@Test
public void test1(){
Jedis jedis = new Jedis("39.108.59.214", 6379);
Set<String> keys = jedis.keys("*");
for (String key : keys) {
System.out.println(key);
}
jedis.geoadd("china:city", 153.15,74.81,"unknown");
jedis.geoadd("china:city", 121.47,31.23,"shanghai");
List<GeoCoordinate> geopos = jedis.geopos("china:city", "shanghai", "unknown");
for (GeoCoordinate geopo : geopos) {
System.out.println("经度:" + geopo.getLatitude()+ "\t纬度:" + geopo.getLongitude());
}
}
}
所有的 redis 命令在 jedis 中都有,在这里不多赘述,详记redis命令即可
七、整合Springboot
1. pom.xml
<!--Springboot已经内置了redis的客户端-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- spring2.X集成redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.9.0</version>
</dependency>
2. application.yaml
spring:
redis:
#Redis服务器地址
port: 6379
#Redis服务器连接端口
host: 39.108.59.214
#Redis服务器连接密码
password: Gmf2373271519...
#Redis数据库索引(默认为0)
database: 0
#连接超时时间(毫秒)
connect-timeout: 1800000
jedis:
pool:
#连接池最大连接数(使用负值表示没有限制)
max-active: 8
#最大阻塞等待时间(负数表示没限制)
max-wait: -1
#连接池中的最大空闲连接
max-idle: 5
#连接池中的最小空闲连接
min-idle: 0
3. Redis配置类
@EnableCaching
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
4. Redis工具类
@Component
public final class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// =============================common============================
/**
* @param key 键
* @param time 时间(秒)
* @return boolean
* @description 指定缓存失效时间
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据key 获取过期时间
*
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断key是否存在
*
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
*
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
}
}
}
// ============================String=============================
/**
* 普通缓存获取
*
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
*
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
*
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 递增
*
* @param key 键
* @param delta 要增加几(大于0)
* @return
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
*
* @param key 键
* @param delta 要减少几(小于0)
* @return
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
// ================================Map=================================
/**
* HashGet
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return 值
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
*
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
*
* @param key 键
* @param map 对应多个键值
* @return true 成功 false 失败
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 并设置时间
*
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,并设置超时时间(如果不存在将创建)
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该HashKey的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
* @return
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
* @return
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================
/**
* 根据key获取Set中的所有值
*
* @param key 键
* @return
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取set缓存的长度
*
* @param key 键
* @return
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================list=================================
/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
* @return
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取list缓存的长度
*
* @param key 键
* @return
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
* @return
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
八、Redis事务操作
1. Redis事务的定义
Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
Redis事务的主要作用就是串联多个命令防止别的命令插队。
2. 事务操作的相关命令——Multi、Exec、discard
从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。
组队的过程中可以通过discard来放弃组队。
使用案例
3. 事务的错误处理
情况一
组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消
情况二
如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。
4. 事务冲突问题
1. 例子
- 一个请求想给金额减8000
- 一个请求想给金额减5000
- 一个请求想给金额减1000
2. 悲观锁
悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁
3. 乐观锁
乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的
4. 命令
WATCH key [key ...]
:在执行multi之前,先执行watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断unwatch
:取消 WATCH 命令对所有 key 的监视(如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行UNWATCH 了)
# 监视 key ,且事务成功执行
redis> WATCH lock lock_times
OK
redis> MULTI
OK
redis> SET lock "huangz"
QUEUED
redis> INCR lock_times
QUEUED
redis> EXEC
1) OK
2) (integer) 1
# 监视 key ,且事务被打断
redis> WATCH lock lock_times
OK
redis> MULTI
OK
redis> SET lock "joe" # 就在这时,另一个客户端修改了 lock_times 的值
QUEUED
redis> INCR lock_times
QUEUED
redis> EXEC # 因为 lock_times 被修改, joe 的事务执行失败
(nil)
5. 事务三特性
- 单独的隔离操作
- 事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
- 没有隔离级别的概念
- 队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行
- 不保证原子性
- 事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚
5. 秒杀案例
1. 解决计数器和人员记录的事务操作
引入Thymeleaf
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<version>2.4.5</version>
</dependency>
seckill.html
<div>
<form method="get" action="/seckill">
<input type="hidden" value="1000" name="productId"/>
<input type="submit" value="开始秒杀"/>
</form>
</div>
SekillController.java
@Controller
public class SeckillController {
@Autowired
RedisUtil redisUtil;
@RequestMapping("/seckill.jhtml")
public String seckillHTML() {
return "seckill";
}
@GetMapping("/seckill")
@ResponseBody
public String seckill(String productId) {
//随机生成用户ID
int userId = new Random().nextInt(50000);
//库存商品key
String productKey = "product:" + productId;
//商品用户Key
String userKey = "user:" + productId;
Object productAmount = redisUtil.get(productKey);
//判断秒杀活动是否开启
if (productAmount == null) {
return "秒杀暂未开启";
}
RedisTemplate<String, Object> redisTemplate = redisUtil.getRedisTemplate();
//判断当前用户是否参与过秒杀活动
if (redisTemplate.opsForSet().isMember(userKey, userId)) {
return "你已参加过该活动";
}
//判断库存是否足够(库存 > 0)
if (Integer.parseInt(productAmount.toString()) <= 0) {
return "秒杀失败,库存不足";
}
//库存减少-1
redisTemplate.opsForValue().decrement(productKey);
//将用户id添加至秒杀成功列表中
redisUtil.sSet(userKey,userId);
return "秒杀成功";
}
}
但是这样的代码并没有考虑到并发的问题,从而可能引发多卖的情况
2. 秒杀并发模拟
使用Jmeter并发测试
开始测试运行,查看Redis中的数据情况
发现库存出现了负数超卖的情况,甚至在高并发量的情况下Redis出现了等待Timeout情况
3. 解决超卖问题
使用乐观锁淘汰用户,解决超卖问题
修改代码使用乐观锁事务
@Controller
public class SeckillController {
@Autowired
RedisUtil redisUtil;
@RequestMapping("/seckill.jhtml")
public String seckillHTML() {
return "seckill";
}
@GetMapping("/seckill")
@ResponseBody
public String seckill(String productId) {
final String[] msg = {"秒杀成功"};
RedisTemplate<String, Object> redisTemplate = redisUtil.getRedisTemplate();
/*
* 通过 SessionCallback 操作 Redis 时,会从当前线程获得 Redis Connection ,如果获取不到,则会去“创建”一个 Redis Connection
* 并绑定到当前线程中。这样,我们在该 Redis Connection 开启 Redis Transaction 后,在该线程的所有操作,都可以在这个 Transaction 中,
* 最后交由 Spring 事务管理器统一提供或回滚 Transaction
* */
redisTemplate.execute(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
//随机生成用户ID
int userId = new Random().nextInt(50000);
//库存商品key
String productKey = "product:" + productId;
//商品用户Key
String userKey = "user:" + productId;
//监视商品库存
operations.watch(productKey);
Object productAmount = redisUtil.get(productKey);
//判断秒杀活动是否开启
if (productAmount == null) {
operations.unwatch();
msg[0] = "秒杀暂未开启";
return "秒杀暂未开启";
}
//判断当前用户是否参与过秒杀活动
if (operations.opsForSet().isMember(userKey, userId)) {
operations.unwatch();
msg[0] = "你已参加过该活动";
return "你已参加过该活动";
}
//判断库存是否足够(库存 > 0)
if (Integer.parseInt(productAmount.toString()) <= 0) {
operations.unwatch();
msg[0] = "秒杀失败,库存不足";
return "秒杀失败,库存不足";
}
//开始事务
operations.multi();
//库存减少-1
operations.opsForValue().decrement(productKey);
//将用户id添加至秒杀成功列表中
operations.opsForSet().add(userKey,userId);
//执行事务,自动解除watch状态,修改版本
return operations.exec();
}
});
return msg[0];
}
}
**
4. 解决库存遗留问题
1. 问题描述
假设存在100库存,当所有的线程(假设为2000)并发执行秒杀操作,那么redis乐观锁机制将会保证只有一个线程成功,其余都将失败,那么将会遗留99个库存
2. LUA脚本
Lua 是一个小巧的脚本语言,Lua脚本可以很容易的被C/C++代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。
很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。
这其中包括魔兽争霸地图、魔兽世界、博德之门、愤怒的小鸟等众多游戏插件或外挂。
官方网址:https://www.w3cschool.cn/lua/
3. LUA脚本在Redis中的优势
将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。
LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。
但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。利用lua脚本淘汰用户,解决超卖问题。
redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题
**
4. 使用Lua脚本解决库存遗留问题
编写Lua脚本代码
seckill.lua
local userid=KEYS[1];
local prodid=KEYS[2];
local qtkey="product:"..prodid;
local usersKey="user:"..prodid;
local userExists=redis.call("SISMEMBER",usersKey,userid);
if tonumber(userExists)==1 then
return 2;
end
local num= redis.call("GET" ,qtkey);
if tonumber(num)<=0 then
return 0;
else
redis.call("DECR",qtkey);
redis.call("SADD",usersKey,userid);
end
return 1;
添加Lua配置类
LuaConfig.java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
@Configuration
public class LuaConfig {
@Bean
public DefaultRedisScript<Long> secKillScript() {
DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
defaultRedisScript.setResultType(Long.class);
defaultRedisScript.setScriptSource(
new ResourceScriptSource(new ClassPathResource("lua/Test.lua")));
return defaultRedisScript;
}
}
修改原代码
seckillController.java
@Autowired
DefaultRedisScript script;
/**
* @param productId
* @return java.lang.String
* @description 使用Lua脚本解决库存遗留问题
*/
@GetMapping("/seckillLua")
@ResponseBody
public String seckillLua(String productId) {
RedisTemplate<String, Object> redisTemplate = redisUtil.getRedisTemplate();
//随机生成用户ID
int userId = new Random().nextInt(50000);
ArrayList<String> keys = new ArrayList<>();
keys.add(userId + "");
keys.add(productId);
Long result = (Long) redisTemplate.execute(script, keys);
if (result == 0) {
System.out.println("库存不足");
return "库存不足";
} else if (result == 1) {
System.out.println("秒杀成功");
return "秒杀成功";
} else if (result == 2) {
System.out.println("已经秒杀过");
return "已经秒杀过";
} else {
System.out.println("未知错误");
return "未知错误";
}
}
测试
测试使用40并发量同时秒杀
测试结果
检查库存状况
使用Lua脚本的方式不会出现超卖,并且库存可以全部卖完
八、Redis持久化
官网介绍:http://www.redis.io
Redis 提供了2个不同形式的持久化方式。
- RDB(Redis DataBase)
- AOF(Append Of File)
1. RDB(Redis DataBase)
1. 介绍
RDB是指在指定的时间间隔内将内存中的数据集快照写入磁盘, 也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里