1. 消息中间件
消息中间件:消息+中间件
消息:消息即为数据,数据就会有规划,有长度,有大小。
java 提供了一套标准 JMS(java message server)
- JMS 消息主体(Body)
- JMS提供五种消息主体的形式,每种形式通过消息接口定义:
- StreamMessage
- 消息整体主体包含流式Java原生值,它是连续地被填充和读取的。
- MapMessage
- 消息整体主体包含键值对集合,其中键为字符串,值为Java原生类型。条目访问可被计算器连续地或者名称随机地访问,它的顺序并不一定。
- TextMessage
- 消息整体主体包含一个Java String 对象。
- ObjectMessage
- 消息整体主体包含一个Serializable 对象,如果需要使用集合对象,确保JDK 1.2或更高。
- BytesMessage
中间件:为我们提供发送消息的程序或者服务.
为什么要使用消息中间件
- 解耦
- 异步
- 削峰
2. RocketMQ 整体认识
整体认识:远程通讯,发送消息,存储消息
概念
- Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息
- Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费
- Topic:消息主题,负责标记一类消息,生产者将消息发送到 Topic,消费者从该 Topic 消费消息
- Broker:消息中转角色,负责存储消息,转发消息,一般也称为
Server
,在 JMS 规范中称为 Provider- 生产者生产消息到
Broker
,消费者从Broker
拉取消息并消费。
- 生产者生产消息到
- NameServer:服务发现 Server,用于生产者和消费者获取 Broker 的服务;
- 类似于 kafka 中 zookeeper,或者 springcloud 中 Eureka,是一个注册中心。主要提供两个功能:Broker管理 和 路由信息管理 。
- 说白了就是
Broker
会将自己的信息注册到NameServer
中,此时NameServer
就存放了很多Broker
的信息( Broker 的路由表),消费者和生产者就从NameServer
中获取路由表然后照着路由表的信息和对应的Broker
进行通信(生产者和消费者定期会向NameServer
去查询相关的Broker
的信息)。
- 生产者组中的生产者会向主题发送消息,而 主题中存在多个队列,生产者每次生产消息之后是指定主题中的某个队列发送消息的。
- 集群消费模式下,一个消费者集群多台机器共同消费一个
topic
的多个队列,一个队列只会被这个消费者组里的一个消费者消费。如果某个消费者挂掉,此组内其它消费者会接替挂掉的消费者继续消费 - 所以一般来讲要控制 消费者组中的消费者个数和主题中队列个数相同 。
- 每个消费组在每个队列上维护一个消费位置
- 因为我们刚刚画的仅仅是一个消费者组,我们知道在发布订阅模式中一般会涉及到多个消费者组,而每个消费者组在每个队列中的消费位置都是不同的。如果此时有多个消费者组,那么消息被一个消费者组消费完之后是不会删除的(因为其它消费者组也需要呀),它仅仅是为每个消费者组维护一个消费位移(offset) ,每次消费者组消费完会返回一个成功的响应,然后队列再把维护的消费位移加一,这样就不会出现刚刚消费过的消息再一次被消费了。
为什么一个主题中需要维护多个队列 ?答案是 提高并发能力 。
:::info
所以总结来说,RocketMQ
通过使用在一个 **Topic**
中配置多个队列并且每个队列维护每个消费者组的消费位置 实现了 主题模式/发布订阅模式 。
一个 **Topic**
分布在多个 **Broker**
上,一个 **Broker**
可以配置多个 **Topic**
,它们是多对多的关系。
如果某个 Topic
消息量很大,应该给它多配置几个队列 (上文中提到了提高并发能力),并且 尽量多分布在不同 **Broker**
上,以减轻某个 **Broker**
的压力 。Topic
消息量都比较均匀的情况下,如果某个 broker
上的队列越多,则该 broker
压力越大。
:::
Rocketmq模块划分
名称 | 作用 |
---|---|
broker | broker模块:c和p端消息存储逻辑 |
client | 客户端api:produce、consumer端 接受与发送api |
common | 公共组件:常量、基类、数据结构 |
tools | 运维tools:命令行工具模块 |
store | 存储模块:消息、索引、commitlog存储 |
namesrv | 服务管理模块:服务注册topic等信息存储 |
remoting | 远程通讯模块:netty+fastjson |
logappender | 日志适配模块 |
example | Demo列子 |
filtersrv | 消息过滤器模块 |
srvutil | 辅助模块 |
filter | 过滤模块:消息过滤模块 |
distribution | 部署、运维相关zip包中的代码 |
openmessaging | 兼容openmessaging分布式消息模块 |
RocketMQ 高可用
**01**
:我们的Broker
做了集群并且还进行了主从部署 ,由于消息分布在各个Broker
上,一旦某个Broker
宕机,则该Broker
上的消息读写都会受到影响。所以Rocketmq
提供了master/slave
的结构,salve
定时从master
同步数据(同步刷盘或者异步刷盘),如果master
宕机,则**slave**
提供消费服务,但是不能写入消息 (后面我还会提到哦)。**02**
:为了保证HA
,我们的NameServer
也做了集群部署,但是请注意它是 去中心化 的。也就意味着它没有主节点,你可以很明显地看出NameServer
的所有节点是没有进行Info Replicate
的,在RocketMQ
中是通过 单个 Broker 和所有 NameServer 保持长连接 ,并且在每隔**30**
秒Broker
会向所有Nameserver
发送心跳,心跳包含了自身的Topic
配置信息,这个步骤就对应这上面的Routing Info
**03**
:在生产者需要向Broker
发送消息的时候,需要先从**NameServer**
获取关于**Broker**
的路由信息,然后通过 轮询 的方法去向每个队列中生产数据 以达到 负载均衡 的效果。**04**
:消费者通过**NameServer**
获取所有**Broker**
的路由信息后,向**Broker**
发送**Pull**
请求来获取消息数据。Consumer
可以以两种模式启动 :- 广播(Broadcast):一条消息会发送给 同一个消费组中的所有消费者
- 集群(Cluster):消息只会发送给一个消费者。
3. RocketMQ 特性
3.1 Product 端
发送方式:
**Sync**
:同步的发送方式,会等待发送结果后才返回**Async**
:异步的发送方式,发送完后,立刻返回。Client 在拿到 Broker 的响应结果后,会回调指定的SendCallback
. 这个 API 也可以指定 Timeout,不指定也是默认的3000ms
.**Oneway**
:比较简单,发出去后,什么都不管直接返回。Ps:日志
发送结果:
org.apache.rocketmq.client.producer.SendStatus
**SEND_OK**
:消息发送成功**FLUSH_DISK_TIMEOUT**
:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失**FLUSH_SLAVE_TIMEOUT**
:消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失**SLAVE_NOT_AVAILABLE**
:消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢。
普通消息
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
- 准备工作 mesasge、网络相关、线程相关
- 从 namesrv 获取 topic 路由(缓存机制)
- 组装数据,broker 需要的序列化数据(json)
- Netty发送(源码)
定时消息
定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。(第三方 job 步长)
**org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel**
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("192.168.0.31:9876");
// Launch producer
producer.start();
int totalMessagesToSend =3;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
//设置延迟消息的级别
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
顺序消息
**org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl**
场景:订单 → 下单 → 支付 → 配送 → 签收
底层原理:4个队列,一个订单下面不同状态的消息是顺序的只需要发到一个队列中org.apache.rocketmq.client.producer.MessageQueueSelector
如何选择一个队列:
一个 topic 下有多个队列,为了保证发送有序,RocketMQ 提供了 MessageQueueSelector 队列选择机制,他有三种实现:
我们可使用 Hash 取模法,让同一个订单发送到同一个队列中,再使用同步发送,只有同个订单的创建消息发送成功,再发送支付消息。这样,我们保证了发送有序。**RocketMQ**
在主题上是无序的、它只有在队列层面才是保证有序 的。
这又扯到两个概念——普通顺序 和 严格顺序 。
- 所谓普通顺序是指:消费者通过 同一个消费队列收到的消息是有顺序的 ,不同消息队列收到的消息则可能是无顺序的。普通顺序消息在
Broker
重启情况下不会保证消息顺序性 (短暂时间) 。 - 所谓严格顺序是指:消费者收到的 所有消息 均是有顺序的。严格顺序消息 即使在异常情况下也会保证消息的顺序性 。
- 严格顺序看起来虽好,实现它可会付出巨大的代价。如果你使用严格顺序模式,
**Broker**
集群中只要有一台机器不可用,则整个集群都不可用。你还用啥?现在主要场景也就在binlog
同步。
- 严格顺序看起来虽好,实现它可会付出巨大的代价。如果你使用严格顺序模式,
一般而言,我们的 MQ
都是能容忍短暂的乱序,所以推荐使用普通顺序模式。
那么,我们现在使用了 普通顺序模式 ,我们从上面学习知道了在 Producer
生产消息的时候会进行轮询(取决你的负载均衡策略)来向同一主题的不同消息队列发送消息。那么如果此时我有几个消息分别是同一个订单的创建、支付、发货,在轮询的策略下这 三个消息会被发送到不同队列 ,因为在不同的队列此时就无法使用 RocketMQ
带来的队列有序特性来保证消息有序性了。
那么,怎么解决呢?
其实很简单,我们需要处理的仅仅是将同一语义下的消息放入同一个队列(比如这里是同一个订单),那我们就可以使用 Hash取模法 来保证同一个订单在同一个队列中就行了。
producer 生产消息
一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列。
for (int i = 1; i <= 5; i++) {
Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());
// defaultMQProducerImpl.send(msg, selector, arg)
SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}, 0);
System.out.println(sendResult);
}
consumer 消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
consumer.setNamesrvAddr("192.168.0.31:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicOrderTest", "*");
/**
* 实现了 MessageListenerOrderly 表示一个队列只会被一个线程取到
*,第二个线程无法访问这个队列
*/
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
// 设置自动提交
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println(msg + ",内容:" + new String(msg.getBody()));
}
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
事务消息
事务:ACID XA 二段阶段 三阶段 TCC 阿里
如今比较常见的分布式事务实现有 2PC、TCC 和事务消息(half 半消息机制)。每一种实现都有其特定的使用场景,但是也有各自的问题,都不是完美的解决方案。
在 RocketMQ
中使用的是 事务消息加上事务反查机制 来解决分布式事务问题的。我画了张图,大家可以对照着图进行理解。
在第一步发送的 half 消息 ,它的意思是 在事务提交之前,对于消费者来说,这个消息是不可见的 。
那么,如何做到写入消息但是对用户不可见呢?RocketMQ 事务消息的做法是:如果消息是
**half**
消息,将备份原消息的主题与消息消费队列,然后 改变主题 为**RMQ_SYS_TRANS_HALF_TOPIC**
。由于消费组未订阅该主题,故消费端无法消费 half 类型的消息,然后 RocketMQ 会开启一个定时任务,从 Topic 为**RMQ_SYS_TRANS_HALF_TOPIC**
中拉取消息进行消费,根据 生产者组去获取一个服务提供者来发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
你可以试想一下,如果没有从第5步开始的 事务反查机制 ,如果出现网路波动第4步没有发送成功,这样就会产生 MQ 不知道是不是需要给消费者消费的问题,他就像一个无头苍蝇一样。在 RocketMQ
中就是使用的上述的事务反查来解决的,而在 Kafka
中通常是直接抛出一个异常让用户来自行解决。
你还需要注意的是,在 MQ Server
指向系统 B 的操作已经和系统 A 不相关了,也就是说在消息队列中的分布式事务是——本地事务和存储消息到消息队列才是同一个事务。这样也就产生了事务的最终一致性,因为整个过程是异步的,每个系统只要保证它自己那一部分的事务就行了。
3.2 Consumer 端
两种消费模式
- Push 方式:MQServer 主动向消费端推送
- rocketmq 已经提供了很全面的实现, consumer 通过长轮询拉取消息后回调
MessageListener
接口实现完成消费, 应用系统只要 MessageListener 完成业务逻辑即可
- rocketmq 已经提供了很全面的实现, consumer 通过长轮询拉取消息后回调
- Pull 方式:即消费端在需要时,主动到 MQServer 拉取
- 完全由业务系统去控制,定时拉取消息,指定队列消费等等, 当然这里需要业务系统根据自己的业务需求去实现。
但在具体实现时,Push 和 Pull 模式都是采用消费端主动拉取的方式(pull)。
每个队列的消费进度,是存储在队列归属的 broker 节点上的
这两种模式分别对应的是 DefaultMQPushConsumer
类和 DefaultMQPullConsumer
类
org.apache.rocketmq.client.impl.consumer.PullMessageService#run
消费模型
org.apache.rocketmq.common.protocol.heartbeat.MessageModel
- BROADCASTING(广播模式)
- 一条消息被多个
Consumer
消费,即使这些Consumer
属于同一个Consumer Group
,消息也会被Consumer Group
中的每个Consumer
都消费一次。在广播消费中的Consumer Group
概念可以认为在消息划分方面无意义。
- 一条消息被多个
- CLUSTERING(集群模式)
- 一个
Consumer Group
中的Consumer
实例平均分摊消费消息。例如某个Topic
有 9 条消息,其中一个Consumer Group
有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。
- 一个
消费选择
org.apache.rocketmq.common.consumer.ConsumeFromWhere
**CONSUME_FROM_LAST_OFFSET**
:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费**CONSUME_FROM_FIRST_OFFSET**
:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费**CONSUME_FROM_TIMESTAMP**
:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在 broker 端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始
消息重复幂等
RocketMQ 无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务层面去重。
你可以使用 写入 **Redis**
来保证,因为 Redis
的 key
和 value
就是天然支持幂等的。当然还有使用 数据库插入法 ,基于数据库的唯一键来保证重复数据不会被插入多条。
不过最主要的还是需要 根据特定场景使用特定的解决方案 。
消息过滤
enablePropertyFilter=true
- Status=1 需要
- status=2不需要
消息堆积问题
产生消息堆积的根源其实就只有两个:生产者生产太快或消费者消费太慢。
我们可以从多个角度去思考解决这个问题,当流量到峰值的时候是因为生产者生产太快,我们可以使用一些 限流降级 的方法,当然你也可以增加多个消费者实例去水平扩展增加消费能力来匹配生产的激增。如果消费者消费过慢的话,我们可以先检查 是否是消费者出现了大量的消费错误 ,或者打印一下日志查看是否是哪一个线程卡死,出现了锁资源不释放等等的问题。
当然,最快速解决消息堆积问题的方法还是增加消费者实例,不过 同时你还需要增加每个主题的队列数量 。 别忘了在
RocketMQ
中,一个队列只会被一个消费者消费 ,如果你仅仅是增加消费者实例就会出现我一开始给你画架构图的那种情况。
回溯消息
回溯消费是指 Consumer
已经消费成功的消息,由于业务上需求需要重新消费,在RocketMQ
中, Broker
在向Consumer
投递成功消息后,消息仍然需要保留 。并且重新消费一般是按照时间维度,例如由于 Consumer
系统故障,恢复后需要重新消费1小时前的数据,那么 Broker
要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ
支持按照时间回溯消费,时间维度精确到毫秒。
3.3 Namesrv 端
Namesrv 入口:org.apache.rocketmq.namesrv.NamesrvStartup
Namesrv 名称服务,是没有状态可集群横向扩展。可以理解为一个注册中心, 整个 Namesrv 的代码非常简单,主要包含两块功能:
- 管理一些 KV 的配置
- 管理一些 Topic、Broker 的注册信息
大致提供服务为:
- 每个 broker 启动的时候会向 namesrv 注册
- Producer 发送消息的时候根据 topic 获取路由到 broker 的信息
- Consumer 根据 topic 到 namesrv 获取 topic 的路由到 broker 的信息
3.4 Broker 端
程序入口:org.apache.rocketmq.broker.BrokerStartup
4. 存储 & 索引
存储文件结构org.apache.rocketmq.store.index.IndexFile
索引文件由:索引文件头(IndexHeader)+( 槽位 Slot )+(消息的索引内容)三部分构成。
- beginTimestamp 8 位 long 类型,索引文件构建第一个索引的消息落在 broker 的时间
- endTimestamp 8 位 long 类型,索引文件构建最后一个索引消息落 broker 时间
- beginPhyOffset 8 位 long 类型,索引文件构建第一个索引的消息 commitLog 偏移量endPhyOffset 8 位 long 类型,索引文件构建最后一个索引消息
- commitLog 偏移量hashSlotCount 4 位 int 类型,构建索引占用的槽位数(这个值貌似没有具体作用)
indexCount 4 位 int 类型,索引文件中构建的索引个数
槽位 slot
默认每个文件配置的 slot 个数为 500 万个, 每个 slot 是 4 位的 int 类型数据计算消息的对应的**slotPos=Math.abs(keyHash)%hashSlotNum**
消息在 IndexFile 中的偏移量,Slot 存储的值为消息个数索引
消息的索引内容是 20 位定长内容的数据
- 4 位 int 值, 存储的是 key 的 hash 值
- 8 位 long 值 存储的是消息在 commitlog 的物理偏移量 phyOffset
- 4 位 int 值 存储了当前消息跟索引文件中第一个消息在 broker 落地的时间差
- 4 位 int 值 如果存在 hash 冲突,存储的是上一个消息的索引地址
4.1 RocketMQ 的刷盘机制
同步刷盘 & 异步刷盘
- 同步刷盘:同步刷盘时需要等待一个刷盘成功的
**ACK**
,同步刷盘对MQ
消息可靠性来说是一种不错的保障,但是 性能上会有较大影响 ,一般地适用于金融等特定业务场景。 - 异步刷盘:异步刷盘往往是开启一个线程去异步地执行刷盘操作。消息刷盘采用后台异步线程提交的方式进行, 降低了读写延迟 ,提高了
MQ
的性能和吞吐量,一般适用于如发验证码等对于消息保证要求不太高的业务场景。
一般地,异步刷盘只有在 **Broker**
意外宕机的时候会丢失部分数据,你可以设置 Broker
的参数 **FlushDiskType**
来调整你的刷盘策略( **ASYNC_FLUSH**
或者 **SYNC_FLUSH**
)。
4.2 同步复制和异步复制
上面的同步刷盘和异步刷盘是在单个结点层面的,而同步复制和异步复制主要是指的 Borker
主从模式下,主节点返回消息给客户端的时候是否需要同步从节点。
- 同步复制: 也叫 “同步双写”,也就是说,只有消息同步双写到
**主从**
结点上时才返回写入成功 。 - 异步复制: 消息写入主节点之后就直接返回写入成功 。
那么,异步复制会不会也像异步刷盘那样影响消息的可靠性呢?
答案是不会的,因为两者就是不同的概念,对于消息可靠性是通过不同的刷盘策略保证的,而像异步同步复制策略仅仅是影响到了 可用性 。
为什么呢?其主要原因是 **RocketMQ**
是不支持自动主从切换的,当主节点挂掉之后,生产者就不能再给这个主节点生产消息了。
比如这个时候采用异步复制的方式,在主节点还未发送完需要同步的消息的时候主节点挂掉了,这个时候从节点就少了一部分消息。但是此时生产者无法再给主节点生产消息了,消费者可以自动切换到从节点进行消费(仅仅是消费),所以在主节点挂掉的时间只会产生主从结点短暂的消息不一致的情况,降低了可用性,而当主节点重启之后,从节点那部分未来得及复制的消息还会继续复制。
在单主从架构中,如果一个主节点挂掉了,那么也就意味着整个系统不能再生产了。那么这个可用性的问题能否解决呢?一个主从不行那就多个主从的呗,别忘了在我们最初的架构图中,每个 Topic
是分布在不同 Broker
中的。
但是这种复制方式同样也会带来一个问题,那就是无法保证 严格顺序 。在上文中我们提到了如何保证的消息顺序性是通过将一个语义的消息发送在同一个队列中,使用 **Topic**
下的队列来保证顺序性的。
如果此时我们主节点 A 负责的是订单 A 的一系列语义消息,然后它挂了,这样其他节点是无法代替主节点 A 的,如果我们任意节点都可以存入任何消息,那就没有顺序性可言了。
而在 RocketMQ
中采用了 **Dledger**
解决这个问题。他要求在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客⼾端返回写⼊成功,并且它是⽀持通过选举来动态切换主节点的。这里我就不展开说明了,读者可以自己去了解。
也不是说
Dledger
是个完美的方案,至少在Dledger
选举过程中是无法提供服务的,而且他必须要使用三个节点或以上,如果多数节点同时挂掉他也是无法保证可用性的,而且要求消息复制板书以上节点的效率和直接异步复制还是有一定的差距的。
4.3 存储机制
但是,在 Topic
中的 队列是以什么样的形式存在的?队列中的消息又是如何进行存储持久化的呢? 还未解决,其实这里涉及到了 RocketMQ
是如何设计它的存储结构了。我首先想大家介绍 RocketMQ
消息存储架构中的三大角色:CommitLog
、ConsumeQueue
和 IndexFile
。
**CommitLog**
: 消息主体以及元数据的存储主体,存储Producer
端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G
,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。**ConsumeQueue**
: 消息消费队列,引入的目的主要是提高消息消费的性能(我们再前面也讲了),由于RocketMQ
是基于主题Topic
的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog
文件中根据Topic
检索消息是非常低效的。Consumer
即可根据ConsumeQueue
来查找待消费的消息。其中,ConsumeQueue
(逻辑消费队列)作为消费消息的索引,保存了指定Topic
下的队列消息在CommitLog
中的起始物理偏移量**offset**
,消息大小**size**
和消息**Tag**
的**HashCode**
值。**consumequeue**
文件可以看成是基于**topic**
的**commitlog**
索引文件,故consumequeue
文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
。- 同样
consumequeue
文件采取定长设计,每一个条目共20个字节,分别为 8 字节的commitlog
物理偏移量、4 字节的消息长度、8 字节 taghashcode
,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue
文件大小约 5.72M;
- 同样
**IndexFile**
:IndexFile
(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法。这里只做科普不做详细介绍。
总结来说,整个消息存储的结构,最主要的就是 CommitLoq
和 ConsumeQueue
。而 ConsumeQueue
你可以大概理解为 Topic
中的队列。
RocketMQ
采用的是 混合型的存储结构 ,即为 **Broker**
单个实例下所有的队列共用一个日志数据文件来存储消息。有意思的是在同样高并发的 Kafka
中会为每个 Topic
分配一个存储文件。这就有点类似于我们有一大堆书需要装上书架,RockeMQ
是不分书的种类直接成批的塞上去的,而 Kafka
是将书本放入指定的分类区域的。
而 RocketMQ
为什么要这么做呢?原因是 提高数据的写入效率 ,不分 Topic
意味着我们有更大的几率获取 成批 的消息进行数据写入,但也会带来一个麻烦就是读取消息的时候需要遍历整个大文件,这是非常耗时的。
所以,在 RocketMQ
中又使用了 ConsumeQueue
作为每个队列的索引文件来 提升读取消息的效率。我们可以直接根据队列的消息序号,计算出索引的全局位置(索引序号*索引固定⻓度20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息。
讲到这里,你可能对 RockeMQ
的存储架构还有些模糊,没事,我们结合着图来理解一下。
首先,在最上面的那一块就是我刚刚讲的你现在可以直接 把 **ConsumerQueue**
理解为 **Queue**
。
在图中最左边说明了 红色方块 代表被写入的消息,虚线方块代表等待被写入的。左边的生产者发送消息会指定 Topic
、QueueId
和具体消息内容,而在 Broker
中管你是哪门子消息,他直接全部顺序存储到了 CommitLog。
而根据生产者指定的 Topic
和 QueueId
将这条消息本身在 CommitLog
的偏移(offset),消息本身大小,和tag的hash值存入对应的 ConsumeQueue
索引文件中。而在每个队列中都保存了 ConsumeOffset
即每个消费者组的消费位置(我在架构那里提到了,忘了的同学可以回去看一下),而消费者拉取消息进行消费的时候只需要根据 ConsumeOffset
获取下一个未被消费的消息就行了。
为什么 CommitLog
文件要设计成固定大小的长度呢?提醒:内存映射机制。
5. 序列化
在RocketMQ中,RemotingCommand 这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作org.apache.rocketmq.remoting.protocol.RemotingCommand
6. RocketMQ 配置详解
Broker 参数
# | 参数名 | 默认值 | 说明 |
---|---|---|---|
2 | listenPort | 10911 | broker的服务端口号,作为对producer和consumer使用服务的端口号 |
3 | namesrvAddr | null | namesrv的ip地址。格式: ip:port;ip:port |
4 | brokerIP1 | 本机IP | broker所在的机器ip,默认不用设置,如果机器有多个网卡,需要手动设置 |
5 | brokerName | 本机主机名 | 作用为一组master与slave通过brokerName是否相同来标示,通过brokerId来区分master还是slave |
6 | brokerClusterName | DefaultCluster | 整个broker集群的名字,创建topic时需要指定。 |
7 | brokerId | 0 | 0:master 非0:slave |
8 | storePathCommitLog | $HOME/store/commitlog/ | commitLog存储路径 |
9 | storePathConsumerQueue | $HOME/store/consumequeue/ | 消费队列存储路径 |
10 | mapedFileSizeCommitLog | 1024 1024 1024(1G) | commitLog每个文件的大小,默认1G |
11 | deleteWhen | 4 | 删除文件时间点,默认凌晨 4点 |
12 | fileReservedTime | 72 | 文件保留时间,默认72小时. |
13 | brokerRole | ASYNC_MASTER | Broker 的角色 ASYNC_MASTER 异步复制Master SYNC_MASTER 同步双写Master SLAVE |
14 | flushDiskType | ASYNC_FLUSH | 刷盘方式 ASYNC_FLUSH 异步刷盘 SYNC_FLUSH 同步刷盘 |
15 | defaultTopicQueueNums | 4 | 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。 |
16 | autoCreateTopicEnable | true | 是否自动创建topic。 |
17 | autoCreateSubscriptionGroup | true | 是否允许Broker自动创建订阅组,建议线下开启,线上关闭 |
18 | rejectTransactionMessage | false | 是否拒绝事务消息接入 |
19 | etchNamesrvAddrByAddressServer | false | 是否从web服务器获取Name Server地址,针对大规模的Broker集群建议使用这种方式 |
20 | storePathIndex | $HOME/store/index | 消息索引存储路径 |
21 | storeCheckpoint | $HOME/store/checkpoint | checkpoint文件存储路径 |
22 | abortFile | $HOME/store/abort | abort文件存储路径 |
23 | maxTransferBytesOnMessageInMemory | 262144 | 单次Pull消息(内存)传输的最大字节数 |
24 | maxTransferCountOnMessageInMemory | 32 | 单次Pull消息(内存)传输的最大条数 |
25 | maxTransferBytesOnMessageInDisk | 65536 | 单次Pull消息(磁盘)传输的最大字节数 |
26 | maxTransferCountOnMessageInDisk | 8 | 单次Pull消息(磁盘)传输的最大条数 |
27 | messageIndexEnable | true | 是否开启消息索引功能 |
28 | messageIndexSafe | false | 是否提供安全的消息索引机制,索引保证不丢 |
29 | haMasterAddress | 在Slave上直接设置Master地址,默认从Name Server上自动获取,也可以手工强制配置 | |
30 | cleanFileForciblyEnable | true | 磁盘满、且无过期文件情况下 TRUE 表示强制删除文件,优先保证服务可用 FALSE 标记服务不可用,文件不删除 |
Consumer 参数
DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer 都继承与 ClientConfig类,ClientConfig 为客户端的公共配置类。客户端的配置都是 get、se t形式,每个参数都可以用spring 来配置,也可以在代码中配置,例如namesrvAddr这个参数可以这样配置,其他参数同理。producer.setNamesrvAddr("192.168.0.1:9876");
# | 参数名 | 默认值 | 说明 |
---|---|---|---|
1 | namesrvAddr | Name Server地址列表,多个NameServer地址用分号隔开 | |
2 | clientIP | 本机IP | 客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 |
3 | instanceName | DEFAULT | 客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等) |
4 | clientCallbackExecutorThreads | 4 | 通信层异步回调线程数 |
5 | pollNameServerInteval | 30000 | 轮询Name Server间隔时间,单位毫秒 |
6 | heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 |
7 | persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 |
Producer 参数
■Producer配置 | |||
---|---|---|---|
# | 参数名 | 默认值 | 说明 |
1 | producerGroup | DEFAULT_PRODUCER | Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组 |
2 | createTopicKey | TBW102 | 在发送消息时,自动创建服务器不存在的topic,需要指定Key。 |
3 | defaultTopicQueueNums | 4 | 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 |
4 | sendMsgTimeout | 10000 | 发送消息超时时间,单位毫秒 |
5 | compressMsgBodyOverHowmuch | 4096 | 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 |
6 | retryAnotherBrokerWhenNotStoreOK | FALSE | 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 |
7 | maxMessageSize | 131072 | 客户端限制的消息大小,超过报错,同时服务端也会限制 |
8 | transactionCheckListener | 事务消息回查监听器,如果发送事务消息,必须设置 | |
9 | checkThreadPoolMinSize | 1 | Broker回查Producer事务状态时,线程池大小 |
10 | checkThreadPoolMaxSize | Broker回查Producer事务状态时,线程池大小 | |
11 | checkRequestHoldMax | 2000 | Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 |
■Push Consumer配置 | |||
# | 参数名 | 默认值 | 说明 |
1 | consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
2 | messageModel | CLUSTERING | 消息模型,支持以下两种 1、集群消费(CLSUTER) 2、广播消费(BROADCASTING) |
3 | consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer启动后,默认从什么位置开始消费 1、CONSUME_FROM_LAST_OFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息 2、CONSUME_FROM_FIRST_OFFSET:从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍 3、CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前 |
4 | allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
5 | subscription | {} | 订阅关系 |
6 | messageListener | 消息监听器 | |
7 | offsetStore | 消费进度存储 | |
8 | consumeThreadMin | 10 | 消费线程池数量 |
9 | consumeThreadMax | 20 | 消费线程池数量 |
10 | consumeConcurrentlyMaxSpan | 2000 | 单队列并行消费允许的最大跨度 |
11 | pullThresholdForQueue | 1000 | 拉消息本地队列缓存消息最大数 |
12 | pullInterval | 0 | 拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒 |
13 | consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 |
14 | pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 |
■Pull Consumer配置 | |||
# | 参数名 | 默认值 | 说明 |
1 | consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
2 | brokerSuspendMaxTimeMillis | 20000 | 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒 |
3 | consumerTimeoutMillisWhenSuspend | 30000 | 长轮询,Consumer拉消息请求在Broker挂起超过指定时间,客户端认为超时,单位毫秒 |
4 | consumerPullTimeoutMillis | 10000 | 非长轮询,拉消息超时时间,单位毫秒 |
5 | messageModel | BROADCASTING | 消息模型,支持以下两种 1、集群消费 2、广播消费 |
6 | messageQueueListener | 监听队列变化 | |
7 | offsetStore | 消费进度存储 | |
8 | registerTopics | [] | 注册的topic集合 |
9 | allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
Meesage 数据结构
Message 数据结构各个字段都可以通过get、set方式访问,例如访问topic:
msg.getTopic();
msg.setTopic("test");
字段名 | 默认值 | 必填 | 说明 |
---|---|---|---|
Topic | null | ○ | 线下环境不需要申请,线上环境需要申请后才能使用 |
Body | null | ○ | 二进制形式,序列化由应用决定,Producer与Consumer要协商好序列化形式。 |
Tags | null | 类似于Gmail为每封邮件设置的标签,方便服务器过滤使用。目前只支持每个消息设置一个tag,所以也可以类比为Notify的MessageType概念。 | |
Keys | null | 代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以再Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品ID等。 | |
Flag | 0 | 完全由应用来设置,RocketMQ不做敢于。 | |
DelayTimeLevel | 0 | 消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费。 | |
WaitStoreMsgOK | TRUE | 表示消息是否在服务器罗盘后才返回应答。 |
7. RocketMQ 开发指南
8. RocketMQ 最佳实战
8.1 Producer 最佳实践
- 一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了 tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。
- 每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
- 消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。
- 对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。
- 某些应用如果不关注消息是否发送成功,请直接使用
sendOneWay
方法发送消息。
8.2 Consumer 最佳实践
- 消费过程要做到幂等(即消费端去重)
- 尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量。
- 优化每条消息消费过程
8.3 其他配置
线上应该关闭 autoCreateTopicEnable
,即在配置文件中将其设置为 false
。
RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于 MQServer 上面还没有创建对应的 Topic
,这个时候,如果上面的配置打开的话,会返回默认 TOPIC 的(RocketMQ会在每台broker
上面创建名为TBW102
的TOPIC)路由信息,然后Producer
会选择一台Broker
发送消息,选中的broker
在存储消息时,发现消息的topic
还没有创建,就会自动创建topic
。后果就是:以后所有该TOPIC的消息,都将发送到这台broker
上,达不到负载均衡的目的。
所以基于目前 RocketMQ 的设计,建议关闭自动创建 TOPIC 的功能,然后根据消息量的大小,手动创建 TOPIC。