一、新建工程 rocketmq-springboot-producer-demo 以及 rocketmq-springboot-consumer-demo
任何第三方库和SpringBoot进行整合,都是这三步
- 改pom (导入这个组件的依赖)
- 写yml (写这个组件的一些配置)
-
二、修改pom.xml,添加依赖
完整依赖如下,截止至 2020.7.13号,rocketmq-spring-boot-starter的最新版为2.1.0 ```xml <?xml version=”1.0” encoding=”UTF-8”?> <project xmlns=”http://maven.apache.org/POM/4.0.0“
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>mq-demo</artifactId>
<groupId>cn.spectrumrpc</groupId>
<version>1.0-SNAPSHOT</version>
4.0.0 rocketmq-springboot-demo org.springframework.boot spring-boot-starter 2.2.2.RELEASE org.apache.rocketmq rocketmq-spring-boot-starter 2.1.0
<a name="eusr0"></a>
## 三、修改application.yml
```yaml
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: default-group
四、编写测试用例
在SpringBoot中,都是采用xxxTemplate来进行封装。
所以采用RocketMQTemplate 来发送消息
4.1 同步发送消息
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendSyncMessage() {
rocketMQTemplate.syncSend("syncTopic","hello,rocketmq-springboot");
}
4.2 异步发送消息
public void sendAsyncMessage() {
rocketMQTemplate.asyncSend("asyncTopic", "hello,async", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("onSuccess:" + sendResult);
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
}
发送结果:
onSuccess:SendResult [sendStatus=SEND_OK, msgId=FE80000000000000E8AD87FFFE95B346000018B4AAC2457020CC0000, offsetMsgId=AC11000100002A9F000000000000F735, messageQueue=MessageQueue [topic=asyncTopic, brokerName=broker-a, queueId=3], queueOffset=0]
4.3 单向发送消息
public void sendOneWay() {
rocketMQTemplate.sendOneWay("oneWayTopic", "onewayMessage");
}
4.4 消费端消费消息
通过 @RocketMQMessageListener 注解,并继承 RocketMQListener,就可以监听 生产者发送的消息
其中 onMessage 即为业务处理的方法。
@Component
@RocketMQMessageListener(topic = "asyncTopic",consumerGroup = "springboot-mq-consumer-1")
public class MQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("info = " + s);
}
}
@RocketMQMessageListener 中有许多的属性,最常用的为 topic:指定消费的主题, consumerGroup指定当前消费者所在的组(同一个消费组)。
其中rocketmq的消费者有两种消费模式,负载均衡和轮询。在 @RocketMQMessageListener 注解中,是通过messageModel这个属性进行配置的,如下。(其余的更多属性,在后续的案例中再进行解释)
// 广播
@RocketMQMessageListener(topic = "asyncTopic",consumerGroup = "springboot-mq-consumer-1"
,messageModel = MessageModel.BROADCASTING)
// 轮循
@RocketMQMessageListener(topic = "asyncTopic",consumerGroup = "springboot-mq-consumer-1"
,messageModel = MessageModel.CLUSTERING)
4.5 发送顺序消息
生产者代码
调用带Orderly的api即可
public void sendOrderly() {
List<Order> orders = Order.buildOrders();
for (Order order : orders) {
rocketMQTemplate.syncSendOrderly("orderTopic",order, String.valueOf(order.getOrderId()));
}
}
如果期望异步发送消息,采用
public void sendOrderly() {
List<Order> orders = Order.buildOrders();
for (Order order : orders) {
rocketMQTemplate.asyncSendOrderly("orderTopic", order, String.valueOf(order.getOrderId()), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("onSuccess, result = " + sendResult);
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
}
}
消费者代码
消费端,需要在@RocketMQMessageListener注解上,修改一个属性,标明这个消费者是顺序消费,否则,还是会乱序消费。 需要指定 consumeMode = ConsumeMode.ORDERLY,指明顺序消费,指定这个,相当于 MessageListenerOrderly 来监听
@Component
@RocketMQMessageListener(topic = "orderTopic",consumerGroup = "orderly-consumer", consumeMode = ConsumeMode.ORDERLY)
public class OrderlyConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
System.out.println("receive order: " + order + "\t currentThread" + Thread.currentThread().getName());
}
}
测试结果:
4.6 发送延时消息
生产者:
使用如下的api,第四个参数就是延时的级别
public SendResult syncSend(String destination, Message<?> message,
long timeout, int delayLevel){
}
具体的示例如下:
public void sendDelayMessage() {
Message<String> message = MessageBuilder.withPayload("delayMessage").build();
rocketMQTemplate.syncSend("delayTopic", message, 1000, 3);
}
消费者:
消费者端和之前没有任何差别。
@Component
@RocketMQMessageListener(topic = "delayTopic",consumerGroup = "delay-consumer")
public class DelayConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println("currentTime = " + System.currentTimeMillis() + "");
System.out.println("recevice message = " + message + "");
}
}
4.7 批量发送消息
生产者:
public void batchMessage() {
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
messages.add(MessageBuilder.withPayload("batchMessage " + i).build());
}
rocketMQTemplate.syncSend("batchTopic", messages, 1000);
}
4.8 过滤消息
生产者:
public void filterMessage(){
for (int i = 0; i < 10; i++) {
HashMap<String, Object> map = new HashMap<>();
map.put("a", i);
MessageHeaders messageHeaders = new MessageHeaders(map);
Message<String> message = MessageBuilder.createMessage("filter message " + i, messageHeaders);
rocketMQTemplate.syncSend("filterTopic", message);
}
}
消费者:
主要添加 @RocketMQMessageListener 的 selectorType 为 SelectorType.SQL92。
selectorExpression 为自己想要过滤的SQL语法,例如此例子中,添加的属性为a,判断a的范围。
@Component
@RocketMQMessageListener(topic = "filterTopic",consumerGroup = "filter-consumer"
,selectorType = SelectorType.SQL92, selectorExpression = "a between 0 and 3")
public class FilterConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("on message, message body: " + s);
}
}
4.9 事务消息
五、RocketMQMessageListener 的属性解释
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
// rocketmq 的 namesrv地址
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
// 当前消费者的所在组
String consumerGroup();
// 当前消费者所消费的主题
String topic();
// 消费者过滤消息的方式,默认为TAG,还有SQL92这种类型
SelectorType selectorType() default SelectorType.TAG;
// 消费者过滤消息的表达式,默认是*,即所有消息都拿过来,
// 结合上默认的TAG的话,默认就是所有TAG都拿
String selectorExpression() default "*";
// 消费者的默认,默认为并发争抢消费,
// 还有另一种模式是ConsumeMode.ORDERLY,即顺序消费
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
// 消息的模式, 默认是负载均衡模式,即多个客户端一起完成这个消息的消费
// 还有另一种模式是 MessageModel.BROADCASTING 即订阅模式,所有人看到所有消息
MessageModel messageModel() default MessageModel.CLUSTERING;
// 消费者的最大线程数
int consumeThreadMax() default 64;
// 消费者的超时时间,默认30S
long consumeTimeout() default 30000L;
String accessKey() default "${rocketmq.consumer.access-key:}";
String secretKey() default "${rocketmq.consumer.secret-key:}";
boolean enableMsgTrace() default true;
String customizedTraceTopic() default "${rocketmq.consumer.customized-trace-topic:}";
String nameServer() default "${rocketmq.name-server:}";
String accessChannel() default "${rocketmq.access-channel:}";
}