原文 出自 图灵学院 四期 RocketMQ的讲义 ,我自己整理了一下,然后听课的时候自己也记了一些笔记,整理完了发到了博客上.
注意点
在使用SpringBoot的starter集成包时,要特别注意版本。因为SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用rocketmq-spring-boot-starter:2.0.4版本开发的代码,升级到目前最新的rocketmq-spring-boot-starter:2.1.1后,基本就用不了了。
SpringBoot的convertAndSend方法内部是将String类型的msg转成了org.springframework.messaging.Message对象
这个Message对象是Spring包下的,不是RocketMQ包下的了,Spring把RocketMQ的原生Message对象进行了二次封装
this.rocketMQTemplate.convertAndSend(topic, msg);
第二个就是Spring把tag和topic放到了一起,rocketMQTemplate.sendMessageInTransaction方法第一个参数是topic:tags的格式
String destination = topic + ":" + tags[i % tags.length];//这里发送事务消息时,还是会转换成RocketMQ的Message对象,再调用RocketMQ的API完成事务消息机制。SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
第三个点就是rocketmq-spring-boot-starter版本要比RocketMQ原生版本更新慢,也就是说比如说RocketMQ出了个新的特性,此时rocketmq-spring-boot-starter版本就没有这个新特性,等过段时间才会有这些特性
第四个点就是rocketmq-spring-boot-starter的注解改变的比较频繁,可能这个版本注解是这样的,下个版本注解就是另外一个样子的了.不管怎么变化,肯定不会跳过RocketMQ的那些基本功能的,比如说事务消息 过滤消息 延迟消息 消息重试等等.
代码
依赖
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-core</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-webmvc</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.1.6.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>2.1.6.RELEASE</version></dependency></dependencies>
启动类
@SpringBootApplicationpublic class RocketMQSBApplication {public static void main(String[] args) {SpringApplication.run(RocketMQSBApplication.class, args);}}
Controller
package com.roy.rocketmq.controller;import com.roy.rocketmq.basic.SpringProducer;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController@RequestMapping("/MQTest")public class MQTestController {private final String topic = "TestTopic";@Resourceprivate SpringProducer producer;/*** 发送普通消息** @param message* @return*/@RequestMapping("/sendMessage")public String sendMessage(String message) {producer.sendMessage(topic, message);return "消息发送完成";}//这个发送事务消息的例子中有很多问题,需要注意下。@RequestMapping("/sendTransactionMessage")public String sendTransactionMessage(String message) throws InterruptedException {producer.sendMessageInTransaction(topic, message);return "消息发送完成";}}
config
package com.roy.rocketmq.config;import org.apache.commons.lang3.StringUtils;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.apache.rocketmq.spring.support.RocketMQUtil;import org.springframework.messaging.Message;import org.springframework.messaging.converter.StringMessageConverter;import java.util.concurrent.ConcurrentHashMap;/*** 事务消息监听器* 关于@RocketMQTransactionListener 这个注解,有点奇怪。2.0.4版本中,是需要指定txProducerGroup指向一个消息发送者组。不同的组可以有不同的事务消息逻辑。* 但是到了2.1.1版本,只能指定rocketMQTemplateBeanMame,也就是说如果你有多个发送者组需要有不同的事务消息逻辑,那就需要定义多个RocketMQTemplate。* 而且这个版本中,虽然重现了我们在原生API中的事务消息逻辑,但是测试过程中还是发现一些奇怪的特性,用的时候要注意点。**///@RocketMQTransactionListener(txProducerGroup = "springBootGroup2")//事务消息rocketMQTemplateBeanName指向rocketMQTemplate的名字@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")public class MyTransactionImpl implements RocketMQLocalTransactionListener {private final ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();/*** 查询本地事务*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);String destination = arg.toString();localTrans.put(transId, msg);//这个msg的实现类是GenericMessage,里面实现了toString方法//在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。//而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀System.out.println("executeLocalTransaction msg = " + msg);//转成RocketMQ的Message对象//为什么要转成RocketMQ的Message对象?原因是因为Spring的Message没有tag属性org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "UTF-8", destination, msg);String tags = message.getTags();if (StringUtils.contains(tags, "TagA")) {return RocketMQLocalTransactionState.COMMIT;} else if (StringUtils.contains(tags, "TagB")) {return RocketMQLocalTransactionState.ROLLBACK;} else {return RocketMQLocalTransactionState.UNKNOWN;}}//延迟检查的时间间隔要有点奇怪。@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {//获取事务IdString transId = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID).toString();Message originalMessage = localTrans.get(transId);//这里能够获取到自定义的transaction_id属性System.out.println("checkLocalTransaction msg = " + originalMessage);//获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性// String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString();//获取tagsString tags = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS).toString();if (StringUtils.contains(tags, "TagC")) {return RocketMQLocalTransactionState.COMMIT;} else if (StringUtils.contains(tags, "TagD")) {return RocketMQLocalTransactionState.ROLLBACK;} else {return RocketMQLocalTransactionState.UNKNOWN;}}}
package com.roy.rocketmq.config;import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;import org.apache.rocketmq.spring.core.RocketMQTemplate;/*** 自定义RocketMQTemplate* 你可以自己扩展原生的RocketMQTemplate属性,对@ExtRocketMQTemplateConfiguration注解里面的属性进行定制*/@ExtRocketMQTemplateConfiguration()public class ExtRocketMQTemplate extends RocketMQTemplate {}
生产者消费者
package com.roy.rocketmq.basic;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 生产者**/@Componentpublic class SpringProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic, String msg) {// 把字符串转成Message对象,this.rocketMQTemplate.convertAndSend(topic, msg);}public void sendMessageInTransaction(String topic, String msg) throws InterruptedException {String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 10; i++) {//尝试在Header中加入一些自定义的属性。Message<String> message = MessageBuilder.withPayload(msg).setHeader(RocketMQHeaders.TRANSACTION_ID, "TransID_" + i)//发到事务监听器里后,这个自己设定的TAGS属性会丢失。但是上面那个属性不会丢失。.setHeader(RocketMQHeaders.TAGS, tags[i % tags.length])//MyProp在事务监听器里也能拿到,为什么就单单这个RocketMQHeaders.TAGS拿不到?这只能去调源码了。.setHeader("MyProp", "MyProp_" + i).build();String destination = topic + ":" + tags[i % tags.length];//这里发送事务消息时,还是会转换成RocketMQ的Message对象,再调用RocketMQ的API完成事务消息机制。// 参数3说明: 参数三会传递给MyTransactionImpl类的executeLocalTransaction方法的arg参数上.SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);System.out.printf("%s%n", sendResult);Thread.sleep(10);}}}
package com.roy.rocketmq.basic;import org.apache.rocketmq.spring.annotation.ConsumeMode;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;/*** 注意下@RocketMQMessageListener这个注解的其他属性**/@Component//声明一个消费者只需要声明一个注释@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic", consumeMode = ConsumeMode.CONCURRENTLY)public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message : " + message);}}
application.properties配置文件
application.properties
rocketmq.name-server=zjj101:9876;zjj102:9876;zjj103:9876rocketmq.producer.group=springBootGroup
启动和演示
启动 SpringBoot的RocketMQSBApplication启动类
开始测试:
普通消息: get请求 http://localhost:8080/MQTest/sendMessage?message=demoData
事务消息: get请求 http://localhost:8080/MQTest/sendTransactionMessage?message=demoData
其它类型的消息
String springTopic = "TestTopic";//发送字符消息SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);//同步发送sendResult = rocketMQTemplate.syncSend(springTopic, new User().setUserAge((byte) 18).setUserName("Kitty"));System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);//同步发送sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload(new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);/*** 异步发送*/rocketMQTemplate.asyncSend(springTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {@Overridepublic void onSuccess(SendResult var1) {System.out.printf("async onSucess SendResult=%s %n", var1);}@Overridepublic void onException(Throwable var1) {System.out.printf("async onException Throwable=%s %n", var1);}});//发送指定TAG的消息rocketMQTemplate.convertAndSend(springTopic + ":tag0", "I'm from tag0"); // tag0 will not be consumer-selectedSystem.out.printf("syncSend topic %s tag %s %n", springTopic, "tag0");rocketMQTemplate.convertAndSend(springTopic + ":tag1", "I'm from tag1");System.out.printf("syncSend topic %s tag %s %n", springTopic, "tag1");//同步发送消息并且返回一个String类型的结果。String replyString = rocketMQTemplate.sendAndReceive(springTopic, "request string", String.class);System.out.printf("send %s and receive %s %n", "request string", replyString);//同步发送消息并且返回一个Byte数组类型的结果。byte[] replyBytes = rocketMQTemplate.sendAndReceive(springTopic, MessageBuilder.withPayload("request byte[]").build(), byte[].class, 3000);System.out.printf("send %s and receive %s %n", "request byte[]", new String(replyBytes));//同步发送一个带hash参数的请求(排序消息),并返回一个User类型的结果User requestUser = new User().setUserAge((byte) 9).setUserName("requestUserName");User replyUser = rocketMQTemplate.sendAndReceive(springTopic, requestUser, User.class, "order-id");System.out.printf("send %s and receive %s %n", requestUser, replyUser);//同步发送一个带延迟级别的消息(延迟消息),并返回一个泛型结果ProductWithPayload<String> replyGenericObject = rocketMQTemplate.sendAndReceive(springTopic, "request generic",new TypeReference<ProductWithPayload<String>>() {}.getType(), 30000, 2);System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);//异步发送消息,返回String类型结果。rocketMQTemplate.sendAndReceive(springTopic, "request string", new RocketMQLocalRequestCallback<String>() {@Overridepublic void onSuccess(String message) {System.out.printf("send %s and receive %s %n", "request string", message);}@Overridepublic void onException(Throwable e) {e.printStackTrace();}});//异步发送消息,并返回一个User类型的结果。rocketMQTemplate.sendAndReceive(springTopic, new User().setUserAge((byte) 9).setUserName("requestUserName"), new RocketMQLocalRequestCallback<User>() {@Overridepublic void onSuccess(User message) {System.out.printf("send user object and receive %s %n", message.toString());}@Overridepublic void onException(Throwable e) {e.printStackTrace();}}, 5000);//发送批量消息List<Message> msgs = new ArrayList<Message>();for (int i = 0; i < 10; i++) {msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());}SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);System.out.printf("--- Batch messages send result :" + sr);
码云代码地址
https://gitee.com/zjj19941/ZJJ_RocketMQ/tree/master/TuLing4-RocketMQ-Demo/SpringBoot-rocketmq
