image.png1) 概念

1.1) 何为MQ

  • MQ(Message Queue)消息队列,是一种用来保存消息数据的队列
    • 消息(Message): 服务器间的业务请求
    • 队列(Queue):数据结构的一种,特征为 “先进先出”

image.png

1.2) MQ的作用/优点

  • 应用解耦(异步消息发送)
  • 快速应用变更维护(异步消息发送)
    • image.png
  • 流量削峰(削峰填谷)(异步消息发送)

    • image.png

      1.3) MQ的缺点

  • 系统可用性降低

  • 系统复杂度提高
  • 异步消息机制

    • 消息顺序性
    • 消息丢失
    • 消息一致性
    • 消息重复使用

      1.4) 常用MQ产品

  • ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高

  • RabbitMQ :erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,
  • RocketMQ :java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强
  • kafka :scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多

1.5) RocketMQ

1.5.1) 基本概念

  • RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)
  • 解决所有缺点

image.png

1.5.2)名词解释

  • topic
    • 消息主题,一级消息类型,通过Topic对消息进行分类
  • 消息(Message)
    • 消息队列中信息传递的载体。
  • Tag
    • 消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类
  • Producer
    • 消息生产者,也称为消息发布者,负责生产并发送消息
  • Producer实例
    • Producer的一个对象实例,不同的Producer实例可以运行在不同进程内或者不同机器上。Producer实例线程安全,可在同一进程内多线程之间共享。
  • Consumer
    • 消息消费者,也称为消息订阅者,负责接收并消费消息。可分为两类:
      • Push Consumer:消息由消息队列RocketMQ版推送至Consumer。
      • Pull Consumer:该类Consumer主动从消息队列RocketMQ版拉取消息。目前仅TCP Java SDK支持该类Consumer
  • Consumer实例
    • Consumer的一个对象实例,不同的Consumer实例可以运行在不同进程内或者不同机器上。一个Consumer实例内配置线程池消费消息。
  • Group
    • 一类Producer或Consumer,这类Producer或Consumer通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。
  • Group ID

    • Group的标识。

      2) 环境搭建

      2.1) 方案一

      2.1.1) 上传压缩包

  • 步骤1:安装JDK(1.8)

    1. 1)解压 jdk
    2. tar -zxvf jdk-8u171-linux-x64.tar.gz
    3. 2)配置环境变量
    4. >vim /etc/profile
    5. export JAVA_HOME=/opt/jdk1.8.0_171
    6. export PATH=$PATH:${JAVA_HOME}/bin
    7. 3)重新加载配置
    8. >source /etc/profile
    9. >java -version

    如果安装完毕 jdk 后 java -version 看到的是 openjdk(需要删除)因为 操作系统默认已经安装了 opendjdk

    # 查看
    rpm -qa | grep java
    # 删除(把上一个命令看到的所有的jdk文件 用 如下命令删除)
    rpm -e --nodeps java-1.8.0-openjdk-1.8.0.232.b09-0.el7_7.x86_64
    rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.232.b09-0.el7_7.x86_64
    rpm -e --nodeps java-1.7.0-openjdk-headless-1.7.0.241-2.6.20.0.el7_7.x86_64
    rmp -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64
    rpm -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64
    

    image.png

  • 步骤2:上传压缩包(zip)

    • yum -y install lrzsz
    • rz
  • 步骤3:解压缩unzip rocketmq-all-4.5.2-bin-release.zip
  • 步骤4:修改目录名称mv rocketmq-all-4.5.2-bin-release rocketmq

    2.1.2) 启动服务器

  • 步骤1:启动命名服务器(bin目录下)

    • sh mqnamesrv
  • 步骤2:启动消息服务器(bin目录下)
    • 修改_runbroker.sh_文件中有关内存的配置(调整的与当前虚拟机内存匹配即可,推荐256m-128m)

