一、新建工程 rocketmq-springboot-producer-demo 以及 rocketmq-springboot-consumer-demo

任何第三方库和SpringBoot进行整合,都是这三步

  • 改pom (导入这个组件的依赖)
  • 写yml (写这个组件的一些配置)
  • 修改主启动类

    二、修改pom.xml,添加依赖

    完整依赖如下,截止至 2020.7.13号,rocketmq-spring-boot-starter的最新版为2.1.0 ```xml <?xml version=”1.0” encoding=”UTF-8”?> <project xmlns=”http://maven.apache.org/POM/4.0.0

    1. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    1. <artifactId>mq-demo</artifactId>
    2. <groupId>cn.spectrumrpc</groupId>
    3. <version>1.0-SNAPSHOT</version>

    4.0.0 rocketmq-springboot-demo org.springframework.boot spring-boot-starter 2.2.2.RELEASE org.apache.rocketmq rocketmq-spring-boot-starter 2.1.0

  1. <a name="eusr0"></a>
  2. ## 三、修改application.yml
  3. ```yaml
  4. rocketmq:
  5. name-server: 127.0.0.1:9876
  6. producer:
  7. group: default-group

四、编写测试用例

在SpringBoot中,都是采用xxxTemplate来进行封装。
所以采用RocketMQTemplate 来发送消息

4.1 同步发送消息

  1. @Autowired
  2. private RocketMQTemplate rocketMQTemplate;
  3. public void sendSyncMessage() {
  4. rocketMQTemplate.syncSend("syncTopic","hello,rocketmq-springboot");
  5. }

4.2 异步发送消息

  1. public void sendAsyncMessage() {
  2. rocketMQTemplate.asyncSend("asyncTopic", "hello,async", new SendCallback() {
  3. @Override
  4. public void onSuccess(SendResult sendResult) {
  5. System.out.println("onSuccess:" + sendResult);
  6. }
  7. @Override
  8. public void onException(Throwable throwable) {
  9. throwable.printStackTrace();
  10. }
  11. });
  12. }

发送结果:

  1. onSuccessSendResult [sendStatus=SEND_OK, msgId=FE80000000000000E8AD87FFFE95B346000018B4AAC2457020CC0000, offsetMsgId=AC11000100002A9F000000000000F735, messageQueue=MessageQueue [topic=asyncTopic, brokerName=broker-a, queueId=3], queueOffset=0]

4.3 单向发送消息

  1. public void sendOneWay() {
  2. rocketMQTemplate.sendOneWay("oneWayTopic", "onewayMessage");
  3. }

4.4 消费端消费消息

通过 @RocketMQMessageListener 注解,并继承 RocketMQListener,就可以监听 生产者发送的消息
其中 onMessage 即为业务处理的方法。

  1. @Component
  2. @RocketMQMessageListener(topic = "asyncTopic",consumerGroup = "springboot-mq-consumer-1")
  3. public class MQConsumer implements RocketMQListener<String> {
  4. @Override
  5. public void onMessage(String s) {
  6. System.out.println("info = " + s);
  7. }
  8. }

@RocketMQMessageListener 中有许多的属性,最常用的为 topic:指定消费的主题, consumerGroup指定当前消费者所在的组(同一个消费组)。
其中rocketmq的消费者有两种消费模式,负载均衡和轮询。在 @RocketMQMessageListener 注解中,是通过messageModel这个属性进行配置的,如下。(其余的更多属性,在后续的案例中再进行解释)

  1. // 广播
  2. @RocketMQMessageListener(topic = "asyncTopic",consumerGroup = "springboot-mq-consumer-1"
  3. ,messageModel = MessageModel.BROADCASTING)
  4. // 轮循
  5. @RocketMQMessageListener(topic = "asyncTopic",consumerGroup = "springboot-mq-consumer-1"
  6. ,messageModel = MessageModel.CLUSTERING)

4.5 发送顺序消息

生产者代码

调用带Orderly的api即可

  1. public void sendOrderly() {
  2. List<Order> orders = Order.buildOrders();
  3. for (Order order : orders) {
  4. rocketMQTemplate.syncSendOrderly("orderTopic",order, String.valueOf(order.getOrderId()));
  5. }
  6. }

如果期望异步发送消息,采用

  1. public void sendOrderly() {
  2. List<Order> orders = Order.buildOrders();
  3. for (Order order : orders) {
  4. rocketMQTemplate.asyncSendOrderly("orderTopic", order, String.valueOf(order.getOrderId()), new SendCallback() {
  5. @Override
  6. public void onSuccess(SendResult sendResult) {
  7. System.out.println("onSuccess, result = " + sendResult);
  8. }
  9. @Override
  10. public void onException(Throwable throwable) {
  11. throwable.printStackTrace();
  12. }
  13. });
  14. }
  15. }

消费者代码

