SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模型来对接所有的MQ消息中间件产品。我们还是来看看SpringCloudStream如何来集成RocketMQ。

1、快速实战

创建Maven工程,引入依赖:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-client</artifactId>
  5. <version>4.7.1</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.rocketmq</groupId>
  9. <artifactId>rocketmq-acl</artifactId>
  10. <version>4.7.1</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>com.alibaba.cloud</groupId>
  14. <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
  15. <version>2.2.3.RELEASE</version>
  16. <exclusions>
  17. <exclusion>
  18. <groupId>org.apache.rocketmq</groupId>
  19. <artifactId>rocketmq-client</artifactId>
  20. </exclusion>
  21. <exclusion>
  22. <groupId>org.apache.rocketmq</groupId>
  23. <artifactId>rocketmq-acl</artifactId>
  24. </exclusion>
  25. </exclusions>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-web</artifactId>
  30. <version>2.3.3.RELEASE</version>
  31. </dependency>
  32. </dependencies>

应用启动类:

  1. import org.springframework.boot.SpringApplication;
  2. import org.springframework.boot.autoconfigure.SpringBootApplication;
  3. import org.springframework.cloud.stream.annotation.EnableBinding;
  4. import org.springframework.cloud.stream.messaging.Sink;
  5. import org.springframework.cloud.stream.messaging.Source;
  6. /**
  7. * @author :楼兰
  8. * @date :Created in 2020/10/22
  9. * @description:
  10. **/
  11. @EnableBinding({Source.class, Sink.class})
  12. @SpringBootApplication
  13. public class ScRocketMQApplication {
  14. public static void main(String[] args) {
  15. SpringApplication.run(ScRocketMQApplication.class,args);
  16. }
  17. }

注意这个@EnableBinding({Source.class, Sink.class})注解,这是SpringCloudStream引入的Binder配置。
然后增加配置文件application.properties

  1. #ScStream通用的配置以spring.cloud.stream开头
  2. spring.cloud.stream.bindings.input.destination=TestTopic
  3. spring.cloud.stream.bindings.input.group=scGroup
  4. spring.cloud.stream.bindings.output.destination=TestTopic
  5. #rocketMQ的个性化配置以spring.cloud.stream.rocketmq开头
  6. #spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876;192.168.232.129:9876;192.168.232.130:9876
  7. spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876

SpringCloudStream中,一个binding对应一个消息通道。这其中配置的input,是在Sink.class中定义的,对应一个消息消费者。而output,是在Source.class中定义的,对应一个消息生产者。
然后就可以增加消息消费者:

  1. package com.roy.scrocket.basic;
  2. import org.springframework.cloud.stream.annotation.StreamListener;
  3. import org.springframework.cloud.stream.messaging.Sink;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @author :楼兰
  7. * @date :Created in 2020/10/22
  8. * @description:
  9. **/
  10. @Component
  11. public class ScConsumer {
  12. @StreamListener(Sink.INPUT)
  13. public void onMessage(String messsage){
  14. System.out.println("received message:"+messsage+" from binding:"+ Sink.INPUT);
  15. }
  16. }

消息生产者:

  1. package com.roy.scrocket.basic;
  2. import org.apache.rocketmq.common.message.MessageConst;
  3. import org.springframework.cloud.stream.messaging.Source;
  4. import org.springframework.messaging.Message;
  5. import org.springframework.messaging.MessageHeaders;
  6. import org.springframework.messaging.support.MessageBuilder;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.Resource;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. /**
  12. * @author :楼兰
  13. * @date :Created in 2020/10/22
  14. * @description:
  15. **/
  16. @Component
  17. public class ScProducer {
  18. @Resource
  19. private Source source;
  20. public void sendMessage(String msg){
  21. Map<String, Object> headers = new HashMap<>();
  22. headers.put(MessageConst.PROPERTY_TAGS, "testTag");
  23. MessageHeaders messageHeaders = new MessageHeaders(headers);
  24. Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);
  25. this.source.output().send(message);
  26. }
  27. }

最后增加一个Controller类用于测试:

  1. package com.roy.scrocket.controller;
  2. import com.roy.scrocket.basic.ScProducer;
  3. import org.springframework.web.bind.annotation.RequestMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. import javax.annotation.Resource;
  6. /**
  7. * @author :楼兰
  8. * @date :Created in 2020/10/27
  9. * @description:
  10. **/
  11. @RestController
  12. @RequestMapping("/MQTest")
  13. public class MQTestController {
  14. @Resource
  15. private ScProducer producer;
  16. @RequestMapping("/sendMessage")
  17. public String sendMessage(String message){
  18. producer.sendMessage(message);
  19. return "消息发送完成";
  20. }
  21. }

启动应用后,就可以访问http://localhost:8080/MQTest/sendMessage?message=123,给RocketMQ发送一条消息到TestTopic,并在ScConsumer中消费到了。

2、总结

  • 关于SpringCloudStream。这是一套几乎通用的消息中间件编程框架,例如从对接RocketMQ换到对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了。但是,由于各个MQ产品都有自己的业务模型,差距非常大,所以使用使用SpringCloudStream时要注意业务模型转换。并且在实际使用中,要非常注意各个MQ的个性化配置属性。例如RocketMQ的个性化属性都是以spring.cloud.stream.rocketmq开头,只有通过这些属性才能用上RocketMQ的延迟消息、排序消息、事务消息等个性化功能。
  • SpringCloudStream是Spring社区提供的一套统一框架,但是官方目前只封装了kafka、kafka Stream、RabbitMQ的具体依赖。而RocketMQ的依赖是交由厂商自己维护的,也就是由阿里巴巴自己来维护。这个维护力度显然是有不小差距的。所以一方面可以看到之前在使用SpringBoot时着重强调的版本问题,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0。这个差距就非常大了。另一方面,RocketMQ这帮大神不屑于写文档的问题也特别严重,SpringCloudStream中关于RocketMQ的个性化配置几乎很难找到完整的文档。
  • 总之,对于RocketMQ来说,SpringCloudStream目前来说还并不是一个非常好的集成方案。这方面跟kafka和Rabbit还没法比。所以使用时要慎重。