引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
修改配置
rocketmq:
name-server: 192.168.128.102:9876
producer:
group: demo_producer
生产者
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/producer")
public String send(){
User user = new User("beyond", 12);
rocketMQTemplate.convertAndSend("demo-topic", user);
return JSON.toJSONString(user);
}
消费者
@Component
@RocketMQMessageListener(topic = "demo-topic", consumerGroup = "demo_consumer")
public class DemoConsumers implements RocketMQListener<User> {
@Override
public void onMessage(User user) {
System.out.println("Consumers接收消息:" + user.toString());
}
}
其他消息
异步发送
rocketMQTemplate.asyncSend("async-topic", user, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("send success:" + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("send fail:" + e);
}
});
单向发送
rocketMQTemplate.sendOneWay(“topic9”,user);
延时消息
rocketMQTemplate.syncSend(“topic9”, MessageBuilder.withPayload(“test delay”).build(),2000,2);
批量发送
List<Message> msgList = new ArrayList<>();
msgList.add(new Message("batch-topic", "tag1", "msg1".getBytes()));
msgList.add(new Message("batch-topic", "tag1", "msg2".getBytes()));
msgList.add(new Message("batch-topic", "tag1", "msg3".getBytes()));
rocketMQTemplate.syncSend("batch-topic",msgList,1000);
Tag过滤
消费者
@RocketMQMessageListener(topic = “topic9”,consumerGroup = “group1”,selectorExpression = “tag1”)
Sql过滤
@RocketMQMessageListener(topic = “topic9”,consumerGroup = “group1”, selectorExpression = “age>18” ,selectorType= SelectorType.SQL92)
改消息模式
@RocketMQMessageListener(topic = “topic9”,consumerGroup = “group1”,messageModel = MessageModel.BROADCASTING)