📓 官方文档:https://github.com/apache/rocketmq/tree/master/docs/cn

1. 安装指南

1.1 Windows Docker安装

由于是公司的电脑,为了方便,我使用Win10的docker进行部署,结果发现不太行。

Windows10安装wls2
参考:https://docs.microsoft.com/zh-cn/windows/wsl/install-win10
要求:

  • x64:版本 1903 或更高版本,采用 内部版本 18362 或更高版本。
  • ARM64:版本 2004 或更高版本,采用 内部版本 19041 或更高版本。

检查版本:

  1. winver

启用虚拟机:

  1. dism.exe /online /enable-feature /featurename:VirtualMachinePlatform /all /norestart

下载Linux内核更新包:

将WSL2设置为默认版本:

  1. wsl --set-default-version 2

Windows10安装Docker
地址:https://www.docker.com/products/docker-desktop
image.png
点击Download for Winodws,一路傻瓜式安装即可。

Docker安装RocketMQ
创建五个挂载目录:

  1. mkdir -p D:\environment\rocketmq\namesrv\logs
  2. mkdir -p D:\environment\rocketmq\namesrv\store
  3. mkdir -p D:\environment\rocketmq\broker\conf
  4. mkdir -p D:\environment\rocketmq\broker\logs
  5. mkdir -p D:\environment\rocketmq\broker\store

创建broker配置文件broker.conf:

  1. # 所属集群名称,如果节点较多可以配置多个
  2. brokerClusterName = DefaultCluster
  3. # broker名称,master和slave使用相同的名称,表明他们的主从关系
  4. brokerName = broker-a
  5. # 0表示master,>0表示slave
  6. brokerId = 0
  7. # 表示几点做消息删除动作,默认是凌晨4点
  8. deleteWhen = 04
  9. # 在磁盘上保留消息的时长,单位是小时
  10. fileReservedTime = 48
  11. # 有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制
  12. brokerRole = ASYNC_MASTER
  13. # 刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态ASYNC_FLUSH不需要
  14. flushDiskType = ASYNC_FLUSH
  15. # 设置broker节点所在服务器的ip地址
  16. brokerIP1 = 172.18.0.4

创建docker-compose.yml文件进行容器编排:

  1. version: '3.5'
  2. services:
  3. rmqnamesrv:
  4. image: foxiswho/rocketmq:server
  5. container_name: rmqnamesrv
  6. ports:
  7. - 9876:9876
  8. volumes:
  9. - D:/environment/rocketmq/namesrv/logs:/opt/logs
  10. - D:/environment/rocketmq/namesrv/store:/opt/store
  11. networks:
  12. rmq:
  13. aliases:
  14. - rmqnamesrv
  15. rmqbroker:
  16. image: foxiswho/rocketmq:broker
  17. container_name: rmqbroker
  18. ports:
  19. - 10909:10909
  20. - 10911:10911
  21. volumes:
  22. - D:/environment/rocketmq/broker/logs:/opt/logs
  23. - D:/environment/rocketmq/broker/store:/opt/store
  24. - D:/environment/rocketmq/broker/conf/broker.conf:/etc/rocketmq/broker.conf
  25. environment:
  26. NAMESRV_ADDR: "rmqnamesrv:9876"
  27. JAVA_OPTS: " -Duser.home=/opt"
  28. JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
  29. command: mqbroker -c /etc/rocketmq/broker.conf
  30. depends_on:
  31. - rmqnamesrv
  32. networks:
  33. rmq:
  34. aliases:
  35. - rmqbroker
  36. rmqconsole:
  37. image: styletang/rocketmq-console-ng
  38. container_name: rmqconsole
  39. ports:
  40. - 8080:8080
  41. environment:
  42. JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
  43. depends_on:
  44. - rmqnamesrv
  45. networks:
  46. rmq:
  47. aliases:
  48. - rmqconsole
  49. networks:
  50. rmq:
  51. name: rmq
  52. driver: bridge

最后在该文件目录下启动:

  1. docker-compose up -d

完成后在浏览器输入:http://192.168.9.113:8080
image.png

1.2 Windows10 本机安装

