原文 出自 图灵学院 四期 RocketMQ的讲义 ,我自己整理了一下,然后听课的时候自己也记了一些笔记,整理完了发到了博客上.

注意点

在使用SpringBoot的starter集成包时,要特别注意版本。因为SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用rocketmq-spring-boot-starter:2.0.4版本开发的代码,升级到目前最新的rocketmq-spring-boot-starter:2.1.1后,基本就用不了了。

SpringBoot的convertAndSend方法内部是将String类型的msg转成了org.springframework.messaging.Message对象

这个Message对象是Spring包下的,不是RocketMQ包下的了,Spring把RocketMQ的原生Message对象进行了二次封装

  1. this.rocketMQTemplate.convertAndSend(topic, msg);

第二个就是Spring把tag和topic放到了一起,rocketMQTemplate.sendMessageInTransaction方法第一个参数是topic:tags的格式

  1. String destination = topic + ":" + tags[i % tags.length];
  2. //这里发送事务消息时,还是会转换成RocketMQ的Message对象,再调用RocketMQ的API完成事务消息机制。
  3. SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);

第三个点就是rocketmq-spring-boot-starter版本要比RocketMQ原生版本更新慢,也就是说比如说RocketMQ出了个新的特性,此时rocketmq-spring-boot-starter版本就没有这个新特性,等过段时间才会有这些特性

第四个点就是rocketmq-spring-boot-starter的注解改变的比较频繁,可能这个版本注解是这样的,下个版本注解就是另外一个样子的了.不管怎么变化,肯定不会跳过RocketMQ的那些基本功能的,比如说事务消息 过滤消息 延迟消息 消息重试等等.

代码

依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-spring-boot-starter</artifactId>
  5. <version>2.1.1</version>
  6. <exclusions>
  7. <exclusion>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter</artifactId>
  10. </exclusion>
  11. <exclusion>
  12. <groupId>org.springframework</groupId>
  13. <artifactId>spring-core</artifactId>
  14. </exclusion>
  15. <exclusion>
  16. <groupId>org.springframework</groupId>
  17. <artifactId>spring-webmvc</artifactId>
  18. </exclusion>
  19. </exclusions>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter-web</artifactId>
  24. <version>2.1.6.RELEASE</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.springframework.boot</groupId>
  28. <artifactId>spring-boot-starter-test</artifactId>
  29. <version>2.1.6.RELEASE</version>
  30. </dependency>
  31. </dependencies>

启动类

  1. @SpringBootApplication
  2. public class RocketMQSBApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(RocketMQSBApplication.class, args);
  5. }
  6. }

Controller

  1. package com.roy.rocketmq.controller;
  2. import com.roy.rocketmq.basic.SpringProducer;
  3. import org.springframework.web.bind.annotation.RequestMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. import javax.annotation.Resource;
  6. @RestController
  7. @RequestMapping("/MQTest")
  8. public class MQTestController {
  9. private final String topic = "TestTopic";
  10. @Resource
  11. private SpringProducer producer;
  12. /**
  13. * 发送普通消息
  14. *
  15. * @param message
  16. * @return
  17. */
  18. @RequestMapping("/sendMessage")
  19. public String sendMessage(String message) {
  20. producer.sendMessage(topic, message);
  21. return "消息发送完成";
  22. }
  23. //这个发送事务消息的例子中有很多问题,需要注意下。
  24. @RequestMapping("/sendTransactionMessage")
  25. public String sendTransactionMessage(String message) throws InterruptedException {
  26. producer.sendMessageInTransaction(topic, message);
  27. return "消息发送完成";
  28. }
  29. }