image.png

  • sh mqbroker -n localhost:9876

    2.1.3) 测试服务器环境

  • 步骤1:配置命名服务器地址
    • export NAMESRV_ADDR=localhost:9876
  • 步骤2:启动生产者程序客户端(bin目录下)
    • sh tools.sh org.apache.rocketmq.example.quickstart.Producer
    • 启动后产生大量日志信息(注意该信息是测试程序中自带的,不具有通用性,仅供学习查阅参考)
  • 步骤3:启动消费者程序客户端(bin目录下)
    • sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

      2.2) 方案二(使用提供的虚拟机)

      参考《前置课虚拟机使用》

      3) 消息收发

      3.0) 消息发送与接收开发流程

  1. 谁来发?
  2. 发给谁?
  3. 怎么发?
  4. 发什么?
  5. 发的结果是什么?
  6. 打扫战场

    3.1) 基于Java环境构建消息发送与消息接收基础程序

    3.1.1) 单生产者单消费者(OneToOne)

    生产者

    步骤1:导入坐标
    <dependencies>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client</artifactId>
             <version>4.5.2</version>
         </dependency>
     </dependencies>
    
    步骤2:编写发送消息的程序 ```java package com.itheima.base;

import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;

/**

  • 生产者,生产消息 *
  • @author liqp
  • @version 1.0 */ public class Producer { public static void main(String[] args) throws Exception {

     /*
     谁来发?
     发给谁?
     怎么发?
     发什么?
     发的结果是什么?
     打扫战场
      */
     //1.创建一个发送消息的对象
     DefaultMQProducer producer = new DefaultMQProducer("group1");
     //2.设定发送的命名服务器地址
     producer.setNamesrvAddr("192.168.200.169:9876");
    
     //3.1 启动发送的服务
     producer.start();
    
     //4. 创建要发送的消息对象,指定topic,指定消息内容
     Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8"));
    
     //3.2 发送消息,获取发送结果
     SendResult result = producer.send(msg);
     System.out.println("返回结果" + result);
     //5. 关闭连接
     producer.shutdown();
    

    } }

<a name="286s9"></a>
#### 消费者
```java
package com.itheima.base;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 消费者,消费消息
 *
 * @author liqp
 * @version 1.0
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("192.168.200.169:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意*
        consumer.subscribe("topic1", "*");
        //4.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println("消息" + new String(messageExt.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动接收消息的服务
        consumer.start();
        System.out.println("接收消息服务已开启运行");
    }
}

3.1.2) 单生产者多消费者(OneToMany)

生产者(发送多条消息)

//1.创建一个发送消息的对象
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("192.168.200.169:9876");

//3.1 启动发送的服务
producer.start();
for (int i = 0; i < 10; i++) {
    //4. 创建要发送的消息对象,指定topic,指定消息内容
    Message msg = new Message("topic1", ("hello rocketmq" + i).getBytes("UTF-8"));

    //3.2 发送消息,获取发送结果
    SendResult result = producer.send(msg);
    System.out.println("返回结果" + result);
}
//5. 关闭连接
producer.shutdown();

消费者(多消费者)

负载均衡模式/集群模式(默认)

consumer.setMessageModel(MessageModel.CLUSTERING);

广播模式

consumer.setMessageModel(MessageModel.BROADCASTING);

3.1.3) 多生产者多消费者(ManyToMany)

多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费

3.2) 发送不同类型的消息

3.2.1) 同步消息

即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
image.png

SendResult result = producer.send(msg);

3.2.2) 异步消息

即时性较弱,但需要有回执的消息,例如订单中的某些信息

image.png

for (int i = 0; i < 10; i++) {
    Message msg = new Message("topic1", ("hello rocketmq" + i).getBytes(StandardCharsets.UTF_8));
    //同步消息
    /*SendResult result = producer.send(msg);
            System.out.println("返回结果" + result);*/

    producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println(sendResult);
        }

        @Override
        public void onException(Throwable t) {
            System.out.println(t);
        }
    });
}
TimeUnit.SECONDS.sleep(10);

3.2.3) 单向消息

不需要有回执的消息,例如日志类消息
image.png

producer.sendOneway(msg);

3.3) 特殊的消息发送

