部署架构

Snipaste_2022-03-21_12-00-10.png

工作流程

  1. 启动NameServer, NameServer启动后开始监听端口, 等待Broker、Producer、Consumer连接
  2. 启动Broker时,Broker会将所有的NameServer建立并保持长连接,然后每30秒向NameServer定时发送心跳包
  3. 发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在那些Broker上,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。(可选,也可在发送消息时创建)
  4. Producer发送消息,启动时先跟NameServer集群中的其中一个建立长连接,并从NameServer中获取路由信息,即当前发送的Topic消息的Queue与Broker的地址(IP+Port)的映射关系。然后根据算法策略从队列中选择一个Queue,与队列所在的Broker建立长连接从而想Broker发送消息。在获取到路由信息后,Producer会首先将路由新缓存到本地,再每30秒从NameServer更新一次路由信息
  5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取其所订阅的Topic的路由信息,然后根据算法策略从路由信息中获取到所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样也会每30秒从NameSever更新一次路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker的存活状态

    消费模式

    1. //设置消费者模式为广播模式 BROADCASTING(默认为负载均衡 CLUSTERING)
    2. consumer.setMessageModel(MessageModel.BROADCASTING);

同步消息

即时效性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

  1. //发送消息
  2. SentResult result = producer.send(msg);
  3. //输出发送结果
  4. System.out.pringln(result);

异步消息

即时性较弱,带需要有回执的消息,例如订单中的某些信息

  1. //发送消息
  2. producer.send(msg,new SendCallback(){
  3. //表示成功后的返回结果
  4. public void onSuccess(SendResult sendResult){
  5. System.out.pringln(sendResult);
  6. }
  7. //表示失败后的返回结果
  8. public void onException(Throwable t){
  9. System.out.pringln(t);
  10. }
  11. });
  12. //由于发送消息过快 需要等待10秒
  13. TimeUnit.SECONDS.sleep(10);

单向消息

不需要有回执的消息,例如日志类消息

  1. //发送消息
  2. producer.sendOneway(msg);

延时消息

  1. //设置当前消息的延时发送的时间 3(上图中的下标为3的值)
  2. msg.setDelayTimeLevel(3);
  3. //发送消息
  4. producer.sendOneway(msg);

批量消息

  • 消息内容的总长度不超过4M
  • 消息内容的总长度
    • topic:字符串字节数
    • boyd:字节数组长度
    • 消息追加的属性:key与value的对应字符串字节数
    • 日志:固定20字节
  1. //Prodicer 生产者
  2. public class Producer{
  3. public static void main(Stirng[] args) throws Exception{
  4. //创建producer对象
  5. DefaultMQProducer producer = new DefaultMQProducer("group1");
  6. //设置命名服务器地址
  7. producer.setNamesrvAddr("192.168.95.1:9876");
  8. //创建要发送的对象
  9. List<Message> msgList = new ArrayList<Message>();
  10. msgList.add(new Message("topic1","测试1".getBytes("UTF-8")));
  11. msgList.add(new Message("topic1","测试2".getBytes("UTF-8")));
  12. msgList.add(new Message("topic1","测试3".getBytes("UTF-8")));
  13. //启动发送服务
  14. producer.start();
  15. //发送消息
  16. SentResult result = producer.send(msgList);
  17. //输出发送结果
  18. System.out.pringln(result);
  19. //关闭服务
  20. producer.shutdown();
  21. }
  22. }

消息过滤

tag过滤

  1. //创建消息时添加tag
  2. Message msg = new Message("topic1","tag1","测试".getBytes("UTF-8"));
  3. //接收消息时过滤tag *为所有tag
  4. consumer.subscribe("topic1","tag1");
  5. //接收消息时添加多个tag
  6. consumer.subscribe("topic1","tag1 || tag2");

sql过滤(属性过滤)

