RocketMQ核心概念

  1. 消息模型(Message Model)
    RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
    Springboot集成RocketMQ - 图1
    消息生产者(Producer):
    负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
    消息消费者(Consumer):
    负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
    主题(Topic):
    表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
    消息队列(Message Queue):
    正真存放消息的地方,每一个主题下面会有多个队列
    标签(Tag):
    为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
    组(Group):
    生产者组(Producer Group)
    同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
    消费者组(Consumer Group)
    同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。
  2. 部署结构RocketMQ的服务端有两部分组成:
    1. 代理服务器(Broker Server)
      消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
    2. 名字服务(Name Server)
      名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

Springboot集成RocketMQ - 图2

消息类型

  1. 普通消息
    消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
  2. 普通顺序消息
    普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列 收到的消息则可能是无顺序的。
  3. 严格顺序消
    严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
  4. 延时消息
    是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
    · level == 0,消息为非延迟消息
    · 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
    · level > maxLevel,则level== maxLevel,例如level==20,延迟2h
    定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入 特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有 相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消 息写入真实的topic。
    需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
  5. 事务消息
    RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义 到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
    Springboot集成RocketMQ - 图3

    消息传播模式

  6. 集群模式(Clustering)
    集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。

  7. 广播模式(Broadcasting)
    广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

    Springboot集成RocketMQ

  8. 导包

    1. <dependency>
    2. <groupId>org.apache.rocketmq</groupId>
    3. <artifactId>rocketmq-spring-boot-starter</artifactId>
    4. <version>2.1.1</version>
    5. </dependency>
  9. 发送消息

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    //1,发送同步消息
    @GetMapping("/send/{msg}")
    public String sendMessage(@PathVariable("msg") String msg) {
     //携带tag与key
     Message<String> message = MessageBuilder.withPayload(msg)
             .setHeader("KEYS", UUID.randomUUID().toString())
             .build();
     rocketMQTemplate.send("msgtest:order_pay", message);
     return "同步发送成功";
    }
    //2,发送异步消息
    @GetMapping("/send2/{msg}")
    public String sendMessage2(@PathVariable("msg") String msg) {
     Map<String, Object> maps = new HashMap<>();
     aps.put("KEYS", UUID.randomUUID().toString());
     rocketMQTemplate.asyncSend("msgtest:order_pay", 
     MessageBuilder.withPayload(new MessageDto(msg)).setHeader("KEYS",                                                                          UUID.randomUUID().toString()).build(),
     new SendCallback() {    
         @Override
         public void onSuccess(SendResult sendResult) {
             System.out.println("发送成功: " + sendResult);
         }
         @Override
         public void onException(Throwable e) {
             System.out.println("发送失败: " + e.getMessage());
         }
     });
     return "异步发送成功!";
    }
    //3,发送单项消息
    @GetMapping("/send3/{msg}")
    public String sendMessage3(@PathVariable("msg") String msg) {
     Map<String, Object> maps = new HashMap<>();
     maps.put("KEYS", UUID.randomUUID().toString());
     rocketMQTemplate.sendOneWay("msgtest:order_pay",MessageBuilder.withPayload(
         new MessageDto(msg)).setHeader("KEYS", UUID.randomUUID().toString()).build());
     return "发送单项消息成功!";
    }
    //4,发送顺序消息
    @GetMapping("/send5")
    public String sendMessage5() {
     Map<String, Object> maps = new HashMap<>();
     maps.put("KEYS", UUID.randomUUID().toString());
     for(int i = 0; i<10;i++) {
         String msg = "order: " + i + " : " + System.currentTimeMillis();
         rocketMQTemplate.syncSendOrderly(
                 "msgtest:order_pay", 
                 MessageBuilder.withPayload(new MessageDto(msg)).setHeader("KEYS", 
                         UUID.randomUUID().toString()).build(), 
                 //hash值匹配一个消息队列的,所有消息的hash值现在一样,会放入一个队列中
                 "message"
         );
     }
     return "发送顺序消息成功!";
    }
    //5,发送延时消息
    @GetMapping("/send4/{msg}")
    public String sendMessage4(@PathVariable("msg") String msg) {
     Map<String, Object> maps = new HashMap<>();
     maps.put("KEYS", UUID.randomUUID().toString());
     rocketMQTemplate.syncSend("msgtest:order_pay", 
             MessageBuilder.withPayload(new  MessageDto(msg)).setHeader("KEYS", 
                         UUID.randomUUID().toString()).build(), 
             3000, 
             //延迟等级
             2);
     return "发送延迟消息成功!";
    }
    
  10. 消费消息

    @Component
    @RocketMQMessageListener(consumerGroup = "order-group",topic = "msgtest",
        selectorExpression = "order_pay"
        //设置顺序(单线程)消费,默认是多线程异步消费
        //,consumeMode = ConsumeMode.ORDERLY
        //广播模式
        //,messageModel = MessageModel.CLUSTERING
    )
    public class ConsumerListener implements RocketMQListener<MessageDto> {
     @Override
     public void onMessage(MessageDto message) {
         System.out.println("消费消息");
         System.out.println(message);
     }
    }
    
  11. 处理事务消息

    //发送事务消息
    @GetMapping("/send6")
    public String sendMessage6() {
     Map<String, Object> maps = new HashMap<>();
     aps.put("KEYS", UUID.randomUUID().toString());
     //自定义该消息的事务ID
     String txId = UUID.randomUUID().toString();
      //处理事务业务逻辑需要用到的对象
     User user = new User(2000, "张三丰", 100);
     //发送半消息
     rocketMQTemplate.sendMessageInTransaction("msgtest:order_pay", 
             MessageBuilder.withPayload(new MessageDto(2000+""))
                    .setHeader("KEYS", UUID.randomUUID().toString())
                    .setHeader("txId", txId).build(), 
             user);
     return "发送半消息成功!";
    }
    
    //2,本地事务执行以及回查程序
    @Component
    @RocketMQTransactionListener
    public class LocalTransactionMessageListener implements RocketMQLocalTransactionListener {
     //逻辑当中需要的业务处理对象
     @Autowired
     private UserService us;
     //用来保存事务执行成功的日志mapper对象
     @Autowired
     private RocketTransactionLogMapper rocketTransactionLogMapper;
     //执行本地事务逻辑
     @Override
     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
         System.out.println("执行本地事务");
         User user = (User)arg;
         String txId = msg.getHeaders().get("txId", String.class);
         try {
             us.addUser(txId, user);
             System.out.println("本地事务执行成功");
             return RocketMQLocalTransactionState.COMMIT;
         }catch(Exception e) {
             e.printStackTrace();
             System.out.println("本地事务执行失败");
             return RocketMQLocalTransactionState.ROLLBACK;
         }
     }
     /*
     事务回查的时候需要调用的回查方法,通过日志里面保存的事务日志去检查事务执行情况
     根据事务日志id去查询日志
     */
     @Override
     public RocketMQLocalTransactionState checkLocalTransactionMessage msg) {
         System.out.println("broker没拿到回执消息,执行本地事务检查");
         String txId = msg.getHeaders().get("txId", String.class);
         RocketTransactionLog rs = rocketTransactionLogMapper.selectOne(
                 new QueryWrapper<RocketTransactionLog>().eq("r_txid", txId));
         if(rs == null) return RocketMQLocalTransactionState.ROLLBACK;
         return RocketMQLocalTransactionState.COMMIT;
     }
    }
    
  12. 事务消息可以自定义模板对象

    @ExtRocketMQTemplateConfiguration
    public class UserRocketMqTemplate extends RocketMQTemplate {
    }