3.3.1) 延时消息

  • 消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用

    Message msg = new Message("topic3",("延时消息:hello rocketmq "+i).getBytes("UTF-8"));
    //设置当前消息的延时效果
    msg.setDelayTimeLevel(3);
    SendResult result = producer.send(msg);
    System.out.println("返回结果:"+result);
    
  • 目前支持的消息时间

    • 秒级:1,5,10,30
    • 分级:1~10,20,30
    • 时级:1,2
    • 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2

      3.3.2) 批量消息

  • 发送批量消息

    List<Message> msgList = new ArrayList<Message>();
    SendResult send = producer.send(msgList);
    
  • 消息内容总长度不超过4M

  • 消息内容总长度包含如下:

    • topic(字符串字节数)
    • body (字节数组长度)
    • 消息追加的属性(key与value对应字符串字节数)
    • 日志(固定20字节)

      3.4) 特殊的消息接收

      3.4.1) 消息过滤

      tag过滤/分类过滤

      生产者
      Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));
      
      消费者
      //接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag, 通过 || 分割可以指定多个tag
      consumer.subscribe("topic6","tag1 || tag2");
      

      sql过滤/属性过滤/语法过滤

  • 设置broker支持sql过滤

    • vim /root/rocket/conf/broker.conf
    • 在文件内追加内容enablePropertyFilter=true
    • 重启服务
      • cd /root/rocket
      • docker-compose up
        生产者
        //为消息添加属性
        msg.putUserProperty("vip","1");
        msg.putUserProperty("age","20");
        
        消费者
        //使用消息选择器来过滤对应的属性,语法格式为类SQL语法
        consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
        

        3.5) 消息发送与接收顺序控制

        3.5.1) 错乱的消息顺序

        image.png

        3.5.2)顺序消息

        生产者

Order.java

package com.itheima.order.domain;

public class Order {
    private String id;
    private String msg;

    @Override
    public String toString() {
        return "Order{ id='" + id + ", msg='" + msg + '}';
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.200.169:9876");
producer.start();

//创建要执行的业务队列
List<Order> orderList = new ArrayList<Order>();

Order order11 = new Order();
order11.setId("a");
order11.setMsg("主单-1");
orderList.add(order11);

Order order12 = new Order();
order12.setId("a");
order12.setMsg("子单-2");
orderList.add(order12);

Order order13 = new Order();
order13.setId("a");
order13.setMsg("支付-3");
orderList.add(order13);

Order order14 = new Order();
order14.setId("a");
order14.setMsg("推送-4");
orderList.add(order14);

Order order21 = new Order();
order21.setId("b");
order21.setMsg("主单-1");
orderList.add(order21);

Order order22 = new Order();
order22.setId("b");
order22.setMsg("子单-2");
orderList.add(order22);

Order order31 = new Order();
order31.setId("c");
order31.setMsg("主单-1");
orderList.add(order31);

Order order32 = new Order();
order32.setId("c");
order32.setMsg("子单-2");
orderList.add(order32);

Order order33 = new Order();
order33.setId("c");
order33.setMsg("支付-3");
orderList.add(order33);

//设置消息进入到指定的消息队列中
for(final Order order : orderList){
    Message msg = new Message("orderTopic",order.toString().getBytes());
    //发送时要指定对应的消息队列选择器
    SendResult result = producer.send(msg, new MessageQueueSelector() {
        //设置当前消息发送时使用哪一个消息队列
        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
            //根据发送的信息不同,选择不同的消息队列
            //根据id来选择一个消息队列的对象,并返回->id得到int值
            int mqIndex = order.getId().hashCode() % list.size();
            return list.get(mqIndex);
        }
    }, null);

    System.out.println(result);
}

producer.shutdown();

消费者

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.200.169:9876");
consumer.subscribe("orderTopic","*");

//使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
consumer.registerMessageListener(new MessageListenerOrderly() {
    //使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服务,转化为一个消息队列一个线程服务
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        for(MessageExt msg : list){
            System.out.println(Thread.currentThread().getName()+"  消息:"+new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

consumer.start();
System.out.println("接收消息服务已开启运行");

3.6) 事务消息

3.6.1) 正常事务过程与事务补偿过程

image.png

3.6.2)事务消息状态

  • 提交状态:允许进入队列,此消息与非事务消息无区别
  • 回滚状态:不允许进入队列,此消息等同于未发送过
  • 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
  • 注意:事务消息仅与生产者有关,与消费者无关


3.6.3) 发送事务消息

测试方法

public static void main(String[] args) throws Exception {
    //事务补偿过程
    unknowMsg();

    //事务提交过程
    //commitMsg();

    //事务回滚过程
    //rollbackMsg();

}

事务补偿过程

/**
     * 事务补偿过程
     * @throws Exception
     */
    public static void unknowMsg() throws Exception {
        //事务消息使用的生产者是TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.200.169:9876");
        //添加本地事务对应的监听
        producer.setTransactionListener(new TransactionListener() {
            //正常事务过程
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                //中间状态
                return LocalTransactionState.UNKNOW;
            }

            //事务补偿过程
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("事务补偿过程执行");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();

        Message msg = new Message("topic12", ("事务消息:hello rocketmq ").getBytes("UTF-8"));
        SendResult result = producer.sendMessageInTransaction(msg, null);
        System.out.println("返回结果:" + result);
        //事务补偿过程必须保障服务器在运行过程中,否则将无法进行正常的事务补偿
//      producer.shutdown();
    }

image.png

事务提交过程

/**
     * 事务提交过程
     *
     * @throws Exception
     */
    public static void commitMsg() throws Exception {
        //事务消息使用的生产者是TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.200.169:9876");
        //添加本地事务对应的监听
        producer.setTransactionListener(new TransactionListener() {
            //正常事务过程
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                //事务提交状态
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            //事务补偿过程
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                return null;
            }
        });
        producer.start();

        Message msg = new Message("topic8", ("事务消息:hello rocketmq ").getBytes("UTF-8"));
        SendResult result = producer.sendMessageInTransaction(msg, null);
        System.out.println("返回结果:" + result);
        producer.shutdown();
    }

事务回滚

/**
     * 事务回滚
     *
     * @throws Exception
     */
    public static void rollbackMsg() throws Exception {
        //事务消息使用的生产者是TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.200.169:9876");
        //添加本地事务对应的监听
        producer.setTransactionListener(new TransactionListener() {
            //正常事务过程
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                //事务回滚状态
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }

            //事务补偿过程
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                return null;
            }
        });
        producer.start();

        Message msg = new Message("topic9", ("事务消息:hello rocketmq ").getBytes("UTF-8"));
        SendResult result = producer.sendMessageInTransaction(msg, null);
        System.out.println("返回结果:" + result);
        producer.shutdown();
    }