消费端,需要在@RocketMQMessageListener注解上,修改一个属性,标明这个消费者是顺序消费,否则,还是会乱序消费。 需要指定 consumeMode = ConsumeMode.ORDERLY,指明顺序消费,指定这个,相当于 MessageListenerOrderly 来监听

  1. @Component
  2. @RocketMQMessageListener(topic = "orderTopic",consumerGroup = "orderly-consumer", consumeMode = ConsumeMode.ORDERLY)
  3. public class OrderlyConsumer implements RocketMQListener<Order> {
  4. @Override
  5. public void onMessage(Order order) {
  6. System.out.println("receive order: " + order + "\t currentThread" + Thread.currentThread().getName());
  7. }
  8. }

测试结果:

可以看到同一个OrderId的消息,被同一个线程处理。
image.png

4.6 发送延时消息

生产者:

使用如下的api,第四个参数就是延时的级别

  1. public SendResult syncSend(String destination, Message<?> message,
  2. long timeout, int delayLevel){
  3. }

具体的示例如下:

  1. public void sendDelayMessage() {
  2. Message<String> message = MessageBuilder.withPayload("delayMessage").build();
  3. rocketMQTemplate.syncSend("delayTopic", message, 1000, 3);
  4. }

消费者:

消费者端和之前没有任何差别。

  1. @Component
  2. @RocketMQMessageListener(topic = "delayTopic",consumerGroup = "delay-consumer")
  3. public class DelayConsumer implements RocketMQListener<MessageExt> {
  4. @Override
  5. public void onMessage(MessageExt message) {
  6. System.out.println("currentTime = " + System.currentTimeMillis() + "");
  7. System.out.println("recevice message = " + message + "");
  8. }
  9. }

4.7 批量发送消息

生产者:

  1. public void batchMessage() {
  2. List<Message> messages = new ArrayList<>();
  3. for (int i = 0; i < 10; i++) {
  4. messages.add(MessageBuilder.withPayload("batchMessage " + i).build());
  5. }
  6. rocketMQTemplate.syncSend("batchTopic", messages, 1000);
  7. }

消费者无差别

4.8 过滤消息

生产者:

  1. public void filterMessage(){
  2. for (int i = 0; i < 10; i++) {
  3. HashMap<String, Object> map = new HashMap<>();
  4. map.put("a", i);
  5. MessageHeaders messageHeaders = new MessageHeaders(map);
  6. Message<String> message = MessageBuilder.createMessage("filter message " + i, messageHeaders);
  7. rocketMQTemplate.syncSend("filterTopic", message);
  8. }
  9. }

消费者:

主要添加 @RocketMQMessageListener 的 selectorType 为 SelectorType.SQL92。
selectorExpression 为自己想要过滤的SQL语法,例如此例子中,添加的属性为a,判断a的范围。

  1. @Component
  2. @RocketMQMessageListener(topic = "filterTopic",consumerGroup = "filter-consumer"
  3. ,selectorType = SelectorType.SQL92, selectorExpression = "a between 0 and 3")
  4. public class FilterConsumer implements RocketMQListener<String> {
  5. @Override
  6. public void onMessage(String s) {
  7. System.out.println("on message, message body: " + s);
  8. }
  9. }

4.9 事务消息

五、RocketMQMessageListener 的属性解释

  1. @Target({ElementType.TYPE})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface RocketMQMessageListener {
  5. // rocketmq 的 namesrv地址
  6. String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
  7. String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
  8. String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
  9. String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
  10. String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
  11. // 当前消费者的所在组
  12. String consumerGroup();
  13. // 当前消费者所消费的主题
  14. String topic();
  15. // 消费者过滤消息的方式,默认为TAG,还有SQL92这种类型
  16. SelectorType selectorType() default SelectorType.TAG;
  17. // 消费者过滤消息的表达式,默认是*,即所有消息都拿过来,
  18. // 结合上默认的TAG的话,默认就是所有TAG都拿
  19. String selectorExpression() default "*";
  20. // 消费者的默认,默认为并发争抢消费,
  21. // 还有另一种模式是ConsumeMode.ORDERLY,即顺序消费
  22. ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
  23. // 消息的模式, 默认是负载均衡模式,即多个客户端一起完成这个消息的消费
  24. // 还有另一种模式是 MessageModel.BROADCASTING 即订阅模式,所有人看到所有消息
  25. MessageModel messageModel() default MessageModel.CLUSTERING;
  26. // 消费者的最大线程数
  27. int consumeThreadMax() default 64;
  28. // 消费者的超时时间,默认30S
  29. long consumeTimeout() default 30000L;
  30. String accessKey() default "${rocketmq.consumer.access-key:}";
  31. String secretKey() default "${rocketmq.consumer.secret-key:}";
  32. boolean enableMsgTrace() default true;
  33. String customizedTraceTopic() default "${rocketmq.consumer.customized-trace-topic:}";
  34. String nameServer() default "${rocketmq.name-server:}";
  35. String accessChannel() default "${rocketmq.access-channel:}";
  36. }