ykkj-spring-boot-starter-mq技术组件,基于 Redis 实现分布式消息队列:

  • 使用 Stream特性,提供【集群】消费的能力。
  • 使用 Pub/Sub特性,提供【广播】消费的能力。

    1. 集群消费

    集群消费,是指消息发送到 Redis 时,有且只会被一个消费者(应用 JVM 实例)收到,然后消费成功。如下图所示:
    image.png

    1.1 使用场景

    集群消费在项目中的使用场景,主要是提供可靠的、可堆积的异步任务的能力。例如说:

  • 短信模块,使用它异步发送短信-module/system/mq/consumer/sms/SmsSendConsumer.java。

  • 邮件模块,使用它异步发送邮件-/module/system/mq/consumer/mail/MailSendConsumer.java。

相比 《开发指南 —— 异步任务》 来说,Spring Async 在 JVM 实例重启时,会导致未执行完的任务丢失。而集群消费,因为消息是存储在 Redis 中,所以不会存在该问题。

1.2 实现源码

集群消费基于 Redis Stream 实现:

  • 实现 framework/mq/core/stream/AbstractStreamMessage.java抽象类,定义【集群】消息。
  • 使用 framework/mq/core/RedisMQTemplate.java的 #send(message)方法,发送消息。
  • 实现 /framework/mq/core/stream/AbstractStreamMessageListener.java接口,消费消息。

最终使用 framework/mq/config/YkkjMQAutoConfiguration.java配置类,扫描所有的 AbstractStreamMessageListener 监听器,初始化对应的消费者。如下图所示:
image.png

1.3 实战案例

以短信模块异步发送短息为例子,讲解集群消费的使用。
image.png

1.3.1 引入依赖

在 ykkj-module-system-biz 模块中,引入 ykkj-spring-boot-starter-mq 技术组件。如下所示:

  1. <dependency>
  2. <groupId>cn.iocoder.boot</groupId>
  3. <artifactId>yudao-spring-boot-starter-mq</artifactId>
  4. </dependency>

1.3.2 SmsSendMessage

在 ykkj-module-system-biz 的 module/system/mq/message/sms/包下,创建 SmsSendMessage类,继承 AbstractStreamMessage 抽象类,短信发送消息。代码如下图:
image.png

1.3.3 SmsProducer

① 在 ykkj-module-system-biz 的 module/system/mq/producer/sms/包下,创建 SmsProducer类,SmsSendMessage 的 Producer 生产者,核心是使用 RedisMQTemplate 发送 SmsSendMessage 消息。代码如下图:
image.png
② 发送短信时,需要使用 SmsProducer 发送消息。如下图所示:
image.png

1.3.4 SmsSendConsumer

在 ykkj-module-system-biz 的 module/system/mq/consumer/sms/包下,创建 SmsSendConsumer类,SmsSendMessage 的 Consumer 消费者。代码如下图:
image.png

2. 广播消费

广播消费,是指消息发送到 Redis 时,所有消费者(应用 JVM 实例)收到,然后消费成功。如下图所示:
image.png

2.1 使用场景

例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 Redis 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。
又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 Redis 广播消费。每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。

2.2 实现源码

广播消费基于 Redis Pub/Sub 实现:

  • 实现 framework/mq/core/pubsub/AbstractChannelMessage.java抽象类,定义【广播】消息。
  • 使用 framework/mq/core/RedisMQTemplate.java的 #send(message)方法,发送消息。
  • 实现 framework/mq/core/pubsub/AbstractChannelMessageListener.java接口,消费消息。

最终使用 framework/mq/config/YkkjMQAutoConfiguration.java配置类,扫描所有的 AbstractChannelMessageListener 监听器,初始化对应的消费者。如下图所示:
image.png