选择下载:https://rocketmq.apache.org/release_notes/
推荐版本:4.6.0
下载后解压即可,新建环境变量ROCKETMQ_HOME为解压文件夹所在路径。
然后bin目录,分别启动注册服务和代理服务:

  1. # 启动注册服务
  2. $ start .\mqnamesrv.cmd
  3. Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
  4. Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
  5. The Name Server boot success. serializeType=JSON
  6. # 启动代理服务
  7. $ start .\mqbroker.cmd -n 127.0.0.1:9876
  8. The broker[NS9041695, 172.17.70.177:10911] boot success. serializeType=JSON

下载web控制台:https://github.com/apache/rocketmq-externals
进入子项目:rocketmq-console
修改\src\main\resources下的的application.properties:

  1. rocketmq.config.namesrvAddr=127.0.0.1:9876

修改pom,增加插件:

  1. <!-- Maven package skip test -->
  2. <plugin>
  3. <groupId>org.apache.maven.plugins</groupId>
  4. <artifactId>maven-surefire-plugin</artifactId>
  5. <version>2.22.0</version>
  6. <configuration>
  7. <skipTests>true</skipTests>
  8. </configuration>
  9. </plugin>

打包:

  1. mvn clean package

为了避免启动麻烦,编写一个批处理文件start.bat

  1. @echo off
  2. echo Start up NameServer ...
  3. start mqnamesrv.cmd
  4. echo Start up BrokerServer ...
  5. start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
  6. echo Start uip RocketMQ Console ...
  7. java -jar D:\environment\windows-rocketmq\rocketmq-console\target\rocketmq-console-ng-2.0.0.jar
  8. pause

运行start.bat,结果如下:
image.png

2. 概念特性

2.1 相关概念

消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

消息生产者(Producer)负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

消息消费者(Consumer)负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

主题(Topic)表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

名字服务(Name Server)
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

拉取式消费(Pull Consumer)
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

推动式消费(Push Consumer)
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

集群消费(Clustering)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。