config

  1. package com.roy.rocketmq.config;
  2. import org.apache.commons.lang3.StringUtils;
  3. import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
  4. import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
  5. import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
  6. import org.apache.rocketmq.spring.support.RocketMQHeaders;
  7. import org.apache.rocketmq.spring.support.RocketMQUtil;
  8. import org.springframework.messaging.Message;
  9. import org.springframework.messaging.converter.StringMessageConverter;
  10. import java.util.concurrent.ConcurrentHashMap;
  11. /**
  12. * 事务消息监听器
  13. * 关于@RocketMQTransactionListener 这个注解,有点奇怪。2.0.4版本中,是需要指定txProducerGroup指向一个消息发送者组。不同的组可以有不同的事务消息逻辑。
  14. * 但是到了2.1.1版本,只能指定rocketMQTemplateBeanMame,也就是说如果你有多个发送者组需要有不同的事务消息逻辑,那就需要定义多个RocketMQTemplate。
  15. * 而且这个版本中,虽然重现了我们在原生API中的事务消息逻辑,但是测试过程中还是发现一些奇怪的特性,用的时候要注意点。
  16. **/
  17. //@RocketMQTransactionListener(txProducerGroup = "springBootGroup2")
  18. //事务消息rocketMQTemplateBeanName指向rocketMQTemplate的名字
  19. @RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
  20. public class MyTransactionImpl implements RocketMQLocalTransactionListener {
  21. private final ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();
  22. /**
  23. * 查询本地事务
  24. */
  25. @Override
  26. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  27. Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
  28. String destination = arg.toString();
  29. localTrans.put(transId, msg);
  30. //这个msg的实现类是GenericMessage,里面实现了toString方法
  31. //在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
  32. //而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
  33. System.out.println("executeLocalTransaction msg = " + msg);
  34. //转成RocketMQ的Message对象
  35. //为什么要转成RocketMQ的Message对象?原因是因为Spring的Message没有tag属性
  36. org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "UTF-8", destination, msg);
  37. String tags = message.getTags();
  38. if (StringUtils.contains(tags, "TagA")) {
  39. return RocketMQLocalTransactionState.COMMIT;
  40. } else if (StringUtils.contains(tags, "TagB")) {
  41. return RocketMQLocalTransactionState.ROLLBACK;
  42. } else {
  43. return RocketMQLocalTransactionState.UNKNOWN;
  44. }
  45. }
  46. //延迟检查的时间间隔要有点奇怪。
  47. @Override
  48. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  49. //获取事务Id
  50. String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID).toString();
  51. Message originalMessage = localTrans.get(transId);
  52. //这里能够获取到自定义的transaction_id属性
  53. System.out.println("checkLocalTransaction msg = " + originalMessage);
  54. //获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
  55. // String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString();
  56. //获取tags
  57. String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS).toString();
  58. if (StringUtils.contains(tags, "TagC")) {
  59. return RocketMQLocalTransactionState.COMMIT;
  60. } else if (StringUtils.contains(tags, "TagD")) {
  61. return RocketMQLocalTransactionState.ROLLBACK;
  62. } else {
  63. return RocketMQLocalTransactionState.UNKNOWN;
  64. }
  65. }
  66. }
  1. package com.roy.rocketmq.config;
  2. import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
  3. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  4. /**
  5. * 自定义RocketMQTemplate
  6. * 你可以自己扩展原生的RocketMQTemplate属性,对@ExtRocketMQTemplateConfiguration注解里面的属性进行定制
  7. */
  8. @ExtRocketMQTemplateConfiguration()
  9. public class ExtRocketMQTemplate extends RocketMQTemplate {
  10. }

生产者消费者

  1. package com.roy.rocketmq.basic;
  2. import org.apache.rocketmq.client.producer.SendResult;
  3. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  4. import org.apache.rocketmq.spring.support.RocketMQHeaders;
  5. import org.springframework.messaging.Message;
  6. import org.springframework.messaging.support.MessageBuilder;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.Resource;
  9. /**
  10. * 生产者
  11. **/
  12. @Component
  13. public class SpringProducer {
  14. @Resource
  15. private RocketMQTemplate rocketMQTemplate;
  16. public void sendMessage(String topic, String msg) {
  17. // 把字符串转成Message对象,
  18. this.rocketMQTemplate.convertAndSend(topic, msg);
  19. }
  20. public void sendMessageInTransaction(String topic, String msg) throws InterruptedException {
  21. String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
  22. for (int i = 0; i < 10; i++) {
  23. //尝试在Header中加入一些自定义的属性。
  24. Message<String> message = MessageBuilder.withPayload(msg)
  25. .setHeader(RocketMQHeaders.TRANSACTION_ID, "TransID_" + i)
  26. //发到事务监听器里后,这个自己设定的TAGS属性会丢失。但是上面那个属性不会丢失。
  27. .setHeader(RocketMQHeaders.TAGS, tags[i % tags.length])
  28. //MyProp在事务监听器里也能拿到,为什么就单单这个RocketMQHeaders.TAGS拿不到?这只能去调源码了。
  29. .setHeader("MyProp", "MyProp_" + i)
  30. .build();
  31. String destination = topic + ":" + tags[i % tags.length];
  32. //这里发送事务消息时,还是会转换成RocketMQ的Message对象,再调用RocketMQ的API完成事务消息机制。
  33. // 参数3说明: 参数三会传递给MyTransactionImpl类的executeLocalTransaction方法的arg参数上.
  34. SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
  35. System.out.printf("%s%n", sendResult);
  36. Thread.sleep(10);
  37. }
  38. }
  39. }
  1. package com.roy.rocketmq.basic;
  2. import org.apache.rocketmq.spring.annotation.ConsumeMode;
  3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  4. import org.apache.rocketmq.spring.core.RocketMQListener;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * 注意下@RocketMQMessageListener这个注解的其他属性
  8. **/
  9. @Component
  10. //声明一个消费者只需要声明一个注释
  11. @RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic", consumeMode = ConsumeMode.CONCURRENTLY)
  12. public class SpringConsumer implements RocketMQListener<String> {
  13. @Override
  14. public void onMessage(String message) {
  15. System.out.println("Received message : " + message);
  16. }
  17. }

