官方文档

Link

POM

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
  4. </dependency>

application.yml

服务提供者

  1. spring:
  2. cloud:
  3. stream:
  4. rocketmq:
  5. binder:
  6. # RocketMQ 服务器地址
  7. namesrv-addr: 192.168.1.161:9876
  8. bindings:
  9. # 这里是个 Map 类型参数,{} 为 YAML 中 Map 的行内写法
  10. output1: {destination: test-topic1, content-type: application/json}
  11. output2: {destination: test-topic2, content-type: application/json}

服务消费者

spring:
  application:
    name: rocketmq-consumer
  cloud:
    stream:
      rocketmq:
        binder:
          namesrv-addr: 192.168.1.161:9876
        bindings:
          # 顺序消费
          input1: {consumer.orderly: true}
          # 异步消费标签为tagStr的消息
          input2: {consumer.orderly: false, consumer.tags: tagStr}
      bindings:
        input1: {destination: test-topic1, content-type: text/plain, group: test-group1, consumer.maxAttempts: 1}
        # maxAttempts:最大尝试次数,concurrency:线程池个数
        input2: {destination: test-topic2, content-type: text/plain, group: test-group2, consumer.maxAttempts: 1, consumer.concurrency: 20}

消息处理

消息发送

public interface MySource {
    @Output("output1")
    MessageChannel output1();

    @Output("output2")
    MessageChannel output2();
}
@Service
public class ProviderService {

    @Autowired
    private MySource source;

    /**
     * 发送消息
     * @param msg 消息
     * @throws Exception
     */
    public void send1(String msg) throws Exception {
        source.output1().send(MessageBuilder.withPayload(msg).build());
    }

    /**
     * 发送消息并设置标签
     * @param msg 消息
     * @param tag 消息标签
     * @throws Exception
     */
    public void send1(String msg,String tag) throws Exception {
        Map<String, Object> headers = new HashMap<>();
        headers.put(MessageConst.PROPERTY_TAGS, tag);
        source.output1().send(MessageBuilder.createMessage(msg, new MessageHeaders(headers)));
    }

    public void send2(String msg) throws Exception {
        source.output2().send(MessageBuilder.withPayload(msg).build());
    }

}

消息接收

public interface MySink {

    @Input("input1")
    SubscribableChannel input1();

    @Input("input2")
    SubscribableChannel input2();
}
@Service
public class ConsumerReceive {

    @StreamListener("input1")
    public void receiveInput1(String receiveMsg) {
        System.out.println("input1 receive: " + receiveMsg);
    }

    @StreamListener("input2")
    public void receiveInput2(String receiveMsg) {
        System.out.println("input2 receive: " + receiveMsg);
    }
}
@SpringBootApplication
@EnableBinding({MySink.class})
public class RocketMQConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketMQConsumerApplication.class, args);
    }
}