广播消费(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

普通顺序消息(Normal Ordered Message)
普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

严格顺序消息(Strictly Ordered Message)
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

消息(Message)
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

标签(Tag)
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

2.2 基本特性

  • 订阅发布:生产者向topic发送消息;消费者关注topic中带有某些tag的消息,进而从该topic消费数据
  • 消息顺序:消息消费时按照发送的顺序进行消费,分为全局顺序和分区顺序。
  • 消息过滤:消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。
  • 消息可靠性:支持消息的高可靠。下面四种情况属于硬件资源可立即回复情况,能保证消息不丢失或者丢失少量数据:Broker非正常关闭;Broker异常Crash;OS Crash;机器掉电,但是能立即恢复供电情况。另有两种情况属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失:机器无法开机(可能是cpu、主板、内存等关键设备损坏);磁盘设备损坏。
  • 至少一次:每个消息必须投递一次。消费者先Pull消息到本地,消费完成后才向服务器返回ack,如果没有消费一定不会ack消息。
  • 回溯消费:已经消息成功的消息需要重新消费,支持按照时间回溯消费,时间维度精确到毫秒。
  • 事务消息:因公本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。通过事务消息能达到分布式事务的最终一致。
  • 定时消息:消息共发送到Broker后,不会立即被消费,等到特定时间投递给topic。
  • 消息重试:RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。
  • 消息重投:消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。
  • 流量控制:生产者流控,因为broker处理能力达到瓶颈,不会尝试消息重投;消费者流控,因为消费能力达到瓶颈,降低拉取频率。
  • 死信队列:RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

3. 架构设计

3.1 技术架构

rocketmq_architecture_1.png
RocketMQ架构主要分为四部分,如上图所示:

  1. Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  2. Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可满足大多数用户的需求。
  3. NameServer:NameServer是一个非常简单的Topic路由注册中心,支持Broker的动态注册于发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检测Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步路由信息,Producer、Consumer仍然可以动态感知Broker的路由的信息。
  4. BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
    1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
    2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
    3. Store Service:高可用服务,提供Master Broker和Slave Broker之间的数据同步功能。
    4. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

rocketmq_architecture_2.png

3.2 部署架构

rocketmq_architecture_3.png

Rocket网络部署特点:

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

结合部署架构图,描述集群工作流程:

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

4. 学习DEMO

4.1 引入依赖

maven:

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.6.0</version>
  5. </dependency>

gradle:

  1. compile 'org.apache.rocketmq:rocketmq-client:4.6.0'

4.2 基本样例

4.2.1 发送同步消息

这种可靠性同步发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

  1. /**
  2. * Producer端发送同步消息
  3. * <p>消息通知,短信通知
  4. *
  5. * @author KHighness
  6. * @date 2021-07-09
  7. */
  8. public class SyncProducer {
  9. public static void main(String[] args) throws MQClientException,
  10. UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
  11. // 实例化消息生产者Producer
  12. DefaultMQProducer producer = new DefaultMQProducer("k-highness");
  13. // 设置NameServer地址
  14. producer.setNamesrvAddr("127.0.0.1:9876");
  15. // 启动Producer实例
  16. producer.start();
  17. for (int i = 0; i < 10; i ++) {
  18. // 创建消息,并指定Topic、Tag和消息体
  19. Message msg = new Message(
  20. "TopicTest" /* Topic */,
  21. "TagA" /* Tag */,
  22. ("Hello Rocket " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message Body */
  23. );
  24. // 发送一个消息到Broker
  25. SendResult sendResult = producer.send(msg, 100000);
  26. // 通过sendResult返回消息是否成功送达
  27. System.out.printf("%s%n", sendResult);
  28. }
  29. // 停止发送消息,关闭Producer实例
  30. producer.shutdown();
  31. }
  32. }

4.2.2 发送异步消息

异步消息通常用在对相应时间敏感的业务场景,即发送端不能长时间地等待Broker的响应。

  1. /**
  2. * Producer端发送异步消息
  3. * <p>时间敏感的业务场景
  4. *
  5. * @author KHighness
  6. * @date 2021-07-09
  7. */
  8. public class AsyncProducer {
  9. public static void main(String[] args) throws MQClientException,
  10. UnsupportedEncodingException, RemotingException, InterruptedException {
  11. // 实例化消息生产者Producer
  12. DefaultMQProducer producer = new DefaultMQProducer("k-highness");
  13. // 设置NameServer的地址
  14. producer.setNamesrvAddr("127.0.0.1:9876");
  15. // 启动Producer实例
  16. producer.start();
  17. producer.setRetryTimesWhenSendFailed(0);
  18. int messageCount = 10;
  19. // 根据消息数量实例化倒计时计算器
  20. final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
  21. for (int i = 0; i < messageCount; i++) {
  22. final int index = i;
  23. // 创建消息,并指定Topic、Tag和消息体
  24. Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  25. // SendCallback接收异步返回结果的回调
  26. producer.send(msg, new SendCallback() {
  27. @Override
  28. public void onSuccess(SendResult sendResult) {
  29. System.out.printf("%-10d Success %s %n", index, sendResult.getMsgId());
  30. }
  31. @Override
  32. public void onException(Throwable throwable) {
  33. System.out.printf("%-10d Exception %s %n", index, throwable);
  34. throwable.printStackTrace();
  35. }
  36. });
  37. // 等待5S
  38. countDownLatch.await(5, TimeUnit.SECONDS);
  39. // 如果不再发送消息。关闭Producer实例
  40. producer.shutdown();
  41. }
  42. }
  43. }

4.2.3 单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

  1. /**
  2. * Producer端发送单向消息
  3. * <p>不关心发送结果的场景
  4. *
  5. * @author KHighness
  6. * @date 2021-07-09
  7. */
  8. public class OnewayProducer {
  9. public static void main(String[] args) throws MQClientException,
  10. UnsupportedEncodingException, RemotingException, InterruptedException {
  11. // 实例化消息生产者Producer
  12. DefaultMQProducer producer = new DefaultMQProducer("k-highness");
  13. // 设置NameServer的地址
  14. producer.setNamesrvAddr("127.0.0.1:9876");
  15. // 启动Producer实例
  16. producer.start();
  17. for (int i = 0; i < 10; i++) {
  18. // 创建消息,并指定Topic、Tag和消息体
  19. Message msg = new Message("TopicTest", "TagA", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  20. // 发送单向消息,没有任何返回结果
  21. producer.sendOneway(msg);
  22. }
  23. // 关闭Producer实例
  24. producer.shutdown();
  25. }
  26. }

4.2.4 消费消息

  1. /**
  2. * Consumer消费消息
  3. *
  4. * @author KHighness
  5. * @date 2021-07-09
  6. */
  7. public class Consumer {
  8. public static void main(String[] args) throws MQClientException {
  9. // 实例化消息消费者Consumer
  10. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("k-highness");
  11. // 设置NameServer的地址
  12. consumer.setNamesrvAddr("127.0.0.1:9876");
  13. // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
  14. consumer.subscribe("TopicTest", "*");
  15. // 注册回调实现类来处理从broker拉取的消息
  16. consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {
  17. System.out.printf("%s Receive New Message: %s %n", Thread.currentThread().getName(), msgs);
  18. // 标记该消息已经被成功消息
  19. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  20. });
  21. // 启动消费者实例
  22. consumer.start();
  23. System.out.printf("Consumer Started.%n");
  24. }
  25. }

4.3 顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会才去Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费不能保证顺序。但是如果控制发送的顺序消息只能依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

4.3.1 顺序消息生产

  1. /**
  2. * Producer发送顺序消息
  3. *
  4. * @author KHighness
  5. * @date 2021-07-10
  6. */
  7. public class Producer {
  8. public static void main(String[] args) throws MQClientException,
  9. RemotingException, InterruptedException, MQBrokerException {
  10. DefaultMQProducer producer = new DefaultMQProducer("k-highness");
  11. producer.setNamesrvAddr("127.0.0.1:9876");
  12. producer.start();
  13. String[] tags = new String[]{"TagA", "TagB", "TagC"};
  14. // 订单列表
  15. List<OrderStep> orderList = new Producer().buildOrders();
  16. Date date = new Date();
  17. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  18. String dataStr = sdf.format(date);
  19. for (int i = 0; i < 10; i++) {
  20. // 加上时间前缀
  21. String body = dataStr + " Hello RocketMQ " + orderList.get(i);
  22. Message msg = new Message("TopicTest", tags[i % tags.length],
  23. "KEY" + i, body.getBytes(StandardCharsets.UTF_8));
  24. SendResult sendResult = producer.send(msg, (list, message, arg) -> {
  25. Long id = (Long) arg;
  26. long index = id % list.size();
  27. return list.get((int) index);
  28. }, orderList.get(i).getOrderId());
  29. System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
  30. sendResult.getSendStatus(),
  31. sendResult.getMessageQueue().getQueueId(),
  32. body));
  33. }
  34. producer.shutdown();
  35. }
  36. /**
  37. * 订单步骤
  38. */
  39. private static class OrderStep {
  40. private long orderId;
  41. private String desc;
  42. public long getOrderId() {
  43. return orderId;
  44. }
  45. public void setOrderId(long orderId) {
  46. this.orderId = orderId;
  47. }
  48. public String getDesc() {
  49. return desc;
  50. }
  51. public void setDesc(String desc) {
  52. this.desc = desc;
  53. }
  54. @Override
  55. public String toString() {
  56. return new StringJoiner(", ", OrderStep.class.getSimpleName() + "[", "]")
  57. .add("orderId=" + orderId)
  58. .add("desc='" + desc + "'")
  59. .toString();
  60. }
  61. }
  62. /**
  63. * 生成模拟订单数据
  64. */
  65. private List<OrderStep> buildOrders() {
  66. List<OrderStep> orderList = new ArrayList<OrderStep>();
  67. OrderStep orderDemo = new OrderStep();
  68. orderDemo.setOrderId(15103111039L);
  69. orderDemo.setDesc("创建");
  70. orderList.add(orderDemo);
  71. orderDemo = new OrderStep();
  72. orderDemo.setOrderId(15103111065L);
  73. orderDemo.setDesc("创建");
  74. orderList.add(orderDemo);
  75. orderDemo = new OrderStep();
  76. orderDemo.setOrderId(15103111039L);
  77. orderDemo.setDesc("付款");
  78. orderList.add(orderDemo);
  79. orderDemo = new OrderStep();
  80. orderDemo.setOrderId(15103117235L);
  81. orderDemo.setDesc("创建");
  82. orderList.add(orderDemo);
  83. orderDemo = new OrderStep();
  84. orderDemo.setOrderId(15103111065L);
  85. orderDemo.setDesc("付款");
  86. orderList.add(orderDemo);
  87. orderDemo = new OrderStep();
  88. orderDemo.setOrderId(15103117235L);
  89. orderDemo.setDesc("付款");
  90. orderList.add(orderDemo);
  91. orderDemo = new OrderStep();
  92. orderDemo.setOrderId(15103111065L);
  93. orderDemo.setDesc("完成");
  94. orderList.add(orderDemo);
  95. orderDemo = new OrderStep();
  96. orderDemo.setOrderId(15103111039L);
  97. orderDemo.setDesc("推送");
  98. orderList.add(orderDemo);
  99. orderDemo = new OrderStep();
  100. orderDemo.setOrderId(15103117235L);
  101. orderDemo.setDesc("完成");
  102. orderList.add(orderDemo);
  103. orderDemo = new OrderStep();
  104. orderDemo.setOrderId(15103111039L);
  105. orderDemo.setDesc("完成");
  106. orderList.add(orderDemo);
  107. return orderList;
  108. }
  109. }