application.properties配置文件

application.properties

  1. rocketmq.name-server=zjj101:9876;zjj102:9876;zjj103:9876
  2. rocketmq.producer.group=springBootGroup

启动和演示

启动 SpringBoot的RocketMQSBApplication启动类

开始测试:

普通消息: get请求 http://localhost:8080/MQTest/sendMessage?message=demoData

事务消息: get请求 http://localhost:8080/MQTest/sendTransactionMessage?message=demoData

其它类型的消息

  1. String springTopic = "TestTopic";
  2. //发送字符消息
  3. SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");
  4. System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
  5. //同步发送
  6. sendResult = rocketMQTemplate.syncSend(springTopic, new User().setUserAge((byte) 18).setUserName("Kitty"));
  7. System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
  8. //同步发送
  9. sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload(
  10. new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
  11. System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
  12. /**
  13. * 异步发送
  14. */
  15. rocketMQTemplate.asyncSend(springTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
  16. @Override
  17. public void onSuccess(SendResult var1) {
  18. System.out.printf("async onSucess SendResult=%s %n", var1);
  19. }
  20. @Override
  21. public void onException(Throwable var1) {
  22. System.out.printf("async onException Throwable=%s %n", var1);
  23. }
  24. });
  25. //发送指定TAG的消息
  26. rocketMQTemplate.convertAndSend(springTopic + ":tag0", "I'm from tag0"); // tag0 will not be consumer-selected
  27. System.out.printf("syncSend topic %s tag %s %n", springTopic, "tag0");
  28. rocketMQTemplate.convertAndSend(springTopic + ":tag1", "I'm from tag1");
  29. System.out.printf("syncSend topic %s tag %s %n", springTopic, "tag1");
  30. //同步发送消息并且返回一个String类型的结果。
  31. String replyString = rocketMQTemplate.sendAndReceive(springTopic, "request string", String.class);
  32. System.out.printf("send %s and receive %s %n", "request string", replyString);
  33. //同步发送消息并且返回一个Byte数组类型的结果。
  34. byte[] replyBytes = rocketMQTemplate.sendAndReceive(springTopic, MessageBuilder.withPayload("request byte[]").build(), byte[].class, 3000);
  35. System.out.printf("send %s and receive %s %n", "request byte[]", new String(replyBytes));
  36. //同步发送一个带hash参数的请求(排序消息),并返回一个User类型的结果
  37. User requestUser = new User().setUserAge((byte) 9).setUserName("requestUserName");
  38. User replyUser = rocketMQTemplate.sendAndReceive(springTopic, requestUser, User.class, "order-id");
  39. System.out.printf("send %s and receive %s %n", requestUser, replyUser);
  40. //同步发送一个带延迟级别的消息(延迟消息),并返回一个泛型结果
  41. ProductWithPayload<String> replyGenericObject = rocketMQTemplate.sendAndReceive(springTopic, "request generic",
  42. new TypeReference<ProductWithPayload<String>>() {
  43. }.getType(), 30000, 2);
  44. System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);
  45. //异步发送消息,返回String类型结果。
  46. rocketMQTemplate.sendAndReceive(springTopic, "request string", new RocketMQLocalRequestCallback<String>() {
  47. @Override
  48. public void onSuccess(String message) {
  49. System.out.printf("send %s and receive %s %n", "request string", message);
  50. }
  51. @Override
  52. public void onException(Throwable e) {
  53. e.printStackTrace();
  54. }
  55. });
  56. //异步发送消息,并返回一个User类型的结果。
  57. rocketMQTemplate.sendAndReceive(springTopic, new User().setUserAge((byte) 9).setUserName("requestUserName"), new RocketMQLocalRequestCallback<User>() {
  58. @Override
  59. public void onSuccess(User message) {
  60. System.out.printf("send user object and receive %s %n", message.toString());
  61. }
  62. @Override
  63. public void onException(Throwable e) {
  64. e.printStackTrace();
  65. }
  66. }, 5000);
  67. //发送批量消息
  68. List<Message> msgs = new ArrayList<Message>();
  69. for (int i = 0; i < 10; i++) {
  70. msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
  71. setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
  72. }
  73. SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
  74. System.out.printf("--- Batch messages send result :" + sr);

码云代码地址

https://gitee.com/zjj19941/ZJJ_RocketMQ/tree/master/TuLing4-RocketMQ-Demo/SpringBoot-rocketmq