出自 图灵学院 ,我自己学了一下,然后自己做了个笔记,再结合老师的讲义,整理了一下,写了个博客

概述

SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模型来对接所有的MQ消息中间件产品。我们还是来看看SpringCloudStream如何来集成RocketMQ。

SpringCloudStream更牛逼的事情就是解耦,假如说我以后换MQ了,我把RocketMQ换成RabbitMQ了,或者Kafka了,我代码不需要任何改动,只需要换Maven依赖和properties配置文件即可.

使用SpringCloudStream可以让我们更多的经历关注我们的业务而不是各种MQ产品配置

注意点

  • 关于SpringCloudStream。这是一套几乎通用的消息中间件编程框架,例如从对接RocketMQ换到对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了。但是,由于各个MQ产品都有自己的业务模型,差距非常大,所以使用使用SpringCloudStream时要注意业务模型转换。并且在实际使用中,要非常注意各个MQ的个性化配置属性。例如RocketMQ的个性化属性都是以spring.cloud.stream.rocketmq开头,只有通过这些属性才能用上RocketMQ的延迟消息、排序消息、事务消息等个性化功能。
  • SpringCloudStream是Spring社区提供的一套统一框架,但是官方目前只封装了kafka、kafka Stream、RabbitMQ的具体依赖。而RocketMQ的依赖是交由厂商自己维护的,也就是由阿里巴巴自己来维护。这个维护力度显然是有不小差距的。所以一方面可以看到之前在使用SpringBoot时着重强调的版本问题,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0。这个差距就非常大了。另一方面,RocketMQ这帮大神不屑于写文档的问题也特别严重,SpringCloudStream中关于RocketMQ的个性化配置几乎很难找到完整的文档。
  • 总之,对于RocketMQ来说,SpringCloudStream目前来说还并不是一个非常好的集成方案。这方面跟kafka和Rabbit还没法比。所以使用时要慎重。

代码

pom依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-client</artifactId>
  5. <version>4.7.1</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.rocketmq</groupId>
  9. <artifactId>rocketmq-acl</artifactId>
  10. <version>4.7.1</version>
  11. </dependency>
  12. <!--spring-cloud-starter-stream-rocketmq里面的RocketMQ版本太老了,这里排除掉,然后单独引用新的版本-->
  13. <dependency>
  14. <groupId>com.alibaba.cloud</groupId>
  15. <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
  16. <version>2.2.3.RELEASE</version>
  17. <exclusions>
  18. <exclusion>
  19. <groupId>org.apache.rocketmq</groupId>
  20. <artifactId>rocketmq-client</artifactId>
  21. </exclusion>
  22. <exclusion>
  23. <groupId>org.apache.rocketmq</groupId>
  24. <artifactId>rocketmq-acl</artifactId>
  25. </exclusion>
  26. </exclusions>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-starter-web</artifactId>
  31. <version>2.3.3.RELEASE</version>
  32. </dependency>
  33. </dependencies>

application.properties配置文件

  1. #消息的生产者 input来自sink.class
  2. spring.cloud.stream.bindings.input.destination=TestTopic
  3. spring.cloud.stream.bindings.input.group=scGroup
  4. # 消息的发送者 output 是source.class
  5. spring.cloud.stream.bindings.output.destination=TestTopic
  6. # NameServer的地址
  7. spring.cloud.stream.rocketmq.binder.name-server=zjj101:9876;zjj102:9876;zjj103:9876

Controller

  1. package com.roy.scrocket.controller;
  2. import com.roy.scrocket.basic.ScProducer;
  3. import org.springframework.web.bind.annotation.RequestMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. import javax.annotation.Resource;
  6. @RestController
  7. @RequestMapping("/MQTest")
  8. public class MQTestController {
  9. @Resource
  10. private ScProducer producer;
  11. /**
  12. * 发送消息
  13. *
  14. * @param message
  15. * @return
  16. */
  17. @RequestMapping("/sendMessage")
  18. public String sendMessage(String message) {
  19. producer.sendMessage(message);
  20. return "消息发送完成";
  21. }
  22. }

启动类

  1. package com.roy.scrocket;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.cloud.stream.annotation.EnableBinding;
  5. import org.springframework.cloud.stream.messaging.Sink;
  6. import org.springframework.cloud.stream.messaging.Source;
  7. //EnableBinding 的意思是
  8. @EnableBinding({Source.class, Sink.class})
  9. @SpringBootApplication
  10. public class ScRocketMQApplication {
  11. public static void main(String[] args) {
  12. SpringApplication.run(ScRocketMQApplication.class, args);
  13. }
  14. }

消费者

  1. package com.roy.scrocket.basic;
  2. import org.springframework.cloud.stream.annotation.StreamListener;
  3. import org.springframework.cloud.stream.messaging.Sink;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class ScConsumer {
  7. @StreamListener(Sink.INPUT)
  8. public void onMessage(String messsage) {
  9. System.out.println("received message:" + messsage + " from binding:" +
  10. Sink.INPUT);
  11. }
  12. }

生产者

  1. package com.roy.scrocket.basic;
  2. import org.apache.rocketmq.common.message.MessageConst;
  3. import org.springframework.cloud.stream.messaging.Source;
  4. import org.springframework.messaging.Message;
  5. import org.springframework.messaging.MessageHeaders;
  6. import org.springframework.messaging.support.MessageBuilder;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.Resource;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. @Component
  12. public class ScProducer {
  13. @Resource
  14. private Source source;
  15. public void sendMessage(String msg) {
  16. Map<String, Object> headers = new HashMap<>();
  17. headers.put(MessageConst.PROPERTY_TAGS, "testTag");
  18. MessageHeaders messageHeaders = new MessageHeaders(headers);
  19. Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);
  20. //发送消息
  21. this.source.output().send(message);
  22. }
  23. }

启动测试

  1. 启动ScRocketMQApplication,然后执行
  2. get请求: http://localhost:8080/MQTest/sendMessage?message=demoData