Q1:JVM内存限制,超出阻塞队列上限
Q2:数据安全,没有持久化,阻塞队列数据丢失
A:消息队列

认识消息队列

消息队列包含3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

Redis提供三种各不同方式实现消息队列:

  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

基于List实现消息队列

用Redis里的List模拟JVM的List
优点:

  • 利用Redis存储,不受限JVM内存上限
  • 基于Redis持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

PubSub实现消息队列

PubSub(发布订阅):消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
image.png
优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

基于Stream的消息队列

Stream是Redis5.0引入的一种新的数据类型,可以实现一个功能非常完善的消息队列。
image.png
image.png
image.png
命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读风险

Stream的消费者组模式

image.png

image.png
image.png
命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读风险
  • 有消息确认机制,保证消息至少被消费一次

对比

image.png

基于Stream消息队列实现异步秒杀

需求:

  1. 创建Stream消息队列,名为stream.orders
  2. 修改之前秒杀下单Lua脚本,在认定有抢购资格后,直接向strem.orders添加消息,内容包含voucherId、userId、orderId
  3. 项目启动时,开启一个线程任务,尝试获取stream.orders中消息,完成下单

XGROUP CREATE stream.orders g1 0 MKSTREAM

— 1.3 订单id
local orderId = ARGV[3] … — 3.6.发送消息到队列中,XADD strea.orders k1 v1 k2 v2
redis.call(‘xadd’,’stream.orders’,’
‘,’userId’, userId, ‘voucherId’, voucherId, ‘id’, orderId)

  1. private class VoucherOrderHandler implements Runnable{
  2. @Override
  3. public void run() {
  4. String queueName = "stream.orders";
  5. while(true){
  6. try {
  7. //1.获取消息队列中的订单信息 XREAMGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
  8. List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
  9. Consumer.from("g1", "c1"),
  10. StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
  11. StreamOffset.create(queueName, ReadOffset.lastConsumed())
  12. );
  13. //2.判断订单信息是否为空
  14. if(list == null || list.isEmpty()){
  15. //如果null,说明没有消息,继续下一次循环
  16. continue;
  17. }
  18. //解析消息
  19. MapRecord<String, Object, Object> record = list.get(0);
  20. Map<Object, Object> value = record.getValue();
  21. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
  22. //3.创建订单
  23. createVoucherOrder(voucherOrder);
  24. //4.确认消息 XACK s1 g1 id
  25. stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
  26. }catch(Exception e){
  27. log.error("处理订单异常", e);
  28. handlePendingList();
  29. }
  30. }
  31. }
  32. private void handlePendingList() {
  33. String queueName = "stream.orders";
  34. while(true){
  35. try {
  36. //1.获取pending-list中的订单信息 XREAMGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
  37. List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
  38. Consumer.from("g1", "c1"),
  39. StreamReadOptions.empty().count(1),
  40. StreamOffset.create(queueName, ReadOffset.from("0"))
  41. );
  42. //2.判断订单信息是否为空
  43. if(list == null || list.isEmpty()){
  44. //如果null,说明pending-list没有异常消息,结束循环
  45. break;
  46. }
  47. //解析消息
  48. MapRecord<String, Object, Object> record = list.get(0);
  49. Map<Object, Object> value = record.getValue();
  50. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
  51. //3.创建订单
  52. createVoucherOrder(voucherOrder);
  53. //4.确认消息 XACK s1 g1 id
  54. stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
  55. }catch(Exception e){
  56. log.error("处理订单异常", e);
  57. try {
  58. Thread.sleep(100);
  59. } catch (InterruptedException ex) {
  60. ex.printStackTrace();
  61. }
  62. }
  63. }
  64. }
  65. }