4) 集群搭建

4.1)集群结构与特征分析

4.1.1)集群结构

  • 单机
    • 一个broker提供服务(宕机后服务瘫痪)
  • 集群

    • 多个broker提供服务(单机宕机后消息无法及时被消费)
    • 多个master多个slave

      • master到slave消息同步方式为同步(较异步方式性能略低,消息无延迟)
      • master到slave消息同步方式为异步(较同步方式性能略高,数据略有延迟)

            ![image.png](https://cdn.nlark.com/yuque/0/2020/png/317350/1606472919510-2281ce48-721a-40b5-b0d4-ad59693aa27e.png#height=90&id=lgY7o&margin=%5Bobject%20Object%5D&name=image.png&originHeight=179&originWidth=407&originalType=binary&ratio=1&size=15282&status=done&style=none&width=203.5)
        

        4.1.2)工作流程

        image.png

  • 步骤1:NameServer启动,开启监听,等待broker、producer与consumer连接

  • 步骤2:broker启动,根据配置信息,连接所有的NameServer,并保持长连接
  • 步骤2补充:如果broker中有现存数据, NameServer将保存topic与broker关系
  • 步骤3:producer发信息,连接某个NameServer,并建立长连接
  • 步骤4:producer发消息
    • 步骤4.1若果topic存在,由NameServer直接分配
    • 步骤4.2如果topic不存在,由NameServer创建topic与broker关系,并分配
  • 步骤5:producer在broker的topic选择一个消息队列(从列表中选择)
  • 步骤6:producer与broker建立长连接,用于发送消息
  • 步骤7:producer发送消息

  • comsumer工作流程同producer

image.png

4.2)双主双从集群搭建

4.2.1) 集群架构

image.png

4.2.2)搭建过程

1) 配置主机名称(未来就可以根据主机名找到对应的服务器了)
vim /etc/hosts

