官方文档
Link
POM
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency>
application.yml
服务提供者
spring: cloud: stream: rocketmq: binder: # RocketMQ 服务器地址 namesrv-addr: 192.168.1.161:9876 bindings: # 这里是个 Map 类型参数,{} 为 YAML 中 Map 的行内写法 output1: {destination: test-topic1, content-type: application/json} output2: {destination: test-topic2, content-type: application/json}
服务消费者
spring:
application:
name: rocketmq-consumer
cloud:
stream:
rocketmq:
binder:
namesrv-addr: 192.168.1.161:9876
bindings:
# 顺序消费
input1: {consumer.orderly: true}
# 异步消费标签为tagStr的消息
input2: {consumer.orderly: false, consumer.tags: tagStr}
bindings:
input1: {destination: test-topic1, content-type: text/plain, group: test-group1, consumer.maxAttempts: 1}
# maxAttempts:最大尝试次数,concurrency:线程池个数
input2: {destination: test-topic2, content-type: text/plain, group: test-group2, consumer.maxAttempts: 1, consumer.concurrency: 20}
消息处理
消息发送
public interface MySource {
@Output("output1")
MessageChannel output1();
@Output("output2")
MessageChannel output2();
}
@Service
public class ProviderService {
@Autowired
private MySource source;
/**
* 发送消息
* @param msg 消息
* @throws Exception
*/
public void send1(String msg) throws Exception {
source.output1().send(MessageBuilder.withPayload(msg).build());
}
/**
* 发送消息并设置标签
* @param msg 消息
* @param tag 消息标签
* @throws Exception
*/
public void send1(String msg,String tag) throws Exception {
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_TAGS, tag);
source.output1().send(MessageBuilder.createMessage(msg, new MessageHeaders(headers)));
}
public void send2(String msg) throws Exception {
source.output2().send(MessageBuilder.withPayload(msg).build());
}
}
消息接收
public interface MySink {
@Input("input1")
SubscribableChannel input1();
@Input("input2")
SubscribableChannel input2();
}
@Service
public class ConsumerReceive {
@StreamListener("input1")
public void receiveInput1(String receiveMsg) {
System.out.println("input1 receive: " + receiveMsg);
}
@StreamListener("input2")
public void receiveInput2(String receiveMsg) {
System.out.println("input2 receive: " + receiveMsg);
}
}
@SpringBootApplication
@EnableBinding({MySink.class})
public class RocketMQConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQConsumerApplication.class, args);
}
}