需要在服务器中开启支持sql语法
broker.conf中 添加enablePropertyFilter=ture
sh mqbroker -n localhost:9876 -c ../conf/brocker.conf启动

  1. //创建消息时添加tag
  2. Message msg = new Message("topic1","测试".getBytes("UTF-8"));
  3. msg.putUserProperty("vip","1");
  4. msg.putUserProperty("age","18");
  5. //接收消息属性过滤 支持sql语法
  6. consumer.subscribe("topic1",MussageSelector.bySql("vip >= 1"));

消息队列顺序

  1. //Producer
  2. for(Order order : OrderList){
  3. Message msg = new Message("topic1",orger.toString().getBytes());
  4. //发送时指定对应的消息队列选择器
  5. SentResult result = producer.send(msg,new MessageQueueSelector(){
  6. //设置当前消息发送时的队列
  7. public MessageQueue select(List<MessageQueue> list, Message message, Object o){
  8. //根据发送的消息选择消息队列
  9. //根据id选择消息队列的 并返回->id得到int值(id的hashCode取模于消息队列的总数)
  10. int mqIndex = order.getId().hashCode() % list.size();
  11. return list.get(mqIndex);
  12. }
  13. });
  14. }
  15. //Consumer
  16. //使用MessageListenerOrderly接口后,对消息队列的处理又一个消息队列多个线程服务器,转化为一个消息队列一个线程服务
  17. consumer.registerMessageListener(new MessageListenerOrderly(){
  18. public ConsumerOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext){
  19. for(MessageExt msg : list){
  20. System.out.pringln(Thread.currentThread().getName() + "消息: " + new String(msg));
  21. }
  22. return ConsumeOrderlyStatus.SUCCESS;
  23. }
  24. });

事务消息

  1. //Prodicer 生产者
  2. public class Producer{
  3. public static void main(Stirng[] args) throws Exception{
  4. //创建producer对象
  5. TransactionMQProducer producer = new TransactionMQProducer("group1");
  6. //设置命名服务器地址
  7. producer.setNamesrvAddr("192.168.95.1:9876");
  8. //添加本地事务对应的监听
  9. producer.setTransactionListener(new TransactionListener(){
  10. //正常事务过程
  11. public LocationTransactionState executeLocalTransaction(Message message, Object o){
  12. //COMMIT_MESSAGE/提交状态 ROLLBACK_MESSAGE/回滚状态 UNKNOW/中间状态
  13. return LocalTransactionState.COMMIT_MESSAGE;
  14. }
  15. //事务补偿过程
  16. public LocationTransactionState checkLocalTransaction(MessageExt messageExt){
  17. System.out.pringln("事务补偿过程");
  18. return null;
  19. }
  20. });
  21. //启动发送服务
  22. producer.start();
  23. //创建要发送的对象
  24. Message msg = new Message("topic1","测试".getBytes("UTF-8"));
  25. //发送消息
  26. SentResult result = producer.sendMessageInTransaction(msg,null);
  27. //输出发送结果
  28. System.out.pringln(result);
  29. //事务补偿过程中必须保障服务器在运行中,否则将无法进行正常的事务补偿
  30. //producer.shutdown();
  31. }
  32. }

RocketMQ集群

  • 单击
    • 一个broker提供服务(宕机服务瘫痪)
  • 集群
    • 多个broker提供服务(单机宕机后消息无法及时被消费)
    • 多个master多个slave
      • master到slave消息同步方式为同步(较异步方式性能略低,消息无延迟)
      • master到slave消息同步方式为异步(较同步方式性能略高,数据略有延迟)

消息的存储

ACK(Acknowledge character)

顺序写与零拷贝

  • 随机写
  • 顺序写
  • 零拷贝

    消息存储结构

  • MQ数据存储区域包含

    • 消息数据存储区域
      • topic
      • queueID
      • message
    • 消费逻辑队列
      • minOffset
      • maxOffset
      • consumerOffset
    • 索引
      • key索引
      • 创建时间缩影

