引入依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.0.3</version>
  5. </dependency>

修改配置

  1. rocketmq:
  2. name-server: 192.168.128.102:9876
  3. producer:
  4. group: demo_producer

生产者

  1. @Autowired
  2. private RocketMQTemplate rocketMQTemplate;
  3. @GetMapping("/producer")
  4. public String send(){
  5. User user = new User("beyond", 12);
  6. rocketMQTemplate.convertAndSend("demo-topic", user);
  7. return JSON.toJSONString(user);
  8. }

消费者

  1. @Component
  2. @RocketMQMessageListener(topic = "demo-topic", consumerGroup = "demo_consumer")
  3. public class DemoConsumers implements RocketMQListener<User> {
  4. @Override
  5. public void onMessage(User user) {
  6. System.out.println("Consumers接收消息:" + user.toString());
  7. }
  8. }

其他消息

异步发送

  1. rocketMQTemplate.asyncSend("async-topic", user, new SendCallback() {
  2. @Override
  3. public void onSuccess(SendResult sendResult) {
  4. System.out.println("send success:" + sendResult);
  5. }
  6. @Override
  7. public void onException(Throwable e) {
  8. System.out.println("send fail:" + e);
  9. }
  10. });

单向发送

rocketMQTemplate.sendOneWay(“topic9”,user);

延时消息

rocketMQTemplate.syncSend(“topic9”, MessageBuilder.withPayload(“test delay”).build(),2000,2);

批量发送

  1. List<Message> msgList = new ArrayList<>();
  2. msgList.add(new Message("batch-topic", "tag1", "msg1".getBytes()));
  3. msgList.add(new Message("batch-topic", "tag1", "msg2".getBytes()));
  4. msgList.add(new Message("batch-topic", "tag1", "msg3".getBytes()));
  5. rocketMQTemplate.syncSend("batch-topic",msgList,1000);

Tag过滤

消费者

@RocketMQMessageListener(topic = “topic9”,consumerGroup = “group1”,selectorExpression = “tag1”)

Sql过滤

@RocketMQMessageListener(topic = “topic9”,consumerGroup = “group1”, selectorExpression = “age>18” ,selectorType= SelectorType.SQL92)

改消息模式

@RocketMQMessageListener(topic = “topic9”,consumerGroup = “group1”,messageModel = MessageModel.BROADCASTING)