出自 图灵学院 ,我自己学了一下,然后自己做了个笔记,再结合老师的讲义,整理了一下,写了个博客
概述
SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模型来对接所有的MQ消息中间件产品。我们还是来看看SpringCloudStream如何来集成RocketMQ。
SpringCloudStream更牛逼的事情就是解耦,假如说我以后换MQ了,我把RocketMQ换成RabbitMQ了,或者Kafka了,我代码不需要任何改动,只需要换Maven依赖和properties配置文件即可.
使用SpringCloudStream可以让我们更多的经历关注我们的业务而不是各种MQ产品配置
注意点
- 关于SpringCloudStream。这是一套几乎通用的消息中间件编程框架,例如从对接RocketMQ换到对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了。但是,由于各个MQ产品都有自己的业务模型,差距非常大,所以使用使用SpringCloudStream时要注意业务模型转换。并且在实际使用中,要非常注意各个MQ的个性化配置属性。例如RocketMQ的个性化属性都是以spring.cloud.stream.rocketmq开头,只有通过这些属性才能用上RocketMQ的延迟消息、排序消息、事务消息等个性化功能。
 - SpringCloudStream是Spring社区提供的一套统一框架,但是官方目前只封装了kafka、kafka Stream、RabbitMQ的具体依赖。而RocketMQ的依赖是交由厂商自己维护的,也就是由阿里巴巴自己来维护。这个维护力度显然是有不小差距的。所以一方面可以看到之前在使用SpringBoot时着重强调的版本问题,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0。这个差距就非常大了。另一方面,RocketMQ这帮大神不屑于写文档的问题也特别严重,SpringCloudStream中关于RocketMQ的个性化配置几乎很难找到完整的文档。
 - 总之,对于RocketMQ来说,SpringCloudStream目前来说还并不是一个非常好的集成方案。这方面跟kafka和Rabbit还没法比。所以使用时要慎重。
 
代码
pom依赖
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.7.1</version></dependency><!--spring-cloud-starter-stream-rocketmq里面的RocketMQ版本太老了,这里排除掉,然后单独引用新的版本--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId><version>2.2.3.RELEASE</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.3.RELEASE</version></dependency></dependencies>
application.properties配置文件
#消息的生产者 input来自sink.classspring.cloud.stream.bindings.input.destination=TestTopicspring.cloud.stream.bindings.input.group=scGroup# 消息的发送者 output 是source.classspring.cloud.stream.bindings.output.destination=TestTopic# NameServer的地址spring.cloud.stream.rocketmq.binder.name-server=zjj101:9876;zjj102:9876;zjj103:9876
Controller
package com.roy.scrocket.controller;import com.roy.scrocket.basic.ScProducer;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController@RequestMapping("/MQTest")public class MQTestController {@Resourceprivate ScProducer producer;/*** 发送消息** @param message* @return*/@RequestMapping("/sendMessage")public String sendMessage(String message) {producer.sendMessage(message);return "消息发送完成";}}
启动类
package com.roy.scrocket;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.cloud.stream.messaging.Source;//EnableBinding 的意思是@EnableBinding({Source.class, Sink.class})@SpringBootApplicationpublic class ScRocketMQApplication {public static void main(String[] args) {SpringApplication.run(ScRocketMQApplication.class, args);}}
消费者
package com.roy.scrocket.basic;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.stereotype.Component;@Componentpublic class ScConsumer {@StreamListener(Sink.INPUT)public void onMessage(String messsage) {System.out.println("received message:" + messsage + " from binding:" +Sink.INPUT);}}
生产者
package com.roy.scrocket.basic;import org.apache.rocketmq.common.message.MessageConst;import org.springframework.cloud.stream.messaging.Source;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.util.HashMap;import java.util.Map;@Componentpublic class ScProducer {@Resourceprivate Source source;public void sendMessage(String msg) {Map<String, Object> headers = new HashMap<>();headers.put(MessageConst.PROPERTY_TAGS, "testTag");MessageHeaders messageHeaders = new MessageHeaders(headers);Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);//发送消息this.source.output().send(message);}}
启动测试
启动ScRocketMQApplication,然后执行get请求: http://localhost:8080/MQTest/sendMessage?message=demoData
