SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模型来对接所有的MQ消息中间件产品。我们还是来看看SpringCloudStream如何来集成RocketMQ。
1、快速实战
创建Maven工程,引入依赖:
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.7.1</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId><version>2.2.3.RELEASE</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.3.RELEASE</version></dependency></dependencies>
应用启动类:
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.cloud.stream.messaging.Source;/*** @author :楼兰* @date :Created in 2020/10/22* @description:**/@EnableBinding({Source.class, Sink.class})@SpringBootApplicationpublic class ScRocketMQApplication {public static void main(String[] args) {SpringApplication.run(ScRocketMQApplication.class,args);}}
注意这个@EnableBinding({Source.class, Sink.class})注解,这是SpringCloudStream引入的Binder配置。
然后增加配置文件application.properties
#ScStream通用的配置以spring.cloud.stream开头spring.cloud.stream.bindings.input.destination=TestTopicspring.cloud.stream.bindings.input.group=scGroupspring.cloud.stream.bindings.output.destination=TestTopic#rocketMQ的个性化配置以spring.cloud.stream.rocketmq开头#spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876;192.168.232.129:9876;192.168.232.130:9876spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876
SpringCloudStream中,一个binding对应一个消息通道。这其中配置的input,是在Sink.class中定义的,对应一个消息消费者。而output,是在Source.class中定义的,对应一个消息生产者。
然后就可以增加消息消费者:
package com.roy.scrocket.basic;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.stereotype.Component;/*** @author :楼兰* @date :Created in 2020/10/22* @description:**/@Componentpublic class ScConsumer {@StreamListener(Sink.INPUT)public void onMessage(String messsage){System.out.println("received message:"+messsage+" from binding:"+ Sink.INPUT);}}
消息生产者:
package com.roy.scrocket.basic;import org.apache.rocketmq.common.message.MessageConst;import org.springframework.cloud.stream.messaging.Source;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.util.HashMap;import java.util.Map;/*** @author :楼兰* @date :Created in 2020/10/22* @description:**/@Componentpublic class ScProducer {@Resourceprivate Source source;public void sendMessage(String msg){Map<String, Object> headers = new HashMap<>();headers.put(MessageConst.PROPERTY_TAGS, "testTag");MessageHeaders messageHeaders = new MessageHeaders(headers);Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);this.source.output().send(message);}}
最后增加一个Controller类用于测试:
package com.roy.scrocket.controller;import com.roy.scrocket.basic.ScProducer;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @author :楼兰* @date :Created in 2020/10/27* @description:**/@RestController@RequestMapping("/MQTest")public class MQTestController {@Resourceprivate ScProducer producer;@RequestMapping("/sendMessage")public String sendMessage(String message){producer.sendMessage(message);return "消息发送完成";}}
启动应用后,就可以访问http://localhost:8080/MQTest/sendMessage?message=123,给RocketMQ发送一条消息到TestTopic,并在ScConsumer中消费到了。
2、总结
- 关于SpringCloudStream。这是一套几乎通用的消息中间件编程框架,例如从对接RocketMQ换到对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了。但是,由于各个MQ产品都有自己的业务模型,差距非常大,所以使用使用SpringCloudStream时要注意业务模型转换。并且在实际使用中,要非常注意各个MQ的个性化配置属性。例如RocketMQ的个性化属性都是以spring.cloud.stream.rocketmq开头,只有通过这些属性才能用上RocketMQ的延迟消息、排序消息、事务消息等个性化功能。
- SpringCloudStream是Spring社区提供的一套统一框架,但是官方目前只封装了kafka、kafka Stream、RabbitMQ的具体依赖。而RocketMQ的依赖是交由厂商自己维护的,也就是由阿里巴巴自己来维护。这个维护力度显然是有不小差距的。所以一方面可以看到之前在使用SpringBoot时着重强调的版本问题,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0。这个差距就非常大了。另一方面,RocketMQ这帮大神不屑于写文档的问题也特别严重,SpringCloudStream中关于RocketMQ的个性化配置几乎很难找到完整的文档。
- 总之,对于RocketMQ来说,SpringCloudStream目前来说还并不是一个非常好的集成方案。这方面跟kafka和Rabbit还没法比。所以使用时要慎重。
