引入依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.3</version></dependency>
修改配置
rocketmq:name-server: 192.168.128.102:9876producer:group: demo_producer
生产者
@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/producer")public String send(){User user = new User("beyond", 12);rocketMQTemplate.convertAndSend("demo-topic", user);return JSON.toJSONString(user);}
消费者
@Component@RocketMQMessageListener(topic = "demo-topic", consumerGroup = "demo_consumer")public class DemoConsumers implements RocketMQListener<User> {@Overridepublic void onMessage(User user) {System.out.println("Consumers接收消息:" + user.toString());}}
其他消息
异步发送
rocketMQTemplate.asyncSend("async-topic", user, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("send success:" + sendResult);}@Overridepublic void onException(Throwable e) {System.out.println("send fail:" + e);}});
单向发送
rocketMQTemplate.sendOneWay(“topic9”,user);
延时消息
rocketMQTemplate.syncSend(“topic9”, MessageBuilder.withPayload(“test delay”).build(),2000,2);
批量发送
List<Message> msgList = new ArrayList<>();msgList.add(new Message("batch-topic", "tag1", "msg1".getBytes()));msgList.add(new Message("batch-topic", "tag1", "msg2".getBytes()));msgList.add(new Message("batch-topic", "tag1", "msg3".getBytes()));rocketMQTemplate.syncSend("batch-topic",msgList,1000);
Tag过滤
消费者
@RocketMQMessageListener(topic = “topic9”,consumerGroup = “group1”,selectorExpression = “tag1”)
Sql过滤
@RocketMQMessageListener(topic = “topic9”,consumerGroup = “group1”, selectorExpression = “age>18” ,selectorType= SelectorType.SQL92)
改消息模式
@RocketMQMessageListener(topic = “topic9”,consumerGroup = “group1”,messageModel = MessageModel.BROADCASTING)
