部署架构
工作流程
- 启动NameServer, NameServer启动后开始监听端口, 等待Broker、Producer、Consumer连接
- 启动Broker时,Broker会将所有的NameServer建立并保持长连接,然后每30秒向NameServer定时发送心跳包
- 发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在那些Broker上,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。(可选,也可在发送消息时创建)
- Producer发送消息,启动时先跟NameServer集群中的其中一个建立长连接,并从NameServer中获取路由信息,即当前发送的Topic消息的Queue与Broker的地址(IP+Port)的映射关系。然后根据算法策略从队列中选择一个Queue,与队列所在的Broker建立长连接从而想Broker发送消息。在获取到路由信息后,Producer会首先将路由新缓存到本地,再每30秒从NameServer更新一次路由信息
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取其所订阅的Topic的路由信息,然后根据算法策略从路由信息中获取到所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样也会每30秒从NameSever更新一次路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker的存活状态
消费模式
//设置消费者模式为广播模式 BROADCASTING(默认为负载均衡 CLUSTERING)
consumer.setMessageModel(MessageModel.BROADCASTING);
同步消息
即时效性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
//发送消息
SentResult result = producer.send(msg);
//输出发送结果
System.out.pringln(result);
异步消息
即时性较弱,带需要有回执的消息,例如订单中的某些信息
//发送消息
producer.send(msg,new SendCallback(){
//表示成功后的返回结果
public void onSuccess(SendResult sendResult){
System.out.pringln(sendResult);
}
//表示失败后的返回结果
public void onException(Throwable t){
System.out.pringln(t);
}
});
//由于发送消息过快 需要等待10秒
TimeUnit.SECONDS.sleep(10);
单向消息
不需要有回执的消息,例如日志类消息
//发送消息
producer.sendOneway(msg);
延时消息
//设置当前消息的延时发送的时间 3(上图中的下标为3的值)
msg.setDelayTimeLevel(3);
//发送消息
producer.sendOneway(msg);
批量消息
- 消息内容的总长度不超过4M
- 消息内容的总长度
- topic:字符串字节数
- boyd:字节数组长度
- 消息追加的属性:key与value的对应字符串字节数
- 日志:固定20字节
//Prodicer 生产者
public class Producer{
public static void main(Stirng[] args) throws Exception{
//创建producer对象
DefaultMQProducer producer = new DefaultMQProducer("group1");
//设置命名服务器地址
producer.setNamesrvAddr("192.168.95.1:9876");
//创建要发送的对象
List<Message> msgList = new ArrayList<Message>();
msgList.add(new Message("topic1","测试1".getBytes("UTF-8")));
msgList.add(new Message("topic1","测试2".getBytes("UTF-8")));
msgList.add(new Message("topic1","测试3".getBytes("UTF-8")));
//启动发送服务
producer.start();
//发送消息
SentResult result = producer.send(msgList);
//输出发送结果
System.out.pringln(result);
//关闭服务
producer.shutdown();
}
}
消息过滤
tag过滤
//创建消息时添加tag
Message msg = new Message("topic1","tag1","测试".getBytes("UTF-8"));
//接收消息时过滤tag *为所有tag
consumer.subscribe("topic1","tag1");
//接收消息时添加多个tag
consumer.subscribe("topic1","tag1 || tag2");
sql过滤(属性过滤)
需要在服务器中开启支持sql语法broker.conf
中 添加enablePropertyFilter=ture
用sh mqbroker -n localhost:9876 -c ../conf/brocker.conf
启动
//创建消息时添加tag
Message msg = new Message("topic1","测试".getBytes("UTF-8"));
msg.putUserProperty("vip","1");
msg.putUserProperty("age","18");
//接收消息属性过滤 支持sql语法
consumer.subscribe("topic1",MussageSelector.bySql("vip >= 1"));
消息队列顺序
//Producer
for(Order order : OrderList){
Message msg = new Message("topic1",orger.toString().getBytes());
//发送时指定对应的消息队列选择器
SentResult result = producer.send(msg,new MessageQueueSelector(){
//设置当前消息发送时的队列
public MessageQueue select(List<MessageQueue> list, Message message, Object o){
//根据发送的消息选择消息队列
//根据id选择消息队列的 并返回->id得到int值(id的hashCode取模于消息队列的总数)
int mqIndex = order.getId().hashCode() % list.size();
return list.get(mqIndex);
}
});
}
//Consumer
//使用MessageListenerOrderly接口后,对消息队列的处理又一个消息队列多个线程服务器,转化为一个消息队列一个线程服务
consumer.registerMessageListener(new MessageListenerOrderly(){
public ConsumerOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext){
for(MessageExt msg : list){
System.out.pringln(Thread.currentThread().getName() + "消息: " + new String(msg));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
事务消息
//Prodicer 生产者
public class Producer{
public static void main(Stirng[] args) throws Exception{
//创建producer对象
TransactionMQProducer producer = new TransactionMQProducer("group1");
//设置命名服务器地址
producer.setNamesrvAddr("192.168.95.1:9876");
//添加本地事务对应的监听
producer.setTransactionListener(new TransactionListener(){
//正常事务过程
public LocationTransactionState executeLocalTransaction(Message message, Object o){
//COMMIT_MESSAGE/提交状态 ROLLBACK_MESSAGE/回滚状态 UNKNOW/中间状态
return LocalTransactionState.COMMIT_MESSAGE;
}
//事务补偿过程
public LocationTransactionState checkLocalTransaction(MessageExt messageExt){
System.out.pringln("事务补偿过程");
return null;
}
});
//启动发送服务
producer.start();
//创建要发送的对象
Message msg = new Message("topic1","测试".getBytes("UTF-8"));
//发送消息
SentResult result = producer.sendMessageInTransaction(msg,null);
//输出发送结果
System.out.pringln(result);
//事务补偿过程中必须保障服务器在运行中,否则将无法进行正常的事务补偿
//producer.shutdown();
}
}
RocketMQ集群
- 单击
- 一个broker提供服务(宕机服务瘫痪)
- 集群
- 多个broker提供服务(单机宕机后消息无法及时被消费)
- 多个master多个slave
- master到slave消息同步方式为同步(较异步方式性能略低,消息无延迟)
- master到slave消息同步方式为异步(较同步方式性能略高,数据略有延迟)
消息的存储
顺序写与零拷贝
- 随机写
- 顺序写
-
消息存储结构
MQ数据存储区域包含
- 消息数据存储区域
- topic
- queueID
- message
- 消费逻辑队列
- minOffset
- maxOffset
- consumerOffset
- 索引
- key索引
- 创建时间缩影
- …
- 消息数据存储区域
刷盘机制
更改配置文件broker.properties
- 同步刷盘:安全性高,效率低,速度慢(适用于对数据安全要求较高的业务)
flushDiskType=SYNC-FLUSH
异步刷盘:安全性低,效率高,速度快(适用于对数据处理速度要求较高的业务)
nameserver
- 无状态+全服务器注册
- 消息服务器
- 主从架构(2M-2S)
- 消息生成
- 生产者将相同的topic绑定到多个group组,保障master挂掉后,其他master扔可正常进行消息接收
- 消息消费
- RocketMQ自身会根据master的压力确认是否由master承担消息读取的功能,当master繁忙时候,自动切换slave承担数据读取的工作
主从数据复制
更改配置文件broker.properties
- 同步复制
brokerRole=SYNC_MASTER
- master接到消息后,先复制到slave,然后反馈给生产者写操作成功
- 优点:数据安全,不丢数据,出现故障容易恢复
- 缺点:影响数据吞吐量,整体性能低
异步复制
Producer负载均衡
- 内部实现了不同broker集群中对同一topic对应消息队列的负载均衡
Consumer负载均衡
当消息消费后未正常返回消费成功的信息将启动消息重试机制
- 消息重试机制
- 顺序消息
- 当消费者消费消息失败后,RocketMQ会自动进行消息重试(每次间隔时间为1秒)
- 注意:应用会出现消息消费被阻塞的情况。因此,要对顺序消息进行消费情况的监控,避免阻塞的现象的发生
- 无序消息
- 无序消息包括普通消息,定时消息,延时消息,事务消息
- 无序消息重试仅适用于负载均衡(集群)模型下的消息消费,不适用于广播模式下的消息消费
- 为保障无序消息的消费,MQ设定了合理的消息重试间隔
- 死信队列
- 当消息消费重试达到指定次数(默认16次)后,MQ将无法被正常消费的消息称为死信消息(Dead-Letter Message)
- 死信消息不会被直接抛弃,而是保存到一个全新的队列中,该队列称为死信队列(Dead-Letter Queue)
- 死信队列特征
- 归属某个组(Group ID),而不归属Topic,也不归属消费者
- 一个死信队列中可以包含同一个组下的多个Topic中的死信消息
- 死信队列不会进行初始化,当第一个死信出现后,此队列首次初始化
- 死信队列中消息特征
- 不会被再次重复消费
- 死信队列中的消息有效期为3天,达到时限后将被清除
- 死信处理
- 在监控平台中,通过查找死信,获取死信的messageId,然后通过ID对死信进行精准消费
- 顺序消息
消息重复消费
- 消息重复消费原因
- 生产者发送了重复的消息
- 网络丢包
- 生产者宕机
- 消息服务器投递了重复的消息
- 网络闪断
- 动态的负载过程
- 网络闪断/丢包
- broker重启
- 订阅方应用重启(消费者)
- 客户端扩容/缩容
- 生产者发送了重复的消息
- 消息幂等
- 对同一条消息,无论消费多少次,结构保持一致,称为消息幂等性
- 解决方案:
- 使用业务ID作为消息的key
- 在消息消费时,客户端对key做判定,未使用过放行,使用过抛弃
- 注意:messageID由RocketMQ产生,messageID并不具有唯一性,不能作为幂等判定条件