RocketMQ

RocketMQ 是阿里巴巴开源的分布式消息中间件,现在是 Apache 的一个顶级项目。

1. RocketMQ 介绍

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:

  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力

    2. RocketMQ 环境搭建

    RocketMQ 学习示例是在 linux 环境下安装

    2.1. 资源下载

最新版本 4.9.2(2021.10.18)

RocketMQ 下载地址:https://rocketmq.apache.org/release_notes/release-notes-4.9.2/
解压后的目录结构如下:

  1. apache-rocketmq
  2. ├── LICENSE
  3. ├── NOTICE
  4. ├── README.md
  5. ├── benchmark
  6. ├── bin
  7. ├── conf
  8. └── lib

2.2. Linux 环境安装

2.2.1. 环境要求

  • Linux 64位操作系统
  • 64bit JDK 1.8+

    2.2.2. 安装

  1. 上传文件到Linux系统 ```

    上传

    rz
  1. 2. 解压到安装目录

解压

unzip rocketmq-all-4.4.0-bin-release.zip

移动到指定的目录(如需要)

mv rocketmq-all-4.4.0-bin-release /xx/xxx

  1. #### 2.2.3. 启动服务
  2. - 进入 rocketmq 的安装目录下的bin目录

cd ./xxx/rocketmq-all-4.4.0-bin-release/bin

  1. ##### 2.2.3.1. 修改配置文件
  2. 启动前,需要修改两个配置文件。修改服务占用的内存

vim runbroker.sh

JAVA_OPT=”${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g”

修改为JAVA_OPT=”${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m”

vim runserver.sh

JAVA_OPT=”${JAVA_OPT} -server -Xms4g -Xmx2g -Xmn -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m”

修改为 JAVA_OPT=”${JAVA_OPT} -server -Xms256m -Xmx128m -Xmn -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m”

  1. ##### 2.2.3.2. 启动 NameServer

nohup ./mqnamesrv &

  1. 只要进程不报错,就应该是启动成功了,可以查看一下日志或者查看进程来确认是否成功启动

查看日志

tail -f /root/logs/rocketmqlogs/namesrv.log

查看进程

netstat -an | grep 9876

  1. ![](https://gitee.com/moonzero/images/raw/master/code-note/20220106095659154_18638.png)
  2. ##### 2.2.3.3. 启动 Broker
  3. 启动 Broker,需要指定一下 NameServer 的地址和端口

nohup ./mqbroker -n localhost:9876 &

  1. 只要进程不报错,就应该是启动成功了,可以查看一下日志或者查看进程来确认是否成功启动

tail -f /root/logs/rocketmqlogs/broker.log

  1. ![](https://gitee.com/moonzero/images/raw/master/code-note/20220106105736976_27327.png)
  2. #### 2.2.4. 测试
  3. 官方文档提供了一个测试脚本。_注:以下命令是在RocketMQ根目录执行_<br />发送消息:

export NAMESRV_ADDR=localhost:9876 bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

  1. 发送成功后显示:SendResult [sendStatus=SEND_OK, msgId= …<br />接收消息:

export NAMESRV_ADDR=localhost:9876 bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

  1. 接收成功后显示:ConsumeMessageThread_%d Receive New Messages: [MessageExt
  2. #### 2.2.5. 关闭服务
  3. RocketMQ 根目录执行以下命令,关闭 RocketMQ

bin/mqshutdown broker bin/mqshutdown namesrv

  1. ### 2.3. windows 环境安装
  2. #### 2.3.1. 安装
  3. - 将下载的 rocketmq-all-4.9.2-bin-release.zip 到无中文无空格的目录下
  4. - 新增环境变量 `ROCKETMQ_HOME`,指定 rocketmq 解压的根目录
  5. ![](https://gitee.com/moonzero/images/raw/master/code-note/20220106140222376_32572.png)
  6. > 注意:RocketMQ 的环境变量名称必须是 `ROCKETMQ_HOME`,因为启动的脚本是用此名称。
  7. ![](https://gitee.com/moonzero/images/raw/master/code-note/20220106140415853_2542.png)
  8. - 修改 `根目录/conf/broker.conf` 配置文件,添加如下配置:

enablePropertyFilter=true

指定 NameServer 的地址,把 borker 与 NameServer 关联起来

namesrvAddr=127.0.0.1:9876

  1. ![](https://gitee.com/moonzero/images/raw/master/code-note/20220106140735203_1753.png)<br />![](https://gitee.com/moonzero/images/raw/master/code-note/20220106141213858_12615.png)
  2. #### 2.3.2. 启动服务
  3. 先进入 RocketMQ bin 目录下
  4. ##### 2.3.2.1. 修改配置文件
  5. 启动前,需要修改两个配置文件。修改服务占用的内存
  6. - 修改 runbroker.cmd

set “JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g”

修改为 set “JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx256m”

  1. - 修改 runserver.cmd

set “JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m”

修改为 set “JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx320m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m”

  1. - 修改服务的日志保存位置(可选),日志的配置文件位置是:rocketmq-4.9.2\conf\logback_xxx.xml,共3个文件
  2. ##### 2.3.2.2. 启动 NameServer
  3. - 方式一:直接进入`/根目录/bin/`,双击 mqnamesrv.cmd 启动
  4. - 方式二:命令行启动

cd /d E:\deployment-environment\RocketMQ\rocketmq-4.9.2\bin mqnamesrv.cmd

  1. ![](https://gitee.com/moonzero/images/raw/master/code-note/20220106141603810_9695.png)
  2. ##### 2.3.2.3. 启动 Broker
  3. 进入 `/根目录/bin/` 目录,通过命令行启动 broker

cd /d E:\deployment-environment\RocketMQ\rocketmq-4.9.2\bin mqbroker.cmd -c ../conf/broker.conf

  1. ![](https://gitee.com/moonzero/images/raw/master/code-note/20220106141809993_9760.png)
  2. #### 2.3.3. 关闭服务
  3. 直接关闭命令行窗口即可
  4. ### 2.4. RocketMQ 控制台安装(windows环境)
  5. RocketMQ 控制台 github 仓库地址:[https://github.com/apache/rocketmq-externals](https://github.com/apache/rocketmq-externals)<br />原下载地址:[https://github.com/apache/rocketmq-externals/releases](https://github.com/apache/rocketmq-externals/releases)(已失效)
  6. > 没有打包好的版本下载,在github仓库中,选择`release-rocketmq-console`的分支,然后克隆或者打包下载到本地即可
  7. - 修改项目的配置

修改配置文件 rocketmq-console\src\main\resources\application.properties

server.port=7777 # 项目部署端口号 rocketmq.config.namesrvAddr=192.168.12.132:9876 # nameserv的地址,注意防火墙要开启9876端口 rocketmq.config.dataPath=E:/logs/tmp/rocketmq-console/data # 项目的临时配置文件

  1. - 修改 \src\main\resources\logback.xml 中日志保存位置,默认保存在`${user.dir}`
  2. - 将工程打成jar包后,再启动项目

进入控制台项目根目录,将工程打成jar包

mvn clean package -Dmaven.test.skip=true

启动控制台

java -jar target/rocketmq-console-ng-1.0.0.jar

  1. > 注:也可以不修改原配置文件,在启动命令中,指定项目部署端口号和NameServer的地址

cd /d E:\deployment-environment\RocketMQ\rocketmq-console\target\

启动控制台

java -jar rocketmq-console-ng-1.0.0.jar —server.port=7777 —rocketmq.config.namesrvAddr=127.0.0.1:9876

  1. - 根据配置的端口号,访问控制台
  2. ![](https://gitee.com/moonzero/images/raw/master/code-note/20220106154521415_7732.png)
  3. ## 3. RocketMQ 的架构及概念
  4. ### 3.1. 技术架构
  5. ![](https://gitee.com/moonzero/images/raw/master/code-note/20220106111459581_10004.png)<br />RocketMQ架构上主要分为四部分,如上图所示:
  6. - roducer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  7. - Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  8. - NameServerNameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后ProducerConumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
  9. - BrokerServerBroker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
  10. 1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
  11. 1. Client Manager:负责管理客户端(Producer/Consumer)和维护ConsumerTopic订阅信息
  12. 1. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
  13. 1. HA Service:高可用服务,提供Master Broker Slave Broker之间的数据同步功能。
  14. 1. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
  15. ![](https://gitee.com/moonzero/images/raw/master/code-note/20220106111617801_5835.png)
  16. ### 3.2. 基本概念
  17. - 消息模型(Message Model
  18. RocketMQ主要由 ProducerBrokerConsumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 BrokerMessage Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
  19. - 消息生产者(Producer
  20. 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
  21. - 消息消费者(Consumer
  22. 负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
  23. - 主题(Topic
  24. 表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
  25. - 代理服务器(Broker Server
  26. 消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
  27. - 名字服务(Name Server
  28. 名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
  29. - 拉取式消费(Pull Consumer
  30. Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
  31. - 推动式消费(Push Consumer
  32. Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
  33. - 生产者组(Producer Group
  34. 同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
  35. - 消费者组(Consumer Group
  36. 同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的TopicRocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
  37. - 集群消费(Clustering
  38. 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
  39. - 广播消费(Broadcasting
  40. 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
  41. - 普通顺序消息(Normal Ordered Message
  42. 普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
  43. - 严格顺序消息(Strictly Ordered Message
  44. 严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
  45. - 消息(Message
  46. 消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message IDKey查询消息的功能。
  47. - 标签(Tag
  48. 为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
  49. ## 4. RocketMQ 快速开始
  50. ### 4.1. 相关依赖
  51. 示例使用 SpringBoot 项目
org.springframework.boot spring-boot-starter-parent 2.1.13.RELEASE org.apache.rocketmq rocketmq-spring-boot-starter 2.1.1 org.springframework.boot spring-boot-starter-test test
  1. ### 4.2. RocketMQ 相关的配置
  2. 修改项目application.yml配置文件,增加 RocketMQ 相关的配置
  3. - 消息发送者配置:

RocketMQ 配置

rocketmq: name-server: 127.0.0.1:9876 # RocketMQ 服务的地址 producer: group: producer-example # 生产者组名称(按实际命名)

  1. - 消息消费者配置:

RocketMQ 配置

rocketmq: name-server: 127.0.0.1:9876 # RocketMQ 服务的地址

  1. ### 4.3. 使用 RocketMQ 原生的 API 方式
  2. #### 4.3.1. 发送消息
  3. 使用 RocketMQ 发送消息步骤如下:
  4. 1. 创建消息生产者,指定生产者所属的组名
  5. 1. 指定 Nameserver 地址
  6. 1. 启动生产者
  7. 1. 创建消息对象,指定主题、标签和消息体
  8. 1. 发送消息
  9. 1. 关闭生产者

import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.junit.Test;

@Test public void basicTest() throws Exception { // 1. 创建消息生产者,并且设置生产组名 DefaultMQProducer producer = new DefaultMQProducer(“my-producer-group”);

  1. // 2. 为生产者设置 NameServer 的地址
  2. producer.setNamesrvAddr("127.0.0.1:9876");
  3. // 3. 启动生产者
  4. producer.start();
  5. /*
  6. * 4. 构建消息对象
  7. * String topic 消息的主题名
  8. * String tags 消息的标签名
  9. * byte[] body 消息的内容,字节数组
  10. */
  11. Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes());
  12. /*
  13. * 5. 发送消息
  14. * Message msg 消息对象
  15. * long timeout 超时时间
  16. */
  17. SendResult result = producer.send(message, 10000);
  18. System.out.println(result);
  19. // 6. 关闭生产者
  20. producer.shutdown();

}

  1. #### 4.3.2. 接收消息
  2. 使用 RocketMQ 接收消息步骤:
  3. 1. 创建消息消费者,指定消费者所属的组名
  4. 1. 指定 Nameserver 地址
  5. 1. 指定消费者订阅的主题和标签
  6. 1. 设置回调函数,编写处理消息的方法
  7. 1. 启动消息消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.junit.Test;

import java.util.List;

@Test public void basicTest() throws Exception { // 1. 创建消费者,并且为其指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“my-consumer-group”);

  1. // 2. 为消费者设置 NameServer 的地址
  2. consumer.setNamesrvAddr("127.0.0.1:9876");
  3. /*
  4. * 3. 指定消费者订阅的主题和标签
  5. * String topic 消息主题名称
  6. * String subExpression 消息标签名称(`*`代表所有标签)
  7. */
  8. consumer.subscribe("myTopic", "*");
  9. // 4. 设置一个回调函数,并在函数中编写接收到消息之后的处理方法
  10. consumer.registerMessageListener(new MessageListenerConcurrently() {
  11. // 处理获取到的消息
  12. @Override
  13. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  14. // 消费逻辑
  15. System.out.println("Message ===> " + list);
  16. // 返回消费成功状态
  17. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  18. }
  19. });
  20. // 5. 启动消费者
  21. consumer.start();
  22. System.out.println("启动消费者成功了");
  23. System.in.read();

}

  1. ### 4.4. Spring Boot 方式
  2. #### 4.4.1. 发送消息
  3. 使用 `RocketMQTemplate` 对象发送消息

@RestController public class ProducerController { private final static Logger logger = LoggerFactory.getLogger(ProducerController.class);

  1. @Autowired
  2. private RocketMQTemplate rocketMQTemplate;
  3. @GetMapping("sendMessage/{msg}")
  4. public String sendMessage(@PathVariable String msg) {
  5. logger.info("开始发送消息");
  6. // 发送异步消息
  7. rocketMQTemplate.asyncSend("spring-boot-test-topic", msg, new SendCallback() {
  8. @Override
  9. public void onSuccess(SendResult sendResult) {
  10. logger.info("发送消息成功,结果:{}", sendResult);
  11. }
  12. @Override
  13. public void onException(Throwable e) {
  14. logger.error("发送消息异常:{}", e);
  15. }
  16. });
  17. return "send message success!" + msg;
  18. }

}

  1. #### 4.4.2. 接收消息
  2. RocketMQ 支持两种消息模式:
  3. - 广播消费:每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理;
  4. - 集群消费:一条消息只能被一个消费者实例消费

/**

  • Spring Boot 方式接收消息。
  • 消息消费者需要 实现 RocketMQListener 接口。泛型 T 是消息的类型 */ @Service // @RocketMQMessageListener 注解用于配置消费者相关信息 @RocketMQMessageListener(

    1. consumerGroup = "consume-example", // 消费者分组
    2. topic = "spring-boot-test-topic", // 要消费的主题
    3. consumeMode = ConsumeMode.CONCURRENTLY, // 消费模式:无序和有序,默认是无序
    4. messageModel = MessageModel.CLUSTERING // 消息模式:广播和集群,默认是集群

    ) public class ConsumeService implements RocketMQListener { private final static Logger logger = LoggerFactory.getLogger(ConsumeService.class);

    @Override public void onMessage(String message) {

    1. logger.info("消息消费者接收到的信息:{}", message);

    } }

  1. ## 5. 不同类型的消息
  2. 引入 RocketMQ 依赖
org.apache.rocketmq rocketmq-spring-boot-starter 2.1.1
  1. > 注:示例依赖 rocketmq-starter 2.0.2 版本时,发送消息时如果当前topic不存在,会报No route info of this topic这个异常问题。高版本默认自动创建topic
  2. 注入 `RocketMQTemplate`

@Autowired private RocketMQTemplate rocketMQTemplate;

  1. ### 5.1. 普通消息
  2. RocketMQ 提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。
  3. #### 5.1.1. 可靠同步发送
  4. 同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。<br />此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

// 同步消息测试 @Test public void testSyncSend() { /*

  1. * public SendResult syncSend(String destination, Object payload, long timeout)
  2. * 发送同步消息,在等待消息发送结果的返回之前,会一直阻塞
  3. * String destination 消息主题和标签。格式:`topicName:tags`
  4. * Object payload 消息体
  5. * long timeout 超时时间,单位ms
  6. */
  7. SendResult result =
  8. rocketMQTemplate.syncSend("MessageType-test-topic:sync", "这是一条同步消息", 10000);
  9. System.out.println(result);

}

  1. #### 5.1.2. 可靠异步发送
  2. 异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。<br />异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

// 异步消息测试 @Test public void testAsyncSend() throws InterruptedException { /*

  1. * public void asyncSend(String destination, Object payload, SendCallback sendCallback)
  2. * 发送异步消息,发送消息之后,程序会继续往下执行,不会等待结果的返回
  3. * String destination 消息主题和标签。格式:`topicName:tags`
  4. * Object payload 消息体
  5. * SendCallback sendCallback 异步消息发送成功的回调函数
  6. */
  7. rocketMQTemplate.asyncSend("MessageType-test-topic:async", "这是一条异步消息", new SendCallback() {
  8. // 成功响应的回调
  9. @Override
  10. public void onSuccess(SendResult result) {
  11. System.out.println(result);
  12. }
  13. // 异常响应的回调
  14. @Override
  15. public void onException(Throwable throwable) {
  16. System.out.println(throwable);
  17. }
  18. });
  19. System.out.println("异步消息发送成功");
  20. // 线程睡眠,目的等待异步消息发送成功后的回调函数
  21. Thread.sleep(30000);

}

  1. #### 5.1.3. 单向发送
  2. 单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。<br />适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

// 单向消息 @Test public void testOneWay() { /*

  1. * public void sendOneWay(String destination, Object payload)
  2. * 发送消息,不等待服务器回应且没有回调函数触发
  3. * String destination 消息主题和标签。格式:`topicName:tags`
  4. * Object payload 消息体
  5. */
  6. rocketMQTemplate.sendOneWay("MessageType-test-topic:oneway", "这是一条单向消息");

}

  1. #### 5.1.4. 三种发送方式的对比
  2. |
  3. 发送方式
  4. | 发送 TPS
  5. | 发送结果反馈
  6. | 可靠性
  7. |
  8. | --- | --- | --- | --- |
  9. |
  10. 同步发送
  11. |
  12. |
  13. | 不丢失
  14. |
  15. |
  16. 异步发送
  17. |
  18. |
  19. | 不丢失
  20. |
  21. |
  22. 单向发送
  23. | 最快
  24. |
  25. | 可能丢失
  26. |
  27. ### 5.2. 顺序消息
  28. 顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。<br />![](https://gitee.com/moonzero/images/raw/master/code-note/20220107093611691_28724.png)<br />**RocketMQ 默认发送是分布同一个主题中不同的队列中**

// 顺序消息测试 @Test public void testSendOrderly() throws InterruptedException { /*

  1. * public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout)
  2. * 发送同步顺序消息,在等待消息发送结果的返回之前,会一直阻塞
  3. * String destination 消息主题和标签。格式:`topicName:tags`
  4. * Object payload 消息体
  5. * String hashKey 用于选择队列的 hashkey
  6. * long timeout 超时时间,单位ms
  7. */
  8. SendResult result =
  9. rocketMQTemplate.syncSendOrderly("MessageType-test-topic:sync", "这是一条同步消息", "hk", 10000);
  10. System.out.println(result);
  11. /*
  12. * public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback)
  13. * 发送异步顺序消息,发送消息之后,程序会继续往下执行,不会等待结果的返回
  14. * String destination 消息主题和标签。格式:`topicName:tags`
  15. * Object payload 消息体
  16. * String hashKey 用于选择队列的 hashkey。
  17. * SendCallback sendCallback 异步消息发送成功的回调函数
  18. */
  19. rocketMQTemplate.asyncSendOrderly("MessageType-test-topic:async", "这是一条异步消息", "hk", new SendCallback() {
  20. // 成功响应的回调
  21. @Override
  22. public void onSuccess(SendResult result) {
  23. System.out.println(result);
  24. }
  25. // 异常响应的回调
  26. @Override
  27. public void onException(Throwable throwable) {
  28. System.out.println(throwable);
  29. }
  30. });
  31. /*
  32. * public void sendOneWayOrderly(String destination, Object payload, String hashKey)
  33. * 发送单向顺序消息,不等待服务器回应且没有回调函数触发
  34. * String destination 消息主题和标签。格式:`topicName:tags`
  35. * Object payload 消息体
  36. * String hashKey 用于选择队列的 hashkey。
  37. */
  38. rocketMQTemplate.sendOneWayOrderly("MessageType-test-topic:oneway", "这是一条单向消息", "hk");
  39. // 线程睡眠,目的等待异步消息发送成功后的回调函数
  40. Thread.sleep(30000);

}

  1. ### 5.3. 事务消息
  2. RocketMQ 提供了事务消息,通过事务消息就能达到分布式事务的最终一致。
  3. #### 5.3.1. 事务消息交互流程
  4. ![](https://gitee.com/moonzero/images/raw/master/code-note/20220107101823197_5114.png)<br />**相关概念**:
  5. - 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了 RocketMQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
  6. - 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ 服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(`Commit` 或是 `Rollback`),该询问过程即消息回查。
  7. **事务消息发送步骤**:
  8. 1. 发送方将半事务消息发送至 RocketMQ 服务端。
  9. 1. RocketMQ 服务端将消息持久化之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息。
  10. 1. 发送方开始执行本地事务逻辑。
  11. 1. 发送方根据本地事务执行结果向服务端提交二次确认(`Commit` 或是 `Rollback`),服务端收到 `Commit` 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 `Rollback` 状态则删除半事务消息,订阅方将不会接受该消息。
  12. **事务消息回查步骤**:
  13. 1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  14. 1. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  15. 1. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
  16. #### 5.3.2. 基础使用示例

@RestController public class TxMessageController {

  1. @Autowired
  2. private RocketMQTemplate rocketMQTemplate;
  3. @GetMapping("tx_message_example")
  4. public String txMessageExample() {
  5. System.out.println("======= 程序开始 =======");
  6. Product product = new Product();
  7. product.setId("001");
  8. product.setProductName("秋天的兰花");
  9. product.setPrice(BigDecimal.TEN);
  10. product.setProductDesc("オータム オーキッド");
  11. /*
  12. * public TransactionSendResult sendMessageInTransaction(final String destination, final Message<?> message, final Object arg)
  13. * 发送半事务消息。
  14. * final String destination 消息主题和标签。格式:`topicName:tags`
  15. * final Message<?> message 消息内容
  16. * final Object arg 在执行本地事务方法中传入的参数
  17. */
  18. rocketMQTemplate.sendMessageInTransaction(
  19. "tx_topic",
  20. MessageBuilder.withPayload(product).setHeader("tx_id", "tx-id-001").build(),
  21. product
  22. );
  23. return product.getProductName();
  24. }

}

  1. 创建事务消息监听实现类。需要继承 RocketMQLocalTransactionListener 接口,实现 executeLocalTransaction checkLocalTransaction 方法。

@Service @RocketMQTransactionListener public class TxMessageServiceListener implements RocketMQLocalTransactionListener { /**

  1. * 执行本地事务
  2. *
  3. * @param msg
  4. * @param arg
  5. * @return
  6. */
  7. @Override
  8. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  9. System.out.println("executeLocalTransaction 方法调用,执行本地事务...");
  10. String txId = (String) msg.getHeaders().get("tx_id");
  11. Product product = (Product) arg;
  12. System.out.println("executeLocalTransaction 方法获取到的消息体:" + txId);
  13. System.out.println("executeLocalTransaction 方法获取到的参数:" + product);
  14. // 模拟本地一些业务逻辑(30s)
  15. try {
  16. Thread.sleep(30000);
  17. return RocketMQLocalTransactionState.COMMIT;
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. return RocketMQLocalTransactionState.ROLLBACK;
  21. }
  22. }
  23. /**
  24. * 消息回查
  25. *
  26. * @param msg
  27. * @return
  28. */
  29. @Override
  30. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  31. System.out.println("checkLocalTransaction 方法调用,进行消息回查...");
  32. String txId = (String) msg.getHeaders().get("tx_id");
  33. System.out.println("checkLocalTransaction 方法获取到的消息体:" + txId);
  34. // 模拟本地一些业务逻辑(10s),如:查询本地数据库,是否已经操作成功 orderDao.findById(txId)
  35. try {
  36. Thread.sleep(10000);
  37. // 如果确认本地事务成功,则提交
  38. return RocketMQLocalTransactionState.COMMIT;
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. return RocketMQLocalTransactionState.ROLLBACK;
  42. }
  43. }

}

  1. #### 5.3.3. @RocketMQTransactionListener 注解与 sendMessageInTransaction 在版本升级的变化
  2. 依赖:
org.apache.rocketmq rocketmq-spring-boot-starter 2.1.0
  1. - 2.0.2 升级 2.1.0 版本,发送事务消息的 `sendMessageInTransaction` 方法参数个数从4个变成3

// * 2.0.2 版本 * /**

  • Send Spring Message in Transaction *
  • @param txProducerGroup the validate txProducerGroup name, set null if using the default name
  • @param destination destination formats: topicName:tags
  • @param message message {@link org.springframework.messaging.Message}
  • @param arg ext arg
  • @return TransactionSendResult
  • @throws MessagingException */ public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException { try {
    1. TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);
    2. org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
    3. charset, destination, message);
    4. return txProducer.sendMessageInTransaction(rocketMsg, arg);
    } catch (MQClientException e) {
    1. throw RocketMQUtil.convert(e);
    } }

// * 2.1.0 版本 * /**

  • Send Spring Message in Transaction *
  • @param destination destination formats: topicName:tags
  • @param message message {@link org.springframework.messaging.Message}
  • @param arg ext arg
  • @return TransactionSendResult
  • @throws MessagingException */ public TransactionSendResult sendMessageInTransaction(final String destination, final Message<?> message, final Object arg) throws MessagingException { try {
    1. if (((TransactionMQProducer) producer).getTransactionListener() == null) {
    2. throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
    3. }
    4. org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
    5. return producer.sendMessageInTransaction(rocketMsg, arg);
    } catch (MQClientException e) {
    1. throw RocketMQUtil.convert(e);
    } }
  1. - 2.0.2 升级 2.1.0 版本,`@RocketMQTransactionListener` 注解移除了 `txProducerGroup` 属性

@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface RocketMQTransactionListener { /**

  1. * Set ExecutorService params -- corePoolSize
  2. */
  3. int corePoolSize() default 1;
  4. /**
  5. * Set ExecutorService params -- maximumPoolSize
  6. */
  7. int maximumPoolSize() default 1;
  8. /**
  9. * Set ExecutorService params -- keepAliveTime
  10. */
  11. long keepAliveTime() default 1000 * 60; //60ms
  12. /**
  13. * Set ExecutorService params -- blockingQueueSize
  14. */
  15. int blockingQueueSize() default 2000;
  16. /**
  17. * Set rocketMQTemplate bean name, the default is rocketMQTemplate.
  18. * if use ExtRocketMQTemplate, can set ExtRocketMQTemplate bean name.
  19. */
  20. String rocketMQTemplateBeanName() default "rocketMQTemplate";

}

  1. rocketmq-spring-boot-starter 低于 2.1.0 以前的项目中,可以用多个 `@RocketMQTransactionListener` 来监听不同的 `txProducerGroup` 来发送不同类型的事务消息到topic,但是现在在一个项目中,如果在一个项目中写了多个 `@RocketMQTransactionListener`,项目将不能启动,启动会报

java.lang.IllegalStateException: rocketMQTemplate already exists RocketMQLocalTransactionListener

`` 所以发送事务消息:在客户端,首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,实现确认和回查方法;然后再使用资源模板RocketMQTemplate,调用方法sendMessageInTransaction()来进行消息的发布。注意:从 RocketMQ-Spring 2.1.0 版本之后,注解@RocketMQTransactionListener不能设置txProducerGroupaksk,这些值均与对应的RocketMQTemplate` 保持一致。