# nameserver
192.168.184.128 rocketmq-nameserver1
192.168.184.129 rocketmq-nameserver2
# broker
192.168.184.128 rocketmq-master1
192.168.184.129 rocketmq-slave2
192.168.184.129 rocketmq-master2
192.168.184.128 rocketmq-slave1

配置完毕后重启网卡,应用配置
systemctl restart network
2) 关闭防火墙

# 关闭防火墙
systemctl stop firewalld.service 
# 查看防火墙的状态
firewall-cmd --state 
# 禁止firewall开机启动
systemctl disable firewalld.service

3) 配置jdk
详见 2.1.1) 步骤1
4) 配置服务器环境
将rocketmq 解压至跟目录 /

# 解压
unzip rocketmq-all-4.5.2-bin-release.zip
# 修改目录名称
mv rocketmq-all-4.5.2-bin-release rocketmq

vim /etc/profile

#set rocketmq
ROCKETMQ_HOME=/rocketmq
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH

配置完毕后重启网卡,应用配置
source /etc/profile
5) 创建集群服务器的数据存储目录
主节点创建四个目录/ 从节点四个目录

mkdir /rocketmq/store
mkdir /rocketmq/store/commitlog
mkdir /rocketmq/store/consumequeue
mkdir /rocketmq/store/index


mkdir /rocketmq-slave/store
mkdir /rocketmq-slave/store/commitlog
mkdir /rocketmq-slave/store/consumequeue
mkdir /rocketmq-slave/store/index

注意master与slave如果在同一个虚拟机中部署,需要将存储目录区分开
6) 修改配置
不同的节点,应该修改不同的配置,文件夹也应该不一样

cd r/ocketmq/conf/2m-2s-sync
vim  broker-a.proerties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48

#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/store-slave
#commitLog 存储路径
storePathCommitLog=/rocketmq/store-slave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/store-slave/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/store-slave/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/store-slave/checkpoint
#abort 文件存储路径
abortFile=/rocketmq


#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

检查启动内存 (nameserver 和broker 均需要修改)

vim /rocketmq/bin/runbroker.sh
vim /rocketmq/bin/runserver.sh

# 开发环境配置 JVM Configuration
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

启动(bin 目录)

nohup sh mqnamesrv &
nohup sh mqbroker -c ../conf/2m-2s-syncbroker-a.properties &
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &

5) rocketmq-console

rocketmq-console是一款基于java环境开发的(springboot)的管理控制台工具
获取地址:https://github.com/apache/rocketmq-externals

6) 高级特性

6.1) 持久化与持久化介质

6.1.1)数据库持久化

ActiveMQ 使用了数据库的消息存储
缺点:数据库瓶颈将成为MQ瓶颈
image.png

6.1.2)文件系统持久化

不用数据库,直接用文件存储
(RocketMQ/Kafka/RabbitMQ)
image.png

6.2)顺序写与零拷贝

6.2.1)MQ 高效的消息存储与读写方式

  • SSD(Solid State Disk)
    • 随机写(100KB/s)
    • 顺序写(600MB/s)1秒1部电影

image.png

6.2.2)零拷贝技术

1) 通过启动时初始化话文件大小来保证 占用固定的磁盘空间,保证磁盘读写速度
2) 零拷贝”技术
    数据传输由传统的4次复制简化成3次复制(如下图),减少1次复制过程
    Java语言中使用MappedByteBuffer类实现了该技术
    要求:预留存储空间,用于保存数据(1G存储空间起步)

image.png

6.3)消息存储结构

消息数据存储区域
    topic
    queueId
    message
消费逻辑队列
    minOffset
    maxOffset
    consumerOffset
索引
    key索引
    创建时间索引
    ……

image.png

6.4) 刷盘机制

6.4.1) 同步刷盘

1)生产者发送消息到MQ,MQ接到消息数据
2)MQ挂起生产者发送消息的线程
3)MQ将消息数据写入内存
4)内存数据写入硬盘
5)磁盘存储后返回SUCCESS
6)MQ恢复挂起的生产者线程
7)发送ACK到生产者

image.png

6.4.2) 异步刷盘

1)生产者发送消息到MQ,MQ接到消息数据
2)MQ将消息数据写入内存
3)发送ACK到生产者
--等消息量多了--
4)内存数据写入硬盘

image.png