刷盘机制

更改配置文件broker.properties

  • 同步刷盘:安全性高,效率低,速度慢(适用于对数据安全要求较高的业务)
    • flushDiskType=SYNC-FLUSH
  • 异步刷盘:安全性低,效率高,速度快(适用于对数据处理速度要求较高的业务)

    • flushDiskType=ASYNC-FLUSH

      高可用

  • nameserver

    • 无状态+全服务器注册
  • 消息服务器
    • 主从架构(2M-2S)
  • 消息生成
    • 生产者将相同的topic绑定到多个group组,保障master挂掉后,其他master扔可正常进行消息接收
  • 消息消费
    • RocketMQ自身会根据master的压力确认是否由master承担消息读取的功能,当master繁忙时候,自动切换slave承担数据读取的工作

主从数据复制

更改配置文件broker.properties

  • 同步复制
    • brokerRole=SYNC_MASTER
    • master接到消息后,先复制到slave,然后反馈给生产者写操作成功
    • 优点:数据安全,不丢数据,出现故障容易恢复
    • 缺点:影响数据吞吐量,整体性能低
  • 异步复制

    • brokerRole=SYNC_MASTER
    • master接到消息后,立即返回给生成者操作成功,当消息达到一定量后再异步复制到slave
    • 优点:数据吞吐量大,操作延迟低,性能高
    • 缺点:数据不安全,会出现数据丢失的现象,一旦master出现故障,从上次数据同步到故障时间的数据丢失

      负载均衡

  • Producer负载均衡

    • 内部实现了不同broker集群中对同一topic对应消息队列的负载均衡
  • Consumer负载均衡

    • 平均分配
    • 循环平均分配

      消息重试

  • 当消息消费后未正常返回消费成功的信息将启动消息重试机制

  • 消息重试机制
    • 顺序消息
      • 当消费者消费消息失败后,RocketMQ会自动进行消息重试(每次间隔时间为1秒)
      • 注意:应用会出现消息消费被阻塞的情况。因此,要对顺序消息进行消费情况的监控,避免阻塞的现象的发生
    • 无序消息
      • 无序消息包括普通消息,定时消息,延时消息,事务消息
      • 无序消息重试仅适用于负载均衡(集群)模型下的消息消费,不适用于广播模式下的消息消费
      • 为保障无序消息的消费,MQ设定了合理的消息重试间隔
    • 死信队列
      • 当消息消费重试达到指定次数(默认16次)后,MQ将无法被正常消费的消息称为死信消息(Dead-Letter Message)
      • 死信消息不会被直接抛弃,而是保存到一个全新的队列中,该队列称为死信队列(Dead-Letter Queue)
      • 死信队列特征
        • 归属某个组(Group ID),而不归属Topic,也不归属消费者
        • 一个死信队列中可以包含同一个组下的多个Topic中的死信消息
        • 死信队列不会进行初始化,当第一个死信出现后,此队列首次初始化
      • 死信队列中消息特征
        • 不会被再次重复消费
        • 死信队列中的消息有效期为3天,达到时限后将被清除
      • 死信处理
        • 在监控平台中,通过查找死信,获取死信的messageId,然后通过ID对死信进行精准消费

消息重复消费

  • 消息重复消费原因
    • 生产者发送了重复的消息
      • 网络丢包
      • 生产者宕机
    • 消息服务器投递了重复的消息
      • 网络闪断
    • 动态的负载过程
      • 网络闪断/丢包
      • broker重启
      • 订阅方应用重启(消费者)
      • 客户端扩容/缩容
  • 消息幂等
    • 对同一条消息,无论消费多少次,结构保持一致,称为消息幂等性
    • 解决方案:
      • 使用业务ID作为消息的key
      • 在消息消费时,客户端对key做判定,未使用过放行,使用过抛弃
    • 注意:messageID由RocketMQ产生,messageID并不具有唯一性,不能作为幂等判定条件