Q1:JVM内存限制,超出阻塞队列上限
Q2:数据安全,没有持久化,阻塞队列数据丢失
A:消息队列
认识消息队列
消息队列包含3个角色:
- 消息队列:存储和管理消息,也被称为消息代理
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
Redis提供三种各不同方式实现消息队列:
- list结构:基于List结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
基于List实现消息队列
用Redis里的List模拟JVM的List
优点:
- 利用Redis存储,不受限JVM内存上限
- 基于Redis持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
PubSub实现消息队列
PubSub(发布订阅):消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列
Stream是Redis5.0引入的一种新的数据类型,可以实现一个功能非常完善的消息队列。


命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读风险
Stream的消费者组模式



命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读风险
- 有消息确认机制,保证消息至少被消费一次
对比
基于Stream消息队列实现异步秒杀
需求:
- 创建Stream消息队列,名为stream.orders
- 修改之前秒杀下单Lua脚本,在认定有抢购资格后,直接向strem.orders添加消息,内容包含voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取stream.orders中消息,完成下单
XGROUP CREATE stream.orders g1 0 MKSTREAM
— 1.3 订单id
local orderId = ARGV[3] … — 3.6.发送消息到队列中,XADD strea.orders k1 v1 k2 v2
redis.call(‘xadd’,’stream.orders’,’‘,’userId’, userId, ‘voucherId’, voucherId, ‘id’, orderId)
private class VoucherOrderHandler implements Runnable{@Overridepublic void run() {String queueName = "stream.orders";while(true){try {//1.获取消息队列中的订单信息 XREAMGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//2.判断订单信息是否为空if(list == null || list.isEmpty()){//如果null,说明没有消息,继续下一次循环continue;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//3.创建订单createVoucherOrder(voucherOrder);//4.确认消息 XACK s1 g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());}catch(Exception e){log.error("处理订单异常", e);handlePendingList();}}}private void handlePendingList() {String queueName = "stream.orders";while(true){try {//1.获取pending-list中的订单信息 XREAMGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));//2.判断订单信息是否为空if(list == null || list.isEmpty()){//如果null,说明pending-list没有异常消息,结束循环break;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//3.创建订单createVoucherOrder(voucherOrder);//4.确认消息 XACK s1 g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());}catch(Exception e){log.error("处理订单异常", e);try {Thread.sleep(100);} catch (InterruptedException ex) {ex.printStackTrace();}}}}}
