原文 出自 图灵学院 四期 RocketMQ的讲义 ,我自己整理了一下,然后听课的时候自己也记了一些笔记,整理完了发到了博客上.
注意点
在使用SpringBoot的starter集成包时,要特别注意版本。因为SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用rocketmq-spring-boot-starter:2.0.4版本开发的代码,升级到目前最新的rocketmq-spring-boot-starter:2.1.1后,基本就用不了了。
SpringBoot的convertAndSend方法内部是将String类型的msg转成了org.springframework.messaging.Message对象
这个Message对象是Spring包下的,不是RocketMQ包下的了,Spring把RocketMQ的原生Message对象进行了二次封装
this.rocketMQTemplate.convertAndSend(topic, msg);
第二个就是Spring把tag和topic放到了一起,rocketMQTemplate.sendMessageInTransaction方法第一个参数是topic:tags的格式
String destination = topic + ":" + tags[i % tags.length];
//这里发送事务消息时,还是会转换成RocketMQ的Message对象,再调用RocketMQ的API完成事务消息机制。
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
第三个点就是rocketmq-spring-boot-starter版本要比RocketMQ原生版本更新慢,也就是说比如说RocketMQ出了个新的特性,此时rocketmq-spring-boot-starter版本就没有这个新特性,等过段时间才会有这些特性
第四个点就是rocketmq-spring-boot-starter的注解改变的比较频繁,可能这个版本注解是这样的,下个版本注解就是另外一个样子的了.不管怎么变化,肯定不会跳过RocketMQ的那些基本功能的,比如说事务消息 过滤消息 延迟消息 消息重试等等.
代码
依赖
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
</dependencies>
启动类
@SpringBootApplication
public class RocketMQSBApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQSBApplication.class, args);
}
}
Controller
package com.roy.rocketmq.controller;
import com.roy.rocketmq.basic.SpringProducer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/MQTest")
public class MQTestController {
private final String topic = "TestTopic";
@Resource
private SpringProducer producer;
/**
* 发送普通消息
*
* @param message
* @return
*/
@RequestMapping("/sendMessage")
public String sendMessage(String message) {
producer.sendMessage(topic, message);
return "消息发送完成";
}
//这个发送事务消息的例子中有很多问题,需要注意下。
@RequestMapping("/sendTransactionMessage")
public String sendTransactionMessage(String message) throws InterruptedException {
producer.sendMessageInTransaction(topic, message);
return "消息发送完成";
}
}
config
package com.roy.rocketmq.config;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.StringMessageConverter;
import java.util.concurrent.ConcurrentHashMap;
/**
* 事务消息监听器
* 关于@RocketMQTransactionListener 这个注解,有点奇怪。2.0.4版本中,是需要指定txProducerGroup指向一个消息发送者组。不同的组可以有不同的事务消息逻辑。
* 但是到了2.1.1版本,只能指定rocketMQTemplateBeanMame,也就是说如果你有多个发送者组需要有不同的事务消息逻辑,那就需要定义多个RocketMQTemplate。
* 而且这个版本中,虽然重现了我们在原生API中的事务消息逻辑,但是测试过程中还是发现一些奇怪的特性,用的时候要注意点。
**/
//@RocketMQTransactionListener(txProducerGroup = "springBootGroup2")
//事务消息rocketMQTemplateBeanName指向rocketMQTemplate的名字
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
private final ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();
/**
* 查询本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
String destination = arg.toString();
localTrans.put(transId, msg);
//这个msg的实现类是GenericMessage,里面实现了toString方法
//在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
//而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
System.out.println("executeLocalTransaction msg = " + msg);
//转成RocketMQ的Message对象
//为什么要转成RocketMQ的Message对象?原因是因为Spring的Message没有tag属性
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "UTF-8", destination, msg);
String tags = message.getTags();
if (StringUtils.contains(tags, "TagA")) {
return RocketMQLocalTransactionState.COMMIT;
} else if (StringUtils.contains(tags, "TagB")) {
return RocketMQLocalTransactionState.ROLLBACK;
} else {
return RocketMQLocalTransactionState.UNKNOWN;
}
}
//延迟检查的时间间隔要有点奇怪。
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
//获取事务Id
String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID).toString();
Message originalMessage = localTrans.get(transId);
//这里能够获取到自定义的transaction_id属性
System.out.println("checkLocalTransaction msg = " + originalMessage);
//获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
// String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString();
//获取tags
String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS).toString();
if (StringUtils.contains(tags, "TagC")) {
return RocketMQLocalTransactionState.COMMIT;
} else if (StringUtils.contains(tags, "TagD")) {
return RocketMQLocalTransactionState.ROLLBACK;
} else {
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
package com.roy.rocketmq.config;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
/**
* 自定义RocketMQTemplate
* 你可以自己扩展原生的RocketMQTemplate属性,对@ExtRocketMQTemplateConfiguration注解里面的属性进行定制
*/
@ExtRocketMQTemplateConfiguration()
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
生产者消费者
package com.roy.rocketmq.basic;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 生产者
**/
@Component
public class SpringProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String msg) {
// 把字符串转成Message对象,
this.rocketMQTemplate.convertAndSend(topic, msg);
}
public void sendMessageInTransaction(String topic, String msg) throws InterruptedException {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
//尝试在Header中加入一些自定义的属性。
Message<String> message = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TRANSACTION_ID, "TransID_" + i)
//发到事务监听器里后,这个自己设定的TAGS属性会丢失。但是上面那个属性不会丢失。
.setHeader(RocketMQHeaders.TAGS, tags[i % tags.length])
//MyProp在事务监听器里也能拿到,为什么就单单这个RocketMQHeaders.TAGS拿不到?这只能去调源码了。
.setHeader("MyProp", "MyProp_" + i)
.build();
String destination = topic + ":" + tags[i % tags.length];
//这里发送事务消息时,还是会转换成RocketMQ的Message对象,再调用RocketMQ的API完成事务消息机制。
// 参数3说明: 参数三会传递给MyTransactionImpl类的executeLocalTransaction方法的arg参数上.
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
}
}
}
package com.roy.rocketmq.basic;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 注意下@RocketMQMessageListener这个注解的其他属性
**/
@Component
//声明一个消费者只需要声明一个注释
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic", consumeMode = ConsumeMode.CONCURRENTLY)
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message : " + message);
}
}
application.properties配置文件
application.properties
rocketmq.name-server=zjj101:9876;zjj102:9876;zjj103:9876
rocketmq.producer.group=springBootGroup
启动和演示
启动 SpringBoot的RocketMQSBApplication启动类
开始测试:
普通消息: get请求 http://localhost:8080/MQTest/sendMessage?message=demoData
事务消息: get请求 http://localhost:8080/MQTest/sendTransactionMessage?message=demoData
其它类型的消息
String springTopic = "TestTopic";
//发送字符消息
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
//同步发送
sendResult = rocketMQTemplate.syncSend(springTopic, new User().setUserAge((byte) 18).setUserName("Kitty"));
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
//同步发送
sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload(
new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
/**
* 异步发送
*/
rocketMQTemplate.asyncSend(springTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
System.out.printf("async onSucess SendResult=%s %n", var1);
}
@Override
public void onException(Throwable var1) {
System.out.printf("async onException Throwable=%s %n", var1);
}
});
//发送指定TAG的消息
rocketMQTemplate.convertAndSend(springTopic + ":tag0", "I'm from tag0"); // tag0 will not be consumer-selected
System.out.printf("syncSend topic %s tag %s %n", springTopic, "tag0");
rocketMQTemplate.convertAndSend(springTopic + ":tag1", "I'm from tag1");
System.out.printf("syncSend topic %s tag %s %n", springTopic, "tag1");
//同步发送消息并且返回一个String类型的结果。
String replyString = rocketMQTemplate.sendAndReceive(springTopic, "request string", String.class);
System.out.printf("send %s and receive %s %n", "request string", replyString);
//同步发送消息并且返回一个Byte数组类型的结果。
byte[] replyBytes = rocketMQTemplate.sendAndReceive(springTopic, MessageBuilder.withPayload("request byte[]").build(), byte[].class, 3000);
System.out.printf("send %s and receive %s %n", "request byte[]", new String(replyBytes));
//同步发送一个带hash参数的请求(排序消息),并返回一个User类型的结果
User requestUser = new User().setUserAge((byte) 9).setUserName("requestUserName");
User replyUser = rocketMQTemplate.sendAndReceive(springTopic, requestUser, User.class, "order-id");
System.out.printf("send %s and receive %s %n", requestUser, replyUser);
//同步发送一个带延迟级别的消息(延迟消息),并返回一个泛型结果
ProductWithPayload<String> replyGenericObject = rocketMQTemplate.sendAndReceive(springTopic, "request generic",
new TypeReference<ProductWithPayload<String>>() {
}.getType(), 30000, 2);
System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);
//异步发送消息,返回String类型结果。
rocketMQTemplate.sendAndReceive(springTopic, "request string", new RocketMQLocalRequestCallback<String>() {
@Override
public void onSuccess(String message) {
System.out.printf("send %s and receive %s %n", "request string", message);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
//异步发送消息,并返回一个User类型的结果。
rocketMQTemplate.sendAndReceive(springTopic, new User().setUserAge((byte) 9).setUserName("requestUserName"), new RocketMQLocalRequestCallback<User>() {
@Override
public void onSuccess(User message) {
System.out.printf("send user object and receive %s %n", message.toString());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
}, 5000);
//发送批量消息
List<Message> msgs = new ArrayList<Message>();
for (int i = 0; i < 10; i++) {
msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
}
SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
System.out.printf("--- Batch messages send result :" + sr);
码云代码地址
https://gitee.com/zjj19941/ZJJ_RocketMQ/tree/master/TuLing4-RocketMQ-Demo/SpringBoot-rocketmq