1:springboot整合

注意版本问题 版本差距比较大, 如果是rocketmq-spring-boot-starter:2.0.4版本开发的代码

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-spring-boot-starter</artifactId>
  5. <version>2.1.1</version>
  6. <exclusions>
  7. <exclusion>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter</artifactId>
  10. </exclusion>
  11. <exclusion>
  12. <groupId>org.springframework</groupId>
  13. <artifactId>spring-core</artifactId>
  14. </exclusion>
  15. <exclusion>
  16. <groupId>org.springframework</groupId>
  17. <artifactId>spring-webmvc</artifactId>
  18. </exclusion>
  19. </exclusions>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter-web</artifactId>
  24. <version>2.1.6.RELEASE</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>io.springfox</groupId>
  28. <artifactId>springfox-swagger-ui</artifactId>
  29. <version>2.9.2</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>io.springfox</groupId>
  33. <artifactId>springfox-swagger2</artifactId>
  34. <version>2.9.2</version>
  35. </dependency>
  36. </dependencies>

rocketmq-spring-boot-starter:2.1.1引入的SpringBoot包版本是2.0.5.RELEASE,这里把SpringBoot的依赖包升级了一下。

配置文件:

NameServer地址
rocketmq.name-server=192.168.232.128:9876
#默认的消息生产者组
rocketmq.producer.group=springBootGroup

生产者

  1. @Component
  2. public class SpringProducer {
  3. @Resource
  4. private RocketMQTemplate rocketMQTemplate;
  5. //发送普通消息的示例
  6. public void sendMessage(String topic,String msg){
  7. this.rocketMQTemplate.convertAndSend(topic,msg);
  8. }
  9. //发送事务消息的示例
  10. public void sendMessageInTransaction(String topic,String msg) throws InterruptedException {
  11. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  12. for (int i = 0; i < 10; i++) {
  13. Message<String> message = MessageBuilder.withPayload(msg).build();
  14. String destination =topic+":"+tags[i % tags.length];
  15. SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination);
  16. System.out.printf("%s%n", sendResult);
  17. Thread.sleep(10);
  18. }
  19. }
  20. }

消费者

  1. @Component
  2. @RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
  3. public class SpringConsumer implements RocketMQListener<String> {
  4. @Override
  5. public void onMessage(String message) {
  6. System.out.println("Received message : "+ message);
  7. }
  8. }

监听类

RocketMQMessageListener

selectorType属性和selectorExpression来定制 consumeMode属性定性并发还是有序 messageModel属性定制广播,集群

事务监听

  1. @RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
  2. public class MyTransactionImpl implements RocketMQLocalTransactionListener {
  3. private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>();
  4. @Override
  5. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  6. Object id = msg.getHeaders().get("id");
  7. String destination = arg.toString();
  8. localTrans.put(id,destination);
  9. org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
  10. String tags = message.getTags();
  11. if(StringUtils.contains(tags,"TagA")){
  12. return RocketMQLocalTransactionState.COMMIT;
  13. }else if(StringUtils.contains(tags,"TagB")){
  14. return RocketMQLocalTransactionState.ROLLBACK;
  15. }else{
  16. return RocketMQLocalTransactionState.UNKNOWN;
  17. }
  18. }
  19. @Override
  20. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  21. //SpringBoot的消息对象中,并没有transactionId这个属性。跟原生API不一样。
  22. // String destination = localTrans.get(msg.getTransactionId());
  23. return RocketMQLocalTransactionState.COMMIT;
  24. }
  25. }

总结

  • SpringBoot 引入org.apache.rocketmq:rocketmq-spring-boot-starter依赖后,就可以通过内置的RocketMQTemplate来与RocketMQ交互。相关属性都以rockemq.开头。具体所有的配置信息可以参见org.apache.rocketmq.spring.autoconfigure.RocketMQProperties这个类。
  • SpringBoot依赖中的Message对象和RocketMQ-client中的Message对象是两个不同的对象,这在使用的时候要非常容易弄错。例如RocketMQ-client中的Message里的TAG属性,在SpringBoot依赖中的Message中就没有。Tag属性被移到了发送目标中,与Topic一起,以Topic:Tag的方式指定。
  • 最后强调一次,一定要注意版本。rocketmq-spring-boot-starter的更新进度一般都会略慢于RocketMQ的版本更新,并且版本不同会引发很多奇怪的问题。apache有一个官方的rocketmq-spring示例,地址:https://github.com/apache/rocketmq-spring.git 以后如果版本更新了,可以参考下这个示例代码。

2: SpringCloudStream整合

官网:


sample:
https://github.com/spring-cloud/spring-cloud-stream-samples/

spring社区提供的一个统一的消息驱动框架
目的:统一一个编程模型对接所有MQ消息中间件
目前已接入 :
image.png

引入maven

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-client</artifactId>
  5. <version>4.7.1</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.rocketmq</groupId>
  9. <artifactId>rocketmq-acl</artifactId>
  10. <version>4.7.1</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>com.alibaba.cloud</groupId>
  14. <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
  15. <version>2.2.3.RELEASE</version>
  16. <exclusions>
  17. <exclusion>
  18. <groupId>org.apache.rocketmq</groupId>
  19. <artifactId>rocketmq-client</artifactId>
  20. </exclusion>
  21. <exclusion>
  22. <groupId>org.apache.rocketmq</groupId>
  23. <artifactId>rocketmq-acl</artifactId>
  24. </exclusion>
  25. </exclusions>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-web</artifactId>
  30. <version>2.3.3.RELEASE</version>
  31. </dependency>
  32. </dependencies>

启动类:

  1. // Source类,Sink类
  2. @EnableBinding({Source.class, Sink.class})
  3. @SpringBootApplication
  4. public class ScRocketMQApplication {
  5. public static void main(String[] args) {
  6. SpringApplication.run(ScRocketMQApplication.class,args);
  7. }
  8. }

配置文件:

ScStream通用的配置以spring.cloud.stream开头
spring.cloud.stream.bindings.input.destination=TestTopic
spring.cloud.stream.bindings.input.group=scGroup
spring.cloud.stream.bindings.output.destination=TestTopic


#rocketMQ的个性化配置以spring.cloud.stream.rocketmq开头 spring.cloud.stream.rocketmq.binder.nameserver=192.168.232.128:9876;192.168.232.129:9876

一个binding 对应一个消息通道,对应input 对应Sink.class,对应的消费者
output 在Source.class中定义,对应的生产者

生产者:

  1. @Component
  2. public class ScProducer {
  3. @Resource
  4. private Source source;
  5. public void sendMessage(String msg){
  6. Map<String, Object> headers = new HashMap<>();
  7. headers.put(MessageConst.PROPERTY_TAGS, "testTag");
  8. MessageHeaders messageHeaders = new MessageHeaders(headers);
  9. Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);
  10. this.source.output().send(message);
  11. }
  12. }

消费者

  1. @Component
  2. public class ScConsumer {
  3. @StreamListener(Sink.INPUT)
  4. public void onMessage(String messsage){
  5. System.out.println("received message:"+messsage+" from binding:"+ Sink.INPUT);
  6. }
  7. }

总结

  1. 对于RocketMQ,慎用,kafka,rabbit还不清楚
  2. 隔离了具体消息的实现方式,可以方便切换底层的实现
  3. 目前封装的中间件不多,spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0。这个差距就非常大了,当中关于RocketMQ的文档较少

    代码

    rocketMQ-demo.7z