6.4.3) 同步刷盘/ 异步刷盘 优缺点对比

同步刷盘:安全性高,效率低,速度慢(适用于对数据安全要求较高的业务)
异步刷盘:安全性低,效率高,速度快(适用于对数据处理速度要求较高的业务)

6.4.4) 配置方式

#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH

6.5)高可用与主从复制方案

6.5.1)高可用方案

nameserver

nameserver ,通过无状态+全服务器注册 来保证即使一个宕机了也能提供所有的服务

消息服务器

主从架构(2M-2S) ,即使又一台服务器宕机, 服务依旧可以正常提供
注意: master 一旦宕机,slave 只提供消费服务,不能写入新的消息(slave 不会升级为master)

消息生产(开发人员写代码时保障)

生产者将相同的topic绑定到多个group组,保障master挂掉后,其他master仍可正常进行消息接收

消息消费

RocketMQ自身会根据master的压力确认是否由master承担消息读取的功能,当master繁忙时候,自动切换由slave承担数据读取的工作

6.5.2)主从复制方案

同步复制

master接到消息后,先复制到slave,然后反馈给生产者写操作成功
优点:数据安全,不丢数据,出现故障容易恢复
缺点:影响数据吞吐量,整体性能低

异步复制

master接到消息后,立即返回给生产者写操作成功,当消息达到一定量后再异步复制到slave
优点:数据吞吐量大,操作延迟低,性能高
缺点:数据不安全,会出现数据丢失的现象,一旦master出现故障,从上次数据同步到故障时间的数据将丢失

配置(配置在启动时 -c 指定的配置文件中 broker.conf)

#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER

6.6)负载均衡

  • Producer负载均衡
    内部实现了不同broker集群中对同一topic对应消息队列的负载均衡
  • Consumer负载均衡
    平均分配
    循环平均分配

    6.7)消息重试

    当消息消费后未正常返回消费成功的信息将启动消息重试机制

    6.7.1) 顺序消息重试

    当消费者消费消息失败后,RocketMQ会自动进行消息重试(每次间隔时间为 1 秒)
    注意:应用会出现消息消费被阻塞的情况,因此,要对顺序消息的消费情况进行监控,避免阻塞现象的发生
    

    6.7.2) 无序消息重试

    无序消息包括普通消息、定时消息、延时消息、事务消息
    无序消息重试仅适用于负载均衡(集群)模型下的消息消费,不适用于广播模式下的消息消费
    为保障无序消息的消费,MQ设定了合理的消息重试间隔时长
    
    image.png

    6.8)死信队列

    6.8.1)概念

    当消息消费重试到达了指定次数(默认16次)后,MQ将无法被正常消费的消息称为死信消息(Dead-Letter Message)
    死信消息不会被直接抛弃,而是保存到了一个全新的队列中,该队列称为死信队列(Dead-Letter Queue)
    

    6.8.2) 死信队列特征

    ```
  • 归属某一个组(Gourp Id),而不归属Topic,也不归属消费者
  • 一个死信队列中可以包含同一个组下的多个Topic中的死信消息
  • 死信队列不会进行默认初始化,当第一个死信出现后,此队列首次初始化
    <a name="piVcX"></a>
    ### 6.8.3)死信队列中消息特征
    
  • 不会被再次重复消费
  • 死信队列中的消息有效期为3天,达到时限后将被清除
    <a name="HdPmQ"></a>
    ### 6.8.4) 死信处理
    
    在监控平台中,通过查找死信,获取死信的messageId,然后通过id对死信进行精准消费 ```

6.9)重复消费与消息幂等

6.9.1)重复消费原因

1 生产者发送了重复的消息
    网络闪断
    生产者宕机
2 消息服务器投递了重复的消息
    网络闪断
3 动态的负载均衡过程
    网络闪断/抖动
    broker重启
    订阅方应用重启(消费者)
    客户端扩容
    客户端缩容

6.9.2)消息幂等

对同一条消息,无论消费多少次,结果保持一致,称为消息幂等性

6.9.3)解决方案

- 使用业务id作为消息的key
- 在消费消息时,客户端对key做判定,未使用过放行,使用过抛弃

注意:messageId由RocketMQ产生,messageId并不具有唯一性,不能作用幂等判定条件