4.3.2 顺序消费消息

  1. /**
  2. * Consumer等待延时消息
  3. *
  4. * @author KHighness
  5. * @date 2021-07-12
  6. */
  7. public class ScheduledMessageConsumer {
  8. public static void main(String[] args) throws MQClientException {
  9. // 实例化消费者
  10. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
  11. consumer.setNamesrvAddr("127.0.0.1:9876");
  12. // 订阅Topics
  13. consumer.subscribe("TestTopic", "*");
  14. // 注册消息监听者
  15. consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
  16. for (MessageExt msg : msgs) {
  17. System.out.printf("Receive message [msgId=%s, msg=%s] %dms later\n", msg.getMsgId(),
  18. new String(msg.getBody()), System.currentTimeMillis() - msg.getBornTimestamp());
  19. }
  20. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  21. });
  22. // 启动消费者
  23. consumer.start();
  24. }
  25. }

4.4 延时消息

4.4.1 启动消费者等待传入订阅消息

  1. /**
  2. * Consumer等待延时消息
  3. *
  4. * @author KHighness
  5. * @date 2021-07-12
  6. */
  7. public class ScheduledMessageConsumer {
  8. public static void main(String[] args) throws MQClientException {
  9. // 实例化消费者
  10. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
  11. consumer.setNamesrvAddr("127.0.0.1:9876");
  12. // 订阅Topics
  13. consumer.subscribe("TestTopic", "*");
  14. // 注册消息监听者
  15. consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
  16. for (MessageExt msg : msgs) {
  17. System.out.printf("Receive message [msgId=%s, msg=%s] %dms later\n", msg.getMsgId(),
  18. new String(msg.getBody()), System.currentTimeMillis() - msg.getBornTimestamp());
  19. }
  20. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  21. });
  22. // 启动消费者
  23. consumer.start();
  24. }
  25. }

4.4.2 发送延时消息

  1. /**
  2. * Producer发送延时消息
  3. *
  4. * @author KHighness
  5. * @date 2021-07-12
  6. */
  7. public class ScheduledMessageProducer {
  8. public static void main(String[] args) throws MQClientException,
  9. RemotingException, InterruptedException, MQBrokerException {
  10. // 实例化生产者
  11. DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
  12. producer.setNamesrvAddr("127.0.0.1:9876");
  13. // 启动生产者
  14. producer.start();
  15. int totalMessageToSend = 10;
  16. for (int i = 0; i < totalMessageToSend; i++) {
  17. Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
  18. // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间)DelayTimeLevel
  19. message.setDelayTimeLevel(3);
  20. // 发送消息
  21. producer.send(message);
  22. }
  23. // 关闭生产者
  24. producer.shutdown();
  25. }
  26. }

4.4.3 延时消息的使用场景

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

4.4.4 延时消息是使用限制

  1. // org/apache/rocketmq/store/config/MessageStoreConfig.java
  2. private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java

4.5 批量消息