1、快速实战

这部分我们看下SpringBoot如何快速集成RocketMQ。
在使用SpringBoot的starter集成包时,要特别注意版本。因为SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用rocketmq-spring-boot-starter:2.0.4版本开发的代码,升级到目前最新的rocketmq-spring-boot-starter:2.1.1后,基本就用不了了。
我们创建一个maven工程,引入关键依赖:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-spring-boot-starter</artifactId>
  5. <version>2.1.1</version>
  6. <exclusions>
  7. <exclusion>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter</artifactId>
  10. </exclusion>
  11. <exclusion>
  12. <groupId>org.springframework</groupId>
  13. <artifactId>spring-core</artifactId>
  14. </exclusion>
  15. <exclusion>
  16. <groupId>org.springframework</groupId>
  17. <artifactId>spring-webmvc</artifactId>
  18. </exclusion>
  19. </exclusions>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter-web</artifactId>
  24. <version>2.1.6.RELEASE</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>io.springfox</groupId>
  28. <artifactId>springfox-swagger-ui</artifactId>
  29. <version>2.9.2</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>io.springfox</groupId>
  33. <artifactId>springfox-swagger2</artifactId>
  34. <version>2.9.2</version>
  35. </dependency>
  36. </dependencies>

rocketmq-spring-boot-starter:2.1.1引入的SpringBoot包版本是2.0.5.RELEASE,这里把SpringBoot的依赖包升级了一下。
然后我们以SpringBoot的方式,快速创建一个简单的Demo
启动类:

  1. @SpringBootApplication
  2. public class RocketMQScApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(RocketMQScApplication.class,args);
  5. }
  6. }

配置文件 application.properties

  1. #NameServer地址
  2. rocketmq.name-server=192.168.232.128:9876
  3. #默认的消息生产者组
  4. rocketmq.producer.group=springBootGroup

消息生产者

  1. package com.roy.rocket.basic;
  2. import org.apache.rocketmq.client.exception.MQClientException;
  3. import org.apache.rocketmq.client.producer.SendResult;
  4. import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
  5. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  6. import org.springframework.messaging.Message;
  7. import org.springframework.messaging.support.MessageBuilder;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.Resource;
  10. import java.io.UnsupportedEncodingException;
  11. /**
  12. * @author :dlz
  13. * @date :Created in 2020/10/22
  14. * @description:
  15. **/
  16. @Component
  17. public class SpringProducer {
  18. @Resource
  19. private RocketMQTemplate rocketMQTemplate;
  20. //发送普通消息的示例
  21. public void sendMessage(String topic,String msg){
  22. this.rocketMQTemplate.convertAndSend(topic,msg);
  23. }
  24. //发送事务消息的示例
  25. public void sendMessageInTransaction(String topic,String msg) throws InterruptedException {
  26. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  27. for (int i = 0; i < 10; i++) {
  28. Message<String> message = MessageBuilder.withPayload(msg).build();
  29. String destination =topic+":"+tags[i % tags.length];
  30. SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination);
  31. System.out.printf("%s%n", sendResult);
  32. Thread.sleep(10);
  33. }
  34. }
  35. }

消息消费者

  1. package com.roy.rocket.basic;
  2. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  3. import org.apache.rocketmq.spring.core.RocketMQListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @author :dlz
  7. * @date :Created in 2020/10/22
  8. * @description:
  9. **/
  10. @Component
  11. @RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
  12. public class SpringConsumer implements RocketMQListener<String> {
  13. @Override
  14. public void onMessage(String message) {
  15. System.out.println("Received message : "+ message);
  16. }
  17. }

SpringBoot集成RocketMQ,消费者部分的核心就在这个@RocketMQMessageListener注解上。所有消费者的核心功能也都会集成到这个注解中。所以我们还要注意下这个注解里面的属性:
例如:消息过滤可以由里面的selectorType属性和selectorExpression来定制
消息有序消费还是并发消费则由consumeMode属性定制。
消费者是集群部署还是广播部署由messageModel属性定制。
然后关于事务消息,还需要配置一个事务消息监听器:

  1. package com.roy.rocket.config;
  2. import org.apache.commons.lang3.StringUtils;
  3. import org.apache.rocketmq.client.producer.LocalTransactionState;
  4. import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
  5. import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
  6. import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
  7. import org.apache.rocketmq.spring.support.RocketMQUtil;
  8. import org.springframework.messaging.Message;
  9. import org.springframework.messaging.converter.StringMessageConverter;
  10. import java.util.concurrent.ConcurrentHashMap;
  11. /**
  12. * @author :楼兰
  13. * @date :Created in 2020/11/5
  14. * @description:
  15. **/
  16. @RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
  17. public class MyTransactionImpl implements RocketMQLocalTransactionListener {
  18. private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>();
  19. @Override
  20. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  21. Object id = msg.getHeaders().get("id");
  22. String destination = arg.toString();
  23. localTrans.put(id,destination);
  24. org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
  25. String tags = message.getTags();
  26. if(StringUtils.contains(tags,"TagA")){
  27. return RocketMQLocalTransactionState.COMMIT;
  28. }else if(StringUtils.contains(tags,"TagB")){
  29. return RocketMQLocalTransactionState.ROLLBACK;
  30. }else{
  31. return RocketMQLocalTransactionState.UNKNOWN;
  32. }
  33. }
  34. @Override
  35. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  36. //SpringBoot的消息对象中,并没有transactionId这个属性。跟原生API不一样。
  37. // String destination = localTrans.get(msg.getTransactionId());
  38. return RocketMQLocalTransactionState.COMMIT;
  39. }
  40. }

这样我们启动应用后,就能够通过访问 http://localhost:8080/MQTest/sendMessage?message=123 接口来发送一条简单消息。并在SpringConsumer中消费到。
也可以通过访问http://localhost:8080/MQTest/sendTransactionMessage?message=123 ,来发送一条事务消息。
这里可以看到,对事务消息,SpringBoot进行封装时,就缺少了transactionId,这在事务控制中是非常关键的。
如果提示没有主题,那可以先创建一个主题。

2、其他更多消息类型:

对于其他的消息类型,文档中就不一一记录了。具体可以参见源码中的junit测试案例。

3、总结:

  • SpringBoot 引入org.apache.rocketmq:rocketmq-spring-boot-starter依赖后,就可以通过内置的RocketMQTemplate来与RocketMQ交互。相关属性都以rockemq.开头。具体所有的配置信息可以参见org.apache.rocketmq.spring.autoconfigure.RocketMQProperties这个类。
  • SpringBoot依赖中的Message对象和RocketMQ-client中的Message对象是两个不同的对象,这在使用的时候要非常容易弄错。例如RocketMQ-client中的Message里的TAG属性,在SpringBoot依赖中的Message中就没有。Tag属性被移到了发送目标中,与Topic一起,以Topic:Tag的方式指定。
  • 最后强调一次,一定要注意版本。rocketmq-spring-boot-starter的更新进度一般都会略慢于RocketMQ的版本更新,并且版本不同会引发很多奇怪的问题。apache有一个官方的rocketmq-spring示例,地址:https://github.com/apache/rocketmq-spring.git 以后如果版本更新了,可以参考下这个示例代码。